From 4ca1e2cb1b9fa5b9edf0d9248a2649d422a2bae4 Mon Sep 17 00:00:00 2001 From: BelKed <66956532+BelKed@users.noreply.github.com> Date: Sun, 8 Mar 2026 06:57:23 +0100 Subject: [PATCH] Improve Betamax cassette sanitization --- test/__init__.py | 4 +- test/cassette_sanitizer.py | 133 ++++++++++++++++++++++++++ test/helpers.py | 30 ------ tests_new/conftest.py | 33 +------ tests_new/test_cassette_sanitizer.py | 137 +++++++++++++++++++++++++++ 5 files changed, 274 insertions(+), 63 deletions(-) create mode 100644 test/cassette_sanitizer.py delete mode 100644 test/helpers.py create mode 100644 tests_new/test_cassette_sanitizer.py diff --git a/test/__init__.py b/test/__init__.py index bf67bbb..f10e80a 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -10,7 +10,7 @@ from pycaching.errors import Error from pycaching.geocaching import Geocaching -from .helpers import sanitize_cookies +from .cassette_sanitizer import sanitize_betamax_interaction username = os.environ.get("PYCACHING_TEST_USERNAME") or "USERNAMEPLACEHOLDER" password = os.environ.get("PYCACHING_TEST_PASSWORD") or "PASSWORDPLACEHOLDER" @@ -25,7 +25,7 @@ config.cassette_library_dir = str(cassette_dir) config.define_cassette_placeholder("", quote_plus(username)) config.define_cassette_placeholder("", quote_plus(password)) -config.before_record(callback=sanitize_cookies) +config.before_record(callback=sanitize_betamax_interaction) Betamax.register_serializer(PrettyJSONSerializer) diff --git a/test/cassette_sanitizer.py b/test/cassette_sanitizer.py new file mode 100644 index 0000000..4c23eab --- /dev/null +++ b/test/cassette_sanitizer.py @@ -0,0 +1,133 @@ +"""Shared Betamax cassette sanitization helpers for recorded fixtures. + +The suite records authenticated traffic, so cassettes may capture user-specific +values that are irrelevant to assertions: auth tokens, page bootstrap user +metadata, ASP.NET hidden fields, and home-location coordinates in URLs. +This module keeps the scrubbing rules in one place for both unittest and +pytest-based Betamax setups. +""" + +import re + +from betamax.cassette.cassette import Placeholder + +CLASSIFIED_COOKIES = ( + "gspkauth", + "__RequestVerificationToken", + "jwt", +) + +# Each rule must contain exactly one capture group with the sensitive value. +# Variables are exported for use in tests that assert on placeholder values. +PLACEHOLDER_RULES = { + # This bootstrap object is not parsed by library code, so replacing its + # contents wholesale keeps fixtures smaller and hides a lot of account data. + "": (re.compile(r"window\['chromeSettings'\]\s*=\s*\{([\s\S]*?)\};"),), + "": ( + re.compile(r"__RequestVerificationToken=([^&\"\s]+)"), + re.compile(r'"__RequestVerificationToken"\s*:\s*"([^"]+)"'), + re.compile(r'name="__RequestVerificationToken"[^>]*value="([^"]+)"'), + ), + "": ( + re.compile(r'name="__VIEWSTATE"[^>]*value="([^"]+)"'), + re.compile(r"__VIEWSTATE=([^&\"\s]+)"), + ), + "": ( + re.compile(r'name="__VIEWSTATE1"[^>]*value="([^"]+)"'), + re.compile(r"__VIEWSTATE1=([^&\"\s]+)"), + ), + "": ( + re.compile(r'name="__VIEWSTATEGENERATOR"[^>]*value="([^"]+)"'), + re.compile(r"__VIEWSTATEGENERATOR=([^&\"\s]+)"), + ), + "": ( + re.compile(r'"username"\s*:\s*"([^"]+)"'), + re.compile(r'"Username"\s*:\s*"([^"]+)"'), + ), + "": (re.compile(r'"(?:publicGuid|PublicGuid|userPublicGuid)"\s*:\s*"([^"]+)"'),), + "": ( + re.compile(r'"accountId"\s*:\s*(\d+)'), + re.compile(r'"gcUser"\s*:\s*\{[\s\S]*?"id"\s*:\s*(\d+)'), + ), + "": ( + re.compile(r'"referenceCode"\s*:\s*"(P[A-Z0-9]+)"'), + re.compile(r"window\['userRef'\]\s*=\s*'([^']+)'"), + re.compile(r'"userRef"\s*:\s*"([^"]+)"'), + ), + "": ( + re.compile(r"\buserToken\s*=\s*'([^']+)'"), + re.compile(r'"userToken"\s*:\s*"([^"]+)"'), + re.compile(r"([?&]tkn=)([^&\"\s]+)"), + ), + "": (re.compile(r'"(?:homeLocation|HomeLocation)"\s*:\s*"([^"]+)"'),), + "": ( + re.compile(r'"(?:homeCoords|HomeCoords)"\s*:\s*"([^"]+)"'), + re.compile(r"(?:[?&;]saddr=)(-?\d+(?:\.\d+)?(?:%2C|,)-?\d+(?:\.\d+)?)"), + ), + "": (re.compile(r'"dateCreated"\s*:\s*"([^"]+)"'),), + "": (re.compile(r'"clientIpCoordinate"\s*:\s*(\{[^}]+\})'),), +} + + +def sanitize_betamax_interaction(interaction, cassette): + """Register placeholders for sensitive values found in one Betamax interaction.""" + _collect_cookie_placeholders(interaction, cassette) + + for text in _iter_interaction_texts(interaction): + for placeholder, patterns in PLACEHOLDER_RULES.items(): + for pattern in patterns: + for value in pattern.findall(text): + if isinstance(value, tuple): + value = value[-1] + _add_placeholder(cassette, placeholder, value) + + +def _collect_cookie_placeholders(interaction, cassette): + response = interaction.as_response() + response_cookies = response.cookies + request_cookies = {} + response_headers = interaction.data.get("response", {}).get("headers", {}) + response_set_cookies = response_headers.get("Set-Cookie", []) + + for cookie in (response.request.headers.get("Cookie") or "").split("; "): + name, sep, value = cookie.partition("=") + if sep: + request_cookies[name] = value + + for name in CLASSIFIED_COOKIES: + _add_placeholder(cassette, "", response_cookies.get(name)) + _add_placeholder(cassette, "", request_cookies.get(name)) + for header in response_set_cookies: + match = re.search(rf"(?:^|[;,]\s*){re.escape(name)}=([^;,\s]+)", header) + if match: + _add_placeholder(cassette, "", match.group(1)) + + +def _iter_interaction_texts(interaction): + for obj, key in (("request", "uri"), ("response", "url")): + value = interaction.data.get(obj, {}).get(key) + if value: + yield value + + for obj in ("request", "response"): + headers = interaction.data.get(obj, {}).get("headers", {}) + for value in headers.values(): + if isinstance(value, list): + yield "\n".join(value) + elif value: + yield value + + body = interaction.data.get(obj, {}).get("body", "") + value = body.get("string") if isinstance(body, dict) else body + if value: + yield value + + +def _add_placeholder(cassette, placeholder, value): + if not value or value.startswith("<"): + return + + if any(item.placeholder == placeholder and item.replace == value for item in cassette.placeholders): + return + + cassette.placeholders.append(Placeholder(placeholder=placeholder, replace=value)) diff --git a/test/helpers.py b/test/helpers.py deleted file mode 100644 index be7eabc..0000000 --- a/test/helpers.py +++ /dev/null @@ -1,30 +0,0 @@ -from betamax.cassette.cassette import Placeholder - -CLASSIFIED_COOKIES = ( - "gspkauth", - "__RequestVerificationToken", - "jwt", # NOTE: JWT token, contains user related informations: username, ids, oauth token -) - - -def sanitize_cookies(interaction, cassette): - response = interaction.as_response() - response_cookies = response.cookies - request_cookies = dict() - for cookie in (interaction.as_response().request.headers.get("Cookie") or "").split("; "): - name, sep, val = cookie.partition("=") - if sep: - request_cookies[name] = val - - secret_values = set() - for name in CLASSIFIED_COOKIES: - potential_val = response_cookies.get(name) - if potential_val: - secret_values.add(potential_val) - - potential_val = request_cookies.get(name) - if potential_val: - secret_values.add(potential_val) - - for val in secret_values: - cassette.placeholders.append(Placeholder(placeholder="", replace=val)) diff --git a/tests_new/conftest.py b/tests_new/conftest.py index 9305138..610a9a3 100644 --- a/tests_new/conftest.py +++ b/tests_new/conftest.py @@ -1,11 +1,11 @@ import os from pathlib import Path +from test.cassette_sanitizer import sanitize_betamax_interaction from urllib.parse import quote_plus import pytest import requests from betamax import Betamax -from betamax.cassette.cassette import Placeholder from betamax_serializers.pretty_json import PrettyJSONSerializer from pycaching.geocaching import Geocaching @@ -13,11 +13,6 @@ USERNAME = os.environ.get("PYCACHING_TEST_USERNAME") or "USERNAMEPLACEHOLDER" PASSWORD = os.environ.get("PYCACHING_TEST_PASSWORD") or "PASSWORDPLACEHOLDER" COOKIE = os.environ.get("PYCACHING_TEST_COOKIE") -CLASSIFIED_COOKIES = ( - "gspkauth", - "__RequestVerificationToken", - "jwt", # NOTE: JWT token, contains user related informations: username, ids, oauth token -) CASSETTE_DIR = Path(__file__).parent / "cassettes" @@ -29,7 +24,7 @@ def betamax_config(): _betamax_config.cassette_library_dir = str(CASSETTE_DIR) _betamax_config.define_cassette_placeholder("", quote_plus(USERNAME)) _betamax_config.define_cassette_placeholder("", quote_plus(PASSWORD)) - _betamax_config.before_record(callback=_sanitize_betamax_cookies) + _betamax_config.before_record(callback=sanitize_betamax_interaction) _betamax_config.default_cassette_options["serialize_with"] = "prettyjson" @@ -58,27 +53,3 @@ def geocaching_logged_in(betamax_session: requests.Session): else: gc.login(USERNAME, PASSWORD) return gc - - -def _sanitize_betamax_cookies(interaction, cassette): - # TODO handle also request body occurence of __RequestVerificationToken - response = interaction.as_response() - response_cookies = response.cookies - request_cookies = dict() - for cookie in (interaction.as_response().request.headers.get("Cookie") or "").split("; "): - name, sep, val = cookie.partition("=") - if sep: - request_cookies[name] = val - - secret_values = set() - for name in CLASSIFIED_COOKIES: - potential_val = response_cookies.get(name) - if potential_val: - secret_values.add(potential_val) - - potential_val = request_cookies.get(name) - if potential_val: - secret_values.add(potential_val) - - for val in secret_values: - cassette.placeholders.append(Placeholder(placeholder="", replace=val)) diff --git a/tests_new/test_cassette_sanitizer.py b/tests_new/test_cassette_sanitizer.py new file mode 100644 index 0000000..0b229ea --- /dev/null +++ b/tests_new/test_cassette_sanitizer.py @@ -0,0 +1,137 @@ +import json +import secrets +from test.cassette_sanitizer import sanitize_betamax_interaction +from types import SimpleNamespace +from uuid import uuid4 + +import pytest +import requests + + +@pytest.fixture(autouse=True) +def betamax_forgotten_recording_env_vars_fuse(): + """Override the autouse fuse; this unit test does not record network traffic.""" + + +def test_sanitize_betamax_interaction_collects_requested_placeholders(): + auth_cookie = secrets.token_urlsafe(16) + rotated_auth_cookie = secrets.token_urlsafe(16) + request_token = secrets.token_urlsafe(18) + response_token = secrets.token_urlsafe(20) + viewstate = secrets.token_urlsafe(24) + viewstate_generator = secrets.token_hex(4).upper() + username = "user_" + secrets.token_hex(4) + user_guid = str(uuid4()) + user_id = str(secrets.randbelow(90_000_000) + 10_000_000) + user_code = "PR" + secrets.token_hex(3).upper() + user_token = secrets.token_urlsafe(40) + user_created_date = "2011-11-11T11:11:11" + membership_level = "1" + locale = "en-US" + date_format = "dd.MM.yyyy" + home_coords = "12.123456,12.123456" + client_ip_coordinate = json.dumps({"latitude": 12.123456, "longitude": 12.123456}) + next_data = json.dumps( + { + "props": { + "pageProps": { + "gcUser": { + "id": int(user_id), + "username": username, + "publicGuid": user_guid, + "referenceCode": user_code, + "dateCreated": user_created_date, + "locale": locale, + "membershipLevel": int(membership_level), + "dateFormat": date_format, + "clientIpCoordinate": json.loads(client_ip_coordinate), + } + } + } + } + ) + + interaction = SimpleNamespace( + data={ + "request": { + "uri": "https://www.geocaching.com/seek/geocache.logbook?tkn=TOKEN", + "headers": { + "Cookie": ["gspkauth={}; __RequestVerificationToken={}".format(auth_cookie, request_token)], + }, + "body": { + "string": "__RequestVerificationToken={}&__VIEWSTATE={}".format(request_token, viewstate), + }, + }, + "response": { + "url": "https://www.geocaching.com/cache?saddr={}".format(home_coords), + "headers": { + "Set-Cookie": [ + "gspkauth={}; path=/; secure; HttpOnly".format(rotated_auth_cookie), + ], + }, + "body": { + "string": """ + + + + + """.format( + response_token=response_token, + viewstate_generator=viewstate_generator, + user_id=user_id, + username=username, + user_guid=user_guid, + user_code=user_code, + user_token=user_token, + home_coords=home_coords, + next_data=next_data, + ), + }, + }, + } + ) + + response = SimpleNamespace( + cookies=requests.cookies.cookiejar_from_dict( + {"gspkauth": auth_cookie, "__RequestVerificationToken": response_token} + ), + request=SimpleNamespace( + headers={"Cookie": "gspkauth={}; __RequestVerificationToken={}".format(auth_cookie, request_token)} + ), + ) + interaction.as_response = lambda: response + + cassette = SimpleNamespace(placeholders=[]) + + sanitize_betamax_interaction(interaction, cassette) + + placeholders = {(item.placeholder, item.replace) for item in cassette.placeholders} + chrome_settings_values = [value for placeholder, value in placeholders if placeholder == ""] + assert len(chrome_settings_values) == 1 + assert user_id in chrome_settings_values[0] + assert username in chrome_settings_values[0] + assert user_guid in chrome_settings_values[0] + assert user_code in chrome_settings_values[0] + assert home_coords in chrome_settings_values[0] + assert ("", auth_cookie) in placeholders + assert ("", rotated_auth_cookie) in placeholders + assert ("", request_token) in placeholders + assert ("", response_token) in placeholders + assert ("", viewstate) in placeholders + assert ("", viewstate_generator) in placeholders + assert ("", username) in placeholders + assert ("", user_guid) in placeholders + assert ("", user_id) in placeholders + assert ("", user_code) in placeholders + assert ("", user_token) in placeholders + assert ("", user_created_date) in placeholders + assert ("", client_ip_coordinate) in placeholders + assert ("", home_coords) in placeholders