1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125 |
- # Copyright 2020 Google LLC
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import datetime
- import http.client as http_client
- import json
- import os
- import urllib.parse
- import mock
- import pytest # type: ignore
- from google.auth import _helpers
- from google.auth import aws
- from google.auth import environment_vars
- from google.auth import exceptions
- from google.auth import transport
- IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE = (
- "gl-python/3.7 auth/1.1 auth-request-type/at cred-type/imp"
- )
- LANG_LIBRARY_METRICS_HEADER_VALUE = "gl-python/3.7 auth/1.1"
- CLIENT_ID = "username"
- CLIENT_SECRET = "password"
- # Base64 encoding of "username:password".
- BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ="
- SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com"
- SERVICE_ACCOUNT_IMPERSONATION_URL_BASE = (
- "https://us-east1-iamcredentials.googleapis.com"
- )
- SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE = "/v1/projects/-/serviceAccounts/{}:generateAccessToken".format(
- SERVICE_ACCOUNT_EMAIL
- )
- SERVICE_ACCOUNT_IMPERSONATION_URL = (
- SERVICE_ACCOUNT_IMPERSONATION_URL_BASE + SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE
- )
- QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID"
- SCOPES = ["scope1", "scope2"]
- TOKEN_URL = "https://sts.googleapis.com/v1/token"
- TOKEN_INFO_URL = "https://sts.googleapis.com/v1/introspect"
- SUBJECT_TOKEN_TYPE = "urn:ietf:params:aws:token-type:aws4_request"
- AUDIENCE = "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID"
- REGION_URL = "http://169.254.169.254/latest/meta-data/placement/availability-zone"
- IMDSV2_SESSION_TOKEN_URL = "http://169.254.169.254/latest/api/token"
- SECURITY_CREDS_URL = "http://169.254.169.254/latest/meta-data/iam/security-credentials"
- REGION_URL_IPV6 = "http://[fd00:ec2::254]/latest/meta-data/placement/availability-zone"
- IMDSV2_SESSION_TOKEN_URL_IPV6 = "http://[fd00:ec2::254]/latest/api/token"
- SECURITY_CREDS_URL_IPV6 = (
- "http://[fd00:ec2::254]/latest/meta-data/iam/security-credentials"
- )
- CRED_VERIFICATION_URL = (
- "https://sts.{region}.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15"
- )
- # Sample fictitious AWS security credentials to be used with tests that require a session token.
- ACCESS_KEY_ID = "AKIAIOSFODNN7EXAMPLE"
- SECRET_ACCESS_KEY = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
- TOKEN = "AQoEXAMPLEH4aoAH0gNCAPyJxz4BlCFFxWNE1OPTgk5TthT+FvwqnKwRcOIfrRh3c/LTo6UDdyJwOOvEVPvLXCrrrUtdnniCEXAMPLE/IvU1dYUg2RVAJBanLiHb4IgRmpRV3zrkuWJOgQs8IZZaIv2BXIa2R4OlgkBN9bkUDNCJiBeb/AXlzBBko7b15fjrBs2+cTQtpZ3CYWFXG8C5zqx37wnOE49mRl/+OtkIKGO7fAE"
- # To avoid json.dumps() differing behavior from one version to other,
- # the JSON payload is hardcoded.
- REQUEST_PARAMS = '{"KeySchema":[{"KeyType":"HASH","AttributeName":"Id"}],"TableName":"TestTable","AttributeDefinitions":[{"AttributeName":"Id","AttributeType":"S"}],"ProvisionedThroughput":{"WriteCapacityUnits":5,"ReadCapacityUnits":5}}'
- # Each tuple contains the following entries:
- # region, time, credentials, original_request, signed_request
- DEFAULT_UNIVERSE_DOMAIN = "googleapis.com"
- VALID_TOKEN_URLS = [
- "https://sts.googleapis.com",
- "https://us-east-1.sts.googleapis.com",
- "https://US-EAST-1.sts.googleapis.com",
- "https://sts.us-east-1.googleapis.com",
- "https://sts.US-WEST-1.googleapis.com",
- "https://us-east-1-sts.googleapis.com",
- "https://US-WEST-1-sts.googleapis.com",
- "https://us-west-1-sts.googleapis.com/path?query",
- "https://sts-us-east-1.p.googleapis.com",
- ]
- INVALID_TOKEN_URLS = [
- "https://iamcredentials.googleapis.com",
- "sts.googleapis.com",
- "https://",
- "http://sts.googleapis.com",
- "https://st.s.googleapis.com",
- "https://us-eas\t-1.sts.googleapis.com",
- "https:/us-east-1.sts.googleapis.com",
- "https://US-WE/ST-1-sts.googleapis.com",
- "https://sts-us-east-1.googleapis.com",
- "https://sts-US-WEST-1.googleapis.com",
- "testhttps://us-east-1.sts.googleapis.com",
- "https://us-east-1.sts.googleapis.comevil.com",
- "https://us-east-1.us-east-1.sts.googleapis.com",
- "https://us-ea.s.t.sts.googleapis.com",
- "https://sts.googleapis.comevil.com",
- "hhttps://us-east-1.sts.googleapis.com",
- "https://us- -1.sts.googleapis.com",
- "https://-sts.googleapis.com",
- "https://us-east-1.sts.googleapis.com.evil.com",
- "https://sts.pgoogleapis.com",
- "https://p.googleapis.com",
- "https://sts.p.com",
- "http://sts.p.googleapis.com",
- "https://xyz-sts.p.googleapis.com",
- "https://sts-xyz.123.p.googleapis.com",
- "https://sts-xyz.p1.googleapis.com",
- "https://sts-xyz.p.foo.com",
- "https://sts-xyz.p.foo.googleapis.com",
- ]
- VALID_SERVICE_ACCOUNT_IMPERSONATION_URLS = [
- "https://iamcredentials.googleapis.com",
- "https://us-east-1.iamcredentials.googleapis.com",
- "https://US-EAST-1.iamcredentials.googleapis.com",
- "https://iamcredentials.us-east-1.googleapis.com",
- "https://iamcredentials.US-WEST-1.googleapis.com",
- "https://us-east-1-iamcredentials.googleapis.com",
- "https://US-WEST-1-iamcredentials.googleapis.com",
- "https://us-west-1-iamcredentials.googleapis.com/path?query",
- "https://iamcredentials-us-east-1.p.googleapis.com",
- ]
- INVALID_SERVICE_ACCOUNT_IMPERSONATION_URLS = [
- "https://sts.googleapis.com",
- "iamcredentials.googleapis.com",
- "https://",
- "http://iamcredentials.googleapis.com",
- "https://iamcre.dentials.googleapis.com",
- "https://us-eas\t-1.iamcredentials.googleapis.com",
- "https:/us-east-1.iamcredentials.googleapis.com",
- "https://US-WE/ST-1-iamcredentials.googleapis.com",
- "https://iamcredentials-us-east-1.googleapis.com",
- "https://iamcredentials-US-WEST-1.googleapis.com",
- "testhttps://us-east-1.iamcredentials.googleapis.com",
- "https://us-east-1.iamcredentials.googleapis.comevil.com",
- "https://us-east-1.us-east-1.iamcredentials.googleapis.com",
- "https://us-ea.s.t.iamcredentials.googleapis.com",
- "https://iamcredentials.googleapis.comevil.com",
- "hhttps://us-east-1.iamcredentials.googleapis.com",
- "https://us- -1.iamcredentials.googleapis.com",
- "https://-iamcredentials.googleapis.com",
- "https://us-east-1.iamcredentials.googleapis.com.evil.com",
- "https://iamcredentials.pgoogleapis.com",
- "https://p.googleapis.com",
- "https://iamcredentials.p.com",
- "http://iamcredentials.p.googleapis.com",
- "https://xyz-iamcredentials.p.googleapis.com",
- "https://iamcredentials-xyz.123.p.googleapis.com",
- "https://iamcredentials-xyz.p1.googleapis.com",
- "https://iamcredentials-xyz.p.foo.com",
- "https://iamcredentials-xyz.p.foo.googleapis.com",
- ]
- TEST_FIXTURES = [
- # GET request (AWS botocore tests).
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla.req
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla.sreq
- (
- "us-east-1",
- "2011-09-09T23:36:00Z",
- {
- "access_key_id": "AKIDEXAMPLE",
- "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
- },
- {
- "method": "GET",
- "url": "https://host.foo.com",
- "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
- },
- {
- "url": "https://host.foo.com",
- "method": "GET",
- "headers": {
- "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=b27ccfbfa7df52a200ff74193ca6e32d4b48b8856fab7ebf1c595d0670a7e470",
- "host": "host.foo.com",
- "date": "Mon, 09 Sep 2011 23:36:00 GMT",
- },
- },
- ),
- # GET request with relative path (AWS botocore tests).
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-relative-relative.req
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-relative-relative.sreq
- (
- "us-east-1",
- "2011-09-09T23:36:00Z",
- {
- "access_key_id": "AKIDEXAMPLE",
- "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
- },
- {
- "method": "GET",
- "url": "https://host.foo.com/foo/bar/../..",
- "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
- },
- {
- "url": "https://host.foo.com/foo/bar/../..",
- "method": "GET",
- "headers": {
- "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=b27ccfbfa7df52a200ff74193ca6e32d4b48b8856fab7ebf1c595d0670a7e470",
- "host": "host.foo.com",
- "date": "Mon, 09 Sep 2011 23:36:00 GMT",
- },
- },
- ),
- # GET request with /./ path (AWS botocore tests).
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-slash-dot-slash.req
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-slash-dot-slash.sreq
- (
- "us-east-1",
- "2011-09-09T23:36:00Z",
- {
- "access_key_id": "AKIDEXAMPLE",
- "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
- },
- {
- "method": "GET",
- "url": "https://host.foo.com/./",
- "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
- },
- {
- "url": "https://host.foo.com/./",
- "method": "GET",
- "headers": {
- "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=b27ccfbfa7df52a200ff74193ca6e32d4b48b8856fab7ebf1c595d0670a7e470",
- "host": "host.foo.com",
- "date": "Mon, 09 Sep 2011 23:36:00 GMT",
- },
- },
- ),
- # GET request with pointless dot path (AWS botocore tests).
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-slash-pointless-dot.req
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-slash-pointless-dot.sreq
- (
- "us-east-1",
- "2011-09-09T23:36:00Z",
- {
- "access_key_id": "AKIDEXAMPLE",
- "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
- },
- {
- "method": "GET",
- "url": "https://host.foo.com/./foo",
- "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
- },
- {
- "url": "https://host.foo.com/./foo",
- "method": "GET",
- "headers": {
- "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=910e4d6c9abafaf87898e1eb4c929135782ea25bb0279703146455745391e63a",
- "host": "host.foo.com",
- "date": "Mon, 09 Sep 2011 23:36:00 GMT",
- },
- },
- ),
- # GET request with utf8 path (AWS botocore tests).
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-utf8.req
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-utf8.sreq
- (
- "us-east-1",
- "2011-09-09T23:36:00Z",
- {
- "access_key_id": "AKIDEXAMPLE",
- "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
- },
- {
- "method": "GET",
- "url": "https://host.foo.com/%E1%88%B4",
- "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
- },
- {
- "url": "https://host.foo.com/%E1%88%B4",
- "method": "GET",
- "headers": {
- "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=8d6634c189aa8c75c2e51e106b6b5121bed103fdb351f7d7d4381c738823af74",
- "host": "host.foo.com",
- "date": "Mon, 09 Sep 2011 23:36:00 GMT",
- },
- },
- ),
- # GET request with duplicate query key (AWS botocore tests).
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla-query-order-key-case.req
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla-query-order-key-case.sreq
- (
- "us-east-1",
- "2011-09-09T23:36:00Z",
- {
- "access_key_id": "AKIDEXAMPLE",
- "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
- },
- {
- "method": "GET",
- "url": "https://host.foo.com/?foo=Zoo&foo=aha",
- "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
- },
- {
- "url": "https://host.foo.com/?foo=Zoo&foo=aha",
- "method": "GET",
- "headers": {
- "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=be7148d34ebccdc6423b19085378aa0bee970bdc61d144bd1a8c48c33079ab09",
- "host": "host.foo.com",
- "date": "Mon, 09 Sep 2011 23:36:00 GMT",
- },
- },
- ),
- # GET request with duplicate out of order query key (AWS botocore tests).
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla-query-order-value.req
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla-query-order-value.sreq
- (
- "us-east-1",
- "2011-09-09T23:36:00Z",
- {
- "access_key_id": "AKIDEXAMPLE",
- "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
- },
- {
- "method": "GET",
- "url": "https://host.foo.com/?foo=b&foo=a",
- "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
- },
- {
- "url": "https://host.foo.com/?foo=b&foo=a",
- "method": "GET",
- "headers": {
- "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=feb926e49e382bec75c9d7dcb2a1b6dc8aa50ca43c25d2bc51143768c0875acc",
- "host": "host.foo.com",
- "date": "Mon, 09 Sep 2011 23:36:00 GMT",
- },
- },
- ),
- # GET request with utf8 query (AWS botocore tests).
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla-ut8-query.req
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla-ut8-query.sreq
- (
- "us-east-1",
- "2011-09-09T23:36:00Z",
- {
- "access_key_id": "AKIDEXAMPLE",
- "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
- },
- {
- "method": "GET",
- "url": "https://host.foo.com/?{}=bar".format(
- urllib.parse.unquote("%E1%88%B4")
- ),
- "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
- },
- {
- "url": "https://host.foo.com/?{}=bar".format(
- urllib.parse.unquote("%E1%88%B4")
- ),
- "method": "GET",
- "headers": {
- "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=6fb359e9a05394cc7074e0feb42573a2601abc0c869a953e8c5c12e4e01f1a8c",
- "host": "host.foo.com",
- "date": "Mon, 09 Sep 2011 23:36:00 GMT",
- },
- },
- ),
- # POST request with sorted headers (AWS botocore tests).
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-header-key-sort.req
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-header-key-sort.sreq
- (
- "us-east-1",
- "2011-09-09T23:36:00Z",
- {
- "access_key_id": "AKIDEXAMPLE",
- "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
- },
- {
- "method": "POST",
- "url": "https://host.foo.com/",
- "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT", "ZOO": "zoobar"},
- },
- {
- "url": "https://host.foo.com/",
- "method": "POST",
- "headers": {
- "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host;zoo, Signature=b7a95a52518abbca0964a999a880429ab734f35ebbf1235bd79a5de87756dc4a",
- "host": "host.foo.com",
- "date": "Mon, 09 Sep 2011 23:36:00 GMT",
- "ZOO": "zoobar",
- },
- },
- ),
- # POST request with upper case header value from AWS Python test harness.
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-header-value-case.req
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-header-value-case.sreq
- (
- "us-east-1",
- "2011-09-09T23:36:00Z",
- {
- "access_key_id": "AKIDEXAMPLE",
- "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
- },
- {
- "method": "POST",
- "url": "https://host.foo.com/",
- "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT", "zoo": "ZOOBAR"},
- },
- {
- "url": "https://host.foo.com/",
- "method": "POST",
- "headers": {
- "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host;zoo, Signature=273313af9d0c265c531e11db70bbd653f3ba074c1009239e8559d3987039cad7",
- "host": "host.foo.com",
- "date": "Mon, 09 Sep 2011 23:36:00 GMT",
- "zoo": "ZOOBAR",
- },
- },
- ),
- # POST request with header and no body (AWS botocore tests).
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-header-value-trim.req
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-header-value-trim.sreq
- (
- "us-east-1",
- "2011-09-09T23:36:00Z",
- {
- "access_key_id": "AKIDEXAMPLE",
- "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
- },
- {
- "method": "POST",
- "url": "https://host.foo.com/",
- "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT", "p": "phfft"},
- },
- {
- "url": "https://host.foo.com/",
- "method": "POST",
- "headers": {
- "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host;p, Signature=debf546796015d6f6ded8626f5ce98597c33b47b9164cf6b17b4642036fcb592",
- "host": "host.foo.com",
- "date": "Mon, 09 Sep 2011 23:36:00 GMT",
- "p": "phfft",
- },
- },
- ),
- # POST request with body and no header (AWS botocore tests).
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-x-www-form-urlencoded.req
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-x-www-form-urlencoded.sreq
- (
- "us-east-1",
- "2011-09-09T23:36:00Z",
- {
- "access_key_id": "AKIDEXAMPLE",
- "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
- },
- {
- "method": "POST",
- "url": "https://host.foo.com/",
- "headers": {
- "Content-Type": "application/x-www-form-urlencoded",
- "date": "Mon, 09 Sep 2011 23:36:00 GMT",
- },
- "data": "foo=bar",
- },
- {
- "url": "https://host.foo.com/",
- "method": "POST",
- "headers": {
- "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=content-type;date;host, Signature=5a15b22cf462f047318703b92e6f4f38884e4a7ab7b1d6426ca46a8bd1c26cbc",
- "host": "host.foo.com",
- "Content-Type": "application/x-www-form-urlencoded",
- "date": "Mon, 09 Sep 2011 23:36:00 GMT",
- },
- "data": "foo=bar",
- },
- ),
- # POST request with querystring (AWS botocore tests).
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-vanilla-query.req
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-vanilla-query.sreq
- (
- "us-east-1",
- "2011-09-09T23:36:00Z",
- {
- "access_key_id": "AKIDEXAMPLE",
- "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
- },
- {
- "method": "POST",
- "url": "https://host.foo.com/?foo=bar",
- "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
- },
- {
- "url": "https://host.foo.com/?foo=bar",
- "method": "POST",
- "headers": {
- "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=b6e3b79003ce0743a491606ba1035a804593b0efb1e20a11cba83f8c25a57a92",
- "host": "host.foo.com",
- "date": "Mon, 09 Sep 2011 23:36:00 GMT",
- },
- },
- ),
- # GET request with session token credentials.
- (
- "us-east-2",
- "2020-08-11T06:55:22Z",
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- },
- {
- "method": "GET",
- "url": "https://ec2.us-east-2.amazonaws.com?Action=DescribeRegions&Version=2013-10-15",
- },
- {
- "url": "https://ec2.us-east-2.amazonaws.com?Action=DescribeRegions&Version=2013-10-15",
- "method": "GET",
- "headers": {
- "Authorization": "AWS4-HMAC-SHA256 Credential="
- + ACCESS_KEY_ID
- + "/20200811/us-east-2/ec2/aws4_request, SignedHeaders=host;x-amz-date;x-amz-security-token, Signature=41e226f997bf917ec6c9b2b14218df0874225f13bb153236c247881e614fafc9",
- "host": "ec2.us-east-2.amazonaws.com",
- "x-amz-date": "20200811T065522Z",
- "x-amz-security-token": TOKEN,
- },
- },
- ),
- # POST request with session token credentials.
- (
- "us-east-2",
- "2020-08-11T06:55:22Z",
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- },
- {
- "method": "POST",
- "url": "https://sts.us-east-2.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15",
- },
- {
- "url": "https://sts.us-east-2.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15",
- "method": "POST",
- "headers": {
- "Authorization": "AWS4-HMAC-SHA256 Credential="
- + ACCESS_KEY_ID
- + "/20200811/us-east-2/sts/aws4_request, SignedHeaders=host;x-amz-date;x-amz-security-token, Signature=596aa990b792d763465d73703e684ca273c45536c6d322c31be01a41d02e5b60",
- "host": "sts.us-east-2.amazonaws.com",
- "x-amz-date": "20200811T065522Z",
- "x-amz-security-token": TOKEN,
- },
- },
- ),
- # POST request with computed x-amz-date and no data.
- (
- "us-east-2",
- "2020-08-11T06:55:22Z",
- {"access_key_id": ACCESS_KEY_ID, "secret_access_key": SECRET_ACCESS_KEY},
- {
- "method": "POST",
- "url": "https://sts.us-east-2.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15",
- },
- {
- "url": "https://sts.us-east-2.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15",
- "method": "POST",
- "headers": {
- "Authorization": "AWS4-HMAC-SHA256 Credential="
- + ACCESS_KEY_ID
- + "/20200811/us-east-2/sts/aws4_request, SignedHeaders=host;x-amz-date, Signature=9e722e5b7bfa163447e2a14df118b45ebd283c5aea72019bdf921d6e7dc01a9a",
- "host": "sts.us-east-2.amazonaws.com",
- "x-amz-date": "20200811T065522Z",
- },
- },
- ),
- # POST request with session token and additional headers/data.
- (
- "us-east-2",
- "2020-08-11T06:55:22Z",
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- },
- {
- "method": "POST",
- "url": "https://dynamodb.us-east-2.amazonaws.com/",
- "headers": {
- "Content-Type": "application/x-amz-json-1.0",
- "x-amz-target": "DynamoDB_20120810.CreateTable",
- },
- "data": REQUEST_PARAMS,
- },
- {
- "url": "https://dynamodb.us-east-2.amazonaws.com/",
- "method": "POST",
- "headers": {
- "Authorization": "AWS4-HMAC-SHA256 Credential="
- + ACCESS_KEY_ID
- + "/20200811/us-east-2/dynamodb/aws4_request, SignedHeaders=content-type;host;x-amz-date;x-amz-security-token;x-amz-target, Signature=eb8bce0e63654bba672d4a8acb07e72d69210c1797d56ce024dbbc31beb2a2c7",
- "host": "dynamodb.us-east-2.amazonaws.com",
- "x-amz-date": "20200811T065522Z",
- "Content-Type": "application/x-amz-json-1.0",
- "x-amz-target": "DynamoDB_20120810.CreateTable",
- "x-amz-security-token": TOKEN,
- },
- "data": REQUEST_PARAMS,
- },
- ),
- ]
- class TestRequestSigner(object):
- @pytest.mark.parametrize(
- "region, time, credentials, original_request, signed_request", TEST_FIXTURES
- )
- @mock.patch("google.auth._helpers.utcnow")
- def test_get_request_options(
- self, utcnow, region, time, credentials, original_request, signed_request
- ):
- utcnow.return_value = datetime.datetime.strptime(time, "%Y-%m-%dT%H:%M:%SZ")
- request_signer = aws.RequestSigner(region)
- actual_signed_request = request_signer.get_request_options(
- credentials,
- original_request.get("url"),
- original_request.get("method"),
- original_request.get("data"),
- original_request.get("headers"),
- )
- assert actual_signed_request == signed_request
- def test_get_request_options_with_missing_scheme_url(self):
- request_signer = aws.RequestSigner("us-east-2")
- with pytest.raises(ValueError) as excinfo:
- request_signer.get_request_options(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- },
- "invalid",
- "POST",
- )
- assert excinfo.match(r"Invalid AWS service URL")
- def test_get_request_options_with_invalid_scheme_url(self):
- request_signer = aws.RequestSigner("us-east-2")
- with pytest.raises(ValueError) as excinfo:
- request_signer.get_request_options(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- },
- "http://invalid",
- "POST",
- )
- assert excinfo.match(r"Invalid AWS service URL")
- def test_get_request_options_with_missing_hostname_url(self):
- request_signer = aws.RequestSigner("us-east-2")
- with pytest.raises(ValueError) as excinfo:
- request_signer.get_request_options(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- },
- "https://",
- "POST",
- )
- assert excinfo.match(r"Invalid AWS service URL")
- class TestCredentials(object):
- AWS_REGION = "us-east-2"
- AWS_ROLE = "gcp-aws-role"
- AWS_SECURITY_CREDENTIALS_RESPONSE = {
- "AccessKeyId": ACCESS_KEY_ID,
- "SecretAccessKey": SECRET_ACCESS_KEY,
- "Token": TOKEN,
- }
- AWS_IMDSV2_SESSION_TOKEN = "awsimdsv2sessiontoken"
- AWS_SIGNATURE_TIME = "2020-08-11T06:55:22Z"
- CREDENTIAL_SOURCE = {
- "environment_id": "aws1",
- "region_url": REGION_URL,
- "url": SECURITY_CREDS_URL,
- "regional_cred_verification_url": CRED_VERIFICATION_URL,
- }
- CREDENTIAL_SOURCE_IPV6 = {
- "environment_id": "aws1",
- "region_url": REGION_URL_IPV6,
- "url": SECURITY_CREDS_URL_IPV6,
- "regional_cred_verification_url": CRED_VERIFICATION_URL,
- "imdsv2_session_token_url": IMDSV2_SESSION_TOKEN_URL_IPV6,
- }
- SUCCESS_RESPONSE = {
- "access_token": "ACCESS_TOKEN",
- "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
- "token_type": "Bearer",
- "expires_in": 3600,
- "scope": " ".join(SCOPES),
- }
- @classmethod
- def make_serialized_aws_signed_request(
- cls,
- aws_security_credentials,
- region_name="us-east-2",
- url="https://sts.us-east-2.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15",
- ):
- """Utility to generate serialize AWS signed requests.
- This makes it easy to assert generated subject tokens based on the
- provided AWS security credentials, regions and AWS STS endpoint.
- """
- request_signer = aws.RequestSigner(region_name)
- signed_request = request_signer.get_request_options(
- aws_security_credentials, url, "POST"
- )
- reformatted_signed_request = {
- "url": signed_request.get("url"),
- "method": signed_request.get("method"),
- "headers": [
- {
- "key": "Authorization",
- "value": signed_request.get("headers").get("Authorization"),
- },
- {"key": "host", "value": signed_request.get("headers").get("host")},
- {
- "key": "x-amz-date",
- "value": signed_request.get("headers").get("x-amz-date"),
- },
- ],
- }
- # Include security token if available.
- if "security_token" in aws_security_credentials:
- reformatted_signed_request.get("headers").append(
- {
- "key": "x-amz-security-token",
- "value": signed_request.get("headers").get("x-amz-security-token"),
- }
- )
- # Append x-goog-cloud-target-resource header.
- reformatted_signed_request.get("headers").append(
- {"key": "x-goog-cloud-target-resource", "value": AUDIENCE}
- ),
- return urllib.parse.quote(
- json.dumps(
- reformatted_signed_request, separators=(",", ":"), sort_keys=True
- )
- )
- @classmethod
- def make_mock_request(
- cls,
- region_status=None,
- region_name=None,
- role_status=None,
- role_name=None,
- security_credentials_status=None,
- security_credentials_data=None,
- token_status=None,
- token_data=None,
- impersonation_status=None,
- impersonation_data=None,
- imdsv2_session_token_status=None,
- imdsv2_session_token_data=None,
- ):
- """Utility function to generate a mock HTTP request object.
- This will facilitate testing various edge cases by specify how the
- various endpoints will respond while generating a Google Access token
- in an AWS environment.
- """
- responses = []
- if imdsv2_session_token_status:
- # AWS session token request
- imdsv2_session_response = mock.create_autospec(
- transport.Response, instance=True
- )
- imdsv2_session_response.status = imdsv2_session_token_status
- imdsv2_session_response.data = imdsv2_session_token_data
- responses.append(imdsv2_session_response)
- if region_status:
- # AWS region request.
- region_response = mock.create_autospec(transport.Response, instance=True)
- region_response.status = region_status
- if region_name:
- region_response.data = "{}b".format(region_name).encode("utf-8")
- responses.append(region_response)
- if role_status:
- # AWS role name request.
- role_response = mock.create_autospec(transport.Response, instance=True)
- role_response.status = role_status
- if role_name:
- role_response.data = role_name.encode("utf-8")
- responses.append(role_response)
- if security_credentials_status:
- # AWS security credentials request.
- security_credentials_response = mock.create_autospec(
- transport.Response, instance=True
- )
- security_credentials_response.status = security_credentials_status
- if security_credentials_data:
- security_credentials_response.data = json.dumps(
- security_credentials_data
- ).encode("utf-8")
- responses.append(security_credentials_response)
- if token_status:
- # GCP token exchange request.
- token_response = mock.create_autospec(transport.Response, instance=True)
- token_response.status = token_status
- token_response.data = json.dumps(token_data).encode("utf-8")
- responses.append(token_response)
- if impersonation_status:
- # Service account impersonation request.
- impersonation_response = mock.create_autospec(
- transport.Response, instance=True
- )
- impersonation_response.status = impersonation_status
- impersonation_response.data = json.dumps(impersonation_data).encode("utf-8")
- responses.append(impersonation_response)
- request = mock.create_autospec(transport.Request)
- request.side_effect = responses
- return request
- @classmethod
- def make_credentials(
- cls,
- credential_source,
- token_url=TOKEN_URL,
- token_info_url=TOKEN_INFO_URL,
- client_id=None,
- client_secret=None,
- quota_project_id=None,
- scopes=None,
- default_scopes=None,
- service_account_impersonation_url=None,
- ):
- return aws.Credentials(
- audience=AUDIENCE,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=token_url,
- token_info_url=token_info_url,
- service_account_impersonation_url=service_account_impersonation_url,
- credential_source=credential_source,
- client_id=client_id,
- client_secret=client_secret,
- quota_project_id=quota_project_id,
- scopes=scopes,
- default_scopes=default_scopes,
- )
- @classmethod
- def assert_aws_metadata_request_kwargs(
- cls, request_kwargs, url, headers=None, method="GET"
- ):
- assert request_kwargs["url"] == url
- # All used AWS metadata server endpoints use GET HTTP method.
- assert request_kwargs["method"] == method
- if headers:
- assert request_kwargs["headers"] == headers
- else:
- assert "headers" not in request_kwargs or request_kwargs["headers"] is None
- # None of the endpoints used require any data in request.
- assert "body" not in request_kwargs
- @classmethod
- def assert_token_request_kwargs(
- cls, request_kwargs, headers, request_data, token_url=TOKEN_URL
- ):
- assert request_kwargs["url"] == token_url
- assert request_kwargs["method"] == "POST"
- assert request_kwargs["headers"] == headers
- assert request_kwargs["body"] is not None
- body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
- assert len(body_tuples) == len(request_data.keys())
- for (k, v) in body_tuples:
- assert v.decode("utf-8") == request_data[k.decode("utf-8")]
- @classmethod
- def assert_impersonation_request_kwargs(
- cls,
- request_kwargs,
- headers,
- request_data,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- ):
- assert request_kwargs["url"] == service_account_impersonation_url
- assert request_kwargs["method"] == "POST"
- assert request_kwargs["headers"] == headers
- assert request_kwargs["body"] is not None
- body_json = json.loads(request_kwargs["body"].decode("utf-8"))
- assert body_json == request_data
- @mock.patch.object(aws.Credentials, "__init__", return_value=None)
- def test_from_info_full_options(self, mock_init):
- credentials = aws.Credentials.from_info(
- {
- "audience": AUDIENCE,
- "subject_token_type": SUBJECT_TOKEN_TYPE,
- "token_url": TOKEN_URL,
- "token_info_url": TOKEN_INFO_URL,
- "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL,
- "service_account_impersonation": {"token_lifetime_seconds": 2800},
- "client_id": CLIENT_ID,
- "client_secret": CLIENT_SECRET,
- "quota_project_id": QUOTA_PROJECT_ID,
- "credential_source": self.CREDENTIAL_SOURCE,
- }
- )
- # Confirm aws.Credentials instance initialized with the expected parameters.
- assert isinstance(credentials, aws.Credentials)
- mock_init.assert_called_once_with(
- audience=AUDIENCE,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- token_info_url=TOKEN_INFO_URL,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- service_account_impersonation_options={"token_lifetime_seconds": 2800},
- client_id=CLIENT_ID,
- client_secret=CLIENT_SECRET,
- credential_source=self.CREDENTIAL_SOURCE,
- quota_project_id=QUOTA_PROJECT_ID,
- workforce_pool_user_project=None,
- universe_domain=DEFAULT_UNIVERSE_DOMAIN,
- )
- @mock.patch.object(aws.Credentials, "__init__", return_value=None)
- def test_from_info_required_options_only(self, mock_init):
- credentials = aws.Credentials.from_info(
- {
- "audience": AUDIENCE,
- "subject_token_type": SUBJECT_TOKEN_TYPE,
- "token_url": TOKEN_URL,
- "credential_source": self.CREDENTIAL_SOURCE,
- }
- )
- # Confirm aws.Credentials instance initialized with the expected parameters.
- assert isinstance(credentials, aws.Credentials)
- mock_init.assert_called_once_with(
- audience=AUDIENCE,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- token_info_url=None,
- service_account_impersonation_url=None,
- service_account_impersonation_options={},
- client_id=None,
- client_secret=None,
- credential_source=self.CREDENTIAL_SOURCE,
- quota_project_id=None,
- workforce_pool_user_project=None,
- universe_domain=DEFAULT_UNIVERSE_DOMAIN,
- )
- @mock.patch.object(aws.Credentials, "__init__", return_value=None)
- def test_from_file_full_options(self, mock_init, tmpdir):
- info = {
- "audience": AUDIENCE,
- "subject_token_type": SUBJECT_TOKEN_TYPE,
- "token_url": TOKEN_URL,
- "token_info_url": TOKEN_INFO_URL,
- "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL,
- "service_account_impersonation": {"token_lifetime_seconds": 2800},
- "client_id": CLIENT_ID,
- "client_secret": CLIENT_SECRET,
- "quota_project_id": QUOTA_PROJECT_ID,
- "credential_source": self.CREDENTIAL_SOURCE,
- "universe_domain": DEFAULT_UNIVERSE_DOMAIN,
- }
- config_file = tmpdir.join("config.json")
- config_file.write(json.dumps(info))
- credentials = aws.Credentials.from_file(str(config_file))
- # Confirm aws.Credentials instance initialized with the expected parameters.
- assert isinstance(credentials, aws.Credentials)
- mock_init.assert_called_once_with(
- audience=AUDIENCE,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- token_info_url=TOKEN_INFO_URL,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- service_account_impersonation_options={"token_lifetime_seconds": 2800},
- client_id=CLIENT_ID,
- client_secret=CLIENT_SECRET,
- credential_source=self.CREDENTIAL_SOURCE,
- quota_project_id=QUOTA_PROJECT_ID,
- workforce_pool_user_project=None,
- universe_domain=DEFAULT_UNIVERSE_DOMAIN,
- )
- @mock.patch.object(aws.Credentials, "__init__", return_value=None)
- def test_from_file_required_options_only(self, mock_init, tmpdir):
- info = {
- "audience": AUDIENCE,
- "subject_token_type": SUBJECT_TOKEN_TYPE,
- "token_url": TOKEN_URL,
- "credential_source": self.CREDENTIAL_SOURCE,
- }
- config_file = tmpdir.join("config.json")
- config_file.write(json.dumps(info))
- credentials = aws.Credentials.from_file(str(config_file))
- # Confirm aws.Credentials instance initialized with the expected parameters.
- assert isinstance(credentials, aws.Credentials)
- mock_init.assert_called_once_with(
- audience=AUDIENCE,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- token_info_url=None,
- service_account_impersonation_url=None,
- service_account_impersonation_options={},
- client_id=None,
- client_secret=None,
- credential_source=self.CREDENTIAL_SOURCE,
- quota_project_id=None,
- workforce_pool_user_project=None,
- universe_domain=DEFAULT_UNIVERSE_DOMAIN,
- )
- def test_constructor_invalid_credential_source(self):
- # Provide invalid credential source.
- credential_source = {"unsupported": "value"}
- with pytest.raises(ValueError) as excinfo:
- self.make_credentials(credential_source=credential_source)
- assert excinfo.match(r"No valid AWS 'credential_source' provided")
- def test_constructor_invalid_environment_id(self):
- # Provide invalid environment_id.
- credential_source = self.CREDENTIAL_SOURCE.copy()
- credential_source["environment_id"] = "azure1"
- with pytest.raises(ValueError) as excinfo:
- self.make_credentials(credential_source=credential_source)
- assert excinfo.match(r"No valid AWS 'credential_source' provided")
- def test_constructor_missing_cred_verification_url(self):
- # regional_cred_verification_url is a required field.
- credential_source = self.CREDENTIAL_SOURCE.copy()
- credential_source.pop("regional_cred_verification_url")
- with pytest.raises(ValueError) as excinfo:
- self.make_credentials(credential_source=credential_source)
- assert excinfo.match(r"No valid AWS 'credential_source' provided")
- def test_constructor_invalid_environment_id_version(self):
- # Provide an unsupported version.
- credential_source = self.CREDENTIAL_SOURCE.copy()
- credential_source["environment_id"] = "aws3"
- with pytest.raises(ValueError) as excinfo:
- self.make_credentials(credential_source=credential_source)
- assert excinfo.match(r"aws version '3' is not supported in the current build.")
- def test_info(self):
- credentials = self.make_credentials(
- credential_source=self.CREDENTIAL_SOURCE.copy()
- )
- assert credentials.info == {
- "type": "external_account",
- "audience": AUDIENCE,
- "subject_token_type": SUBJECT_TOKEN_TYPE,
- "token_url": TOKEN_URL,
- "token_info_url": TOKEN_INFO_URL,
- "credential_source": self.CREDENTIAL_SOURCE,
- "universe_domain": DEFAULT_UNIVERSE_DOMAIN,
- }
- def test_token_info_url(self):
- credentials = self.make_credentials(
- credential_source=self.CREDENTIAL_SOURCE.copy()
- )
- assert credentials.token_info_url == TOKEN_INFO_URL
- def test_token_info_url_custom(self):
- for url in VALID_TOKEN_URLS:
- credentials = self.make_credentials(
- credential_source=self.CREDENTIAL_SOURCE.copy(),
- token_info_url=(url + "/introspect"),
- )
- assert credentials.token_info_url == (url + "/introspect")
- def test_token_info_url_negative(self):
- credentials = self.make_credentials(
- credential_source=self.CREDENTIAL_SOURCE.copy(), token_info_url=None
- )
- assert not credentials.token_info_url
- def test_token_url_custom(self):
- for url in VALID_TOKEN_URLS:
- credentials = self.make_credentials(
- credential_source=self.CREDENTIAL_SOURCE.copy(),
- token_url=(url + "/token"),
- )
- assert credentials._token_url == (url + "/token")
- def test_service_account_impersonation_url_custom(self):
- for url in VALID_SERVICE_ACCOUNT_IMPERSONATION_URLS:
- credentials = self.make_credentials(
- credential_source=self.CREDENTIAL_SOURCE.copy(),
- service_account_impersonation_url=(
- url + SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE
- ),
- )
- assert credentials._service_account_impersonation_url == (
- url + SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE
- )
- def test_retrieve_subject_token_missing_region_url(self):
- # When AWS_REGION envvar is not available, region_url is required for
- # determining the current AWS region.
- credential_source = self.CREDENTIAL_SOURCE.copy()
- credential_source.pop("region_url")
- credentials = self.make_credentials(credential_source=credential_source)
- with pytest.raises(exceptions.RefreshError) as excinfo:
- credentials.retrieve_subject_token(None)
- assert excinfo.match(r"Unable to determine AWS region")
- @mock.patch("google.auth._helpers.utcnow")
- def test_retrieve_subject_token_success_temp_creds_no_environment_vars(
- self, utcnow
- ):
- utcnow.return_value = datetime.datetime.strptime(
- self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
- )
- request = self.make_mock_request(
- region_status=http_client.OK,
- region_name=self.AWS_REGION,
- role_status=http_client.OK,
- role_name=self.AWS_ROLE,
- security_credentials_status=http_client.OK,
- security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
- )
- credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
- subject_token = credentials.retrieve_subject_token(request)
- assert subject_token == self.make_serialized_aws_signed_request(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- }
- )
- # Assert region request.
- self.assert_aws_metadata_request_kwargs(
- request.call_args_list[0][1], REGION_URL
- )
- # Assert role request.
- self.assert_aws_metadata_request_kwargs(
- request.call_args_list[1][1], SECURITY_CREDS_URL
- )
- # Assert security credentials request.
- self.assert_aws_metadata_request_kwargs(
- request.call_args_list[2][1],
- "{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE),
- {"Content-Type": "application/json"},
- )
- # Retrieve subject_token again. Region should not be queried again.
- new_request = self.make_mock_request(
- role_status=http_client.OK,
- role_name=self.AWS_ROLE,
- security_credentials_status=http_client.OK,
- security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
- )
- credentials.retrieve_subject_token(new_request)
- # Only 3 requests should be sent as the region is cached.
- assert len(new_request.call_args_list) == 2
- # Assert role request.
- self.assert_aws_metadata_request_kwargs(
- new_request.call_args_list[0][1], SECURITY_CREDS_URL
- )
- # Assert security credentials request.
- self.assert_aws_metadata_request_kwargs(
- new_request.call_args_list[1][1],
- "{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE),
- {"Content-Type": "application/json"},
- )
- @mock.patch("google.auth._helpers.utcnow")
- @mock.patch.dict(os.environ, {})
- def test_retrieve_subject_token_success_temp_creds_no_environment_vars_idmsv2(
- self, utcnow
- ):
- utcnow.return_value = datetime.datetime.strptime(
- self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
- )
- request = self.make_mock_request(
- region_status=http_client.OK,
- region_name=self.AWS_REGION,
- role_status=http_client.OK,
- role_name=self.AWS_ROLE,
- security_credentials_status=http_client.OK,
- security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
- imdsv2_session_token_status=http_client.OK,
- imdsv2_session_token_data=self.AWS_IMDSV2_SESSION_TOKEN,
- )
- credential_source_token_url = self.CREDENTIAL_SOURCE.copy()
- credential_source_token_url[
- "imdsv2_session_token_url"
- ] = IMDSV2_SESSION_TOKEN_URL
- credentials = self.make_credentials(
- credential_source=credential_source_token_url
- )
- subject_token = credentials.retrieve_subject_token(request)
- assert subject_token == self.make_serialized_aws_signed_request(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- }
- )
- # Assert session token request
- self.assert_aws_metadata_request_kwargs(
- request.call_args_list[0][1],
- IMDSV2_SESSION_TOKEN_URL,
- {"X-aws-ec2-metadata-token-ttl-seconds": "300"},
- "PUT",
- )
- # Assert region request.
- self.assert_aws_metadata_request_kwargs(
- request.call_args_list[1][1],
- REGION_URL,
- {"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN},
- )
- # Assert role request.
- self.assert_aws_metadata_request_kwargs(
- request.call_args_list[2][1],
- SECURITY_CREDS_URL,
- {"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN},
- )
- # Assert security credentials request.
- self.assert_aws_metadata_request_kwargs(
- request.call_args_list[3][1],
- "{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE),
- {
- "Content-Type": "application/json",
- "X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN,
- },
- )
- # Retrieve subject_token again. Region should not be queried again.
- new_request = self.make_mock_request(
- role_status=http_client.OK,
- role_name=self.AWS_ROLE,
- security_credentials_status=http_client.OK,
- security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
- imdsv2_session_token_status=http_client.OK,
- imdsv2_session_token_data=self.AWS_IMDSV2_SESSION_TOKEN,
- )
- credentials.retrieve_subject_token(new_request)
- # Only 3 requests should be sent as the region is cached.
- assert len(new_request.call_args_list) == 3
- # Assert session token request.
- self.assert_aws_metadata_request_kwargs(
- request.call_args_list[0][1],
- IMDSV2_SESSION_TOKEN_URL,
- {"X-aws-ec2-metadata-token-ttl-seconds": "300"},
- "PUT",
- )
- # Assert role request.
- self.assert_aws_metadata_request_kwargs(
- new_request.call_args_list[1][1],
- SECURITY_CREDS_URL,
- {"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN},
- )
- # Assert security credentials request.
- self.assert_aws_metadata_request_kwargs(
- new_request.call_args_list[2][1],
- "{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE),
- {
- "Content-Type": "application/json",
- "X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN,
- },
- )
- @mock.patch("google.auth._helpers.utcnow")
- @mock.patch.dict(
- os.environ,
- {
- environment_vars.AWS_REGION: AWS_REGION,
- environment_vars.AWS_ACCESS_KEY_ID: ACCESS_KEY_ID,
- },
- )
- def test_retrieve_subject_token_success_temp_creds_environment_vars_missing_secret_access_key_idmsv2(
- self, utcnow
- ):
- utcnow.return_value = datetime.datetime.strptime(
- self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
- )
- request = self.make_mock_request(
- role_status=http_client.OK,
- role_name=self.AWS_ROLE,
- security_credentials_status=http_client.OK,
- security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
- imdsv2_session_token_status=http_client.OK,
- imdsv2_session_token_data=self.AWS_IMDSV2_SESSION_TOKEN,
- )
- credential_source_token_url = self.CREDENTIAL_SOURCE.copy()
- credential_source_token_url[
- "imdsv2_session_token_url"
- ] = IMDSV2_SESSION_TOKEN_URL
- credentials = self.make_credentials(
- credential_source=credential_source_token_url
- )
- subject_token = credentials.retrieve_subject_token(request)
- assert subject_token == self.make_serialized_aws_signed_request(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- }
- )
- # Assert session token request.
- self.assert_aws_metadata_request_kwargs(
- request.call_args_list[0][1],
- IMDSV2_SESSION_TOKEN_URL,
- {"X-aws-ec2-metadata-token-ttl-seconds": "300"},
- "PUT",
- )
- # Assert role request.
- self.assert_aws_metadata_request_kwargs(
- request.call_args_list[1][1],
- SECURITY_CREDS_URL,
- {"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN},
- )
- # Assert security credentials request.
- self.assert_aws_metadata_request_kwargs(
- request.call_args_list[2][1],
- "{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE),
- {
- "Content-Type": "application/json",
- "X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN,
- },
- )
- @mock.patch("google.auth._helpers.utcnow")
- @mock.patch.dict(
- os.environ,
- {
- environment_vars.AWS_REGION: AWS_REGION,
- environment_vars.AWS_SECRET_ACCESS_KEY: SECRET_ACCESS_KEY,
- },
- )
- def test_retrieve_subject_token_success_temp_creds_environment_vars_missing_access_key_id_idmsv2(
- self, utcnow
- ):
- utcnow.return_value = datetime.datetime.strptime(
- self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
- )
- request = self.make_mock_request(
- role_status=http_client.OK,
- role_name=self.AWS_ROLE,
- security_credentials_status=http_client.OK,
- security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
- imdsv2_session_token_status=http_client.OK,
- imdsv2_session_token_data=self.AWS_IMDSV2_SESSION_TOKEN,
- )
- credential_source_token_url = self.CREDENTIAL_SOURCE.copy()
- credential_source_token_url[
- "imdsv2_session_token_url"
- ] = IMDSV2_SESSION_TOKEN_URL
- credentials = self.make_credentials(
- credential_source=credential_source_token_url
- )
- subject_token = credentials.retrieve_subject_token(request)
- assert subject_token == self.make_serialized_aws_signed_request(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- }
- )
- # Assert session token request.
- self.assert_aws_metadata_request_kwargs(
- request.call_args_list[0][1],
- IMDSV2_SESSION_TOKEN_URL,
- {"X-aws-ec2-metadata-token-ttl-seconds": "300"},
- "PUT",
- )
- # Assert role request.
- self.assert_aws_metadata_request_kwargs(
- request.call_args_list[1][1],
- SECURITY_CREDS_URL,
- {"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN},
- )
- # Assert security credentials request.
- self.assert_aws_metadata_request_kwargs(
- request.call_args_list[2][1],
- "{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE),
- {
- "Content-Type": "application/json",
- "X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN,
- },
- )
- @mock.patch("google.auth._helpers.utcnow")
- @mock.patch.dict(os.environ, {environment_vars.AWS_REGION: AWS_REGION})
- def test_retrieve_subject_token_success_temp_creds_environment_vars_missing_creds_idmsv2(
- self, utcnow
- ):
- utcnow.return_value = datetime.datetime.strptime(
- self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
- )
- request = self.make_mock_request(
- role_status=http_client.OK,
- role_name=self.AWS_ROLE,
- security_credentials_status=http_client.OK,
- security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
- imdsv2_session_token_status=http_client.OK,
- imdsv2_session_token_data=self.AWS_IMDSV2_SESSION_TOKEN,
- )
- credential_source_token_url = self.CREDENTIAL_SOURCE.copy()
- credential_source_token_url[
- "imdsv2_session_token_url"
- ] = IMDSV2_SESSION_TOKEN_URL
- credentials = self.make_credentials(
- credential_source=credential_source_token_url
- )
- subject_token = credentials.retrieve_subject_token(request)
- assert subject_token == self.make_serialized_aws_signed_request(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- }
- )
- # Assert session token request.
- self.assert_aws_metadata_request_kwargs(
- request.call_args_list[0][1],
- IMDSV2_SESSION_TOKEN_URL,
- {"X-aws-ec2-metadata-token-ttl-seconds": "300"},
- "PUT",
- )
- # Assert role request.
- self.assert_aws_metadata_request_kwargs(
- request.call_args_list[1][1],
- SECURITY_CREDS_URL,
- {"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN},
- )
- # Assert security credentials request.
- self.assert_aws_metadata_request_kwargs(
- request.call_args_list[2][1],
- "{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE),
- {
- "Content-Type": "application/json",
- "X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN,
- },
- )
- @mock.patch("google.auth._helpers.utcnow")
- @mock.patch.dict(
- os.environ,
- {
- environment_vars.AWS_REGION: AWS_REGION,
- environment_vars.AWS_ACCESS_KEY_ID: ACCESS_KEY_ID,
- environment_vars.AWS_SECRET_ACCESS_KEY: SECRET_ACCESS_KEY,
- },
- )
- def test_retrieve_subject_token_success_temp_creds_idmsv2(self, utcnow):
- utcnow.return_value = datetime.datetime.strptime(
- self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
- )
- request = self.make_mock_request(
- role_status=http_client.OK, role_name=self.AWS_ROLE
- )
- credential_source_token_url = self.CREDENTIAL_SOURCE.copy()
- credential_source_token_url[
- "imdsv2_session_token_url"
- ] = IMDSV2_SESSION_TOKEN_URL
- credentials = self.make_credentials(
- credential_source=credential_source_token_url
- )
- credentials.retrieve_subject_token(request)
- assert not request.called
- @mock.patch("google.auth._helpers.utcnow")
- def test_retrieve_subject_token_success_ipv6(self, utcnow):
- utcnow.return_value = datetime.datetime.strptime(
- self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
- )
- request = self.make_mock_request(
- region_status=http_client.OK,
- region_name=self.AWS_REGION,
- role_status=http_client.OK,
- role_name=self.AWS_ROLE,
- security_credentials_status=http_client.OK,
- security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
- imdsv2_session_token_status=http_client.OK,
- imdsv2_session_token_data=self.AWS_IMDSV2_SESSION_TOKEN,
- )
- credential_source_token_url = self.CREDENTIAL_SOURCE_IPV6.copy()
- credentials = self.make_credentials(
- credential_source=credential_source_token_url
- )
- subject_token = credentials.retrieve_subject_token(request)
- assert subject_token == self.make_serialized_aws_signed_request(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- }
- )
- # Assert session token request.
- self.assert_aws_metadata_request_kwargs(
- request.call_args_list[0][1],
- IMDSV2_SESSION_TOKEN_URL_IPV6,
- {"X-aws-ec2-metadata-token-ttl-seconds": "300"},
- "PUT",
- )
- # Assert region request.
- self.assert_aws_metadata_request_kwargs(
- request.call_args_list[1][1],
- REGION_URL_IPV6,
- {"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN},
- )
- # Assert role request.
- self.assert_aws_metadata_request_kwargs(
- request.call_args_list[2][1],
- SECURITY_CREDS_URL_IPV6,
- {"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN},
- )
- # Assert security credentials request.
- self.assert_aws_metadata_request_kwargs(
- request.call_args_list[3][1],
- "{}/{}".format(SECURITY_CREDS_URL_IPV6, self.AWS_ROLE),
- {
- "Content-Type": "application/json",
- "X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN,
- },
- )
- @mock.patch("google.auth._helpers.utcnow")
- def test_retrieve_subject_token_session_error_idmsv2(self, utcnow):
- utcnow.return_value = datetime.datetime.strptime(
- self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
- )
- request = self.make_mock_request(
- imdsv2_session_token_status=http_client.UNAUTHORIZED,
- imdsv2_session_token_data="unauthorized",
- )
- credential_source_token_url = self.CREDENTIAL_SOURCE.copy()
- credential_source_token_url[
- "imdsv2_session_token_url"
- ] = IMDSV2_SESSION_TOKEN_URL
- credentials = self.make_credentials(
- credential_source=credential_source_token_url
- )
- with pytest.raises(exceptions.RefreshError) as excinfo:
- credentials.retrieve_subject_token(request)
- assert excinfo.match(r"Unable to retrieve AWS Session Token")
- # Assert session token request
- self.assert_aws_metadata_request_kwargs(
- request.call_args_list[0][1],
- IMDSV2_SESSION_TOKEN_URL,
- {"X-aws-ec2-metadata-token-ttl-seconds": "300"},
- "PUT",
- )
- @mock.patch("google.auth._helpers.utcnow")
- def test_retrieve_subject_token_success_permanent_creds_no_environment_vars(
- self, utcnow
- ):
- # Simualte a permanent credential without a session token is
- # returned by the security-credentials endpoint.
- security_creds_response = self.AWS_SECURITY_CREDENTIALS_RESPONSE.copy()
- security_creds_response.pop("Token")
- utcnow.return_value = datetime.datetime.strptime(
- self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
- )
- request = self.make_mock_request(
- region_status=http_client.OK,
- region_name=self.AWS_REGION,
- role_status=http_client.OK,
- role_name=self.AWS_ROLE,
- security_credentials_status=http_client.OK,
- security_credentials_data=security_creds_response,
- )
- credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
- subject_token = credentials.retrieve_subject_token(request)
- assert subject_token == self.make_serialized_aws_signed_request(
- {"access_key_id": ACCESS_KEY_ID, "secret_access_key": SECRET_ACCESS_KEY}
- )
- @mock.patch("google.auth._helpers.utcnow")
- def test_retrieve_subject_token_success_environment_vars(self, utcnow, monkeypatch):
- monkeypatch.setenv(environment_vars.AWS_ACCESS_KEY_ID, ACCESS_KEY_ID)
- monkeypatch.setenv(environment_vars.AWS_SECRET_ACCESS_KEY, SECRET_ACCESS_KEY)
- monkeypatch.setenv(environment_vars.AWS_SESSION_TOKEN, TOKEN)
- monkeypatch.setenv(environment_vars.AWS_REGION, self.AWS_REGION)
- utcnow.return_value = datetime.datetime.strptime(
- self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
- )
- credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
- subject_token = credentials.retrieve_subject_token(None)
- assert subject_token == self.make_serialized_aws_signed_request(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- }
- )
- @mock.patch("google.auth._helpers.utcnow")
- def test_retrieve_subject_token_success_environment_vars_with_default_region(
- self, utcnow, monkeypatch
- ):
- monkeypatch.setenv(environment_vars.AWS_ACCESS_KEY_ID, ACCESS_KEY_ID)
- monkeypatch.setenv(environment_vars.AWS_SECRET_ACCESS_KEY, SECRET_ACCESS_KEY)
- monkeypatch.setenv(environment_vars.AWS_SESSION_TOKEN, TOKEN)
- monkeypatch.setenv(environment_vars.AWS_DEFAULT_REGION, self.AWS_REGION)
- utcnow.return_value = datetime.datetime.strptime(
- self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
- )
- credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
- subject_token = credentials.retrieve_subject_token(None)
- assert subject_token == self.make_serialized_aws_signed_request(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- }
- )
- @mock.patch("google.auth._helpers.utcnow")
- def test_retrieve_subject_token_success_environment_vars_with_both_regions_set(
- self, utcnow, monkeypatch
- ):
- monkeypatch.setenv(environment_vars.AWS_ACCESS_KEY_ID, ACCESS_KEY_ID)
- monkeypatch.setenv(environment_vars.AWS_SECRET_ACCESS_KEY, SECRET_ACCESS_KEY)
- monkeypatch.setenv(environment_vars.AWS_SESSION_TOKEN, TOKEN)
- monkeypatch.setenv(environment_vars.AWS_DEFAULT_REGION, "Malformed AWS Region")
- # This test makes sure that the AWS_REGION gets used over AWS_DEFAULT_REGION,
- # So, AWS_DEFAULT_REGION is set to something that would cause the test to fail,
- # And AWS_REGION is set to the a valid value, and it should succeed
- monkeypatch.setenv(environment_vars.AWS_REGION, self.AWS_REGION)
- utcnow.return_value = datetime.datetime.strptime(
- self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
- )
- credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
- subject_token = credentials.retrieve_subject_token(None)
- assert subject_token == self.make_serialized_aws_signed_request(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- }
- )
- @mock.patch("google.auth._helpers.utcnow")
- def test_retrieve_subject_token_success_environment_vars_no_session_token(
- self, utcnow, monkeypatch
- ):
- monkeypatch.setenv(environment_vars.AWS_ACCESS_KEY_ID, ACCESS_KEY_ID)
- monkeypatch.setenv(environment_vars.AWS_SECRET_ACCESS_KEY, SECRET_ACCESS_KEY)
- monkeypatch.setenv(environment_vars.AWS_REGION, self.AWS_REGION)
- utcnow.return_value = datetime.datetime.strptime(
- self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
- )
- credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
- subject_token = credentials.retrieve_subject_token(None)
- assert subject_token == self.make_serialized_aws_signed_request(
- {"access_key_id": ACCESS_KEY_ID, "secret_access_key": SECRET_ACCESS_KEY}
- )
- @mock.patch("google.auth._helpers.utcnow")
- def test_retrieve_subject_token_success_environment_vars_except_region(
- self, utcnow, monkeypatch
- ):
- monkeypatch.setenv(environment_vars.AWS_ACCESS_KEY_ID, ACCESS_KEY_ID)
- monkeypatch.setenv(environment_vars.AWS_SECRET_ACCESS_KEY, SECRET_ACCESS_KEY)
- monkeypatch.setenv(environment_vars.AWS_SESSION_TOKEN, TOKEN)
- utcnow.return_value = datetime.datetime.strptime(
- self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
- )
- # Region will be queried since it is not found in envvars.
- request = self.make_mock_request(
- region_status=http_client.OK, region_name=self.AWS_REGION
- )
- credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
- subject_token = credentials.retrieve_subject_token(request)
- assert subject_token == self.make_serialized_aws_signed_request(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- }
- )
- def test_retrieve_subject_token_error_determining_aws_region(self):
- # Simulate error in retrieving the AWS region.
- request = self.make_mock_request(region_status=http_client.BAD_REQUEST)
- credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
- with pytest.raises(exceptions.RefreshError) as excinfo:
- credentials.retrieve_subject_token(request)
- assert excinfo.match(r"Unable to retrieve AWS region")
- def test_retrieve_subject_token_error_determining_aws_role(self):
- # Simulate error in retrieving the AWS role name.
- request = self.make_mock_request(
- region_status=http_client.OK,
- region_name=self.AWS_REGION,
- role_status=http_client.BAD_REQUEST,
- )
- credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
- with pytest.raises(exceptions.RefreshError) as excinfo:
- credentials.retrieve_subject_token(request)
- assert excinfo.match(r"Unable to retrieve AWS role name")
- def test_retrieve_subject_token_error_determining_security_creds_url(self):
- # Simulate the security-credentials url is missing. This is needed for
- # determining the AWS security credentials when not found in envvars.
- credential_source = self.CREDENTIAL_SOURCE.copy()
- credential_source.pop("url")
- request = self.make_mock_request(
- region_status=http_client.OK, region_name=self.AWS_REGION
- )
- credentials = self.make_credentials(credential_source=credential_source)
- with pytest.raises(exceptions.RefreshError) as excinfo:
- credentials.retrieve_subject_token(request)
- assert excinfo.match(
- r"Unable to determine the AWS metadata server security credentials endpoint"
- )
- def test_retrieve_subject_token_error_determining_aws_security_creds(self):
- # Simulate error in retrieving the AWS security credentials.
- request = self.make_mock_request(
- region_status=http_client.OK,
- region_name=self.AWS_REGION,
- role_status=http_client.OK,
- role_name=self.AWS_ROLE,
- security_credentials_status=http_client.BAD_REQUEST,
- )
- credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
- with pytest.raises(exceptions.RefreshError) as excinfo:
- credentials.retrieve_subject_token(request)
- assert excinfo.match(r"Unable to retrieve AWS security credentials")
- @mock.patch(
- "google.auth.metrics.python_and_auth_lib_version",
- return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
- )
- @mock.patch("google.auth._helpers.utcnow")
- def test_refresh_success_without_impersonation_ignore_default_scopes(
- self, utcnow, mock_auth_lib_value
- ):
- utcnow.return_value = datetime.datetime.strptime(
- self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
- )
- expected_subject_token = self.make_serialized_aws_signed_request(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- }
- )
- token_headers = {
- "Content-Type": "application/x-www-form-urlencoded",
- "Authorization": "Basic " + BASIC_AUTH_ENCODING,
- "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false source/aws",
- }
- token_request_data = {
- "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
- "audience": AUDIENCE,
- "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
- "scope": " ".join(SCOPES),
- "subject_token": expected_subject_token,
- "subject_token_type": SUBJECT_TOKEN_TYPE,
- }
- request = self.make_mock_request(
- region_status=http_client.OK,
- region_name=self.AWS_REGION,
- role_status=http_client.OK,
- role_name=self.AWS_ROLE,
- security_credentials_status=http_client.OK,
- security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
- token_status=http_client.OK,
- token_data=self.SUCCESS_RESPONSE,
- )
- credentials = self.make_credentials(
- client_id=CLIENT_ID,
- client_secret=CLIENT_SECRET,
- credential_source=self.CREDENTIAL_SOURCE,
- quota_project_id=QUOTA_PROJECT_ID,
- scopes=SCOPES,
- # Default scopes should be ignored.
- default_scopes=["ignored"],
- )
- credentials.refresh(request)
- assert len(request.call_args_list) == 4
- # Fourth request should be sent to GCP STS endpoint.
- self.assert_token_request_kwargs(
- request.call_args_list[3][1], token_headers, token_request_data
- )
- assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
- assert credentials.quota_project_id == QUOTA_PROJECT_ID
- assert credentials.scopes == SCOPES
- assert credentials.default_scopes == ["ignored"]
- @mock.patch(
- "google.auth.metrics.python_and_auth_lib_version",
- return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
- )
- @mock.patch("google.auth._helpers.utcnow")
- def test_refresh_success_without_impersonation_use_default_scopes(
- self, utcnow, mock_auth_lib_value
- ):
- utcnow.return_value = datetime.datetime.strptime(
- self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
- )
- expected_subject_token = self.make_serialized_aws_signed_request(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- }
- )
- token_headers = {
- "Content-Type": "application/x-www-form-urlencoded",
- "Authorization": "Basic " + BASIC_AUTH_ENCODING,
- "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false source/aws",
- }
- token_request_data = {
- "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
- "audience": AUDIENCE,
- "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
- "scope": " ".join(SCOPES),
- "subject_token": expected_subject_token,
- "subject_token_type": SUBJECT_TOKEN_TYPE,
- }
- request = self.make_mock_request(
- region_status=http_client.OK,
- region_name=self.AWS_REGION,
- role_status=http_client.OK,
- role_name=self.AWS_ROLE,
- security_credentials_status=http_client.OK,
- security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
- token_status=http_client.OK,
- token_data=self.SUCCESS_RESPONSE,
- )
- credentials = self.make_credentials(
- client_id=CLIENT_ID,
- client_secret=CLIENT_SECRET,
- credential_source=self.CREDENTIAL_SOURCE,
- quota_project_id=QUOTA_PROJECT_ID,
- scopes=None,
- # Default scopes should be used since user specified scopes are none.
- default_scopes=SCOPES,
- )
- credentials.refresh(request)
- assert len(request.call_args_list) == 4
- # Fourth request should be sent to GCP STS endpoint.
- self.assert_token_request_kwargs(
- request.call_args_list[3][1], token_headers, token_request_data
- )
- assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
- assert credentials.quota_project_id == QUOTA_PROJECT_ID
- assert credentials.scopes is None
- assert credentials.default_scopes == SCOPES
- @mock.patch(
- "google.auth.metrics.token_request_access_token_impersonate",
- return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
- )
- @mock.patch(
- "google.auth.metrics.python_and_auth_lib_version",
- return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
- )
- @mock.patch("google.auth._helpers.utcnow")
- def test_refresh_success_with_impersonation_ignore_default_scopes(
- self, utcnow, mock_metrics_header_value, mock_auth_lib_value
- ):
- utcnow.return_value = datetime.datetime.strptime(
- self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
- )
- expire_time = (
- _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
- ).isoformat("T") + "Z"
- expected_subject_token = self.make_serialized_aws_signed_request(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- }
- )
- token_headers = {
- "Content-Type": "application/x-www-form-urlencoded",
- "Authorization": "Basic " + BASIC_AUTH_ENCODING,
- "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/false source/aws",
- }
- token_request_data = {
- "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
- "audience": AUDIENCE,
- "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
- "scope": "https://www.googleapis.com/auth/iam",
- "subject_token": expected_subject_token,
- "subject_token_type": SUBJECT_TOKEN_TYPE,
- }
- # Service account impersonation request/response.
- impersonation_response = {
- "accessToken": "SA_ACCESS_TOKEN",
- "expireTime": expire_time,
- }
- impersonation_headers = {
- "Content-Type": "application/json",
- "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
- "x-goog-user-project": QUOTA_PROJECT_ID,
- "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
- "x-allowed-locations": "0x0",
- }
- impersonation_request_data = {
- "delegates": None,
- "scope": SCOPES,
- "lifetime": "3600s",
- }
- request = self.make_mock_request(
- region_status=http_client.OK,
- region_name=self.AWS_REGION,
- role_status=http_client.OK,
- role_name=self.AWS_ROLE,
- security_credentials_status=http_client.OK,
- security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
- token_status=http_client.OK,
- token_data=self.SUCCESS_RESPONSE,
- impersonation_status=http_client.OK,
- impersonation_data=impersonation_response,
- )
- credentials = self.make_credentials(
- client_id=CLIENT_ID,
- client_secret=CLIENT_SECRET,
- credential_source=self.CREDENTIAL_SOURCE,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- quota_project_id=QUOTA_PROJECT_ID,
- scopes=SCOPES,
- # Default scopes should be ignored.
- default_scopes=["ignored"],
- )
- credentials.refresh(request)
- assert len(request.call_args_list) == 5
- # Fourth request should be sent to GCP STS endpoint.
- self.assert_token_request_kwargs(
- request.call_args_list[3][1], token_headers, token_request_data
- )
- # Fifth request should be sent to iamcredentials endpoint for service
- # account impersonation.
- self.assert_impersonation_request_kwargs(
- request.call_args_list[4][1],
- impersonation_headers,
- impersonation_request_data,
- )
- assert credentials.token == impersonation_response["accessToken"]
- assert credentials.quota_project_id == QUOTA_PROJECT_ID
- assert credentials.scopes == SCOPES
- assert credentials.default_scopes == ["ignored"]
- @mock.patch(
- "google.auth.metrics.token_request_access_token_impersonate",
- return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
- )
- @mock.patch(
- "google.auth.metrics.python_and_auth_lib_version",
- return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
- )
- @mock.patch("google.auth._helpers.utcnow")
- def test_refresh_success_with_impersonation_use_default_scopes(
- self, utcnow, mock_metrics_header_value, mock_auth_lib_value
- ):
- utcnow.return_value = datetime.datetime.strptime(
- self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
- )
- expire_time = (
- _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
- ).isoformat("T") + "Z"
- expected_subject_token = self.make_serialized_aws_signed_request(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- }
- )
- token_headers = {
- "Content-Type": "application/x-www-form-urlencoded",
- "Authorization": "Basic " + BASIC_AUTH_ENCODING,
- "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/false source/aws",
- }
- token_request_data = {
- "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
- "audience": AUDIENCE,
- "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
- "scope": "https://www.googleapis.com/auth/iam",
- "subject_token": expected_subject_token,
- "subject_token_type": SUBJECT_TOKEN_TYPE,
- }
- # Service account impersonation request/response.
- impersonation_response = {
- "accessToken": "SA_ACCESS_TOKEN",
- "expireTime": expire_time,
- }
- impersonation_headers = {
- "Content-Type": "application/json",
- "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
- "x-goog-user-project": QUOTA_PROJECT_ID,
- "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
- "x-allowed-locations": "0x0",
- }
- impersonation_request_data = {
- "delegates": None,
- "scope": SCOPES,
- "lifetime": "3600s",
- }
- request = self.make_mock_request(
- region_status=http_client.OK,
- region_name=self.AWS_REGION,
- role_status=http_client.OK,
- role_name=self.AWS_ROLE,
- security_credentials_status=http_client.OK,
- security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
- token_status=http_client.OK,
- token_data=self.SUCCESS_RESPONSE,
- impersonation_status=http_client.OK,
- impersonation_data=impersonation_response,
- )
- credentials = self.make_credentials(
- client_id=CLIENT_ID,
- client_secret=CLIENT_SECRET,
- credential_source=self.CREDENTIAL_SOURCE,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- quota_project_id=QUOTA_PROJECT_ID,
- scopes=None,
- # Default scopes should be used since user specified scopes are none.
- default_scopes=SCOPES,
- )
- credentials.refresh(request)
- assert len(request.call_args_list) == 5
- # Fourth request should be sent to GCP STS endpoint.
- self.assert_token_request_kwargs(
- request.call_args_list[3][1], token_headers, token_request_data
- )
- # Fifth request should be sent to iamcredentials endpoint for service
- # account impersonation.
- self.assert_impersonation_request_kwargs(
- request.call_args_list[4][1],
- impersonation_headers,
- impersonation_request_data,
- )
- assert credentials.token == impersonation_response["accessToken"]
- assert credentials.quota_project_id == QUOTA_PROJECT_ID
- assert credentials.scopes is None
- assert credentials.default_scopes == SCOPES
- def test_refresh_with_retrieve_subject_token_error(self):
- request = self.make_mock_request(region_status=http_client.BAD_REQUEST)
- credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
- with pytest.raises(exceptions.RefreshError) as excinfo:
- credentials.refresh(request)
- assert excinfo.match(r"Unable to retrieve AWS region")
|