Skip to content

Commit 9611ad4

Browse files
committed
Separate CPU-bound thread pool from I/O executor
1 parent 9b87b2c commit 9611ad4

6 files changed

Lines changed: 30 additions & 16 deletions

File tree

bbot/core/helpers/helper.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,8 @@ def loop(self):
212212
# during heavy scans where YARA, regex, DNS, and HTTP all compete for threads
213213
thread_pool_size = max(32, (os.cpu_count() or 1) * 4)
214214
self._loop.set_default_executor(ThreadPoolExecutor(max_workers=thread_pool_size))
215+
# separate pool for CPU-bound work (YARA, regex) so it never queues behind I/O
216+
self._cpu_executor = ThreadPoolExecutor(max_workers=max(8, os.cpu_count() or 4))
215217
return self._loop
216218

217219
def run_in_executor(self, callback, *args, **kwargs):
@@ -225,6 +227,18 @@ def run_in_executor(self, callback, *args, **kwargs):
225227
callback = partial(callback, **kwargs)
226228
return self.loop.run_in_executor(None, callback, *args)
227229

230+
def run_in_executor_cpu(self, callback, *args, **kwargs):
231+
"""
232+
Run short CPU-bound work that releases the GIL in a dedicated thread pool,
233+
separate from I/O so it never queues behind long-running network calls.
234+
235+
Examples:
236+
Execute callback:
237+
>>> result = await self.helpers.run_in_executor_cpu(callback_fn, arg1, arg2)
238+
"""
239+
callback = partial(callback, **kwargs)
240+
return self.loop.run_in_executor(self._cpu_executor, callback, *args)
241+
228242
def run_in_executor_mp(self, callback, *args, **kwargs):
229243
"""
230244
Same as run_in_executor() except with a process pool executor

bbot/core/helpers/regex.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,19 +29,19 @@ def compile(self, *args, **kwargs):
2929

3030
async def search(self, compiled_regex, *args, **kwargs):
3131
self.ensure_compiled_regex(compiled_regex)
32-
return await self.parent_helper.run_in_executor(compiled_regex.search, *args, **kwargs)
32+
return await self.parent_helper.run_in_executor_cpu(compiled_regex.search, *args, **kwargs)
3333

3434
async def match(self, compiled_regex, *args, **kwargs):
3535
self.ensure_compiled_regex(compiled_regex)
36-
return await self.parent_helper.run_in_executor(compiled_regex.match, *args, **kwargs)
36+
return await self.parent_helper.run_in_executor_cpu(compiled_regex.match, *args, **kwargs)
3737

3838
async def sub(self, compiled_regex, *args, **kwargs):
3939
self.ensure_compiled_regex(compiled_regex)
40-
return await self.parent_helper.run_in_executor(compiled_regex.sub, *args, **kwargs)
40+
return await self.parent_helper.run_in_executor_cpu(compiled_regex.sub, *args, **kwargs)
4141

4242
async def findall(self, compiled_regex, *args, **kwargs):
4343
self.ensure_compiled_regex(compiled_regex)
44-
return await self.parent_helper.run_in_executor(compiled_regex.findall, *args, **kwargs)
44+
return await self.parent_helper.run_in_executor_cpu(compiled_regex.findall, *args, **kwargs)
4545

4646
async def findall_multi(self, compiled_regexes, *args, threads=10, **kwargs):
4747
"""
@@ -55,7 +55,7 @@ async def findall_multi(self, compiled_regexes, *args, threads=10, **kwargs):
5555
tasks = {}
5656

5757
def new_task(regex_name, r):
58-
task = self.parent_helper.run_in_executor(r.findall, *args, **kwargs)
58+
task = self.parent_helper.run_in_executor_cpu(r.findall, *args, **kwargs)
5959
tasks[task] = regex_name
6060

6161
compiled_regexes = dict(compiled_regexes)
@@ -77,15 +77,15 @@ def new_task(regex_name, r):
7777

7878
async def finditer(self, compiled_regex, *args, **kwargs):
7979
self.ensure_compiled_regex(compiled_regex)
80-
return await self.parent_helper.run_in_executor(self._finditer, compiled_regex, *args, **kwargs)
80+
return await self.parent_helper.run_in_executor_cpu(self._finditer, compiled_regex, *args, **kwargs)
8181

