diff --git a/changes/1064.added b/changes/1064.added new file mode 100644 index 000000000..07f477cfa --- /dev/null +++ b/changes/1064.added @@ -0,0 +1 @@ +Adding API remediation feature to support API dispatchers. diff --git a/docs/user/app_feature_remediation.md b/docs/user/app_feature_remediation.md index bc6e6c6fb..2cfbd70a4 100644 --- a/docs/user/app_feature_remediation.md +++ b/docs/user/app_feature_remediation.md @@ -1,6 +1,6 @@ # Navigating Configuration Remediation -Automated network configuration remediation is a systematic approach that leverages technology and processes to address and rectify configuration issues in network devices. +Automated network configuration remediation is a systematic approach that leverages technology and processes to address and rectify configuration issues in network devices. It involves the use of the Golden Configuration app to understand the current configuration state, compare it against the intended configuration state, and automatically generate remediation data. Automated network configuration remediation improves efficiency by eliminating manual efforts and reducing the risk of human errors. It enables rapid response to security vulnerabilities, minimizes downtime, and enhances compliance with regulatory and industry standards. @@ -48,6 +48,28 @@ Default Hier config options can be used or customized on a per platform basis, a For additional information on how to customize Hier Config options, please refer to the Hierarchical Configuration development guide: https://hier-config.readthedocs.io/en/latest/ +### API Remediation Type + +You can use the TYPE_API option to enable a device to use the API type of remediation. To use this, you would need to pass the settings +that the API request would use as config context. Here is an example using Cisco Meraki platform. + +```json +org_remediation: + - endpoint: "/organizations/{{ obj.get_config_context().get('organization_id', '')}}" + method: "PUT" + query: [] + fields: + - "name" +``` + +The way to create this is like this: + +- The high level key should be '**feature-name**\_remediation', in this case the feature is **org** + - endpoint: This is the endpoint you should call. You could pass jinja to the endpoint to dynamically create the endpoint. + - method: This is the HTTP method to use for the call. + - query: You add strings here, used as a filter if the endpoint supports it, like for example '?user=NTC' if you would like to filter a response searching for the NTC user. + - fields: This is also a list of strings, and it should hold the key names of the response you got from the device, to include that in the payload you will send to the device when you execute the Config Plan. In this example, we only want the "name" field from the response. + ### Custom Config Remediation Type When a Network Operating System delivers configuration data in a format that is not CLI/Hierarchical, we can still perform remediation by using the Custom Remediation options. Custom Remediation is defined within a Python function that takes as input a Configuration Compliance object and returns a Remediation Field. @@ -66,4 +88,4 @@ Once remediation settings are configured for a particular platform, remediation Once remediation is configured for a particular Platform/Feature pair, it is possible to validate remediation operations by running a compliance job. Navigate to **Jobs -> Perform Configuration Compliance** and run a compliance job for a device that has remediation enabled. Verify that remediation data has been generated by navigating to **Golden Config -> Config Compliance**, select the device and check the compliance status for the feature with remediation enabled and the "Remediating Configuration" field, as shown below: -![Validate Configuration Remediation](../images/remediation_validate_feature.png) \ No newline at end of file +![Validate Configuration Remediation](../images/remediation_validate_feature.png) diff --git a/nautobot_golden_config/choices.py b/nautobot_golden_config/choices.py index ef896fa1b..bbe51de72 100644 --- a/nautobot_golden_config/choices.py +++ b/nautobot_golden_config/choices.py @@ -21,10 +21,12 @@ class RemediationTypeChoice(ChoiceSet): """Choiceset used by RemediationSetting.""" TYPE_HIERCONFIG = "hierconfig" + TYPE_API = "api" TYPE_CUSTOM = "custom_remediation" CHOICES = ( (TYPE_HIERCONFIG, "HIERCONFIG"), + (TYPE_API, "API"), (TYPE_CUSTOM, "CUSTOM_REMEDIATION"), ) diff --git a/nautobot_golden_config/models.py b/nautobot_golden_config/models.py index 472e1b206..e84bdf016 100644 --- a/nautobot_golden_config/models.py +++ b/nautobot_golden_config/models.py @@ -3,6 +3,8 @@ import json import logging import os +from dataclasses import dataclass +from typing import Any from deepdiff import DeepDiff from django.core.exceptions import ValidationError @@ -24,6 +26,8 @@ from nautobot_golden_config.choices import ComplianceRuleConfigTypeChoice, ConfigPlanTypeChoice, RemediationTypeChoice from nautobot_golden_config.utilities.constant import ENABLE_SOTAGG, PLUGIN_CFG +# pylint: disable=too-many-lines + LOGGER = logging.getLogger(__name__) GRAPHQL_STR_START = "query ($device_id: ID!)" @@ -178,8 +182,7 @@ def _verify_get_custom_compliance_data(compliance_details): def _get_hierconfig_remediation(obj): - """ - Generate the remediation configuration for a device using HierConfig. + """Generate the remediation configuration for a device using HierConfig. This function determines the remediating configuration required to bring a device's actual configuration in line with its intended configuration, using the HierConfig library. It performs the following steps: @@ -238,12 +241,373 @@ def _get_hierconfig_remediation(obj): return remediation_config +@dataclass(frozen=True, slots=True) +class DictKey: + """Dict key dataclass. + + A wrapper class for dictionary keys to enable custom behavior or identification. + Primarily used to distinguish dictionary keys with list indexes. + + Attrs: + key (Any): The dictionary key. + """ + + key: Any + + def __str__(self) -> str: + """Return the string representation of the key. + + Returns: + str: The string representation of the key. + """ + return str(self.key) + + def __repr__(self) -> str: + """Return the official string representation of the DictKey. + + Returns: + str: The official string representation of the DictKey. + """ + return f"DictKey({self.key!r})" + + +def _wrap_dict_keys(obj: list[Any] | dict[Any, Any]) -> list[Any] | dict[Any, Any]: + """Recursively walk dicts and lists and wrap *only* dict keys as DictKey(key=). + + Scalars are returned unchanged. + + Args: + obj (list[Any] | dict[Any, Any]): The object to wrap. + + Returns: + list[Any] | dict[Any, Any]: The object with dict keys wrapped as DictKey, or the original scalar. + """ + if isinstance(obj, dict): + return {DictKey(k): _wrap_dict_keys(v) for k, v in obj.items()} + + if isinstance(obj, list): + return [_wrap_dict_keys(x) for x in obj] + return obj + + +def _create_deepdiff_object(actual: list[Any] | dict[Any, Any], intended: list[Any] | dict[Any, Any]) -> DeepDiff: + """Create a DeepDiff object. + + Args: + actual (list[Any] | dict[Any, Any]): Actual configuration. + intended (list[Any] | dict[Any, Any]): Intended configuration. + + Returns: + DeepDiff: DeepDiff object representing the differences. + """ + dd = DeepDiff( + t1=actual, + t2=intended, + view="tree", + verbose_level=2, + ) + return dd + + +class ApiRemediation: # pylint: disable=too-few-public-methods + """Remediation class for controllers.""" + + _DEEPCONFIG_CHANGE_TYPES: tuple[str, ...] = ( + "values_changed", + "dictionary_item_added", + "iterable_item_added", + ) + + def __init__( + self, + compliance_obj, + ) -> None: + """Initialize controller remediation. + + Args: + compliance_obj (ConfigCompliance): Golden Config Compliance object. + """ + self.compliance_obj = compliance_obj + self.feature_name: str = compliance_obj.rule.feature.name.lower() + self.intended_config: dict[str, Any] | str = compliance_obj.intended + self.backup_config: dict[str, Any] | str = compliance_obj.actual + + def _filter_allowed_params( # pylint: disable=too-many-branches + self, + feature_name: str, + config: dict[str, Any], + config_context: list[dict[str, Any]] | None, + ) -> dict[str, Any]: + """Filter allowed parameters and remove unwanted parameters. + + Args: + feature_name (str): Compliance feature name. + config (dict[str, Any]): Intended or actual config. + config_context (list[dict[str, Any]] | None): Device config context. + + Returns: + dict[str, Any]: Filtered config. + """ + if not config_context: + return {} + endpoint_fields: list[str] = [] + for endpoint in config_context: + if not endpoint.get("fields"): + return {} + endpoint_fields.extend(endpoint["fields"]) + + feature_value: Any = config.get(feature_name) + + if feature_value is None: + return {} + + if isinstance(feature_value, dict): + valid_payload_config: dict[str, Any] = {feature_name: {}} + for key, value in feature_value.items(): + if key in endpoint_fields: + valid_payload_config[feature_name][key] = value + return valid_payload_config + + if isinstance(feature_value, list): + valid_payload_config: dict[str, Any] = {feature_name: []} + for item in feature_value: + if not isinstance(item, dict): + continue + params_dict = {} + for key, value in item.items(): + if key in endpoint_fields: + params_dict[key] = value + if params_dict: + valid_payload_config[feature_name].append(params_dict) + return valid_payload_config + return {} + + def _prune_empty_containers(self, obj: list[Any] | dict[Any, Any]) -> list[Any] | dict[Any, Any]: + """Recursively remove empty dicts/lists while preserving None. + + This is appropriate for API payload generation where `None` (JSON null) is a valid value. + + Args: + obj (list[Any] | dict[Any, Any]): Arbitrary nested structure. + + Returns: + list[Any] | dict[Any, Any]: Structure with empty dict/list containers removed; None preserved. + """ + if isinstance(obj, dict): + cleaned: dict[Any, Any] = {} + for k, v in obj.items(): + cv = self._prune_empty_containers(v) + if cv not in ({}, []): + cleaned[k] = cv + return cleaned + + if isinstance(obj, list): + cleaned_list = [self._prune_empty_containers(x) for x in obj] + cleaned_list = [x for x in cleaned_list if x not in ({}, [])] + return cleaned_list or [] + + return obj + + def _process_diff( # pylint: disable=too-many-branches + self, + diff: dict[Any, Any], + path: tuple[Any, ...], + value: Any, + ) -> None: + """Populate a nested delta structure given a DeepDiff path and value. + + This method walks the supplied path tokens and creates intermediate dicts/lists + as required, then sets the final leaf node to `value`. + + Args: + diff (dict[Any, Any]): Delta dictionary being populated. + path (tuple[Any, ...]): Path tokens (DictKey, str, or int list indices). + value (Any): Value to assign at the leaf path. + + Raises: + TypeError: If an unexpected container type is encountered during traversal. + """ + cur: Any = diff + + for i, raw_key in enumerate(path): + is_last: bool = i == len(path) - 1 + next_key = None if is_last else path[i + 1] + + key = raw_key.key if isinstance(raw_key, DictKey) else raw_key + + if isinstance(key, int): + if not isinstance(cur, list): + err_msg: str = f"Expected list at path[{i}] (index {key}), got {type(cur)}" + raise TypeError(err_msg) + + while len(cur) <= key: + cur.append(None) + + if is_last: + cur[key] = value + return + + if cur[key] is None or not isinstance(cur[key], (dict, list)): + cur[key] = [] if isinstance(next_key, int) else {} + + cur = cur[key] + continue + + if isinstance(key, (str, float)): + if not isinstance(cur, dict): + err_msg: str = f"Expected dict at path[{i}] (key {key!r}), got {type(cur)}" + raise TypeError(err_msg) + + if is_last: + cur[key] = value + return + + if key not in cur or not isinstance(cur[key], (dict, list)): + cur[key] = [] if isinstance(next_key, int) else {} + + cur = cur[key] + continue + + err_msg: str = f"Unsupported key type at path[{i}]: {type(key)} ({key!r})" + raise TypeError(err_msg) + + def _apply_deepdiff_changes(self, delta: dict[Any, Any], changes: list[Any]) -> None: + """Apply DeepDiff change objects onto the delta payload using intended-side values (t2). + + Args: + delta (dict[Any, Any]): Delta payload being constructed. + changes (list[Any]): DeepDiff tree change objects. + """ + for change in changes: + if not hasattr(change, "t2"): + continue + + if not hasattr(change, "path"): + continue + + try: + tokens = change.path(output_format="list") + except (TypeError, AttributeError): + continue + + if tokens and tokens[0] == "root": + tokens = tokens[1:] + + self._process_diff( + diff=delta, + path=tuple(tokens), + value=change.t2, + ) + + def _clean_diff(self, diff: Any) -> list[Any] | dict[Any, Any]: + """Convert DeepDiff(tree) into an intended-shaped delta, then prune empty containers. + + If `diff` resembles DeepDiff(tree) output, a delta is built from intended-side values (t2). + Otherwise, the object is only pruned for empty dict/list containers. + + None values are preserved because they are meaningful in API payloads. + + Args: + diff (Any): DeepDiff(tree) result or empty dictionary. + + Returns: + list[Any] | dict[Any, Any]: Intended-shaped delta with empty containers removed (None preserved). + """ + if not isinstance(diff, dict): + return self._prune_empty_containers(obj=diff) + + if not any(k in diff for k in self._DEEPCONFIG_CHANGE_TYPES): + return self._prune_empty_containers(obj=diff) + + delta: dict[Any, Any] = {} + for change_type in self._DEEPCONFIG_CHANGE_TYPES: + self._apply_deepdiff_changes(delta=delta, changes=diff.get(change_type, []) or []) + return self._prune_empty_containers(obj=delta) + + def api_remediation(self) -> str: + """Generate the remediation payload for the current feature. + + Workflow: + 1. Load device config context and honor the "remediate_full_intended" flag. + 2. Filter intended/actual configs down to the fields allowed by the remediation context. + 3. Wrap dictionary keys with DictKey to preserve dict-key semantics in DeepDiff paths. + 4. Compute DeepDiff(tree) between actual (t1) and intended (t2). + 5. Convert DeepDiff output into an intended-shaped delta via `_clean_diff`. + 6. Return the remediation payload as JSON. + + Raises: + ValidationError: If remediation context is missing or intended/actual cannot be filtered. + + Returns: + str: JSON remediation payload (delta), or an empty string when there are no differences. + """ + config_context: dict[str, Any] = self.compliance_obj.device.get_config_context() + try: + if isinstance(self.backup_config, str): + self.backup_config = json.loads(self.backup_config) + if isinstance(self.intended_config, str): + self.intended_config = json.loads(self.intended_config) + except json.JSONDecodeError as exc: + err_msg: str = f"Invalid JSON config: {exc}" + raise ValidationError(err_msg) from exc + + if config_context.get("remediate_full_intended"): + return json.dumps(obj=self.intended_config, indent=4) + + intended: list[Any] | dict[Any, Any] = self._filter_allowed_params( + feature_name=self.feature_name, + config=self.intended_config, + config_context=config_context.get(f"{self.feature_name}_remediation"), + ) + actual: list[Any] | dict[Any, Any] = self._filter_allowed_params( + feature_name=self.feature_name, + config=self.backup_config, + config_context=config_context.get(f"{self.feature_name}_remediation"), + ) + + if not actual or not intended: + err_msg: str = "There was no config context fields that matched the intended or actual configuration." + raise ValidationError(err_msg) + + dict_key_intended: list[Any] | dict[Any, Any] = _wrap_dict_keys(obj=intended) + dict_key_actual: list[Any] | dict[Any, Any] = _wrap_dict_keys(obj=actual) + + dd: DeepDiff = _create_deepdiff_object( + actual=dict_key_actual, + intended=dict_key_intended, + ) + + if not dd: + return "" + + cleaned_diff: list[Any] | dict[Any, Any] = self._clean_diff(diff=dd) + + if not cleaned_diff: + return "" + + return json.dumps(cleaned_diff, indent=4) + + +def _get_api_remediation(obj) -> str: + """Generate the remediation configuration for a device using API. + + Args: + obj (ConfigCompliance): The ConfigCompliance instance. + + Returns: + str: The remediation configuration as a string. + """ + json_controller = ApiRemediation(compliance_obj=obj) + return json_controller.api_remediation() + + # The below maps the provided compliance types FUNC_MAPPER = { ComplianceRuleConfigTypeChoice.TYPE_CLI: _get_cli_compliance, ComplianceRuleConfigTypeChoice.TYPE_JSON: _get_json_compliance, ComplianceRuleConfigTypeChoice.TYPE_XML: _get_xml_compliance, RemediationTypeChoice.TYPE_HIERCONFIG: _get_hierconfig_remediation, + RemediationTypeChoice.TYPE_API: _get_api_remediation, } # The below conditionally add the custom provided compliance type for custom_function, custom_type in CUSTOM_FUNCTIONS.items(): diff --git a/nautobot_golden_config/tests/fixtures/remediation/dict_actual_config.json b/nautobot_golden_config/tests/fixtures/remediation/dict_actual_config.json new file mode 100644 index 000000000..12e4fa117 --- /dev/null +++ b/nautobot_golden_config/tests/fixtures/remediation/dict_actual_config.json @@ -0,0 +1,37 @@ +{ + "feature": { + "param1": "value1", + "param2": "DIFFERENT", + "param4": "extra", + "nested_dict": { + "a": 1, + "b": false, + "c": null, + "d": 2.71, + "e": [ + { "x": "foo", "y": 10 }, + { "x": "baz", "y": 99 } + ] + }, + "nested_list": [ + { + "id": 1, + "values": [1, 2, 3], + "flag": false + }, + { + "id": 2, + "values": [4, 5], + "flag": false + }, + { + "id": 3, + "values": [], + "flag": true + } + ], + "scalar_none": null, + "scalar_float": 2.718, + "scalar_bool": true + } +} diff --git a/nautobot_golden_config/tests/fixtures/remediation/dict_config_context.json b/nautobot_golden_config/tests/fixtures/remediation/dict_config_context.json new file mode 100644 index 000000000..126b840fb --- /dev/null +++ b/nautobot_golden_config/tests/fixtures/remediation/dict_config_context.json @@ -0,0 +1,14 @@ +[ + { + "fields": [ + "param1", + "param2", + "param3", + "nested_dict", + "nested_list", + "scalar_none", + "scalar_float", + "scalar_bool" + ] + } +] diff --git a/nautobot_golden_config/tests/fixtures/remediation/dict_intended_config.json b/nautobot_golden_config/tests/fixtures/remediation/dict_intended_config.json new file mode 100644 index 000000000..47665b04a --- /dev/null +++ b/nautobot_golden_config/tests/fixtures/remediation/dict_intended_config.json @@ -0,0 +1,32 @@ +{ + "feature": { + "param1": "value1", + "param2": "value2", + "param3": "value3", + "nested_dict": { + "a": 1, + "b": true, + "c": null, + "d": 3.14, + "e": [ + { "x": "foo", "y": 10 }, + { "x": "bar", "y": 20 } + ] + }, + "nested_list": [ + { + "id": 1, + "values": [1, 2, 3], + "flag": false + }, + { + "id": 2, + "values": [4, 5, 6], + "flag": true + } + ], + "scalar_none": null, + "scalar_float": 2.718, + "scalar_bool": false + } +} diff --git a/nautobot_golden_config/tests/fixtures/remediation/list_actual_config.json b/nautobot_golden_config/tests/fixtures/remediation/list_actual_config.json new file mode 100644 index 000000000..67169da03 --- /dev/null +++ b/nautobot_golden_config/tests/fixtures/remediation/list_actual_config.json @@ -0,0 +1,37 @@ +{ + "feature": [ + { + "id": 1, + "name": "alpha", + "active": false, + "values": [1, 2], + "meta": { + "score": 9.5, + "tags": ["a", "b", "x"], + "extra": null + } + }, + { + "id": 2, + "name": "beta", + "active": false, + "values": [4, 5], + "meta": { + "score": 6.5, + "tags": ["c", "d"], + "extra": null + } + }, + { + "id": 4, + "name": "delta", + "active": true, + "values": [7, 8], + "meta": { + "score": 1.0, + "tags": ["z"], + "extra": "new" + } + } + ] +} diff --git a/nautobot_golden_config/tests/fixtures/remediation/list_config_context.json b/nautobot_golden_config/tests/fixtures/remediation/list_config_context.json new file mode 100644 index 000000000..e5cc1a070 --- /dev/null +++ b/nautobot_golden_config/tests/fixtures/remediation/list_config_context.json @@ -0,0 +1,5 @@ +[ + { + "fields": ["id", "name", "active", "values", "meta"] + } +] diff --git a/nautobot_golden_config/tests/fixtures/remediation/list_intended_config.json b/nautobot_golden_config/tests/fixtures/remediation/list_intended_config.json new file mode 100644 index 000000000..144623da8 --- /dev/null +++ b/nautobot_golden_config/tests/fixtures/remediation/list_intended_config.json @@ -0,0 +1,37 @@ +{ + "feature": [ + { + "id": 1, + "name": "alpha", + "active": true, + "values": [1, 2, 3], + "meta": { + "score": 9.5, + "tags": ["a", "b"], + "extra": null + } + }, + { + "id": 2, + "name": "beta", + "active": false, + "values": [4, 5, 6], + "meta": { + "score": 7.0, + "tags": ["c"], + "extra": "something" + } + }, + { + "id": 3, + "name": "gamma", + "active": true, + "values": [], + "meta": { + "score": 0.0, + "tags": [], + "extra": null + } + } + ] +} diff --git a/nautobot_golden_config/tests/test_models.py b/nautobot_golden_config/tests/test_models.py index e94bb3c8d..3aa4ee3eb 100644 --- a/nautobot_golden_config/tests/test_models.py +++ b/nautobot_golden_config/tests/test_models.py @@ -1,6 +1,9 @@ """Unit tests for nautobot_golden_config models.""" -from unittest.mock import patch +import json +import pathlib +from typing import Any +from unittest.mock import MagicMock, patch from django.contrib.contenttypes.models import ContentType from django.core.exceptions import ValidationError @@ -11,13 +14,17 @@ from nautobot_golden_config.choices import RemediationTypeChoice from nautobot_golden_config.models import ( + ApiRemediation, ConfigCompliance, ConfigPlan, ConfigRemove, ConfigReplace, + DictKey, GoldenConfigSetting, RemediationSetting, + _create_deepdiff_object, _get_hierconfig_remediation, + _wrap_dict_keys, ) from nautobot_golden_config.tests.conftest import create_git_repos @@ -31,6 +38,8 @@ create_saved_queries, ) +# pylint: disable=protected-access + class ConfigComplianceModelTestCase(TestCase): """Test CRUD operations for ConfigCompliance Model.""" @@ -734,3 +743,189 @@ def test_hierconfig_instantiation_error(self, mock_workflow_remediation, mock_ge mock_get_hconfig.assert_called_once() # WorkflowRemediation should never be called since get_hconfig raises exception mock_workflow_remediation.assert_not_called() + + +def load_fixture(filename: str) -> Any: + """Load a JSON fixture from a file. + + Args: + filename (str): Path to the JSON file. + + Returns: + Any: The loaded JSON data. + """ + with pathlib.Path(filename).open(encoding="utf-8") as f: + return json.load(fp=f) + + +class TestDictKey(TestCase): + """Test cases for the DictKey class.""" + + def test_str_and_repr(self): + k = DictKey("foo") + self.assertEqual(str(k), "foo") + self.assertEqual(repr(k), "DictKey('foo')") + + +class TestWrapDictKeys(TestCase): + """Test cases for the _wrap_dict_keys function.""" + + def test_wrap_dict_keys(self): + obj = {"a": 1, "b": {"c": 2}, "d": [3, {"e": 4}]} + wrapped = _wrap_dict_keys(obj) + # Check top-level keys are DictKey + self.assertTrue(all(isinstance(k, DictKey) for k in wrapped.keys())) + # Check nested dict keys are DictKey + self.assertTrue(all(isinstance(k, DictKey) for k in wrapped[DictKey("b")].keys())) + # Check list elements are preserved + self.assertEqual(wrapped[DictKey("d")][0], 3) + self.assertTrue(isinstance(wrapped[DictKey("d")][1], dict)) + self.assertTrue(isinstance(list(wrapped[DictKey("d")][1].keys())[0], DictKey)) + + +class TestCreateDeepDiffObject(TestCase): + """Test cases for the _create_deepdiff_object function.""" + + def test_deepdiff_object(self): + a = {"foo": 1, "bar": 2} + b = {"foo": 1, "bar": 3} + dd = _create_deepdiff_object(a, b) + self.assertIn("values_changed", dd) + + +class TestApiRemediation(TestCase): + """Test ApiRemediation class using mocks and fixture files.""" + + @classmethod + def setUpClass(cls): + cls.base_fixtures_path = "nautobot_golden_config/tests/fixtures/remediation/" + cls.dict_intended_config = load_fixture(filename=f"{cls.base_fixtures_path}dict_intended_config.json") + cls.dict_actual_config = load_fixture(filename=f"{cls.base_fixtures_path}dict_actual_config.json") + cls.list_intended_config = load_fixture(filename=f"{cls.base_fixtures_path}list_intended_config.json") + cls.list_actual_config = load_fixture(filename=f"{cls.base_fixtures_path}list_actual_config.json") + cls.dict_config_context = load_fixture(filename=f"{cls.base_fixtures_path}dict_config_context.json") + cls.list_config_context = load_fixture(filename=f"{cls.base_fixtures_path}list_config_context.json") + super().setUpClass() + + def setUp(self): + rule = MagicMock() + rule.feature.name = "feature" + rule.config_type = "json" + device = MagicMock() + device.get_config_context.return_value = {"feature_remediation": self.dict_config_context} + compliance_obj = MagicMock() + compliance_obj.rule = rule + compliance_obj.device = device + compliance_obj.intended = self.dict_intended_config + compliance_obj.actual = self.dict_actual_config + self.compliance_obj = compliance_obj + self.api = ApiRemediation(self.compliance_obj) + + def test_dict_remediation_delta(self): + api = ApiRemediation(compliance_obj=self.compliance_obj) + payload = api.api_remediation() + self.assertTrue(isinstance(payload, str)) + if payload: + data = json.loads(payload) + self.assertIn("feature", data) + self.assertIsInstance(data["feature"], dict) + + def test_list_remediation_delta(self): + rule = MagicMock() + rule.feature.name = "feature" + rule.config_type = "json" + device = MagicMock() + device.get_config_context.return_value = {"feature_remediation": self.list_config_context} + compliance_obj = MagicMock() + compliance_obj.rule = rule + compliance_obj.device = device + compliance_obj.intended = self.list_intended_config + compliance_obj.actual = self.list_actual_config + self.compliance_obj = compliance_obj + api = ApiRemediation(compliance_obj=self.compliance_obj) + payload = api.api_remediation() + self.assertTrue(isinstance(payload, str)) + if payload: + data = json.loads(payload) + self.assertIn("feature", data) + self.assertIsInstance(data["feature"], list) + + def test_clean_diff(self): + a = {"feature": {"x": 1, "y": 0}} + b = {"feature": {"x": 1, "y": 2}} + dd = _create_deepdiff_object(a, b) + cleaned = self.api._clean_diff(dd) + self.assertIn("feature", cleaned) + self.assertIn("y", cleaned["feature"]) + + def test_api_remediation_delta(self): + payload = self.api.api_remediation() + # Should be a JSON string with only changed fields + if payload: + data = json.loads(payload) + self.assertIn("feature", data) + # The actual changed fields depend on the fixture content + self.assertIsInstance(data["feature"], dict) + + def test_dict_remediation_full_intended(self): + # If remediate_full_intended is True, should return full intended config + rule = MagicMock() + rule.feature.name = "feature" + rule.config_type = "json" + device = MagicMock() + device.get_config_context.return_value = {"remediate_full_intended": True} + compliance_obj = MagicMock() + compliance_obj.rule = rule + compliance_obj.device = device + compliance_obj.intended = self.dict_intended_config + compliance_obj.actual = self.dict_actual_config + api = ApiRemediation(compliance_obj) + payload = api.api_remediation() + self.assertEqual(json.loads(payload), self.dict_intended_config) + + def test_list_remediation_full_intended(self): + # If remediate_full_intended is True, should return full intended config + rule = MagicMock() + rule.feature.name = "feature" + rule.config_type = "json" + device = MagicMock() + device.get_config_context.return_value = {"remediate_full_intended": True} + compliance_obj = MagicMock() + compliance_obj.rule = rule + compliance_obj.device = device + compliance_obj.intended = self.list_intended_config + compliance_obj.actual = self.list_actual_config + api = ApiRemediation(compliance_obj) + payload = api.api_remediation() + self.assertEqual(json.loads(payload), self.list_intended_config) + + def test_api_remediation_no_context(self): + rule = MagicMock() + rule.feature.name = "feature" + rule.config_type = "json" + device = MagicMock() + device.get_config_context.return_value = {} + compliance_obj = MagicMock() + compliance_obj.rule = rule + compliance_obj.device = device + compliance_obj.intended = {"feature": {"x": 1}} + compliance_obj.actual = {"feature": {"x": 0}} + api = ApiRemediation(compliance_obj) + with self.assertRaises(ValidationError): + api.api_remediation() + + def test_api_remediation_no_diff(self): + # Should return empty string if no diff + rule = MagicMock() + rule.feature.name = "feature" + rule.config_type = "json" + device = MagicMock() + device.get_config_context.return_value = {"feature_remediation": self.dict_config_context} + compliance_obj = MagicMock() + compliance_obj.rule = rule + compliance_obj.device = device + compliance_obj.intended = {"feature": {"x": 1}} + compliance_obj.actual = {"feature": {"x": 1}} + api = ApiRemediation(compliance_obj) + payload = api.api_remediation() + self.assertEqual(payload, "")