diff --git a/changelog/8272-aws-iam-authentication-strategy.yaml b/changelog/8272-aws-iam-authentication-strategy.yaml new file mode 100644 index 00000000000..bd27b5e4775 --- /dev/null +++ b/changelog/8272-aws-iam-authentication-strategy.yaml @@ -0,0 +1,4 @@ +type: Added +description: Added aws_iam authentication strategy for SaaS connectors, supporting AWS Signature V4 signing with static credentials and STS AssumeRole with automatic credential caching +pr: 8272 +labels: [] diff --git a/src/fides/api/schemas/saas/strategy_configuration.py b/src/fides/api/schemas/saas/strategy_configuration.py index e5e8b92f4e9..9f727df3acd 100644 --- a/src/fides/api/schemas/saas/strategy_configuration.py +++ b/src/fides/api/schemas/saas/strategy_configuration.py @@ -198,3 +198,29 @@ class GoogleCloudServiceAccountConfiguration(StrategyConfiguration): "'https://www.googleapis.com/auth/devstorage.read_write' for Cloud Storage, " ), ) + + +class AWSIAMAuthenticationConfiguration(StrategyConfiguration): + """ + Configuration for AWS IAM (Signature V4) authentication. + + Signs HTTP requests using AWS credentials so they can be sent to + IAM-protected endpoints such as API Gateway with IAM authorization. + Supports both static credentials and STS AssumeRole. + """ + + region: Optional[str] = Field( + default=None, + description=( + "AWS region for signing requests (e.g. 'us-east-1'). " + "If not specified, the region is resolved from the connector secrets " + "('aws_region') or inferred from the API Gateway endpoint hostname." + ), + ) + service: str = Field( + default="execute-api", + description=( + "The AWS service name used for Signature V4 signing. " + "Defaults to 'execute-api' for API Gateway." + ), + ) diff --git a/src/fides/api/service/authentication/__init__.py b/src/fides/api/service/authentication/__init__.py index 96200b7b9cc..a30488d3a31 100644 --- a/src/fides/api/service/authentication/__init__.py +++ b/src/fides/api/service/authentication/__init__.py @@ -1,5 +1,6 @@ from fides.api.service.authentication import ( authentication_strategy_api_key, + authentication_strategy_aws_iam, authentication_strategy_basic, authentication_strategy_bearer, authentication_strategy_google_cloud_service_account, diff --git a/src/fides/api/service/authentication/authentication_strategy_aws_iam.py b/src/fides/api/service/authentication/authentication_strategy_aws_iam.py new file mode 100644 index 00000000000..f6677a97ffb --- /dev/null +++ b/src/fides/api/service/authentication/authentication_strategy_aws_iam.py @@ -0,0 +1,250 @@ +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, NoReturn, Optional +from urllib.parse import urlparse + +from botocore.auth import SigV4Auth +from botocore.awsrequest import AWSRequest +from botocore.credentials import Credentials +from botocore.exceptions import ClientError, NoCredentialsError +from loguru import logger +from requests import PreparedRequest +from sqlalchemy.orm import Session + +from fides.api.common_exceptions import FidesopsException +from fides.api.models.connectionconfig import ConnectionConfig +from fides.api.schemas.saas.strategy_configuration import ( + AWSIAMAuthenticationConfiguration, + StrategyConfiguration, +) +from fides.api.service.authentication.authentication_strategy import ( + AuthenticationStrategy, +) +from fides.api.util.logger import Pii + +TOKEN_REFRESH_BUFFER_SECONDS = 300 + + +class AWSIAMAuthenticationStrategy(AuthenticationStrategy): + """ + Authenticates HTTP requests using AWS IAM (Signature V4). + + Supports two modes: + - AssumeRole: Customer provides an IAM Role ARN. Fides assumes the role + via STS to get temporary credentials, then signs requests with SigV4. + - Static keys: Customer provides AWS access key ID and secret access key + directly. + + Designed for authenticating against AWS API Gateway endpoints protected + by IAM authorization. + """ + + name = "aws_iam" + configuration_model = AWSIAMAuthenticationConfiguration + + def __init__(self, configuration: AWSIAMAuthenticationConfiguration): + self.aws_region = configuration.region + self.service = configuration.service + + def add_authentication( + self, request: PreparedRequest, connection_config: ConnectionConfig + ) -> PreparedRequest: + credentials = self._get_credentials(connection_config) + region = self._resolve_region(request.url, connection_config) + + aws_request = AWSRequest( + method=request.method, + url=request.url, + headers=dict(request.headers) if request.headers else {}, + data=request.body or "", + ) + + SigV4Auth(credentials, self.service, region).add_auth(aws_request) + + request.headers.update(dict(aws_request.headers)) + return request + + def _get_credentials(self, connection_config: ConnectionConfig) -> Credentials: + secrets = connection_config.secrets + if not secrets: + raise FidesopsException( + "Secrets are not configured for this connector. " + "AWS IAM authentication requires either an assume_role_arn " + "or aws_access_key_id and aws_secret_access_key." + ) + + assume_role_arn = secrets.get("aws_assume_role_arn") + if assume_role_arn: + return self._get_assumed_role_credentials(secrets, connection_config) + + access_key_id = secrets.get("aws_access_key_id") + secret_access_key = secrets.get("aws_secret_access_key") + if not access_key_id or not secret_access_key: + raise FidesopsException( + "AWS IAM authentication requires either 'aws_assume_role_arn' " + "or both 'aws_access_key_id' and 'aws_secret_access_key'." + ) + session_token = secrets.get("aws_session_token") + return Credentials(access_key_id, secret_access_key, session_token) + + def _get_assumed_role_credentials( + self, + secrets: Dict[str, Any], + connection_config: ConnectionConfig, + ) -> Credentials: + cached_key = secrets.get("aws_iam_access_key_id") + cached_secret = secrets.get("aws_iam_secret_access_key") + cached_token = secrets.get("aws_iam_session_token") + cached_expiry = secrets.get("aws_iam_credentials_expire_at") + + if cached_key and cached_secret and cached_token and cached_expiry: + if not self._is_close_to_expiration(cached_expiry): + return Credentials(cached_key, cached_secret, cached_token) + + return self._refresh_assumed_role_credentials(secrets, connection_config) + + def _refresh_assumed_role_credentials( + self, + secrets: Dict[str, Any], + connection_config: ConnectionConfig, + ) -> Credentials: + import boto3 + + assume_role_arn = secrets["aws_assume_role_arn"] + + logger.info( + "Assuming AWS IAM role for {}", + connection_config.key, + ) + + try: + access_key_id = secrets.get("aws_access_key_id") + secret_access_key = secrets.get("aws_secret_access_key") + + if access_key_id and secret_access_key: + session = boto3.Session( + aws_access_key_id=access_key_id, + aws_secret_access_key=secret_access_key, + aws_session_token=secrets.get("aws_session_token"), + ) + else: + session = boto3.Session() + + sts_client = session.client("sts") + response = sts_client.assume_role( + RoleArn=assume_role_arn, + RoleSessionName="FidesSaaSConnectorSession", + ) + + temp_creds = response["Credentials"] + access_key = temp_creds["AccessKeyId"] + secret_key = temp_creds["SecretAccessKey"] + session_token = temp_creds["SessionToken"] + expiration = temp_creds["Expiration"] + + expires_at = int(expiration.timestamp()) + self._store_credentials( + connection_config, access_key, secret_key, session_token, expires_at + ) + + logger.info( + "Successfully assumed AWS IAM role for {}", + connection_config.key, + ) + + return Credentials(access_key, secret_key, session_token) + + except (ClientError, NoCredentialsError) as exc: + self._handle_credential_error(exc, connection_config) + + def _resolve_region( + self, url: Optional[str], connection_config: ConnectionConfig + ) -> str: + if self.aws_region: + return self.aws_region + + secrets = connection_config.secrets or {} + region_from_secrets = secrets.get("aws_region") + if region_from_secrets: + return region_from_secrets + + if url: + parsed = urlparse(url) + hostname = parsed.hostname or "" + parts = hostname.split(".") + if len(parts) >= 4 and parts[-2] == "amazonaws" and parts[-1] == "com": + return parts[-3] + + return "us-east-1" + + def _is_close_to_expiration(self, expires_at: int) -> bool: + buffer_time = datetime.now(timezone.utc) + timedelta( + seconds=TOKEN_REFRESH_BUFFER_SECONDS + ) + return expires_at < buffer_time.timestamp() + + def _store_credentials( + self, + connection_config: ConnectionConfig, + access_key_id: str, + secret_access_key: str, + session_token: str, + expires_at: int, + ) -> None: + db: Optional[Session] = Session.object_session(connection_config) + if db is None: + logger.warning( + "Unable to cache AWS IAM credentials for {} - no database session available", + connection_config.key, + ) + return + + updated_secrets = { + **(connection_config.secrets or {}), + "aws_iam_access_key_id": access_key_id, + "aws_iam_secret_access_key": secret_access_key, + "aws_iam_session_token": session_token, + "aws_iam_credentials_expire_at": expires_at, + } + connection_config.update(db, data={"secrets": updated_secrets}) + logger.debug( + "Cached AWS IAM credentials for {} (expires at {})", + connection_config.key, + datetime.fromtimestamp(expires_at, tz=timezone.utc).isoformat(), + ) + + def _handle_credential_error( + self, exc: Exception, connection_config: ConnectionConfig + ) -> NoReturn: + error_msg = str(exc) + logger.error( + "Error assuming AWS IAM role for {}: {}", + connection_config.key, + Pii(error_msg), + ) + + if isinstance(exc, NoCredentialsError): + user_message = ( + "No AWS credentials found. Provide either aws_access_key_id " + "and aws_secret_access_key, or ensure the Fides environment " + "has AWS credentials configured (e.g. via instance profile)." + ) + elif isinstance(exc, ClientError): + error_code = exc.response.get("Error", {}).get("Code", "") + if error_code == "AccessDenied": + user_message = ( + "Access denied when assuming the IAM role. Verify that " + "the role's trust policy allows Fides to assume it and " + "that the provided credentials have sts:AssumeRole permission." + ) + elif error_code in ("MalformedPolicyDocument", "PackedPolicyTooLarge"): + user_message = f"IAM role configuration error: {error_code}. Check the role ARN and trust policy." + else: + user_message = f"AWS STS error ({error_code}): {error_msg}" + else: + user_message = f"Failed to assume AWS IAM role: {error_msg}" + + raise FidesopsException(user_message) from exc + + @staticmethod + def get_configuration_model() -> StrategyConfiguration: + return AWSIAMAuthenticationConfiguration # type: ignore diff --git a/src/fides/api/service/authentication/authentication_strategy_factory.py b/src/fides/api/service/authentication/authentication_strategy_factory.py index 408c5009a32..0c03e544ca2 100644 --- a/src/fides/api/service/authentication/authentication_strategy_factory.py +++ b/src/fides/api/service/authentication/authentication_strategy_factory.py @@ -24,6 +24,9 @@ from fides.api.service.authentication.authentication_strategy_oauth2_client_credentials import ( OAuth2ClientCredentialsAuthenticationStrategy, ) +from fides.api.service.authentication.authentication_strategy_aws_iam import ( + AWSIAMAuthenticationStrategy, +) from fides.api.service.authentication.authentication_strategy_query_param import ( QueryParamAuthenticationStrategy, ) @@ -40,6 +43,7 @@ class SupportedAuthenticationStrategies(Enum): oauth2_authorization_code = OAuth2AuthorizationCodeAuthenticationStrategy oauth2_client_credentials = OAuth2ClientCredentialsAuthenticationStrategy google_cloud_service_account = GoogleCloudServiceAccountAuthenticationStrategy + aws_iam = AWSIAMAuthenticationStrategy @classmethod def __contains__(cls, item: str) -> bool: diff --git a/tests/ops/service/authentication/test_authentication_strategy_aws_iam.py b/tests/ops/service/authentication/test_authentication_strategy_aws_iam.py new file mode 100644 index 00000000000..36b504e9b71 --- /dev/null +++ b/tests/ops/service/authentication/test_authentication_strategy_aws_iam.py @@ -0,0 +1,458 @@ +from datetime import datetime, timedelta, timezone +from unittest.mock import MagicMock, Mock, patch + +import pytest +from botocore.exceptions import ClientError, NoCredentialsError +from requests import Request + +from fides.api.common_exceptions import FidesopsException +from fides.api.models.connectionconfig import ConnectionConfig +from fides.api.service.authentication.authentication_strategy import ( + AuthenticationStrategy, +) +from fides.api.service.authentication.authentication_strategy_aws_iam import ( + TOKEN_REFRESH_BUFFER_SECONDS, + AWSIAMAuthenticationStrategy, +) + + +@pytest.fixture +def static_key_secrets(): + return { + "aws_access_key_id": "AKIAIOSFODNN7EXAMPLE", + "aws_secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", + "aws_region": "us-west-2", + } + + +@pytest.fixture +def assume_role_secrets(): + return { + "aws_assume_role_arn": "arn:aws:iam::123456789012:role/CustomerFidesRole", + "aws_region": "us-east-1", + } + + +@pytest.fixture +def static_key_connection_config(static_key_secrets): + return ConnectionConfig( + key="aws_iam_test_connector", + secrets=static_key_secrets, + ) + + +@pytest.fixture +def assume_role_connection_config(assume_role_secrets): + return ConnectionConfig( + key="aws_iam_assume_role_connector", + secrets=assume_role_secrets, + ) + + +class TestStrategyConfiguration: + def test_strategy_registered(self): + strategy = AuthenticationStrategy.get_strategy("aws_iam", {}) + assert isinstance(strategy, AWSIAMAuthenticationStrategy) + + def test_default_service(self): + strategy = AuthenticationStrategy.get_strategy("aws_iam", {}) + assert strategy.service == "execute-api" + + def test_custom_service(self): + strategy = AuthenticationStrategy.get_strategy( + "aws_iam", {"service": "lambda"} + ) + assert strategy.service == "lambda" + + def test_custom_region(self): + strategy = AuthenticationStrategy.get_strategy( + "aws_iam", {"region": "eu-west-1"} + ) + assert strategy.aws_region == "eu-west-1" + + def test_default_region_is_none(self): + strategy = AuthenticationStrategy.get_strategy("aws_iam", {}) + assert strategy.aws_region is None + + +class TestCredentialResolution: + def test_missing_secrets(self): + connection_config = ConnectionConfig(key="test", secrets=None) + req = Request( + method="GET", + url="https://abc123.execute-api.us-east-1.amazonaws.com/prod/resource", + ).prepare() + + strategy = AuthenticationStrategy.get_strategy("aws_iam", {}) + + with pytest.raises(FidesopsException) as exc: + strategy.add_authentication(req, connection_config) + assert "Secrets are not configured" in str(exc.value) + + def test_missing_both_role_and_keys(self): + connection_config = ConnectionConfig( + key="test", secrets={"aws_region": "us-east-1"} + ) + req = Request( + method="GET", + url="https://abc123.execute-api.us-east-1.amazonaws.com/prod/resource", + ).prepare() + + strategy = AuthenticationStrategy.get_strategy("aws_iam", {}) + + with pytest.raises(FidesopsException) as exc: + strategy.add_authentication(req, connection_config) + assert "aws_assume_role_arn" in str(exc.value) + + def test_static_keys_sign_request(self, static_key_connection_config): + req = Request( + method="GET", + url="https://abc123.execute-api.us-west-2.amazonaws.com/prod/users", + ).prepare() + + strategy = AuthenticationStrategy.get_strategy("aws_iam", {}) + authenticated_request = strategy.add_authentication( + req, static_key_connection_config + ) + + assert "Authorization" in authenticated_request.headers + assert "AWS4-HMAC-SHA256" in authenticated_request.headers["Authorization"] + assert "X-Amz-Date" in authenticated_request.headers + + def test_static_keys_with_session_token(self): + connection_config = ConnectionConfig( + key="test", + secrets={ + "aws_access_key_id": "AKIAIOSFODNN7EXAMPLE", + "aws_secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", + "aws_session_token": "FwoGZXIvYXdzEBYaDH...", + "aws_region": "us-east-1", + }, + ) + req = Request( + method="GET", + url="https://abc123.execute-api.us-east-1.amazonaws.com/prod/users", + ).prepare() + + strategy = AuthenticationStrategy.get_strategy("aws_iam", {}) + authenticated_request = strategy.add_authentication(req, connection_config) + + assert "X-Amz-Security-Token" in authenticated_request.headers + + +class TestAssumeRole: + @patch("fides.api.service.authentication.authentication_strategy_aws_iam.boto3") + def test_assume_role_signs_request( + self, mock_boto3, assume_role_connection_config + ): + future_expiry = datetime.now(timezone.utc) + timedelta(hours=1) + mock_sts = MagicMock() + mock_sts.assume_role.return_value = { + "Credentials": { + "AccessKeyId": "ASIA_TEMP_KEY", + "SecretAccessKey": "temp_secret", + "SessionToken": "temp_session_token", + "Expiration": future_expiry, + } + } + mock_session = MagicMock() + mock_session.client.return_value = mock_sts + mock_boto3.Session.return_value = mock_session + + req = Request( + method="GET", + url="https://abc123.execute-api.us-east-1.amazonaws.com/prod/users", + ).prepare() + + strategy = AuthenticationStrategy.get_strategy("aws_iam", {}) + authenticated_request = strategy.add_authentication( + req, assume_role_connection_config + ) + + assert "Authorization" in authenticated_request.headers + assert "AWS4-HMAC-SHA256" in authenticated_request.headers["Authorization"] + mock_sts.assume_role.assert_called_once_with( + RoleArn="arn:aws:iam::123456789012:role/CustomerFidesRole", + RoleSessionName="FidesSaaSConnectorSession", + ) + + @patch("fides.api.service.authentication.authentication_strategy_aws_iam.boto3") + def test_assume_role_with_base_credentials(self, mock_boto3): + connection_config = ConnectionConfig( + key="test", + secrets={ + "aws_assume_role_arn": "arn:aws:iam::123456789012:role/SomeRole", + "aws_access_key_id": "AKIAIOSFODNN7EXAMPLE", + "aws_secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", + "aws_region": "us-east-1", + }, + ) + + future_expiry = datetime.now(timezone.utc) + timedelta(hours=1) + mock_sts = MagicMock() + mock_sts.assume_role.return_value = { + "Credentials": { + "AccessKeyId": "ASIA_TEMP", + "SecretAccessKey": "temp_secret", + "SessionToken": "temp_token", + "Expiration": future_expiry, + } + } + mock_session = MagicMock() + mock_session.client.return_value = mock_sts + mock_boto3.Session.return_value = mock_session + + req = Request( + method="GET", + url="https://abc123.execute-api.us-east-1.amazonaws.com/prod/users", + ).prepare() + + strategy = AuthenticationStrategy.get_strategy("aws_iam", {}) + strategy.add_authentication(req, connection_config) + + mock_boto3.Session.assert_called_once_with( + aws_access_key_id="AKIAIOSFODNN7EXAMPLE", + aws_secret_access_key="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", + aws_session_token=None, + ) + + @patch("fides.api.service.authentication.authentication_strategy_aws_iam.boto3") + def test_assume_role_access_denied(self, mock_boto3, assume_role_connection_config): + mock_sts = MagicMock() + mock_sts.assume_role.side_effect = ClientError( + {"Error": {"Code": "AccessDenied", "Message": "Not authorized"}}, + "AssumeRole", + ) + mock_session = MagicMock() + mock_session.client.return_value = mock_sts + mock_boto3.Session.return_value = mock_session + + req = Request( + method="GET", + url="https://abc123.execute-api.us-east-1.amazonaws.com/prod/users", + ).prepare() + + strategy = AuthenticationStrategy.get_strategy("aws_iam", {}) + + with pytest.raises(FidesopsException) as exc: + strategy.add_authentication(req, assume_role_connection_config) + assert "Access denied" in str(exc.value) + + @patch("fides.api.service.authentication.authentication_strategy_aws_iam.boto3") + def test_assume_role_no_credentials( + self, mock_boto3, assume_role_connection_config + ): + mock_boto3.Session.side_effect = NoCredentialsError() + + req = Request( + method="GET", + url="https://abc123.execute-api.us-east-1.amazonaws.com/prod/users", + ).prepare() + + strategy = AuthenticationStrategy.get_strategy("aws_iam", {}) + + with pytest.raises(FidesopsException) as exc: + strategy.add_authentication(req, assume_role_connection_config) + assert "No AWS credentials found" in str(exc.value) + + +class TestCredentialCaching: + def test_uses_cached_credentials_when_valid(self, assume_role_secrets): + future_expiry = int( + (datetime.now(timezone.utc) + timedelta(hours=1)).timestamp() + ) + connection_config = ConnectionConfig( + key="test", + secrets={ + **assume_role_secrets, + "aws_iam_access_key_id": "ASIA_CACHED", + "aws_iam_secret_access_key": "cached_secret", + "aws_iam_session_token": "cached_token", + "aws_iam_credentials_expire_at": future_expiry, + }, + ) + + req = Request( + method="GET", + url="https://abc123.execute-api.us-east-1.amazonaws.com/prod/users", + ).prepare() + + strategy = AuthenticationStrategy.get_strategy("aws_iam", {}) + authenticated_request = strategy.add_authentication(req, connection_config) + + assert "Authorization" in authenticated_request.headers + assert "AWS4-HMAC-SHA256" in authenticated_request.headers["Authorization"] + + @patch( + "fides.api.service.authentication.authentication_strategy_aws_iam.AWSIAMAuthenticationStrategy._refresh_assumed_role_credentials" + ) + def test_refreshes_credentials_when_close_to_expiration( + self, mock_refresh, assume_role_secrets + ): + from botocore.credentials import Credentials + + mock_refresh.return_value = Credentials( + "ASIA_NEW", "new_secret", "new_token" + ) + + close_expiry = int( + ( + datetime.now(timezone.utc) + + timedelta(seconds=TOKEN_REFRESH_BUFFER_SECONDS - 60) + ).timestamp() + ) + connection_config = ConnectionConfig( + key="test", + secrets={ + **assume_role_secrets, + "aws_iam_access_key_id": "ASIA_OLD", + "aws_iam_secret_access_key": "old_secret", + "aws_iam_session_token": "old_token", + "aws_iam_credentials_expire_at": close_expiry, + }, + ) + + req = Request( + method="GET", + url="https://abc123.execute-api.us-east-1.amazonaws.com/prod/users", + ).prepare() + + strategy = AuthenticationStrategy.get_strategy("aws_iam", {}) + strategy.add_authentication(req, connection_config) + + mock_refresh.assert_called_once() + + +class TestRegionResolution: + def test_region_from_configuration(self, static_key_secrets): + connection_config = ConnectionConfig( + key="test", secrets=static_key_secrets + ) + req = Request( + method="GET", + url="https://abc123.execute-api.us-east-1.amazonaws.com/prod/users", + ).prepare() + + strategy = AuthenticationStrategy.get_strategy( + "aws_iam", {"region": "ap-southeast-1"} + ) + authenticated_request = strategy.add_authentication(req, connection_config) + + auth_header = authenticated_request.headers["Authorization"] + assert "ap-southeast-1" in auth_header + + def test_region_from_secrets(self, static_key_secrets): + connection_config = ConnectionConfig( + key="test", secrets=static_key_secrets + ) + req = Request( + method="GET", + url="https://custom-domain.example.com/users", + ).prepare() + + strategy = AuthenticationStrategy.get_strategy("aws_iam", {}) + authenticated_request = strategy.add_authentication(req, connection_config) + + auth_header = authenticated_request.headers["Authorization"] + assert "us-west-2" in auth_header + + def test_region_inferred_from_url(self): + connection_config = ConnectionConfig( + key="test", + secrets={ + "aws_access_key_id": "AKIAIOSFODNN7EXAMPLE", + "aws_secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", + }, + ) + req = Request( + method="GET", + url="https://abc123.execute-api.eu-central-1.amazonaws.com/prod/users", + ).prepare() + + strategy = AuthenticationStrategy.get_strategy("aws_iam", {}) + authenticated_request = strategy.add_authentication(req, connection_config) + + auth_header = authenticated_request.headers["Authorization"] + assert "eu-central-1" in auth_header + + def test_region_falls_back_to_us_east_1(self): + connection_config = ConnectionConfig( + key="test", + secrets={ + "aws_access_key_id": "AKIAIOSFODNN7EXAMPLE", + "aws_secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", + }, + ) + req = Request( + method="GET", + url="https://custom-domain.example.com/users", + ).prepare() + + strategy = AuthenticationStrategy.get_strategy("aws_iam", {}) + authenticated_request = strategy.add_authentication(req, connection_config) + + auth_header = authenticated_request.headers["Authorization"] + assert "us-east-1" in auth_header + + +class TestRequestSigning: + def test_preserves_existing_headers(self, static_key_connection_config): + req = Request( + method="GET", + url="https://abc123.execute-api.us-west-2.amazonaws.com/prod/users", + headers={"Content-Type": "application/json", "X-Custom": "value"}, + ).prepare() + + strategy = AuthenticationStrategy.get_strategy("aws_iam", {}) + authenticated_request = strategy.add_authentication( + req, static_key_connection_config + ) + + assert authenticated_request.headers["Content-Type"] == "application/json" + assert authenticated_request.headers["X-Custom"] == "value" + assert "Authorization" in authenticated_request.headers + + def test_signs_post_with_body(self, static_key_connection_config): + req = Request( + method="POST", + url="https://abc123.execute-api.us-west-2.amazonaws.com/prod/users", + json={"email": "test@example.com"}, + ).prepare() + + strategy = AuthenticationStrategy.get_strategy("aws_iam", {}) + authenticated_request = strategy.add_authentication( + req, static_key_connection_config + ) + + assert "Authorization" in authenticated_request.headers + assert "AWS4-HMAC-SHA256" in authenticated_request.headers["Authorization"] + + def test_service_name_in_signature(self, static_key_connection_config): + req = Request( + method="GET", + url="https://abc123.execute-api.us-west-2.amazonaws.com/prod/users", + ).prepare() + + strategy = AuthenticationStrategy.get_strategy("aws_iam", {}) + authenticated_request = strategy.add_authentication( + req, static_key_connection_config + ) + + auth_header = authenticated_request.headers["Authorization"] + assert "execute-api" in auth_header + + def test_custom_service_name(self, static_key_connection_config): + req = Request( + method="GET", + url="https://abc123.execute-api.us-west-2.amazonaws.com/prod/users", + ).prepare() + + strategy = AuthenticationStrategy.get_strategy( + "aws_iam", {"service": "lambda"} + ) + authenticated_request = strategy.add_authentication( + req, static_key_connection_config + ) + + auth_header = authenticated_request.headers["Authorization"] + assert "lambda" in auth_header