8282
async def finditer_multi(self, compiled_regexes, *args, **kwargs):
8383
"""
8484
Same as finditer() but with multiple regexes
8585
"""
8686
for r in compiled_regexes:
8787
self.ensure_compiled_regex(r)
88-
return await self.parent_helper.run_in_executor(self._finditer_multi, compiled_regexes, *args, **kwargs)
88+
return await self.parent_helper.run_in_executor_cpu(self._finditer_multi, compiled_regexes, *args, **kwargs)
8989

9090
def _finditer_multi(self, compiled_regexes, *args, **kwargs):
9191
matches = []
@@ -98,16 +98,16 @@ def _finditer(self, compiled_regex, *args, **kwargs):
9898
return list(compiled_regex.finditer(*args, **kwargs))
9999

100100
async def extract_params_html(self, *args, **kwargs):
101-
return await self.parent_helper.run_in_executor(misc.extract_params_html, *args, **kwargs)
101+
return await self.parent_helper.run_in_executor_cpu(misc.extract_params_html, *args, **kwargs)
102102

103103
async def extract_emails(self, *args, **kwargs):
104-
return await self.parent_helper.run_in_executor(misc.extract_emails, *args, **kwargs)
104+
return await self.parent_helper.run_in_executor_cpu(misc.extract_emails, *args, **kwargs)
105105

106106
async def search_dict_values(self, *args, **kwargs):
107107
def _search_dict_values(*_args, **_kwargs):
108108
return list(misc.search_dict_values(*_args, **_kwargs))
109109

110-
return await self.parent_helper.run_in_executor(_search_dict_values, *args, **kwargs)
110+
return await self.parent_helper.run_in_executor_cpu(_search_dict_values, *args, **kwargs)
111111

112112
async def recursive_decode(self, *args, **kwargs):
113-
return await self.parent_helper.run_in_executor(misc.recursive_decode, *args, **kwargs)
113+
return await self.parent_helper.run_in_executor_cpu(misc.recursive_decode, *args, **kwargs)

bbot/core/helpers/yara_helper.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ async def match(self, compiled_rules, text):
4040
Given a compiled YARA rule and a body of text, return a list of strings that match the rule
4141
"""
4242
matched_strings = []
43-
matches = await self.parent_helper.run_in_executor(compiled_rules.match, data=text)
43+
matches = await self.parent_helper.run_in_executor_cpu(compiled_rules.match, data=text)
4444
if matches:
4545
for match in matches:
4646
for string_match in match.strings:

bbot/modules/dnsbrute_mutations.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ async def handle_event(self, event):
4444

4545
async def get_parent_event(self, subdomain):
4646
start = time.time()
47-
parent_host = await self.helpers.run_in_executor(self.helpers.closest_match, subdomain, self.parent_events)
47+
parent_host = await self.helpers.run_in_executor_cpu(self.helpers.closest_match, subdomain, self.parent_events)
4848
elapsed = time.time() - start
4949
self.trace(f"{subdomain}: got closest match among {len(self.parent_events):,} parent events in {elapsed:.2f}s")
5050
return self.parent_events[parent_host]

bbot/modules/internal/excavate.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1145,7 +1145,7 @@ async def search(self, data, event, content_type, discovery_context="HTTP respon
11451145

11461146
for label, data_instance in data_items:
11471147
# Your existing processing code
1148-
for result in await self.helpers.run_in_executor(self.yara_rules.match, data=f"{data_instance}"):
1148+
for result in await self.helpers.run_in_executor_cpu(self.yara_rules.match, data=f"{data_instance}"):
11491149
rule_name = result.rule
11501150

11511151
# Skip specific operations for 'parameter_extraction' rule on decoded_data

bbot/scanner/scanner.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1193,7 +1193,7 @@ async def dns_yara_rules(self):
11931193
if self.dns_yara_rules_uncompiled is not None:
11941194
import yara
11951195

1196-
self._dns_yara_rules = await self.helpers.run_in_executor(
1196+
self._dns_yara_rules = await self.helpers.run_in_executor_cpu(
11971197
yara.compile, source="\n".join(self.dns_yara_rules_uncompiled.values())
11981198
)
11991199
return self._dns_yara_rules
@@ -1209,7 +1209,7 @@ async def extract_in_scope_hostnames(self, s):
12091209
matches = set()
12101210
dns_yara_rules = await self.dns_yara_rules()
12111211
if dns_yara_rules is not None:
1212-
for match in await self.helpers.run_in_executor(dns_yara_rules.match, data=s):
1212+
for match in await self.helpers.run_in_executor_cpu(dns_yara_rules.match, data=s):
12131213
for string in match.strings:
12141214
for instance in string.instances:
12151215
matches.add(str(instance))

0 commit comments

Comments
 (0)