-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfile_block_monitor.py
More file actions
417 lines (344 loc) Β· 17.4 KB
/
file_block_monitor.py
File metadata and controls
417 lines (344 loc) Β· 17.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
#!/usr/bin/env python3
"""
Block Proposal Monitor - Direct File Monitoring
Monitors Prysm validator log file directly for block proposals
"""
import time
import re
import json
import logging
from datetime import datetime, timedelta
import requests
import sys
import signal
import os
class FileBlockMonitor:
def __init__(self, config_file: str = 'file_monitor_config.json'):
self.config = self.load_config(config_file)
self.running = False
self.alerted_proposals = set()
self.log_position = 0
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(message)s',
handlers=[
logging.FileHandler('file_block_monitor.log'),
logging.StreamHandler(sys.stdout)
]
)
def load_config(self, config_file: str) -> dict:
default_config = {
"log_file_path": "/path/to/your/prysm-validator-logs/validator.log",
"pushover": {
"enabled": False,
"app_token": "",
"user_key": "",
"priority": 0
},
"alert_hours_ahead": 2,
"check_interval": 30 # Check every 30 seconds
}
try:
with open(config_file, 'r') as f:
user_config = json.load(f)
default_config.update(user_config)
except FileNotFoundError:
logging.info(f"Creating config file: {config_file}")
with open(config_file, 'w') as f:
json.dump(default_config, f, indent=2)
return default_config
def send_alert(self, message: str):
"""Send Pushover alert"""
if not self.config.get('pushover', {}).get('enabled', False):
logging.warning(f"π¨ ALERT (Pushover disabled): {message}")
return
try:
pushover_config = self.config['pushover']
# Check if credentials are properly configured
if not pushover_config.get('app_token') or not pushover_config.get('user_key'):
logging.error("Pushover credentials not configured properly")
return
priority = pushover_config.get('priority', 0)
payload = {
'token': pushover_config['app_token'],
'user': pushover_config['user_key'],
'title': 'π¨ BLOCK PROPOSAL ALERT',
'message': message,
'priority': priority
}
# Add emergency settings for priority 2
if priority == 2:
payload['retry'] = 300 # Retry every 5 minutes
payload['expire'] = 3600 # Stop after 1 hour
logging.info(f"Sending Pushover notification with priority {priority}")
logging.debug(f"Payload: {payload}")
response = requests.post('https://api.pushover.net/1/messages.json', data=payload)
response.raise_for_status()
result = response.json()
if result.get('status') == 1:
logging.info("π¨ Pushover alert sent successfully!")
else:
logging.error(f"Pushover API returned error: {result}")
except requests.exceptions.RequestException as e:
logging.error(f"Failed to send Pushover alert - Network error: {e}")
except Exception as e:
logging.error(f"Failed to send Pushover alert - Unexpected error: {e}")
def read_new_log_lines(self) -> list:
"""Read new lines from the log file since last check"""
log_file = self.config['log_file_path']
try:
if not os.path.exists(log_file):
logging.error(f"Log file not found: {log_file}")
return []
with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
# Seek to last position
f.seek(self.log_position)
new_lines = f.readlines()
# Update position for next read
self.log_position = f.tell()
return [line.strip() for line in new_lines if line.strip()]
except Exception as e:
logging.error(f"Error reading log file: {e}")
return []
def parse_proposal_schedule(self, line: str) -> dict:
"""Parse Prysm log line for block proposal schedule"""
# Look for: Schedule for epoch 393868 attesterCount=1 proposerCount=1
if 'Schedule for epoch' in line and 'proposerCount=' in line:
match = re.search(r'Schedule for epoch (\d+).*proposerCount=(\d+)', line)
if match:
epoch = int(match.group(1))
proposer_count = int(match.group(2))
if proposer_count > 0:
return {
'epoch': epoch,
'proposer_count': proposer_count,
'timestamp': datetime.now(),
'line': line
}
# Also look for more detailed duty information
# Duties schedule attesterCount=1 attesterPubkeys=[...] proposerSlots=[12345678]
if 'Duties schedule' in line and 'proposerSlots=' in line:
match = re.search(r'proposerSlots=\[([0-9,\s]+)\]', line)
if match:
slots_str = match.group(1)
try:
slots = [int(s.strip()) for s in slots_str.split(',') if s.strip().isdigit()]
if slots:
return {
'proposal_slots': slots,
'timestamp': datetime.now(),
'line': line
}
except ValueError:
pass
return None
def calculate_slot_time(self, slot: int) -> datetime:
"""Calculate UTC time for a given slot"""
# Ethereum mainnet genesis: December 1, 2020, 12:00:00 UTC
genesis_time = datetime(2020, 12, 1, 12, 0, 0)
slot_time = genesis_time + timedelta(seconds=slot * 12)
return slot_time
def process_proposal(self, proposal_info: dict):
"""Process and alert for block proposals"""
if 'proposal_slots' in proposal_info:
# Specific slot information - most accurate
for slot in proposal_info['proposal_slots']:
if slot in self.alerted_proposals:
continue
slot_time = self.calculate_slot_time(slot)
now = datetime.utcnow()
time_until = slot_time - now
# Alert for ANY future proposals, regardless of how far ahead
if time_until > timedelta(0):
hours = int(time_until.total_seconds() // 3600)
minutes = int((time_until.total_seconds() % 3600) // 60)
if hours > 0:
time_str = f"{hours}h {minutes}m"
else:
time_str = f"{minutes}m"
message = f"π¨ BLOCK PROPOSAL SCHEDULED!\n\n"
message += f"Slot: {slot}\n"
message += f"Time: {slot_time.strftime('%Y-%m-%d %H:%M:%S')} UTC\n"
message += f"In: {time_str}\n\n"
message += f"β οΈ AVOID MAINTENANCE until after this time!"
self.send_alert(message)
self.alerted_proposals.add(slot)
logging.warning(f"π¨ BLOCK PROPOSAL: Slot {slot} in {time_str}")
else:
logging.info(f"Proposal slot {slot} is in the past, skipping alert")
elif 'epoch' in proposal_info:
# Epoch-level information - less specific but still useful
epoch = proposal_info['epoch']
proposer_count = proposal_info['proposer_count']
if epoch in self.alerted_proposals:
logging.info(f"Already alerted for epoch {epoch}, skipping")
return
# Calculate epoch start time (epoch * 32 slots)
epoch_start_slot = epoch * 32
epoch_start_time = self.calculate_slot_time(epoch_start_slot)
now = datetime.utcnow()
time_until = epoch_start_time - now
logging.info(f"Processing epoch {epoch}:")
logging.info(f" Epoch start time: {epoch_start_time}")
logging.info(f" Current time: {now}")
logging.info(f" Time until/since: {time_until}")
logging.info(f" Time until seconds: {time_until.total_seconds()}")
# Alert for ANY future proposals, even if just minutes away
if time_until > timedelta(seconds=-300): # Allow 5 minutes in the past (in case of log delays)
hours = int(abs(time_until.total_seconds()) // 3600)
minutes = int((abs(time_until.total_seconds()) % 3600) // 60)
seconds = int(abs(time_until.total_seconds()) % 60)
if time_until < timedelta(0):
# Proposal is in progress or just finished
if hours > 0:
time_str = f"started {hours}h {minutes}m ago"
elif minutes > 0:
time_str = f"started {minutes}m {seconds}s ago"
else:
time_str = f"started {seconds}s ago"
message = f"π¨ BLOCK PROPOSAL IN PROGRESS!\n\n"
message += f"Epoch: {epoch}\n"
message += f"Started: {epoch_start_time.strftime('%Y-%m-%d %H:%M:%S')} UTC\n"
message += f"Status: {time_str}\n"
message += f"Proposals: {proposer_count}\n\n"
message += f"β οΈ Your validator is currently proposing!"
else:
# Proposal is upcoming
if hours > 0:
time_str = f"{hours}h {minutes}m"
elif minutes > 0:
time_str = f"{minutes}m {seconds}s"
else:
time_str = f"{seconds}s"
message = f"π¨ BLOCK PROPOSAL SCHEDULED!\n\n"
message += f"Epoch: {epoch}\n"
message += f"Starts: {epoch_start_time.strftime('%Y-%m-%d %H:%M:%S')} UTC\n"
message += f"In: {time_str}\n"
message += f"Proposals: {proposer_count}\n\n"
message += f"β οΈ AVOID MAINTENANCE for the next 6+ minutes!"
logging.warning(f"π¨ SENDING ALERT for epoch {epoch}, time_until: {time_until}")
self.send_alert(message)
self.alerted_proposals.add(epoch)
logging.warning(f"π¨ BLOCK PROPOSAL: Epoch {epoch} time_until: {time_until} ({proposer_count} proposals)")
else:
logging.warning(f"Proposal epoch {epoch} is too old (time_until: {time_until.total_seconds()}s), skipping alert")
# Still add to alerted list to avoid duplicate processing
self.alerted_proposals.add(epoch)
def cleanup_old_alerts(self):
"""Remove old alerts to prevent memory buildup"""
# Remove alerts older than 1 day
current_time = datetime.utcnow()
current_slot = int((current_time - datetime(2020, 12, 1, 12, 0, 0)).total_seconds() / 12)
# Keep only recent slots/epochs
self.alerted_proposals = {
item for item in self.alerted_proposals
if isinstance(item, int) and item > current_slot - 7200 # Last 24 hours of slots
}
def initialize_log_position(self):
"""Set initial log position to end of file to avoid processing old logs"""
log_file = self.config['log_file_path']
try:
if os.path.exists(log_file):
with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
f.seek(0, 2) # Seek to end of file
self.log_position = f.tell()
logging.info(f"Starting monitoring from end of log file (position: {self.log_position})")
else:
logging.error(f"Log file not found: {log_file}")
except Exception as e:
logging.error(f"Error initializing log position: {e}")
def start(self):
"""Start monitoring the log file"""
self.running = True
def signal_handler(signum, frame):
logging.info("Shutting down...")
self.running = False
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
log_file = self.config['log_file_path']
check_interval = self.config.get('check_interval', 30)
logging.info("π¨ File-based Block Proposal Monitor Started")
logging.info(f"Monitoring: {log_file}")
logging.info(f"Alert threshold: {self.config.get('alert_hours_ahead', 2)} hours")
logging.info(f"Check interval: {check_interval} seconds")
# Start from end of file to avoid processing old logs
self.initialize_log_position()
# Send startup notification if enabled
if self.config.get('pushover', {}).get('enabled', False):
startup_msg = f"Block Proposal Monitor started.\nMonitoring: {os.path.basename(log_file)}"
try:
pushover_config = self.config['pushover']
payload = {
'token': pushover_config['app_token'],
'user': pushover_config['user_key'],
'title': 'Block Monitor Started',
'message': startup_msg,
'priority': 0
}
requests.post('https://api.pushover.net/1/messages.json', data=payload)
except:
pass
while self.running:
try:
# Read new log lines
new_lines = self.read_new_log_lines()
# Process each new line
for line in new_lines:
logging.debug(f"Processing: {line[:100]}...")
proposal_info = self.parse_proposal_schedule(line)
if proposal_info:
logging.info(f"Found proposal info: {proposal_info}")
self.process_proposal(proposal_info)
# Cleanup old alerts periodically
self.cleanup_old_alerts()
# Sleep in small intervals for responsive shutdown
for _ in range(check_interval):
if not self.running:
break
time.sleep(1)
except KeyboardInterrupt:
break
except Exception as e:
logging.error(f"Error in monitoring loop: {e}")
time.sleep(30)
logging.info("Monitor stopped")
def main():
import argparse
parser = argparse.ArgumentParser(description='File-based Block Proposal Monitor')
parser.add_argument('--config', default='file_monitor_config.json', help='Config file')
parser.add_argument('--test', action='store_true', help='Test log file access')
parser.add_argument('--test-pushover', action='store_true', help='Test Pushover notification')
parser.add_argument('--tail', type=int, default=20, help='Show last N lines of log file')
args = parser.parse_args()
monitor = FileBlockMonitor(args.config)
if args.test_pushover:
print("Testing Pushover notification...")
print(f"Using priority level: {monitor.config['pushover'].get('priority', 0)}")
test_message = "π¨ TEST BLOCK PROPOSAL!\n\nThis is a test notification to verify your Pushover setup is working.\n\nEpoch: 999999\nTime: Test\n\nβ
If you received this, alerts are working!"
monitor.send_alert(test_message)
return
if args.test:
log_file = monitor.config['log_file_path']
print(f"Testing access to: {log_file}")
if os.path.exists(log_file):
print("β
Log file exists")
try:
with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
lines = f.readlines()
print(f"β
File readable, {len(lines)} total lines")
print(f"\nLast {args.tail} lines:")
for line in lines[-args.tail:]:
line = line.strip()
if line:
print(f" {line}")
if 'Schedule for epoch' in line or 'proposerCount=' in line:
print(f" π PROPOSAL SCHEDULE LINE!")
except Exception as e:
print(f"β Error reading file: {e}")
else:
print(f"β Log file not found: {log_file}")
return
monitor.start()
if __name__ == "__main__":
main()