Skip to content
Open
75 changes: 24 additions & 51 deletions nettacker/core/utils/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,29 +179,20 @@ def find_args_value(args_name):
return None


def re_address_repeaters_key_name(key_name):
return "".join(["['" + _key + "']" for _key in key_name.split("/")[:-1]])
def set_nested_value(d, key_path, value):
keys = [k for k in key_path.split("/") if k]
for key in keys[:-1]:
d = d[key]
d[keys[-1]] = value


def generate_new_sub_steps(sub_steps, data_matrix, arrays):
original_sub_steps = copy.deepcopy(sub_steps)
steps_array = []
array_names = list(arrays.keys())
for array in data_matrix:
array_name_position = 0
for array_name in arrays:
for sub_step in sub_steps:
exec(
"original_sub_steps{key_name} = {matrix_value}".format(
key_name=re_address_repeaters_key_name(array_name),
matrix_value=(
'"' + str(array[array_name_position]) + '"'
if isinstance(array[array_name_position], int)
or isinstance(array[array_name_position], str)
else array[array_name_position]
),
)
)
array_name_position += 1
for i, array_name in enumerate(array_names):
set_nested_value(original_sub_steps, array_name, array[i])
steps_array.append(copy.deepcopy(original_sub_steps))
return steps_array

Expand All @@ -220,6 +211,7 @@ def find_repeaters(sub_content, root, arrays):
isinstance(sub_content, list) or "nettacker_fuzzer" in sub_content
):
arrays[root] = sub_content

return (sub_content, root, arrays) if root != "" else arrays


Expand Down Expand Up @@ -297,34 +289,23 @@ def fuzzer_function_read_file_as_array(filename):


def apply_data_functions(data):
def apply_data_functions_new():
original_data = copy.deepcopy(data)
for item in data:
if item not in AVAILABLE_DATA_FUNCTIONS:
return
continue

for fn_name in data[item]:
if fn_name in AVAILABLE_DATA_FUNCTIONS[item]:
fn = getattr(importlib.import_module("nettacker.core.fuzzer"), fn_name)
if fn is not None:
original_data[item] = fn(data[item][fn_name])

def apply_data_functions_old():
function_results = {}
globals().update(locals())
exec(
"fuzzer_function = {fuzzer_function}".format(fuzzer_function=data[item]),
globals(),
function_results,
)
original_data[item] = function_results["fuzzer_function"]
return original_data

original_data = copy.deepcopy(data)
for item in data:
if isinstance((data[item]), str) and data[item].startswith("fuzzer_function"):
apply_data_functions_old()
else:
apply_data_functions_new()

return original_data
ALLOWED_INTERCEPTORS = {
"generate_and_replace_md5": generate_and_replace_md5,
}


def fuzzer_repeater_perform(arrays):
Expand All @@ -349,24 +330,16 @@ def fuzzer_repeater_perform(arrays):
for value in sub_data:
formatted_data[list(data.keys())[index_input]] = value
index_input += 1
interceptors_function = ""
interceptors_function_processed = ""

interceptors_function_processed = input_format.format(**formatted_data)

if interceptors:
interceptors_function += "interceptors_function_processed = "
for interceptor in interceptors[::-1]:
interceptors_function += "{interceptor}(".format(interceptor=interceptor)
interceptors_function += "input_format.format(**formatted_data)" + str(
")" * interceptors_function.count("(")
)
expected_variables = {}
globals().update(locals())
exec(interceptors_function, globals(), expected_variables)
interceptors_function_processed = expected_variables[
"interceptors_function_processed"
]
else:
interceptors_function_processed = input_format.format(**formatted_data)
for interceptor in interceptors:
if interceptor not in ALLOWED_INTERCEPTORS:
raise ValueError(f"Interceptor '{interceptor}' is not allowed")
interceptors_function_processed = ALLOWED_INTERCEPTORS[interceptor](
interceptors_function_processed
)

processed_sub_data = interceptors_function_processed
if prefix:
Expand Down
Loading