Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 24 additions & 51 deletions nettacker/core/utils/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,29 +191,20 @@ def find_args_value(args_name):
return None


def re_address_repeaters_key_name(key_name):
return "".join(["['" + _key + "']" for _key in key_name.split("/")[:-1]])
def set_nested_value(d, key_path, value):
keys = [k for k in key_path.split("/") if k]
for key in keys[:-1]:
d = d[key]
d[keys[-1]] = value


def generate_new_sub_steps(sub_steps, data_matrix, arrays):
original_sub_steps = copy.deepcopy(sub_steps)
steps_array = []
array_names = list(arrays.keys())
for array in data_matrix:
array_name_position = 0
for array_name in arrays:
for sub_step in sub_steps:
exec(
"original_sub_steps{key_name} = {matrix_value}".format(
key_name=re_address_repeaters_key_name(array_name),
matrix_value=(
'"' + str(array[array_name_position]) + '"'
if isinstance(array[array_name_position], int)
or isinstance(array[array_name_position], str)
else array[array_name_position]
),
)
)
array_name_position += 1
for i, array_name in enumerate(array_names):
set_nested_value(original_sub_steps, array_name, array[i])
steps_array.append(copy.deepcopy(original_sub_steps))
return steps_array

Expand All @@ -232,6 +223,7 @@ def find_repeaters(sub_content, root, arrays):
isinstance(sub_content, list) or "nettacker_fuzzer" in sub_content
):
arrays[root] = sub_content

return (sub_content, root, arrays) if root != "" else arrays


Expand Down Expand Up @@ -309,34 +301,23 @@ def fuzzer_function_read_file_as_array(filename):


def apply_data_functions(data):
def apply_data_functions_new():
original_data = copy.deepcopy(data)
for item in data:
if item not in AVAILABLE_DATA_FUNCTIONS:
return
continue

for fn_name in data[item]:
if fn_name in AVAILABLE_DATA_FUNCTIONS[item]:
fn = getattr(importlib.import_module("nettacker.core.fuzzer"), fn_name)
if fn is not None:
original_data[item] = fn(data[item][fn_name])

def apply_data_functions_old():
function_results = {}
globals().update(locals())
exec(
"fuzzer_function = {fuzzer_function}".format(fuzzer_function=data[item]),
globals(),
function_results,
)
original_data[item] = function_results["fuzzer_function"]
return original_data

original_data = copy.deepcopy(data)
for item in data:
if isinstance((data[item]), str) and data[item].startswith("fuzzer_function"):
apply_data_functions_old()
else:
apply_data_functions_new()

return original_data
ALLOWED_INTERCEPTORS = {
"generate_and_replace_md5": generate_and_replace_md5,
}


def fuzzer_repeater_perform(arrays):
Expand All @@ -361,24 +342,16 @@ def fuzzer_repeater_perform(arrays):
for value in sub_data:
formatted_data[list(data.keys())[index_input]] = value
index_input += 1
interceptors_function = ""
interceptors_function_processed = ""

interceptors_function_processed = input_format.format(**formatted_data)

if interceptors:
interceptors_function += "interceptors_function_processed = "
for interceptor in interceptors[::-1]:
interceptors_function += "{interceptor}(".format(interceptor=interceptor)
interceptors_function += "input_format.format(**formatted_data)" + str(
")" * interceptors_function.count("(")
)
expected_variables = {}
globals().update(locals())
exec(interceptors_function, globals(), expected_variables)
interceptors_function_processed = expected_variables[
"interceptors_function_processed"
]
else:
interceptors_function_processed = input_format.format(**formatted_data)
for interceptor in interceptors:
if interceptor not in ALLOWED_INTERCEPTORS:
raise ValueError(f"Interceptor '{interceptor}' is not allowed")
interceptors_function_processed = ALLOWED_INTERCEPTORS[interceptor](
interceptors_function_processed
)

processed_sub_data = interceptors_function_processed
if prefix:
Expand Down
109 changes: 109 additions & 0 deletions tests/core/utils/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import time
from unittest.mock import MagicMock, patch

import pytest

