Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-06-26 08:40:24

0001 """Cached system status collection for ePIC production operations."""
0002 
0003 import json
0004 import socket
0005 import subprocess
0006 import urllib.error
0007 import urllib.request
0008 from datetime import timedelta
0009 
0010 from django.db import OperationalError, ProgrammingError, transaction
0011 from django.utils import timezone
0012 
0013 from .models import SystemAgent, SystemStatus, SystemStatusHistory
0014 
0015 
0016 HISTORY_MIN_INTERVAL = timedelta(hours=6)
0017 STATUS_STALE_AFTER = timedelta(minutes=15)
0018 
0019 
0020 def _status(name, category, status, summary, data=None, checked_at=None):
0021     return {
0022         'name': name,
0023         'category': category,
0024         'status': status,
0025         'summary': summary,
0026         'data': data or {},
0027         'checked_at': checked_at or timezone.now(),
0028     }
0029 
0030 
0031 def _run_checked(cmd, timeout=5):
0032     started = timezone.now()
0033     try:
0034         p = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
0035     except subprocess.TimeoutExpired:
0036         return {
0037             'ok': False,
0038             'returncode': None,
0039             'stdout': '',
0040             'stderr': f'timed out after {timeout}s',
0041             'elapsed_ms': int((timezone.now() - started).total_seconds() * 1000),
0042         }
0043     except OSError as exc:
0044         return {
0045             'ok': False,
0046             'returncode': None,
0047             'stdout': '',
0048             'stderr': str(exc),
0049             'elapsed_ms': int((timezone.now() - started).total_seconds() * 1000),
0050         }
0051     return {
0052         'ok': p.returncode == 0,
0053         'returncode': p.returncode,
0054         'stdout': (p.stdout or '').strip(),
0055         'stderr': (p.stderr or '').strip(),
0056         'elapsed_ms': int((timezone.now() - started).total_seconds() * 1000),
0057     }
0058 
0059 
0060 def _systemctl_unit(name, unit, category='services'):
0061     active = _run_checked(['systemctl', 'is-active', unit], timeout=5)
0062     enabled = _run_checked(['systemctl', 'is-enabled', unit], timeout=5)
0063     show = _run_checked(
0064         ['systemctl', 'show', unit, '--property=ActiveState,SubState,MainPID,NRestarts'],
0065         timeout=5,
0066     )
0067     fields = {}
0068     for line in show.get('stdout', '').splitlines():
0069         if '=' in line:
0070             key, value = line.split('=', 1)
0071             fields[key] = value
0072 
0073     is_active = active.get('stdout') == 'active'
0074     if is_active:
0075         state = 'ok'
0076     elif active.get('stdout') in {'activating', 'deactivating'}:
0077         state = 'warning'
0078     else:
0079         state = 'error'
0080 
0081     summary = f"{unit} is {active.get('stdout') or 'unknown'}"
0082     if enabled.get('stdout'):
0083         summary += f", {enabled['stdout']}"
0084     return _status(name, category, state, summary, {
0085         'unit': unit,
0086         'host': socket.gethostname(),
0087         'systemctl': {
0088             'is_active': active,
0089             'is_enabled': enabled,
0090             'show': fields,
0091         },
0092     })
0093 
0094 
0095 def _latest_agent_snapshot(filters):
0096     try:
0097         agent = SystemAgent.objects.filter(**filters).order_by('-last_heartbeat', '-updated_at').first()
0098     except Exception as exc:
0099         return {'lookup_error': str(exc)}
0100     if not agent:
0101         return None
0102     return {
0103         'instance_name': agent.instance_name,
0104         'status': agent.status,
0105         'operational_state': agent.operational_state,
0106         'last_heartbeat': agent.last_heartbeat.isoformat() if agent.last_heartbeat else None,
0107         'hostname': agent.hostname,
0108         'pid': agent.pid,
0109         'namespace': agent.namespace,
0110         'metadata': agent.metadata or {},
0111     }
0112 
0113 
0114 def _ops_agent():
0115     item = _systemctl_unit('epicprod-ops-agent', 'epicprod-ops-agent', category='agents')
0116     agent = _latest_agent_snapshot({'namespace': 'prodops', 'agent_type': 'PRODOPS'})
0117     item['data']['agent_row'] = agent
0118     if agent:
0119         item['summary'] += f"; monitor heartbeat {agent.get('status')}/{agent.get('operational_state')}"
0120         if agent.get('status') not in {'OK', 'WARNING'}:
0121             item['status'] = 'warning' if item['status'] == 'ok' else item['status']
0122     elif item['status'] == 'ok':
0123         item['status'] = 'warning'
0124         item['summary'] += '; no matching monitor heartbeat row'
0125     return item
0126 
0127 
0128 def _panda_bot():
0129     item = _systemctl_unit('swf-panda-bot', 'swf-panda-bot', category='agents')
0130     agent = _latest_agent_snapshot({'instance_name__icontains': 'panda'})
0131     item['data']['agent_row'] = agent
0132     if agent:
0133         item['summary'] += f"; monitor heartbeat {agent.get('status')}/{agent.get('operational_state')}"
0134     return item
0135 
0136 
0137 def _http_endpoint(name, url):
0138     started = timezone.now()
0139     req = urllib.request.Request(url, method='GET', headers={'User-Agent': 'swf-monitor-system-status/1.0'})
0140     data = {'url': url}
0141     try:
0142         with urllib.request.urlopen(req, timeout=8) as resp:
0143             body = resp.read(512)
0144             code = resp.getcode()
0145             elapsed_ms = int((timezone.now() - started).total_seconds() * 1000)
0146             data.update({
0147                 'http_status': code,
0148                 'final_url': resp.geturl(),
0149                 'elapsed_ms': elapsed_ms,
0150                 'sample_bytes': len(body),
0151             })
0152     except urllib.error.HTTPError as exc:
0153         elapsed_ms = int((timezone.now() - started).total_seconds() * 1000)
0154         data.update({
0155             'http_status': exc.code,
0156             'final_url': exc.geturl(),
0157             'elapsed_ms': elapsed_ms,
0158             'error': str(exc),
0159         })
0160         if exc.code >= 500:
0161             return _status(name, 'external', 'error', f'{url} returned HTTP {exc.code}', data)
0162         return _status(name, 'external', 'warning', f'{url} returned HTTP {exc.code}', data)
0163     except Exception as exc:
0164         elapsed_ms = int((timezone.now() - started).total_seconds() * 1000)
0165         data.update({'elapsed_ms': elapsed_ms, 'error': str(exc)})
0166         return _status(name, 'external', 'error', f'{url} check failed: {exc}', data)
0167 
0168     state = 'ok' if data['http_status'] < 400 else 'warning'
0169     return _status(name, 'external', state, f"{url} returned HTTP {data['http_status']}", data)
0170 
0171 
0172 COLLECTORS = {
0173     'epicprod-ops-agent': _ops_agent,
0174     'swf-panda-bot': _panda_bot,
0175     'swf-monitor-mcp-asgi': lambda: _systemctl_unit(
0176         'swf-monitor-mcp-asgi', 'swf-monitor-mcp-asgi', category='services'),
0177     'httpd': lambda: _systemctl_unit('httpd', 'httpd', category='services'),
0178     'epic-devcloud-prod': lambda: _http_endpoint('epic-devcloud-prod', 'https://epic-devcloud.org/prod/'),
0179     'epic-devcloud-doc': lambda: _http_endpoint('epic-devcloud-doc', 'https://epic-devcloud.org/doc/'),
0180 }
0181 
0182 
0183 def _should_append_history(old, new):
0184     if old is None:
0185         return True
0186     if old.status != new['status'] or old.summary != new['summary']:
0187         return True
0188     if not old.checked_at:
0189         return True
0190     return new['checked_at'] - old.checked_at >= HISTORY_MIN_INTERVAL
0191 
0192 
0193 @transaction.atomic
0194 def _save_status(item, source='unknown'):
0195     data = dict(item.get('data') or {})
0196     data['refresh_source'] = source
0197     item = dict(item, data=data)
0198     old = SystemStatus.objects.select_for_update().filter(name=item['name']).first()
0199     append_history = _should_append_history(old, item)
0200     obj, _ = SystemStatus.objects.update_or_create(
0201         name=item['name'],
0202         defaults={
0203             'category': item['category'],
0204             'status': item['status'],
0205             'summary': item['summary'],
0206             'data': item['data'],
0207             'checked_at': item['checked_at'],
0208         },
0209     )
0210     if append_history:
0211         SystemStatusHistory.objects.create(
0212             name=item['name'],
0213             category=item['category'],
0214             status=item['status'],
0215             summary=item['summary'],
0216             data=item['data'],
0217             checked_at=item['checked_at'],
0218         )
0219     return obj
0220 
0221 
0222 def refresh_system_status(selected=None, source='unknown'):
0223     """Run selected collectors and update current/history status rows."""
0224     names = list(selected or COLLECTORS.keys())
0225     results = []
0226     for name in names:
0227         collector = COLLECTORS.get(name)
0228         if collector is None:
0229             item = _status(name, 'unknown', 'unknown', f'No collector named {name}', {
0230                 'available_collectors': sorted(COLLECTORS),
0231             })
0232         else:
0233             try:
0234                 item = collector()
0235             except Exception as exc:
0236                 item = _status(name, 'unknown', 'error', f'Collector failed: {exc}', {
0237                     'collector': name,
0238                     'error': str(exc),
0239                 })
0240         results.append(_save_status(item, source=source))
0241     return results
0242 
0243 
0244 def grouped_current_status():
0245     """Return current rows grouped for the System page, tolerating pre-migration DBs."""
0246     try:
0247         rows = list(SystemStatus.objects.order_by('category', 'name'))
0248     except (OperationalError, ProgrammingError):
0249         return []
0250     groups = []
0251     by_category = {}
0252     for row in rows:
0253         by_category.setdefault(row.category, []).append(row)
0254     for category, items in by_category.items():
0255         groups.append({'category': category, 'items': items})
0256     return groups
0257 
0258 
0259 def status_summary():
0260     now = timezone.now()
0261     try:
0262         rows = list(SystemStatus.objects.all())
0263     except (OperationalError, ProgrammingError):
0264         return {
0265             'ok': 0,
0266             'warning': 0,
0267             'error': 0,
0268             'unknown': 0,
0269             'total': 0,
0270             'latest_checked_at': None,
0271             'oldest_checked_at': None,
0272             'overall_status': 'unknown',
0273             'overall_reason': 'System status tables are not available yet.',
0274         }
0275     counts = {'ok': 0, 'warning': 0, 'error': 0, 'unknown': 0}
0276     checked = []
0277     for row in rows:
0278         counts[row.status if row.status in counts else 'unknown'] += 1
0279         if row.checked_at:
0280             checked.append(row.checked_at)
0281     counts['total'] = len(rows)
0282     counts['latest_checked_at'] = max(checked) if checked else None
0283     counts['oldest_checked_at'] = min(checked) if checked else None
0284     if not rows:
0285         counts['overall_status'] = 'unknown'
0286         counts['overall_reason'] = 'No system status has been collected yet.'
0287     elif counts['error']:
0288         counts['overall_status'] = 'error'
0289         counts['overall_reason'] = f"{counts['error']} check(s) are red."
0290     elif counts['latest_checked_at'] and now - counts['latest_checked_at'] > STATUS_STALE_AFTER:
0291         counts['overall_status'] = 'error'
0292         counts['overall_reason'] = (
0293             f"System status is stale by more than {int(STATUS_STALE_AFTER.total_seconds() // 60)} minutes."
0294         )
0295     elif counts['warning'] or counts['unknown']:
0296         counts['overall_status'] = 'warning'
0297         counts['overall_reason'] = 'One or more checks are warning or unknown.'
0298     else:
0299         counts['overall_status'] = 'ok'
0300         counts['overall_reason'] = 'All current checks are OK.'
0301     return counts
0302 
0303 
0304 def compact_refresh_report(rows):
0305     return json.dumps([
0306         {
0307             'name': row.name,
0308             'category': row.category,
0309             'status': row.status,
0310             'summary': row.summary,
0311             'checked_at': row.checked_at.isoformat() if row.checked_at else None,
0312         }
0313         for row in rows
0314     ], indent=2, sort_keys=True)