Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-27 07:41:42

0001 """
0002 PanDA database query functions for ePIC production monitoring.
0003 
0004 All functions are synchronous — they use django.db.connections['panda']
0005 directly. Callers in async contexts should wrap with sync_to_async.
0006 """
0007 
0008 import logging
0009 import re
0010 from datetime import timedelta
0011 from django.utils import timezone
0012 from django.db import connections
0013 
0014 from .constants import (
0015     PANDA_SCHEMA, LIST_FIELDS, ERROR_FIELDS, DIAGNOSE_EXTRA_FIELDS,
0016     ERROR_COMPONENTS, FAULTY_STATUSES, TASK_LIST_FIELDS,
0017     STUDY_FIELDS, FILE_FIELDS,
0018 )
0019 from .sql import (
0020     build_union_query, build_count_query,
0021     build_task_query, build_task_count_query,
0022     build_union_query_dt, build_union_count, build_union_count_by_field,
0023     build_task_query_dt, build_task_count, build_task_count_by_field,
0024     build_search_clauses,
0025     row_to_dict, extract_errors, like_or_eq,
0026 )
0027 
0028 logger = logging.getLogger(__name__)
0029 
0030 TERMINAL_TASK_STATUSES = ('done', 'failed', 'aborted', 'broken', 'finished')
0031 STALE_TASK_DAYS = 60
0032 
0033 # NERSC Perlmutter jobs publish their per-job pilot & slurm logs here.
0034 # Pattern: <base>/<queue>/<pandaid>/{pilotlog.txt, slurm-<id>-task<N>-panda<pid>.out}
0035 _NERSC_PORTAL_BASE = "https://portal.nersc.gov/cfs/m3763/panda/jobs"
0036 _NERSC_SLURM_RE = re.compile(r'href="(slurm-\d+-task\d+-panda\d+\.out)"')
0037 
0038 
0039 def _nersc_portal_log_urls(computingsite, pandaid):
0040     """Build Perlmutter log URLs by scraping the NERSC portal dir listing.
0041 
0042     The slurm task filename contains a per-allocation task index not stored in
0043     our DB, so we have to scrape the Apache autoindex to find it. Returns
0044     ``None`` if the dir is unreachable or empty.
0045     """
0046     import requests
0047     log_dir = f"{_NERSC_PORTAL_BASE}/{computingsite}/{pandaid}/"
0048     try:
0049         resp = requests.get(log_dir, timeout=5)
0050         if resp.status_code != 200:
0051             return None
0052     except Exception as e:
0053         logger.warning("NERSC portal dir fetch failed for %s: %s", pandaid, e)
0054         return None
0055     result = {
0056         'nersc_log_dir': log_dir,
0057         'pilot_stdout': log_dir + 'pilotlog.txt',
0058     }
0059     m = _NERSC_SLURM_RE.search(resp.text)
0060     if m:
0061         result['slurm_task_stdout'] = log_dir + m.group(1)
0062     return result
0063 
0064 
0065 def _bulk_destinationse(pandaids):
0066     """Look up destinationse (destination storage element) for a batch of jobs.
0067 
0068     The destination SE — the Rucio storage element where output files are
0069     written — is stored per-file in filestable4, not in the jobs table.
0070     Returns {pandaid: destinationse} for jobs that have one.
0071     """
0072     if not pandaids:
0073         return {}
0074     conn = connections['panda']
0075     placeholders = ','.join(['%s'] * len(pandaids))
0076     sql = f"""
0077         SELECT DISTINCT "pandaid", "destinationse"
0078         FROM "{PANDA_SCHEMA}"."filestable4"
0079         WHERE "pandaid" IN ({placeholders})
0080           AND "destinationse" IS NOT NULL
0081     """
0082     try:
0083         with conn.cursor() as cursor:
0084             cursor.execute(sql, list(pandaids))
0085             return {row[0]: row[1] for row in cursor.fetchall()}
0086     except Exception:
0087         logger.exception("_bulk_destinationse failed")
0088         return {}
0089 
0090 
0091 def _stale_task_filter():
0092     """Exclude non-terminal tasks created more than STALE_TASK_DAYS ago."""
0093     cutoff = timezone.now() - timedelta(days=STALE_TASK_DAYS)
0094     placeholders = ', '.join(['%s'] * len(TERMINAL_TASK_STATUSES))
0095     clause = f'NOT ("creationdate" < %s AND "status" NOT IN ({placeholders}))'
0096     return {'clause': clause, 'params': [cutoff, *TERMINAL_TASK_STATUSES]}
0097 
0098 
0099 def list_jobs(days=7, status=None, username=None, site=None,
0100               taskid=None, reqid=None, limit=200, before_id=None):
0101     """List PanDA jobs with summary statistics and cursor-based pagination."""
0102     # When scoped to a specific task, return everything — don't truncate
0103     if taskid:
0104         limit = 100000
0105     cutoff = timezone.now() - timedelta(days=days)
0106     where = ['"modificationtime" >= %s']
0107     params = [cutoff]
0108 
0109     if status:
0110         where.append('"jobstatus" = %s')
0111         params.append(status)
0112     if username:
0113         clause, val = like_or_eq('produsername', username)
0114         where.append(clause)
0115         params.append(val)
0116     if site:
0117         clause, val = like_or_eq('computingsite', site)
0118         where.append(clause)
0119         params.append(val)
0120     if taskid:
0121         where.append('"jeditaskid" = %s')
0122         params.append(taskid)
0123     if reqid:
0124         where.append('"reqid" = %s')
0125         params.append(reqid)
0126     if before_id:
0127         where.append('"pandaid" < %s')
0128         params.append(before_id)
0129 
0130     conn = connections['panda']
0131 
0132     # Summary counts (without pagination cursor)
0133     count_where = [w for w in where if '"pandaid" <' not in w]
0134     count_params = [p for i, p in enumerate(params) if '"pandaid" <' not in where[i]]
0135     count_sql, count_full_params = build_count_query(count_where, count_params)
0136 
0137     summary = {}
0138     total = 0
0139     try:
0140         with conn.cursor() as cursor:
0141             cursor.execute(count_sql, count_full_params)
0142             for row in cursor.fetchall():
0143                 summary[row[0]] = row[1]
0144                 total += row[1]
0145     except Exception as e:
0146         logger.error(f"list_jobs count query failed: {e}")
0147         return {"error": str(e)}
0148 
0149     fetch_limit = limit + 1
0150     sql, full_params = build_union_query(
0151         LIST_FIELDS, where, params,
0152         order_by='"pandaid" DESC',
0153         limit=fetch_limit,
0154     )
0155 
0156     jobs = []
0157     try:
0158         with conn.cursor() as cursor:
0159             cursor.execute(sql, full_params)
0160             rows = cursor.fetchall()
0161             for row in rows[:limit]:
0162                 jobs.append(row_to_dict(row, LIST_FIELDS))
0163     except Exception as e:
0164         logger.error(f"list_jobs query failed: {e}")
0165         return {"error": str(e)}
0166 
0167     has_more = len(rows) > limit
0168     next_before_id = jobs[-1]['pandaid'] if jobs and has_more else None
0169 
0170     # Annotate with destinationse from filestable4
0171     dest_map = _bulk_destinationse([j['pandaid'] for j in jobs])
0172     for j in jobs:
0173         j['destinationse'] = dest_map.get(j['pandaid'])
0174 
0175     return {
0176         "summary": summary,
0177         "total_in_window": total,
0178         "jobs": jobs,
0179         "count": len(jobs),
0180         "pagination": {
0181             "before_id": before_id,
0182             "has_more": has_more,
0183             "next_before_id": next_before_id,
0184             "limit": limit,
0185         },
0186         "filters": {
0187             "days": days,
0188             "status": status,
0189             "username": username,
0190             "site": site,
0191             "taskid": taskid,
0192             "reqid": reqid,
0193         },
0194     }
0195 
0196 
0197 def diagnose_jobs(days=7, username=None, site=None, taskid=None,
0198                   reqid=None, error_component=None, limit=500, before_id=None):
0199     """Diagnose failed PanDA jobs with full error details."""
0200     # When scoped to a specific task, return everything — don't truncate
0201     if taskid:
0202         limit = 100000
0203     cutoff = timezone.now() - timedelta(days=days)
0204     where = [
0205         '"modificationtime" >= %s',
0206         '"jobstatus" IN %s',
0207     ]
0208     params = [cutoff, tuple(FAULTY_STATUSES)]
0209 
0210     if username:
0211         clause, val = like_or_eq('produsername', username)
0212         where.append(clause)
0213         params.append(val)
0214     if site:
0215         clause, val = like_or_eq('computingsite', site)
0216         where.append(clause)
0217         params.append(val)
0218     if taskid:
0219         where.append('"jeditaskid" = %s')
0220         params.append(taskid)
0221     if reqid:
0222         where.append('"reqid" = %s')
0223         params.append(reqid)
0224     if error_component:
0225         comp = next((c for c in ERROR_COMPONENTS if c['name'] == error_component), None)
0226         if comp:
0227             where.append(f'"{comp["code"]}" != 0')
0228     if before_id:
0229         where.append('"pandaid" < %s')
0230         params.append(before_id)
0231 
0232     conn = connections['panda']
0233 
0234     # Build deduplicated field list
0235     seen = set()
0236     fields = []
0237     for f in LIST_FIELDS + [f for f in ERROR_FIELDS if f not in LIST_FIELDS] + DIAGNOSE_EXTRA_FIELDS:
0238         if f not in seen:
0239             seen.add(f)
0240             fields.append(f)
0241 
0242     fetch_limit = limit + 1
0243     sql, full_params = build_union_query(
0244         fields, where, params,
0245         order_by='"pandaid" DESC',
0246         limit=fetch_limit,
0247     )
0248 
0249     jobs = []
0250     try:
0251         with conn.cursor() as cursor:
0252             cursor.execute(sql, full_params)
0253             rows = cursor.fetchall()
0254             for row in rows[:limit]:
0255                 job = row_to_dict(row, fields)
0256                 job['errors'] = extract_errors(job)
0257                 jobs.append(job)
0258     except Exception as e:
0259         logger.error(f"diagnose_jobs query failed: {e}")
0260         return {"error": str(e)}
0261 
0262     has_more = len(rows) > limit
0263     next_before_id = jobs[-1]['pandaid'] if jobs and has_more else None
0264 
0265     # Annotate with destinationse from filestable4
0266     dest_map = _bulk_destinationse([j['pandaid'] for j in jobs])
0267     for j in jobs:
0268         j['destinationse'] = dest_map.get(j['pandaid'])
0269 
0270     # Error summary: count by component and top codes
0271     component_counts = {}
0272     code_counts = {}
0273     for job in jobs:
0274         for err in job['errors']:
0275             comp = err['component']
0276             component_counts[comp] = component_counts.get(comp, 0) + 1
0277             key = f"{comp}:{err['code']}"
0278             if key not in code_counts:
0279                 code_counts[key] = {'component': comp, 'code': err['code'], 'count': 0, 'sample_diag': err['diag']}
0280             code_counts[key]['count'] += 1
0281 
0282     top_errors = sorted(code_counts.values(), key=lambda x: x['count'], reverse=True)[:20]
0283 
0284     return {
0285         "error_summary": {
0286             "total_faulty_jobs": len(jobs),
0287             "by_component": component_counts,
0288             "top_error_codes": top_errors,
0289         },
0290         "jobs": jobs,
0291         "count": len(jobs),
0292         "pagination": {
0293             "before_id": before_id,
0294             "has_more": has_more,
0295             "next_before_id": next_before_id,
0296             "limit": limit,
0297         },
0298         "filters": {
0299             "days": days,
0300             "username": username,
0301             "site": site,
0302             "taskid": taskid,
0303             "reqid": reqid,
0304             "error_component": error_component,
0305         },
0306     }
0307 
0308 
0309 def list_tasks(days=7, status=None, username=None, taskname=None,
0310                reqid=None, workinggroup=None, taskid=None,
0311                processingtype=None, limit=25, before_id=None):
0312     """List JEDI tasks with summary statistics and cursor-based pagination."""
0313     cutoff = timezone.now() - timedelta(days=days)
0314     where = ['"modificationtime" >= %s']
0315     params = [cutoff]
0316 
0317     # Exclude stale non-terminal tasks (created >60 days ago, still pending)
0318     _stale = _stale_task_filter()
0319     where.append(_stale['clause'])
0320     params.extend(_stale['params'])
0321 
0322     if status:
0323         where.append('"status" = %s')
0324         params.append(status)
0325     if username:
0326         clause, val = like_or_eq('username', username)
0327         where.append(clause)
0328         params.append(val)
0329     if taskname:
0330         clause, val = like_or_eq('taskname', taskname)
0331         where.append(clause)
0332         params.append(val)
0333     if reqid:
0334         where.append('"reqid" = %s')
0335         params.append(reqid)
0336     if workinggroup:
0337         where.append('"workinggroup" = %s')
0338         params.append(workinggroup)
0339     if taskid:
0340         where.append('"jeditaskid" = %s')
0341         params.append(taskid)
0342     if processingtype:
0343         clause, val = like_or_eq('processingtype', processingtype)
0344         where.append(clause)
0345         params.append(val)
0346     if before_id:
0347         where.append('"jeditaskid" < %s')
0348         params.append(before_id)
0349 
0350     conn = connections['panda']
0351 
0352     # Summary counts (without pagination cursor)
0353     # Remove the before_id clause and its param (always the last pair if present)
0354     if before_id:
0355         count_where = where[:-1]
0356         count_params = params[:-1]
0357     else:
0358         count_where = where
0359         count_params = params
0360     count_sql, count_full_params = build_task_count_query(count_where, count_params)
0361 
0362     summary = {}
0363     total = 0
0364     try:
0365         with conn.cursor() as cursor:
0366             cursor.execute(count_sql, count_full_params)
0367             for row in cursor.fetchall():
0368                 summary[row[0]] = row[1]
0369                 total += row[1]
0370     except Exception as e:
0371         logger.error(f"list_tasks count query failed: {e}")
0372         return {"error": str(e)}
0373 
0374     fetch_limit = limit + 1
0375     sql, full_params = build_task_query(
0376         TASK_LIST_FIELDS, where, params,
0377         order_by='"jeditaskid" DESC',
0378         limit=fetch_limit,
0379     )
0380 
0381     tasks = []
0382     try:
0383         with conn.cursor() as cursor:
0384             cursor.execute(sql, full_params)
0385             rows = cursor.fetchall()
0386             for row in rows[:limit]:
0387                 tasks.append(row_to_dict(row, TASK_LIST_FIELDS))
0388     except Exception as e:
0389         logger.error(f"list_tasks query failed: {e}")
0390         return {"error": str(e)}
0391 
0392     has_more = len(rows) > limit
0393     next_before_id = tasks[-1]['jeditaskid'] if tasks and has_more else None
0394 
0395     return {
0396         "summary": summary,
0397         "total_in_window": total,
0398         "tasks": tasks,
0399         "count": len(tasks),
0400         "pagination": {
0401             "before_id": before_id,
0402             "has_more": has_more,
0403             "next_before_id": next_before_id,
0404             "limit": limit,
0405         },
0406         "filters": {
0407             "days": days,
0408             "status": status,
0409             "username": username,
0410             "taskname": taskname,
0411             "reqid": reqid,
0412             "workinggroup": workinggroup,
0413             "taskid": taskid,
0414         },
0415     }
0416 
0417 
0418 def error_summary(days=10, username=None, site=None, destinationse=None,
0419                   taskid=None, error_source=None, limit=20):
0420     """Aggregate error summary across failed PanDA jobs, ranked by frequency."""
0421     # When scoped to a specific task, return all error patterns
0422     if taskid:
0423         limit = 10000
0424     cutoff = timezone.now() - timedelta(days=days)
0425     conn = connections['panda']
0426 
0427     extra_params = []
0428     filters = ''
0429 
0430     if username:
0431         if '%' in username:
0432             filters += ' AND "produsername" LIKE %s'
0433         else:
0434             filters += ' AND "produsername" = %s'
0435         extra_params.append(username)
0436     if site:
0437         if '%' in site:
0438             filters += ' AND "computingsite" LIKE %s'
0439         else:
0440             filters += ' AND "computingsite" = %s'
0441         extra_params.append(site)
0442     destse_filter = ''
0443     destse_params = []
0444     if destinationse:
0445         if '%' in destinationse:
0446             destse_filter = ' AND f."destinationse" LIKE %s'
0447         else:
0448             destse_filter = ' AND f."destinationse" = %s'
0449         destse_params.append(destinationse)
0450     if taskid:
0451         filters += ' AND "jeditaskid" = %s'
0452         extra_params.append(taskid)
0453 
0454     components_to_query = ERROR_COMPONENTS
0455     if error_source:
0456         components_to_query = [c for c in ERROR_COMPONENTS if c['name'] == error_source]
0457         if not components_to_query:
0458             return {"error": f"Unknown error_source '{error_source}'. Valid: {[c['name'] for c in ERROR_COMPONENTS]}"}
0459 
0460     parts = []
0461     all_params = []
0462     join_type = 'JOIN' if destinationse else 'LEFT JOIN'
0463     for comp in components_to_query:
0464         for table in ['jobsactive4', 'jobsarchived4']:
0465             parts.append(f"""
0466                 SELECT '{comp['name']}' as error_source,
0467                        j."{comp['code']}" as error_code,
0468                        j."{comp['diag']}" as error_diag,
0469                        j."jeditaskid",
0470                        j."produsername",
0471                        j."computingsite",
0472                        f."destinationse"
0473                 FROM "{PANDA_SCHEMA}"."{table}" j
0474                 {join_type} (
0475                     SELECT DISTINCT "pandaid", "destinationse"
0476                     FROM "{PANDA_SCHEMA}"."filestable4"
0477                     WHERE "destinationse" IS NOT NULL
0478                 ) f ON f."pandaid" = j."pandaid"
0479                 WHERE j."modificationtime" >= %s
0480                   AND j."jobstatus" IN ('failed','cancelled','closed')
0481                   AND j."{comp['code']}" IS NOT NULL
0482                   AND j."{comp['code']}" != 0
0483                   {filters}
0484                   {destse_filter}
0485             """)
0486             all_params.extend([cutoff] + extra_params + destse_params)
0487 
0488     union_sql = ' UNION ALL '.join(parts)
0489     sql = f"""
0490         SELECT error_source, error_code,
0491                LEFT(error_diag, 256) as error_diag,
0492                COUNT(*) as count,
0493                COUNT(DISTINCT jeditaskid) as task_count,
0494                array_agg(DISTINCT produsername) as users,
0495                array_agg(DISTINCT computingsite) as sites,
0496                array_agg(DISTINCT destinationse) as destination_sites
0497         FROM ({union_sql}) errs
0498         GROUP BY error_source, error_code, LEFT(error_diag, 256)
0499         ORDER BY count DESC
0500         LIMIT %s
0501     """
0502     all_params.append(limit)
0503 
0504     try:
0505         with conn.cursor() as cursor:
0506             cursor.execute(sql, all_params)
0507             rows = cursor.fetchall()
0508     except Exception as e:
0509         logger.error(f"error_summary query failed: {e}")
0510         return {"error": str(e)}
0511 
0512     errors = []
0513     total = 0
0514     for row in rows:
0515         entry = {
0516             'error_source': row[0],
0517             'error_code': row[1],
0518             'error_diag': row[2] or '',
0519             'count': row[3],
0520             'task_count': row[4],
0521             'users': row[5],
0522             'sites': row[6],
0523             'destination_sites': row[7],
0524         }
0525         total += row[3]
0526         errors.append(entry)
0527 
0528     return {
0529         "total_errors": total,
0530         "errors": errors,
0531         "count": len(errors),
0532         "filters": {
0533             "days": days,
0534             "username": username,
0535             "site": site,
0536             "taskid": taskid,
0537             "error_source": error_source,
0538         },
0539     }
0540 
0541 
0542 def get_activity(days=1, username=None, site=None, workinggroup=None):
0543     """Pre-digested overview of PanDA activity — aggregate counts only."""
0544     cutoff = timezone.now() - timedelta(days=days)
0545     conn = connections['panda']
0546 
0547     # ── Job filters ──
0548     job_where = '"modificationtime" >= %s'
0549     job_params = [cutoff]
0550     job_filters = ''
0551 
0552     if username:
0553         if '%' in username:
0554             job_filters += ' AND "produsername" LIKE %s'
0555         else:
0556             job_filters += ' AND "produsername" = %s'
0557         job_params.append(username)
0558     if site:
0559         if '%' in site:
0560             job_filters += ' AND "computingsite" LIKE %s'
0561         else:
0562             job_filters += ' AND "computingsite" = %s'
0563         job_params.append(site)
0564 
0565     base_job_where = f'{job_where}{job_filters}'
0566 
0567     def _job_agg(group_col):
0568         sql = f"""
0569             SELECT "jobstatus", "{group_col}", COUNT(*) FROM (
0570                 SELECT "jobstatus", "{group_col}"
0571                 FROM "{PANDA_SCHEMA}"."jobsactive4"
0572                 WHERE {base_job_where}
0573                 UNION ALL
0574                 SELECT "jobstatus", "{group_col}"
0575                 FROM "{PANDA_SCHEMA}"."jobsarchived4"
0576                 WHERE {base_job_where}
0577             ) combined
0578             GROUP BY "jobstatus", "{group_col}"
0579             ORDER BY COUNT(*) DESC
0580         """
0581         full_params = job_params + job_params
0582         with conn.cursor() as cursor:
0583             cursor.execute(sql, full_params)
0584             return cursor.fetchall()
0585 
0586     try:
0587         status_sql = f"""
0588             SELECT "jobstatus", COUNT(*) FROM (
0589                 SELECT "jobstatus" FROM "{PANDA_SCHEMA}"."jobsactive4"
0590                 WHERE {base_job_where}
0591                 UNION ALL
0592                 SELECT "jobstatus" FROM "{PANDA_SCHEMA}"."jobsarchived4"
0593                 WHERE {base_job_where}
0594             ) combined
0595             GROUP BY "jobstatus"
0596             ORDER BY COUNT(*) DESC
0597         """
0598         with conn.cursor() as cursor:
0599             cursor.execute(status_sql, job_params + job_params)
0600             job_by_status = {row[0]: row[1] for row in cursor.fetchall()}
0601 
0602         job_total = sum(job_by_status.values())
0603 
0604         user_rows = _job_agg('produsername')
0605         user_map = {}
0606         for status_val, user_val, count in user_rows:
0607             if user_val not in user_map:
0608                 user_map[user_val] = {'user': user_val, 'total': 0}
0609             user_map[user_val][status_val] = count
0610             user_map[user_val]['total'] += count
0611         by_user = sorted(user_map.values(), key=lambda x: x['total'], reverse=True)
0612 
0613         site_rows = _job_agg('computingsite')
0614         site_map = {}
0615         for status_val, site_val, count in site_rows:
0616             if site_val not in site_map:
0617                 site_map[site_val] = {'site': site_val, 'total': 0}
0618             site_map[site_val][status_val] = count
0619             site_map[site_val]['total'] += count
0620         by_site = sorted(site_map.values(), key=lambda x: x['total'], reverse=True)
0621 
0622     except Exception as e:
0623         logger.error(f"get_activity job queries failed: {e}")
0624         return {"error": str(e)}
0625 
0626     # ── Task aggregation ──
0627     task_where = ['"modificationtime" >= %s']
0628     task_params = [cutoff]
0629 
0630     _stale = _stale_task_filter()
0631     task_where.append(_stale['clause'])
0632     task_params.extend(_stale['params'])
0633 
0634     if username:
0635         if '%' in username:
0636             task_where.append('"username" LIKE %s')
0637         else:
0638             task_where.append('"username" = %s')
0639         task_params.append(username)
0640     if workinggroup:
0641         task_where.append('"workinggroup" = %s')
0642         task_params.append(workinggroup)
0643 
0644     task_where_sql = ' AND '.join(task_where)
0645 
0646     try:
0647         task_status_sql = f"""
0648             SELECT "status", COUNT(*)
0649             FROM "{PANDA_SCHEMA}"."jedi_tasks"
0650             WHERE {task_where_sql}
0651             GROUP BY "status"
0652             ORDER BY COUNT(*) DESC
0653         """
0654         with conn.cursor() as cursor:
0655             cursor.execute(task_status_sql, task_params)
0656             task_by_status = {row[0]: row[1] for row in cursor.fetchall()}
0657 
0658         task_total = sum(task_by_status.values())
0659 
0660         task_user_sql = f"""
0661             SELECT "status", "username", COUNT(*)
0662             FROM "{PANDA_SCHEMA}"."jedi_tasks"
0663             WHERE {task_where_sql}
0664             GROUP BY "status", "username"
0665             ORDER BY COUNT(*) DESC
0666         """
0667         with conn.cursor() as cursor:
0668             cursor.execute(task_user_sql, task_params)
0669             task_user_rows = cursor.fetchall()
0670 
0671         task_user_map = {}
0672         for status_val, user_val, count in task_user_rows:
0673             if user_val not in task_user_map:
0674                 task_user_map[user_val] = {'user': user_val, 'total': 0}
0675             task_user_map[user_val][status_val] = count
0676             task_user_map[user_val]['total'] += count
0677         task_by_user = sorted(task_user_map.values(), key=lambda x: x['total'], reverse=True)
0678 
0679     except Exception as e:
0680         logger.error(f"get_activity task queries failed: {e}")
0681         return {"error": str(e)}
0682 
0683     return {
0684         "jobs": {
0685             "total": job_total,
0686             "by_status": job_by_status,
0687             "by_user": by_user,
0688             "by_site": by_site,
0689         },
0690         "tasks": {
0691             "total": task_total,
0692             "by_status": task_by_status,
0693             "by_user": task_by_user,
0694         },
0695         "filters": {
0696             "days": days,
0697             "username": username,
0698             "site": site,
0699             "workinggroup": workinggroup,
0700         },
0701     }
0702 
0703 
0704 QUEUE_SUMMARY_FIELDS = [
0705     'status', 'state', 'vo_name', 'resource_type', 'type', 'capability',
0706     'corepower', 'atlas_site', 'region', 'country', 'tier', 'cloud',
0707     'container_type', 'pilot_version', 'maxrss', 'maxtime', 'maxwdir',
0708 ]
0709 
0710 
0711 def list_queues(vo=None, status=None, state=None, search=None):
0712     """List PanDA queues from schedconfig_json with summary fields."""
0713     conn = connections['panda']
0714     where = []
0715     params = []
0716 
0717     if vo:
0718         where.append(""""data"->>'vo_name' = %s""")
0719         params.append(vo)
0720     if status:
0721         where.append(""""data"->>'status' = %s""")
0722         params.append(status)
0723     if state:
0724         where.append(""""data"->>'state' = %s""")
0725         params.append(state)
0726     if search:
0727         where.append(""""panda_queue" ILIKE %s""")
0728         params.append(f'%{search}%')
0729 
0730     where_sql = (' WHERE ' + ' AND '.join(where)) if where else ''
0731 
0732     sql = f"""
0733         SELECT "panda_queue", "data", "last_update"
0734         FROM "{PANDA_SCHEMA}"."schedconfig_json"
0735         {where_sql}
0736         ORDER BY "panda_queue"
0737     """
0738 
0739     try:
0740         with conn.cursor() as cursor:
0741             cursor.execute(sql, params)
0742             rows = cursor.fetchall()
0743     except Exception as e:
0744         logger.error(f"list_queues failed: {e}")
0745         return {"error": str(e)}
0746 
0747     queues = []
0748     for row in rows:
0749         data = row[1] or {}
0750         summary = {'panda_queue': row[0]}
0751         for f in QUEUE_SUMMARY_FIELDS:
0752             val = data.get(f)
0753             if val is not None:
0754                 summary[f] = val
0755         summary['last_update'] = row[2].isoformat() if row[2] else None
0756         queues.append(summary)
0757 
0758     return {
0759         "queues": queues,
0760         "count": len(queues),
0761         "filters": {"vo": vo, "status": status, "state": state, "search": search},
0762     }
0763 
0764 
0765 def get_queue(panda_queue):
0766     """Get full configuration for a single PanDA queue."""
0767     conn = connections['panda']
0768 
0769     sql = f"""
0770         SELECT "panda_queue", "data", "last_update"
0771         FROM "{PANDA_SCHEMA}"."schedconfig_json"
0772         WHERE "panda_queue" = %s
0773     """
0774 
0775     try:
0776         with conn.cursor() as cursor:
0777             cursor.execute(sql, [panda_queue])
0778             row = cursor.fetchone()
0779     except Exception as e:
0780         logger.error(f"get_queue failed: {e}")
0781         return {"error": str(e)}
0782 
0783     if not row:
0784         return {"error": f"Queue '{panda_queue}' not found"}
0785 
0786     data = row[1] or {}
0787     # Strip None values for readability
0788     config = {k: v for k, v in data.items() if v is not None}
0789     config['panda_queue'] = row[0]
0790     config['last_update'] = row[2].isoformat() if row[2] else None
0791 
0792     return {"queue": config}
0793 
0794 
0795 def resource_usage(days=30, site=None, username=None, taskid=None):
0796     """Aggregate resource usage for finished jobs.
0797 
0798     Reports two core-hour metrics:
0799     - allocated: actualcorecount × wall time (cores reserved by the facility)
0800     - used: cpuconsumptiontime (CPU time the job actually consumed)
0801 
0802     Only counts jobs that actually ran: jobstatus='finished' with both
0803     starttime and endtime set. Pre-running queue time is excluded.
0804     """
0805     cutoff = timezone.now() - timedelta(days=days)
0806     conn = connections['panda']
0807 
0808     filters = ''
0809     extra_params = []
0810 
0811     if site:
0812         clause, val = like_or_eq('computingsite', site)
0813         filters += f' AND {clause}'
0814         extra_params.append(val)
0815     if username:
0816         clause, val = like_or_eq('produsername', username)
0817         filters += f' AND {clause}'
0818         extra_params.append(val)
0819     if taskid:
0820         filters += ' AND "jeditaskid" = %s'
0821         extra_params.append(taskid)
0822 
0823     base_where = (
0824         '"modificationtime" >= %s'
0825         ' AND "jobstatus" = \'finished\''
0826         ' AND "starttime" IS NOT NULL'
0827         ' AND "endtime" IS NOT NULL'
0828         + filters
0829     )
0830     base_params = [cutoff] + extra_params
0831 
0832     inner_fields = (
0833         '"computingsite", "produsername", '
0834         '"cpuconsumptiontime", "actualcorecount", "corecount", '
0835         '"starttime", "endtime"'
0836     )
0837 
0838     agg_cols = """
0839         COUNT(*) as job_count,
0840         COALESCE(SUM(
0841             EXTRACT(EPOCH FROM ("endtime" - "starttime"))
0842             * COALESCE("actualcorecount", "corecount", 1)
0843         ), 0) / 3600.0 as allocated_core_hours,
0844         COALESCE(SUM("cpuconsumptiontime"), 0) / 3600.0 as used_core_hours,
0845         COALESCE(SUM(
0846             EXTRACT(EPOCH FROM ("endtime" - "starttime"))
0847         ), 0) / 3600.0 as wall_hours
0848     """
0849 
0850     def _run(group_col=None):
0851         select = f'"{group_col}", {agg_cols}' if group_col else agg_cols
0852         group = f'GROUP BY "{group_col}" ORDER BY allocated_core_hours DESC' if group_col else ''
0853         sql = f"""
0854             SELECT {select} FROM (
0855                 SELECT {inner_fields}
0856                 FROM "{PANDA_SCHEMA}"."jobsactive4" WHERE {base_where}
0857                 UNION ALL
0858                 SELECT {inner_fields}
0859                 FROM "{PANDA_SCHEMA}"."jobsarchived4" WHERE {base_where}
0860             ) combined
0861             {group}
0862         """
0863         with conn.cursor() as cursor:
0864             cursor.execute(sql, base_params + base_params)
0865             return cursor.fetchall()
0866 
0867     def _parse(row, offset=0):
0868         return {
0869             'job_count': row[offset],
0870             'allocated_core_hours': round(float(row[offset + 1]), 1),
0871             'used_core_hours': round(float(row[offset + 2]), 1),
0872             'wall_hours': round(float(row[offset + 3]), 1),
0873         }
0874 
0875     try:
0876         rows = _run()
0877         totals = _parse(rows[0]) if rows else {
0878             'job_count': 0, 'allocated_core_hours': 0,
0879             'used_core_hours': 0, 'wall_hours': 0,
0880         }
0881 
0882         by_site = []
0883         for row in _run('computingsite'):
0884             entry = _parse(row, offset=1)
0885             entry['site'] = row[0]
0886             by_site.append(entry)
0887 
0888         by_user = []
0889         for row in _run('produsername'):
0890             entry = _parse(row, offset=1)
0891             entry['user'] = row[0]
0892             by_user.append(entry)
0893 
0894     except Exception as e:
0895         logger.error(f"resource_usage query failed: {e}")
0896         return {"error": str(e)}
0897 
0898     return {
0899         "totals": totals,
0900         "by_site": by_site,
0901         "by_user": by_user,
0902         "filters": {
0903             "days": days,
0904             "site": site,
0905             "username": username,
0906             "taskid": taskid,
0907         },
0908     }
0909 
0910 
0911 def study_job(pandaid):
0912     """Deep study of a single PanDA job — full record, files, harvester logs, errors."""
0913     conn = connections['panda']
0914 
0915     # 1. Full job record from both tables
0916     field_list = ', '.join(f'"{f}"' for f in STUDY_FIELDS)
0917     job_sql = f"""
0918         SELECT {field_list} FROM "{PANDA_SCHEMA}"."jobsactive4" WHERE "pandaid" = %s
0919         UNION ALL
0920         SELECT {field_list} FROM "{PANDA_SCHEMA}"."jobsarchived4" WHERE "pandaid" = %s
0921     """
0922 
0923     try:
0924         with conn.cursor() as cursor:
0925             cursor.execute(job_sql, [pandaid, pandaid])
0926             row = cursor.fetchone()
0927     except Exception as e:
0928         logger.error(f"study_job query failed: {e}")
0929         return {"error": str(e)}
0930 
0931     if not row:
0932         return {"error": f"Job {pandaid} not found"}
0933 
0934     job = row_to_dict(row, STUDY_FIELDS)
0935     job['errors'] = extract_errors(job)
0936 
0937     # Strip null fields for readability
0938     job = {k: v for k, v in job.items() if v is not None}
0939 
0940     # Parse pilotid for log URLs
0941     log_urls = {}
0942     pilotid = job.get('pilotid', '')
0943     if pilotid and '|' in pilotid:
0944         parts = pilotid.split('|')
0945         stdout_url = parts[0]
0946         log_urls['pilot_stdout'] = stdout_url
0947         log_urls['pilot_stderr'] = stdout_url.replace('.out', '.err')
0948         log_urls['batch_log'] = stdout_url.replace('.out', '.log')
0949         if len(parts) >= 4:
0950             job['pilot_type'] = parts[1]
0951             job['pilot_version'] = parts[3]
0952 
0953     # NERSC Perlmutter pilotid ends in literal 'None' so the synthesized URLs
0954     # 404. The NERSC portal exposes per-job log dirs instead.
0955     site = job.get('computingsite') or ''
0956     if site.startswith('NERSC_Perlmutter'):
0957         portal_urls = _nersc_portal_log_urls(site, pandaid)
0958         if portal_urls:
0959             # Drop the broken stderr/batch entries; Perlmutter has a single
0960             # combined pilot log.
0961             log_urls.pop('pilot_stderr', None)
0962             log_urls.pop('batch_log', None)
0963             log_urls.update(portal_urls)
0964 
0965     # 2. Files from filestable4
0966     file_field_list = ', '.join(f'"{f}"' for f in FILE_FIELDS)
0967     files_sql = f"""
0968         SELECT {file_field_list}
0969         FROM "{PANDA_SCHEMA}"."filestable4"
0970         WHERE "pandaid" = %s
0971         ORDER BY "type", "lfn"
0972     """
0973 
0974     files = []
0975     log_file = None
0976     try:
0977         with conn.cursor() as cursor:
0978             cursor.execute(files_sql, [pandaid])
0979             for frow in cursor.fetchall():
0980                 fd = row_to_dict(frow, FILE_FIELDS)
0981                 fd = {k: v for k, v in fd.items() if v is not None}
0982                 files.append(fd)
0983                 if fd.get('type') == 'log':
0984                     log_file = fd
0985     except Exception as e:
0986         logger.error(f"study_job files query failed: {e}")
0987         # Non-fatal — continue with what we have
0988 
0989     # 3. Harvester worker info (condor log URLs)
0990     harvester = None
0991     harvester_sql = f"""
0992         SELECT w."workerid", w."harvesterid", w."stdout", w."stderr", w."batchlog",
0993                w."errorcode", w."diagmessage", w."status"
0994         FROM "{PANDA_SCHEMA}"."harvester_workers" w
0995         JOIN "{PANDA_SCHEMA}"."harvester_rel_jobs_workers" r
0996             ON w."workerid" = r."workerid" AND w."harvesterid" = r."harvesterid"
0997         WHERE r."pandaid" = %s
0998     """
0999 
1000     try:
1001         with conn.cursor() as cursor:
1002             cursor.execute(harvester_sql, [pandaid])
1003             hrow = cursor.fetchone()
1004             if hrow:
1005                 hcols = ['workerid', 'harvesterid', 'stdout', 'stderr', 'batchlog',
1006                          'errorcode', 'diagmessage', 'status']
1007                 harvester = row_to_dict(hrow, hcols)
1008                 harvester = {k: v for k, v in harvester.items() if v is not None}
1009                 # Use harvester URLs if available (more authoritative than parsed pilotid)
1010                 if harvester.get('stdout'):
1011                     log_urls['pilot_stdout'] = harvester['stdout']
1012                 if harvester.get('stderr'):
1013                     log_urls['pilot_stderr'] = harvester['stderr']
1014                 if harvester.get('batchlog'):
1015                     log_urls['batch_log'] = harvester['batchlog']
1016     except Exception as e:
1017         logger.error(f"study_job harvester query failed: {e}")
1018         # Non-fatal
1019 
1020     # 4. Task context (parent task name and status)
1021     task_info = None
1022     jeditaskid = job.get('jeditaskid')
1023     if jeditaskid:
1024         task_sql = f"""
1025             SELECT "jeditaskid", "taskname", "status", "username", "errordialog",
1026                    "failurerate", "workinggroup"
1027             FROM "{PANDA_SCHEMA}"."jedi_tasks"
1028             WHERE "jeditaskid" = %s
1029         """
1030         try:
1031             with conn.cursor() as cursor:
1032                 cursor.execute(task_sql, [jeditaskid])
1033                 trow = cursor.fetchone()
1034                 if trow:
1035                     tcols = ['jeditaskid', 'taskname', 'status', 'username',
1036                              'errordialog', 'failurerate', 'workinggroup']
1037                     task_info = row_to_dict(trow, tcols)
1038                     task_info = {k: v for k, v in task_info.items() if v is not None}
1039         except Exception as e:
1040             logger.error(f"study_job task query failed: {e}")
1041 
1042     # Assemble result
1043     result = {
1044         "pandaid": pandaid,
1045         "job": job,
1046         "files": files,
1047         "log_urls": log_urls,
1048     }
1049 
1050     if log_file:
1051         result["log_file"] = log_file
1052 
1053     if harvester:
1054         result["harvester"] = harvester
1055 
1056     if task_info:
1057         result["task"] = task_info
1058 
1059     # Monitoring page URL
1060     result["monitor_url"] = f"https://epic-devcloud.org/panda/jobs/{pandaid}/"
1061 
1062     # 5. Log analysis for failure-adjacent statuses. 'closed' covers
1063     # lost-heartbeat (pilot killed at slot boundary before reporting back);
1064     # its pilot log on NERSC CFS is the only window into what happened.
1065     jobstatus = job.get('jobstatus', '')
1066     if jobstatus in ('failed', 'holding', 'cancelled', 'closed'):
1067         try:
1068             from askpanda_atlas.log_analysis_impl import (
1069                 _select_log_filename, _fetch_log_text,
1070                 extract_log_excerpt, classify_failure,
1071             )
1072             from decouple import config
1073             base_url = config('PANDA_BASE_URL', default='https://pandamon01.sdcc.bnl.gov')
1074             log_filename = _select_log_filename(job)
1075             log_text = _fetch_log_text(pandaid, log_filename, base_url, timeout=30)
1076             log_source = 'filebrowser'
1077 
1078             # Fallback: fetch pilot log directly from its URL (NERSC portal for
1079             # Perlmutter, Harvester-published URL elsewhere).
1080             if not log_text:
1081                 direct_url = (
1082                     log_urls.get('pilot_stdout')
1083                     or (harvester or {}).get('stdout')
1084                 )
1085                 if direct_url:
1086                     import requests as _requests
1087                     try:
1088                         resp = _requests.get(
1089                             direct_url, timeout=30, verify=False,
1090                         )
1091                         if resp.status_code == 200 and resp.text:
1092                             log_text = resp.text
1093                             log_source = (
1094                                 'nersc_portal'
1095                                 if 'portal.nersc.gov' in direct_url
1096                                 else 'harvester'
1097                             )
1098                     except Exception as exc:
1099                         logger.warning("Direct log fetch failed: %s", exc)
1100 
1101             if log_text:
1102                 piloterrorcode = int(job.get('piloterrorcode') or 0)
1103                 piloterrordiag = str(job.get('piloterrordiag') or '')
1104                 excerpt = extract_log_excerpt(
1105                     log_text, log_filename, piloterrorcode, piloterrordiag
1106                 )
1107                 failure_type = classify_failure(job, excerpt)
1108                 result['log_analysis'] = {
1109                     'failure_type': failure_type,
1110                     'log_filename': log_filename,
1111                     'log_source': log_source,
1112                     'log_excerpt': excerpt,
1113                     'log_available': True,
1114                 }
1115             else:
1116                 result['log_analysis'] = {
1117                     'log_available': False,
1118                     'log_filename': log_filename,
1119                 }
1120         except Exception as e:
1121             logger.error(f"study_job log analysis failed: {e}")
1122             result['log_analysis'] = {'error': str(e)}
1123 
1124     return result
1125 
1126 
1127 # ── DataTables query functions ───────────────────────────────────────────────
1128 
1129 # Orderable columns for jobs and tasks (maps column name to SQL expression)
1130 JOB_ORDER_COLUMNS = {f: f'"{f}"' for f in LIST_FIELDS}
1131 TASK_ORDER_COLUMNS = {f: f'"{f}"' for f in TASK_LIST_FIELDS}
1132 
1133 # Searchable columns for DataTables global search
1134 JOB_SEARCH_FIELDS = ['pandaid', 'jeditaskid', 'produsername', 'jobstatus',
1135                      'computingsite', 'transformation']
1136 TASK_SEARCH_FIELDS = ['jeditaskid', 'taskname', 'status', 'username',
1137                       'workinggroup', 'transpath']
1138 
1139 
1140 def list_jobs_dt(days=7, status=None, username=None, site=None,
1141                  taskid=None, reqid=None,
1142                  order_by='"pandaid" DESC', limit=100, offset=0, search=None):
1143     """List PanDA jobs for DataTables (returns rows, total, filtered counts)."""
1144     cutoff = timezone.now() - timedelta(days=days)
1145     where = ['"modificationtime" >= %s']
1146     params = [cutoff]
1147 
1148     if status:
1149         where.append('"jobstatus" = %s')
1150         params.append(status)
1151     if username:
1152         clause, val = like_or_eq('produsername', username)
1153         where.append(clause)
1154         params.append(val)
1155     if site:
1156         clause, val = like_or_eq('computingsite', site)
1157         where.append(clause)
1158         params.append(val)
1159     if taskid:
1160         where.append('"jeditaskid" = %s')
1161         params.append(int(taskid))
1162     if reqid:
1163         where.append('"reqid" = %s')
1164         params.append(int(reqid))
1165 
1166     conn = connections['panda']
1167 
1168     # Total count (no search filter)
1169     count_sql, count_params = build_union_count(where, params)
1170     try:
1171         with conn.cursor() as cursor:
1172             cursor.execute(count_sql, count_params)
1173             total = cursor.fetchone()[0]
1174     except Exception as e:
1175         logger.error(f"list_jobs_dt count failed: {e}")
1176         return [], 0, 0
1177 
1178     # Apply search filter
1179     filtered_where = list(where)
1180     filtered_params = list(params)
1181     if search:
1182         search_clause, search_params = build_search_clauses(JOB_SEARCH_FIELDS, search)
1183         filtered_where.append(search_clause)
1184         filtered_params.extend(search_params)
1185 
1186     # Filtered count
1187     if search:
1188         fcount_sql, fcount_params = build_union_count(filtered_where, filtered_params)
1189         try:
1190             with conn.cursor() as cursor:
1191                 cursor.execute(fcount_sql, fcount_params)
1192                 filtered = cursor.fetchone()[0]
1193         except Exception as e:
1194             logger.error(f"list_jobs_dt filtered count failed: {e}")
1195             return [], total, 0
1196     else:
1197         filtered = total
1198 
1199     # Data query
1200     sql, full_params = build_union_query_dt(
1201         LIST_FIELDS, filtered_where, filtered_params,
1202         order_by=order_by, limit=limit, offset=offset,
1203     )
1204 
1205     rows = []
1206     try:
1207         with conn.cursor() as cursor:
1208             cursor.execute(sql, full_params)
1209             for row in cursor.fetchall():
1210                 rows.append(row_to_dict(row, LIST_FIELDS))
1211     except Exception as e:
1212         logger.error(f"list_jobs_dt query failed: {e}")
1213         return [], total, filtered
1214 
1215     return rows, total, filtered
1216 
1217 
1218 def list_tasks_dt(days=7, status=None, username=None, taskname=None,
1219                   workinggroup=None, order_by='"jeditaskid" DESC',
1220                   limit=100, offset=0, search=None):
1221     """List JEDI tasks for DataTables (returns rows, total, filtered counts)."""
1222     cutoff = timezone.now() - timedelta(days=days)
1223     where = ['"modificationtime" >= %s']
1224     params = [cutoff]
1225 
1226     _stale = _stale_task_filter()
1227     where.append(_stale['clause'])
1228     params.extend(_stale['params'])
1229 
1230     if status:
1231         where.append('"status" = %s')
1232         params.append(status)
1233     if username:
1234         clause, val = like_or_eq('username', username)
1235         where.append(clause)
1236         params.append(val)
1237     if taskname:
1238         clause, val = like_or_eq('taskname', taskname)
1239         where.append(clause)
1240         params.append(val)
1241     if workinggroup:
1242         where.append('"workinggroup" = %s')
1243         params.append(workinggroup)
1244 
1245     conn = connections['panda']
1246 
1247     # Total count
1248     count_sql, count_params = build_task_count(where, params)
1249     try:
1250         with conn.cursor() as cursor:
1251             cursor.execute(count_sql, count_params)
1252             total = cursor.fetchone()[0]
1253     except Exception as e:
1254         logger.error(f"list_tasks_dt count failed: {e}")
1255         return [], 0, 0
1256 
1257     # Apply search filter
1258     filtered_where = list(where)
1259     filtered_params = list(params)
1260     if search:
1261         search_clause, search_params = build_search_clauses(TASK_SEARCH_FIELDS, search)
1262         filtered_where.append(search_clause)
1263         filtered_params.extend(search_params)
1264 
1265     # Filtered count
1266     if search:
1267         fcount_sql, fcount_params = build_task_count(filtered_where, filtered_params)
1268         try:
1269             with conn.cursor() as cursor:
1270                 cursor.execute(fcount_sql, fcount_params)
1271                 filtered = cursor.fetchone()[0]
1272         except Exception as e:
1273             logger.error(f"list_tasks_dt filtered count failed: {e}")
1274             return [], total, 0
1275     else:
1276         filtered = total
1277 
1278     # Data query
1279     sql, full_params = build_task_query_dt(
1280         TASK_LIST_FIELDS, filtered_where, filtered_params,
1281         order_by=order_by, limit=limit, offset=offset,
1282     )
1283 
1284     rows = []
1285     try:
1286         with conn.cursor() as cursor:
1287             cursor.execute(sql, full_params)
1288             for row in cursor.fetchall():
1289                 rows.append(row_to_dict(row, TASK_LIST_FIELDS))
1290     except Exception as e:
1291         logger.error(f"list_tasks_dt query failed: {e}")
1292         return [], total, filtered
1293 
1294     return rows, total, filtered
1295 
1296 
1297 def job_filter_counts(days=7, status=None, username=None, site=None,
1298                       taskid=None, reqid=None):
1299     """Get filter option counts for job list (status, user, site)."""
1300     cutoff = timezone.now() - timedelta(days=days)
1301     base_where = ['"modificationtime" >= %s']
1302     base_params = [cutoff]
1303 
1304     if taskid:
1305         base_where.append('"jeditaskid" = %s')
1306         base_params.append(int(taskid))
1307     if reqid:
1308         base_where.append('"reqid" = %s')
1309         base_params.append(int(reqid))
1310 
1311     conn = connections['panda']
1312     result = {}
1313 
1314     filter_config = [
1315         ('jobstatus', 'status', status),
1316         ('produsername', 'username', username),
1317         ('computingsite', 'site', site),
1318     ]
1319 
1320     for db_field, filter_name, current_value in filter_config:
1321         # Apply all other filters except this one
1322         where = list(base_where)
1323         params = list(base_params)
1324         for other_db_field, other_name, other_value in filter_config:
1325             if other_name != filter_name and other_value:
1326                 clause, val = like_or_eq(other_db_field, other_value)
1327                 where.append(clause)
1328                 params.append(val)
1329 
1330         sql, full_params = build_union_count_by_field(db_field, where, params)
1331         try:
1332             with conn.cursor() as cursor:
1333                 cursor.execute(sql, full_params)
1334                 result[filter_name] = [(row[0], row[1]) for row in cursor.fetchall()]
1335         except Exception as e:
1336             logger.error(f"job_filter_counts {filter_name} failed: {e}")
1337             result[filter_name] = []
1338 
1339     return result
1340 
1341 
1342 def task_filter_counts(days=7, status=None, username=None, workinggroup=None):
1343     """Get filter option counts for task list (status, username, workinggroup)."""
1344     cutoff = timezone.now() - timedelta(days=days)
1345     base_where = ['"modificationtime" >= %s']
1346     base_params = [cutoff]
1347 
1348     _stale = _stale_task_filter()
1349     base_where.append(_stale['clause'])
1350     base_params.extend(_stale['params'])
1351 
1352     conn = connections['panda']
1353     result = {}
1354 
1355     filter_config = [
1356         ('status', 'status', status),
1357         ('username', 'username', username),
1358         ('workinggroup', 'workinggroup', workinggroup),
1359     ]
1360 
1361     for db_field, filter_name, current_value in filter_config:
1362         where = list(base_where)
1363         params = list(base_params)
1364         for other_db_field, other_name, other_value in filter_config:
1365             if other_name != filter_name and other_value:
1366                 where.append(f'"{other_db_field}" = %s')
1367                 params.append(other_value)
1368 
1369         sql, full_params = build_task_count_by_field(db_field, where, params)
1370         try:
1371             with conn.cursor() as cursor:
1372                 cursor.execute(sql, full_params)
1373                 result[filter_name] = [(row[0], row[1]) for row in cursor.fetchall()]
1374         except Exception as e:
1375             logger.error(f"task_filter_counts {filter_name} failed: {e}")
1376             result[filter_name] = []
1377 
1378     return result
1379 
1380 
1381 def get_task(jeditaskid):
1382     """Get a single JEDI task record."""
1383     conn = connections['panda']
1384     field_list = ', '.join(f'"{f}"' for f in TASK_LIST_FIELDS)
1385     sql = f"""
1386         SELECT {field_list}
1387         FROM "{PANDA_SCHEMA}"."jedi_tasks"
1388         WHERE "jeditaskid" = %s
1389     """
1390     try:
1391         with conn.cursor() as cursor:
1392             cursor.execute(sql, [jeditaskid])
1393             row = cursor.fetchone()
1394     except Exception as e:
1395         logger.error(f"get_task query failed: {e}")
1396         return {"error": str(e)}
1397 
1398     if not row:
1399         return {"error": f"Task {jeditaskid} not found"}
1400 
1401     task = row_to_dict(row, TASK_LIST_FIELDS)
1402     return task