-
Notifications
You must be signed in to change notification settings - Fork 24
Expand file tree
/
Copy pathschedule.py
More file actions
173 lines (132 loc) · 5.28 KB
/
schedule.py
File metadata and controls
173 lines (132 loc) · 5.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
#!/usr/bin/env python
# When we generate an experiment folder (FOLDER) with exp.py script on local machine,
# with K generated bash scripts, we can use this schedule.py to run all these scripts
# with N of them in parallel. The others (assuming N << K) will be started as soon as
# one current running job is completed
import random
import sys
import os
import subprocess
import time
import platform
import pathlib
import psutil
# Note: here we for flush on ALL prints, otherwise we would end up with messed up logs
if len(sys.argv) != 4:
print("Usage:\nschedule.py <N> <FOLDER> <TIMEOUT_MINUTES>", flush=True)
exit(1)
# The number of jobs to run in parallel
N = int(sys.argv[1])
if N < 1:
print("Invalid value for N: " + str(N), flush=True)
exit(1)
# Location of experiment folder
FOLDER = sys.argv[2]
TIMEOUT_MINUTES = int(sys.argv[3])
if TIMEOUT_MINUTES < 1:
print("Invalid value for TIMEOUT_MINUTES: " + str(TIMEOUT_MINUTES), flush=True)
SHELL = platform.system() == 'Windows'
SCRIPTS_FOLDER = pathlib.PurePath(FOLDER).as_posix()
def checkDocker():
try:
# Run 'docker info' command to check if Docker is running
result = subprocess.run(
['docker', 'info'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# If the command failed, Docker is likely not running
if result.returncode != 0:
print("Docker is not running. Error:", result.stderr, file=sys.stderr, flush=True)
sys.exit(1)
# Unfortunately it seems by default Docker has very low network count...
# So must make sure to clean up any un-used ones.
# Had issues where previous experiments did not clean up properly, and all new failed for
# lack of available networks
print("Going to prune all unused networks ('docker network prune -f').", result.stderr, file=sys.stderr, flush=True)
result = subprocess.run(
['docker', 'network', 'prune', '-f'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
if result.returncode != 0:
print("Failed to prune networks. Error:", result.stderr, file=sys.stderr, flush=True)
sys.exit(1)
return True
except FileNotFoundError:
print("Error: Docker is not installed or not in PATH", file=sys.stderr, flush=True)
sys.exit(1)
except Exception as e:
print(f"Unexpected error checking Docker: {str(e)}", file=sys.stderr, flush=True)
sys.exit(1)
checkDocker()
buffer = []
#collect name of all bash files
scripts = [f for f in os.listdir(SCRIPTS_FOLDER) if os.path.isfile(os.path.join(SCRIPTS_FOLDER, f)) and f.endswith(".sh")]
print("There are " + str(len(scripts)) + " Bash script files", flush=True)
random.shuffle(scripts)
k = 1
def runScript(s):
global k
print("Running script " + str(k)+ "/"+ str(len(scripts)) +": " + s, flush=True)
k = k + 1
command = ["bash", s]
handler = subprocess.Popen(command, shell=SHELL, cwd=SCRIPTS_FOLDER, start_new_session=True)
buffer.append(handler)
def killProcess(h):
print("Terminating process.", flush=True)
parent = psutil.Process(h.pid)
children = parent.children(recursive=True)
# Graceful terminate
for p in children:
p.terminate()
parent.terminate()
gone, alive = psutil.wait_procs(children + [parent], timeout=10)
# Force kill remaining
for p in alive:
print(f"Force killing PID {p.pid}")
p.kill()
h.wait()
########################################################################################################################
last_start = time.time()
for s in scripts:
if len(buffer) < N:
last_start = time.time()
runScript(s)
else:
while len(buffer) == N:
for h in buffer:
h.poll()
if h.returncode is not None and h.returncode != 0:
print("Process terminated with code: " + str(h.returncode), flush=True)
# keep the ones running... those have return code not set yet
buffer = [h for h in buffer if h.returncode is None]
if len(buffer) == N :
# all running in buffer... but has any timeout?
# TODO for simplicity we just check latest added... so timeout is not enforced for ALL jobs.
# however, note that internally the jobs have their own timeouts... these here are just extra checks
elapsed_time = time.time() - last_start
if elapsed_time > TIMEOUT_MINUTES * 60:
killProcess(buffer[0])
# wait before checking again
time.sleep(5)
else:
last_start = time.time()
runScript(s)
break
print("Waiting for last scripts to end", flush=True)
budget = TIMEOUT_MINUTES * 60
for h in buffer:
start = time.time()
try:
h.wait(budget)
if h.returncode != 0:
print("Process terminated with code: " + str(h.returncode), flush=True)
except subprocess.TimeoutExpired:
print("Timeout reached.", flush=True)
killProcess(h)
elapsed = time.time() - start
budget = max(0, budget - elapsed)
print("All jobs are completed", flush=True)