from nettacker.core.utils import common as common_utils


Expand Down Expand Up @@ -183,3 +185,110 @@ def quick():
result = common_utils.wait_for_threads_to_finish(threads)
assert result is True
assert len(threads) == 0


def test_set_nested_value_single_key():
d = {"a": 1}
common_utils.set_nested_value(d, "a/", "X")
assert d == {"a": "X"}


def test_set_nested_value_nested_path():
d = {"a": {"b": {"c": 0}}}
common_utils.set_nested_value(d, "a/b/c/", "Z")
assert d == {"a": {"b": {"c": "Z"}}}


def test_set_nested_value_ignores_empty_segments():
d = {"a": {"b": None}}
# Trailing slash -> last segment is empty and filtered out.
common_utils.set_nested_value(d, "a/b/", 42)
assert d["a"]["b"] == 42


def test_generate_new_sub_steps_substitutes_values_without_exec():
sub_steps = {"url": {"schema": None, "ports": None}}
arrays = {"url/schema/": ["http"], "url/ports/": [80]}
data_matrix = [["http", 80], ["https", 443]]

result = common_utils.generate_new_sub_steps(sub_steps, data_matrix, arrays)

assert result == [
{"url": {"schema": "http", "ports": 80}},
{"url": {"schema": "https", "ports": 443}},
]
# Caller's sub_steps must not be mutated.
assert sub_steps == {"url": {"schema": None, "ports": None}}


def test_apply_data_functions_skips_unknown_item():
data = {"unrelated": {"read_from_file": "x.txt"}}
assert common_utils.apply_data_functions(data) == data


def test_apply_data_functions_skips_unknown_function_name():
data = {"passwords": {"not_allowed_fn": "payload"}}
# Unknown function name is simply skipped, returning the original payload.
assert common_utils.apply_data_functions(data) == data


def test_apply_data_functions_invokes_allowed_function():
data = {"passwords": {"read_from_file": "pw.txt"}}
with patch("nettacker.core.fuzzer.read_from_file", return_value=["a", "b"], create=True):
result = common_utils.apply_data_functions(data)
assert result == {"passwords": ["a", "b"]}


def _fuzzer_arrays(interceptors):
return {
"url": {
"nettacker_fuzzer": {
"input_format": "http://host/{path}",
"prefix": "[",
"suffix": "]",
"interceptors": interceptors,
"data": {"path": ["a", "b"]},
}
}
}


def test_fuzzer_repeater_perform_no_interceptors():
result = common_utils.fuzzer_repeater_perform(_fuzzer_arrays(""))
assert result == {"url": ["[http://host/a]", "[http://host/b]"]}


def test_fuzzer_repeater_perform_allowed_interceptor_runs():
arrays = {
"url": {
"nettacker_fuzzer": {
"input_format": "NETTACKER_MD5_GENERATOR_START{seed}NETTACKER_MD5_GENERATOR_STOP",
"prefix": "",
"suffix": "",
"interceptors": "generate_and_replace_md5",
"data": {"seed": ["abc"]},
}
}
}
# This is basically: md5("abc")
assert common_utils.fuzzer_repeater_perform(arrays) == {
"url": ["900150983cd24fb0d6963f7d28e17f72"]
}


def test_fuzzer_repeater_perform_disallowed_interceptor_raises():
arrays = _fuzzer_arrays("os.system")
with pytest.raises(ValueError, match="os.system"):
common_utils.fuzzer_repeater_perform(arrays)
Comment thread
pUrGe12 marked this conversation as resolved.


def test_fuzzer_repeater_perform_unknown_interceptor_alongside_allowed_raises():
# Disallowed name listed first so we hit the allow-list check before any allowed
# interceptor mutates the value.
arrays = _fuzzer_arrays("__import__,generate_and_replace_md5")
with pytest.raises(ValueError, match="__import__"):
common_utils.fuzzer_repeater_perform(arrays)


def test_allowed_interceptors_registry_is_restricted():
assert set(common_utils.ALLOWED_INTERCEPTORS) == {"generate_and_replace_md5"}
Loading