File indexing completed on 2026-04-27 07:41:42
0001 """
0002 PanDA database query functions for ePIC production monitoring.
0003
0004 All functions are synchronous — they use django.db.connections['panda']
0005 directly. Callers in async contexts should wrap with sync_to_async.
0006 """
0007
0008 import logging
0009 import re
0010 from datetime import timedelta
0011 from django.utils import timezone
0012 from django.db import connections
0013
0014 from .constants import (
0015 PANDA_SCHEMA, LIST_FIELDS, ERROR_FIELDS, DIAGNOSE_EXTRA_FIELDS,
0016 ERROR_COMPONENTS, FAULTY_STATUSES, TASK_LIST_FIELDS,
0017 STUDY_FIELDS, FILE_FIELDS,
0018 )
0019 from .sql import (
0020 build_union_query, build_count_query,
0021 build_task_query, build_task_count_query,
0022 build_union_query_dt, build_union_count, build_union_count_by_field,
0023 build_task_query_dt, build_task_count, build_task_count_by_field,
0024 build_search_clauses,
0025 row_to_dict, extract_errors, like_or_eq,
0026 )
0027
0028 logger = logging.getLogger(__name__)
0029
0030 TERMINAL_TASK_STATUSES = ('done', 'failed', 'aborted', 'broken', 'finished')
0031 STALE_TASK_DAYS = 60
0032
0033
0034
0035 _NERSC_PORTAL_BASE = "https://portal.nersc.gov/cfs/m3763/panda/jobs"
0036 _NERSC_SLURM_RE = re.compile(r'href="(slurm-\d+-task\d+-panda\d+\.out)"')
0037
0038
0039 def _nersc_portal_log_urls(computingsite, pandaid):
0040 """Build Perlmutter log URLs by scraping the NERSC portal dir listing.
0041
0042 The slurm task filename contains a per-allocation task index not stored in
0043 our DB, so we have to scrape the Apache autoindex to find it. Returns
0044 ``None`` if the dir is unreachable or empty.
0045 """
0046 import requests
0047 log_dir = f"{_NERSC_PORTAL_BASE}/{computingsite}/{pandaid}/"
0048 try:
0049 resp = requests.get(log_dir, timeout=5)
0050 if resp.status_code != 200:
0051 return None
0052 except Exception as e:
0053 logger.warning("NERSC portal dir fetch failed for %s: %s", pandaid, e)
0054 return None
0055 result = {
0056 'nersc_log_dir': log_dir,
0057 'pilot_stdout': log_dir + 'pilotlog.txt',
0058 }
0059 m = _NERSC_SLURM_RE.search(resp.text)
0060 if m:
0061 result['slurm_task_stdout'] = log_dir + m.group(1)
0062 return result
0063
0064
0065 def _bulk_destinationse(pandaids):
0066 """Look up destinationse (destination storage element) for a batch of jobs.
0067
0068 The destination SE — the Rucio storage element where output files are
0069 written — is stored per-file in filestable4, not in the jobs table.
0070 Returns {pandaid: destinationse} for jobs that have one.
0071 """
0072 if not pandaids:
0073 return {}
0074 conn = connections['panda']
0075 placeholders = ','.join(['%s'] * len(pandaids))
0076 sql = f"""
0077 SELECT DISTINCT "pandaid", "destinationse"
0078 FROM "{PANDA_SCHEMA}"."filestable4"
0079 WHERE "pandaid" IN ({placeholders})
0080 AND "destinationse" IS NOT NULL
0081 """
0082 try:
0083 with conn.cursor() as cursor:
0084 cursor.execute(sql, list(pandaids))
0085 return {row[0]: row[1] for row in cursor.fetchall()}
0086 except Exception:
0087 logger.exception("_bulk_destinationse failed")
0088 return {}
0089
0090
0091 def _stale_task_filter():
0092 """Exclude non-terminal tasks created more than STALE_TASK_DAYS ago."""
0093 cutoff = timezone.now() - timedelta(days=STALE_TASK_DAYS)
0094 placeholders = ', '.join(['%s'] * len(TERMINAL_TASK_STATUSES))
0095 clause = f'NOT ("creationdate" < %s AND "status" NOT IN ({placeholders}))'
0096 return {'clause': clause, 'params': [cutoff, *TERMINAL_TASK_STATUSES]}
0097
0098
0099 def list_jobs(days=7, status=None, username=None, site=None,
0100 taskid=None, reqid=None, limit=200, before_id=None):
0101 """List PanDA jobs with summary statistics and cursor-based pagination."""
0102
0103 if taskid:
0104 limit = 100000
0105 cutoff = timezone.now() - timedelta(days=days)
0106 where = ['"modificationtime" >= %s']
0107 params = [cutoff]
0108
0109 if status:
0110 where.append('"jobstatus" = %s')
0111 params.append(status)
0112 if username:
0113 clause, val = like_or_eq('produsername', username)
0114 where.append(clause)
0115 params.append(val)
0116 if site:
0117 clause, val = like_or_eq('computingsite', site)
0118 where.append(clause)
0119 params.append(val)
0120 if taskid:
0121 where.append('"jeditaskid" = %s')
0122 params.append(taskid)
0123 if reqid:
0124 where.append('"reqid" = %s')
0125 params.append(reqid)
0126 if before_id:
0127 where.append('"pandaid" < %s')
0128 params.append(before_id)
0129
0130 conn = connections['panda']
0131
0132
0133 count_where = [w for w in where if '"pandaid" <' not in w]
0134 count_params = [p for i, p in enumerate(params) if '"pandaid" <' not in where[i]]
0135 count_sql, count_full_params = build_count_query(count_where, count_params)
0136
0137 summary = {}
0138 total = 0
0139 try:
0140 with conn.cursor() as cursor:
0141 cursor.execute(count_sql, count_full_params)
0142 for row in cursor.fetchall():
0143 summary[row[0]] = row[1]
0144 total += row[1]
0145 except Exception as e:
0146 logger.error(f"list_jobs count query failed: {e}")
0147 return {"error": str(e)}
0148
0149 fetch_limit = limit + 1
0150 sql, full_params = build_union_query(
0151 LIST_FIELDS, where, params,
0152 order_by='"pandaid" DESC',
0153 limit=fetch_limit,
0154 )
0155
0156 jobs = []
0157 try:
0158 with conn.cursor() as cursor:
0159 cursor.execute(sql, full_params)
0160 rows = cursor.fetchall()
0161 for row in rows[:limit]:
0162 jobs.append(row_to_dict(row, LIST_FIELDS))
0163 except Exception as e:
0164 logger.error(f"list_jobs query failed: {e}")
0165 return {"error": str(e)}
0166
0167 has_more = len(rows) > limit
0168 next_before_id = jobs[-1]['pandaid'] if jobs and has_more else None
0169
0170
0171 dest_map = _bulk_destinationse([j['pandaid'] for j in jobs])
0172 for j in jobs:
0173 j['destinationse'] = dest_map.get(j['pandaid'])
0174
0175 return {
0176 "summary": summary,
0177 "total_in_window": total,
0178 "jobs": jobs,
0179 "count": len(jobs),
0180 "pagination": {
0181 "before_id": before_id,
0182 "has_more": has_more,
0183 "next_before_id": next_before_id,
0184 "limit": limit,
0185 },
0186 "filters": {
0187 "days": days,
0188 "status": status,
0189 "username": username,
0190 "site": site,
0191 "taskid": taskid,
0192 "reqid": reqid,
0193 },
0194 }
0195
0196
0197 def diagnose_jobs(days=7, username=None, site=None, taskid=None,
0198 reqid=None, error_component=None, limit=500, before_id=None):
0199 """Diagnose failed PanDA jobs with full error details."""
0200
0201 if taskid:
0202 limit = 100000
0203 cutoff = timezone.now() - timedelta(days=days)
0204 where = [
0205 '"modificationtime" >= %s',
0206 '"jobstatus" IN %s',
0207 ]
0208 params = [cutoff, tuple(FAULTY_STATUSES)]
0209
0210 if username:
0211 clause, val = like_or_eq('produsername', username)
0212 where.append(clause)
0213 params.append(val)
0214 if site:
0215 clause, val = like_or_eq('computingsite', site)
0216 where.append(clause)
0217 params.append(val)
0218 if taskid:
0219 where.append('"jeditaskid" = %s')
0220 params.append(taskid)
0221 if reqid:
0222 where.append('"reqid" = %s')
0223 params.append(reqid)
0224 if error_component:
0225 comp = next((c for c in ERROR_COMPONENTS if c['name'] == error_component), None)
0226 if comp:
0227 where.append(f'"{comp["code"]}" != 0')
0228 if before_id:
0229 where.append('"pandaid" < %s')
0230 params.append(before_id)
0231
0232 conn = connections['panda']
0233
0234
0235 seen = set()
0236 fields = []
0237 for f in LIST_FIELDS + [f for f in ERROR_FIELDS if f not in LIST_FIELDS] + DIAGNOSE_EXTRA_FIELDS:
0238 if f not in seen:
0239 seen.add(f)
0240 fields.append(f)
0241
0242 fetch_limit = limit + 1
0243 sql, full_params = build_union_query(
0244 fields, where, params,
0245 order_by='"pandaid" DESC',
0246 limit=fetch_limit,
0247 )
0248
0249 jobs = []
0250 try:
0251 with conn.cursor() as cursor:
0252 cursor.execute(sql, full_params)
0253 rows = cursor.fetchall()
0254 for row in rows[:limit]:
0255 job = row_to_dict(row, fields)
0256 job['errors'] = extract_errors(job)
0257 jobs.append(job)
0258 except Exception as e:
0259 logger.error(f"diagnose_jobs query failed: {e}")
0260 return {"error": str(e)}
0261
0262 has_more = len(rows) > limit
0263 next_before_id = jobs[-1]['pandaid'] if jobs and has_more else None
0264
0265
0266 dest_map = _bulk_destinationse([j['pandaid'] for j in jobs])
0267 for j in jobs:
0268 j['destinationse'] = dest_map.get(j['pandaid'])
0269
0270
0271 component_counts = {}
0272 code_counts = {}
0273 for job in jobs:
0274 for err in job['errors']:
0275 comp = err['component']
0276 component_counts[comp] = component_counts.get(comp, 0) + 1
0277 key = f"{comp}:{err['code']}"
0278 if key not in code_counts:
0279 code_counts[key] = {'component': comp, 'code': err['code'], 'count': 0, 'sample_diag': err['diag']}
0280 code_counts[key]['count'] += 1
0281
0282 top_errors = sorted(code_counts.values(), key=lambda x: x['count'], reverse=True)[:20]
0283
0284 return {
0285 "error_summary": {
0286 "total_faulty_jobs": len(jobs),
0287 "by_component": component_counts,
0288 "top_error_codes": top_errors,
0289 },
0290 "jobs": jobs,
0291 "count": len(jobs),
0292 "pagination": {
0293 "before_id": before_id,
0294 "has_more": has_more,
0295 "next_before_id": next_before_id,
0296 "limit": limit,
0297 },
0298 "filters": {
0299 "days": days,
0300 "username": username,
0301 "site": site,
0302 "taskid": taskid,
0303 "reqid": reqid,
0304 "error_component": error_component,
0305 },
0306 }
0307
0308
0309 def list_tasks(days=7, status=None, username=None, taskname=None,
0310 reqid=None, workinggroup=None, taskid=None,
0311 processingtype=None, limit=25, before_id=None):
0312 """List JEDI tasks with summary statistics and cursor-based pagination."""
0313 cutoff = timezone.now() - timedelta(days=days)
0314 where = ['"modificationtime" >= %s']
0315 params = [cutoff]
0316
0317
0318 _stale = _stale_task_filter()
0319 where.append(_stale['clause'])
0320 params.extend(_stale['params'])
0321
0322 if status:
0323 where.append('"status" = %s')
0324 params.append(status)
0325 if username:
0326 clause, val = like_or_eq('username', username)
0327 where.append(clause)
0328 params.append(val)
0329 if taskname:
0330 clause, val = like_or_eq('taskname', taskname)
0331 where.append(clause)
0332 params.append(val)
0333 if reqid:
0334 where.append('"reqid" = %s')
0335 params.append(reqid)
0336 if workinggroup:
0337 where.append('"workinggroup" = %s')
0338 params.append(workinggroup)
0339 if taskid:
0340 where.append('"jeditaskid" = %s')
0341 params.append(taskid)
0342 if processingtype:
0343 clause, val = like_or_eq('processingtype', processingtype)
0344 where.append(clause)
0345 params.append(val)
0346 if before_id:
0347 where.append('"jeditaskid" < %s')
0348 params.append(before_id)
0349
0350 conn = connections['panda']
0351
0352
0353
0354 if before_id:
0355 count_where = where[:-1]
0356 count_params = params[:-1]
0357 else:
0358 count_where = where
0359 count_params = params
0360 count_sql, count_full_params = build_task_count_query(count_where, count_params)
0361
0362 summary = {}
0363 total = 0
0364 try:
0365 with conn.cursor() as cursor:
0366 cursor.execute(count_sql, count_full_params)
0367 for row in cursor.fetchall():
0368 summary[row[0]] = row[1]
0369 total += row[1]
0370 except Exception as e:
0371 logger.error(f"list_tasks count query failed: {e}")
0372 return {"error": str(e)}
0373
0374 fetch_limit = limit + 1
0375 sql, full_params = build_task_query(
0376 TASK_LIST_FIELDS, where, params,
0377 order_by='"jeditaskid" DESC',
0378 limit=fetch_limit,
0379 )
0380
0381 tasks = []
0382 try:
0383 with conn.cursor() as cursor:
0384 cursor.execute(sql, full_params)
0385 rows = cursor.fetchall()
0386 for row in rows[:limit]:
0387 tasks.append(row_to_dict(row, TASK_LIST_FIELDS))
0388 except Exception as e:
0389 logger.error(f"list_tasks query failed: {e}")
0390 return {"error": str(e)}
0391
0392 has_more = len(rows) > limit
0393 next_before_id = tasks[-1]['jeditaskid'] if tasks and has_more else None
0394
0395 return {
0396 "summary": summary,
0397 "total_in_window": total,
0398 "tasks": tasks,
0399 "count": len(tasks),
0400 "pagination": {
0401 "before_id": before_id,
0402 "has_more": has_more,
0403 "next_before_id": next_before_id,
0404 "limit": limit,
0405 },
0406 "filters": {
0407 "days": days,
0408 "status": status,
0409 "username": username,
0410 "taskname": taskname,
0411 "reqid": reqid,
0412 "workinggroup": workinggroup,
0413 "taskid": taskid,
0414 },
0415 }
0416
0417
0418 def error_summary(days=10, username=None, site=None, destinationse=None,
0419 taskid=None, error_source=None, limit=20):
0420 """Aggregate error summary across failed PanDA jobs, ranked by frequency."""
0421
0422 if taskid:
0423 limit = 10000
0424 cutoff = timezone.now() - timedelta(days=days)
0425 conn = connections['panda']
0426
0427 extra_params = []
0428 filters = ''
0429
0430 if username:
0431 if '%' in username:
0432 filters += ' AND "produsername" LIKE %s'
0433 else:
0434 filters += ' AND "produsername" = %s'
0435 extra_params.append(username)
0436 if site:
0437 if '%' in site:
0438 filters += ' AND "computingsite" LIKE %s'
0439 else:
0440 filters += ' AND "computingsite" = %s'
0441 extra_params.append(site)
0442 destse_filter = ''
0443 destse_params = []
0444 if destinationse:
0445 if '%' in destinationse:
0446 destse_filter = ' AND f."destinationse" LIKE %s'
0447 else:
0448 destse_filter = ' AND f."destinationse" = %s'
0449 destse_params.append(destinationse)
0450 if taskid:
0451 filters += ' AND "jeditaskid" = %s'
0452 extra_params.append(taskid)
0453
0454 components_to_query = ERROR_COMPONENTS
0455 if error_source:
0456 components_to_query = [c for c in ERROR_COMPONENTS if c['name'] == error_source]
0457 if not components_to_query:
0458 return {"error": f"Unknown error_source '{error_source}'. Valid: {[c['name'] for c in ERROR_COMPONENTS]}"}
0459
0460 parts = []
0461 all_params = []
0462 join_type = 'JOIN' if destinationse else 'LEFT JOIN'
0463 for comp in components_to_query:
0464 for table in ['jobsactive4', 'jobsarchived4']:
0465 parts.append(f"""
0466 SELECT '{comp['name']}' as error_source,
0467 j."{comp['code']}" as error_code,
0468 j."{comp['diag']}" as error_diag,
0469 j."jeditaskid",
0470 j."produsername",
0471 j."computingsite",
0472 f."destinationse"
0473 FROM "{PANDA_SCHEMA}"."{table}" j
0474 {join_type} (
0475 SELECT DISTINCT "pandaid", "destinationse"
0476 FROM "{PANDA_SCHEMA}"."filestable4"
0477 WHERE "destinationse" IS NOT NULL
0478 ) f ON f."pandaid" = j."pandaid"
0479 WHERE j."modificationtime" >= %s
0480 AND j."jobstatus" IN ('failed','cancelled','closed')
0481 AND j."{comp['code']}" IS NOT NULL
0482 AND j."{comp['code']}" != 0
0483 {filters}
0484 {destse_filter}
0485 """)
0486 all_params.extend([cutoff] + extra_params + destse_params)
0487
0488 union_sql = ' UNION ALL '.join(parts)
0489 sql = f"""
0490 SELECT error_source, error_code,
0491 LEFT(error_diag, 256) as error_diag,
0492 COUNT(*) as count,
0493 COUNT(DISTINCT jeditaskid) as task_count,
0494 array_agg(DISTINCT produsername) as users,
0495 array_agg(DISTINCT computingsite) as sites,
0496 array_agg(DISTINCT destinationse) as destination_sites
0497 FROM ({union_sql}) errs
0498 GROUP BY error_source, error_code, LEFT(error_diag, 256)
0499 ORDER BY count DESC
0500 LIMIT %s
0501 """
0502 all_params.append(limit)
0503
0504 try:
0505 with conn.cursor() as cursor:
0506 cursor.execute(sql, all_params)
0507 rows = cursor.fetchall()
0508 except Exception as e:
0509 logger.error(f"error_summary query failed: {e}")
0510 return {"error": str(e)}
0511
0512 errors = []
0513 total = 0
0514 for row in rows:
0515 entry = {
0516 'error_source': row[0],
0517 'error_code': row[1],
0518 'error_diag': row[2] or '',
0519 'count': row[3],
0520 'task_count': row[4],
0521 'users': row[5],
0522 'sites': row[6],
0523 'destination_sites': row[7],
0524 }
0525 total += row[3]
0526 errors.append(entry)
0527
0528 return {
0529 "total_errors": total,
0530 "errors": errors,
0531 "count": len(errors),
0532 "filters": {
0533 "days": days,
0534 "username": username,
0535 "site": site,
0536 "taskid": taskid,
0537 "error_source": error_source,
0538 },
0539 }
0540
0541
0542 def get_activity(days=1, username=None, site=None, workinggroup=None):
0543 """Pre-digested overview of PanDA activity — aggregate counts only."""
0544 cutoff = timezone.now() - timedelta(days=days)
0545 conn = connections['panda']
0546
0547
0548 job_where = '"modificationtime" >= %s'
0549 job_params = [cutoff]
0550 job_filters = ''
0551
0552 if username:
0553 if '%' in username:
0554 job_filters += ' AND "produsername" LIKE %s'
0555 else:
0556 job_filters += ' AND "produsername" = %s'
0557 job_params.append(username)
0558 if site:
0559 if '%' in site:
0560 job_filters += ' AND "computingsite" LIKE %s'
0561 else:
0562 job_filters += ' AND "computingsite" = %s'
0563 job_params.append(site)
0564
0565 base_job_where = f'{job_where}{job_filters}'
0566
0567 def _job_agg(group_col):
0568 sql = f"""
0569 SELECT "jobstatus", "{group_col}", COUNT(*) FROM (
0570 SELECT "jobstatus", "{group_col}"
0571 FROM "{PANDA_SCHEMA}"."jobsactive4"
0572 WHERE {base_job_where}
0573 UNION ALL
0574 SELECT "jobstatus", "{group_col}"
0575 FROM "{PANDA_SCHEMA}"."jobsarchived4"
0576 WHERE {base_job_where}
0577 ) combined
0578 GROUP BY "jobstatus", "{group_col}"
0579 ORDER BY COUNT(*) DESC
0580 """
0581 full_params = job_params + job_params
0582 with conn.cursor() as cursor:
0583 cursor.execute(sql, full_params)
0584 return cursor.fetchall()
0585
0586 try:
0587 status_sql = f"""
0588 SELECT "jobstatus", COUNT(*) FROM (
0589 SELECT "jobstatus" FROM "{PANDA_SCHEMA}"."jobsactive4"
0590 WHERE {base_job_where}
0591 UNION ALL
0592 SELECT "jobstatus" FROM "{PANDA_SCHEMA}"."jobsarchived4"
0593 WHERE {base_job_where}
0594 ) combined
0595 GROUP BY "jobstatus"
0596 ORDER BY COUNT(*) DESC
0597 """
0598 with conn.cursor() as cursor:
0599 cursor.execute(status_sql, job_params + job_params)
0600 job_by_status = {row[0]: row[1] for row in cursor.fetchall()}
0601
0602 job_total = sum(job_by_status.values())
0603
0604 user_rows = _job_agg('produsername')
0605 user_map = {}
0606 for status_val, user_val, count in user_rows:
0607 if user_val not in user_map:
0608 user_map[user_val] = {'user': user_val, 'total': 0}
0609 user_map[user_val][status_val] = count
0610 user_map[user_val]['total'] += count
0611 by_user = sorted(user_map.values(), key=lambda x: x['total'], reverse=True)
0612
0613 site_rows = _job_agg('computingsite')
0614 site_map = {}
0615 for status_val, site_val, count in site_rows:
0616 if site_val not in site_map:
0617 site_map[site_val] = {'site': site_val, 'total': 0}
0618 site_map[site_val][status_val] = count
0619 site_map[site_val]['total'] += count
0620 by_site = sorted(site_map.values(), key=lambda x: x['total'], reverse=True)
0621
0622 except Exception as e:
0623 logger.error(f"get_activity job queries failed: {e}")
0624 return {"error": str(e)}
0625
0626
0627 task_where = ['"modificationtime" >= %s']
0628 task_params = [cutoff]
0629
0630 _stale = _stale_task_filter()
0631 task_where.append(_stale['clause'])
0632 task_params.extend(_stale['params'])
0633
0634 if username:
0635 if '%' in username:
0636 task_where.append('"username" LIKE %s')
0637 else:
0638 task_where.append('"username" = %s')
0639 task_params.append(username)
0640 if workinggroup:
0641 task_where.append('"workinggroup" = %s')
0642 task_params.append(workinggroup)
0643
0644 task_where_sql = ' AND '.join(task_where)
0645
0646 try:
0647 task_status_sql = f"""
0648 SELECT "status", COUNT(*)
0649 FROM "{PANDA_SCHEMA}"."jedi_tasks"
0650 WHERE {task_where_sql}
0651 GROUP BY "status"
0652 ORDER BY COUNT(*) DESC
0653 """
0654 with conn.cursor() as cursor:
0655 cursor.execute(task_status_sql, task_params)
0656 task_by_status = {row[0]: row[1] for row in cursor.fetchall()}
0657
0658 task_total = sum(task_by_status.values())
0659
0660 task_user_sql = f"""
0661 SELECT "status", "username", COUNT(*)
0662 FROM "{PANDA_SCHEMA}"."jedi_tasks"
0663 WHERE {task_where_sql}
0664 GROUP BY "status", "username"
0665 ORDER BY COUNT(*) DESC
0666 """
0667 with conn.cursor() as cursor:
0668 cursor.execute(task_user_sql, task_params)
0669 task_user_rows = cursor.fetchall()
0670
0671 task_user_map = {}
0672 for status_val, user_val, count in task_user_rows:
0673 if user_val not in task_user_map:
0674 task_user_map[user_val] = {'user': user_val, 'total': 0}
0675 task_user_map[user_val][status_val] = count
0676 task_user_map[user_val]['total'] += count
0677 task_by_user = sorted(task_user_map.values(), key=lambda x: x['total'], reverse=True)
0678
0679 except Exception as e:
0680 logger.error(f"get_activity task queries failed: {e}")
0681 return {"error": str(e)}
0682
0683 return {
0684 "jobs": {
0685 "total": job_total,
0686 "by_status": job_by_status,
0687 "by_user": by_user,
0688 "by_site": by_site,
0689 },
0690 "tasks": {
0691 "total": task_total,
0692 "by_status": task_by_status,
0693 "by_user": task_by_user,
0694 },
0695 "filters": {
0696 "days": days,
0697 "username": username,
0698 "site": site,
0699 "workinggroup": workinggroup,
0700 },
0701 }
0702
0703
0704 QUEUE_SUMMARY_FIELDS = [
0705 'status', 'state', 'vo_name', 'resource_type', 'type', 'capability',
0706 'corepower', 'atlas_site', 'region', 'country', 'tier', 'cloud',
0707 'container_type', 'pilot_version', 'maxrss', 'maxtime', 'maxwdir',
0708 ]
0709
0710
0711 def list_queues(vo=None, status=None, state=None, search=None):
0712 """List PanDA queues from schedconfig_json with summary fields."""
0713 conn = connections['panda']
0714 where = []
0715 params = []
0716
0717 if vo:
0718 where.append(""""data"->>'vo_name' = %s""")
0719 params.append(vo)
0720 if status:
0721 where.append(""""data"->>'status' = %s""")
0722 params.append(status)
0723 if state:
0724 where.append(""""data"->>'state' = %s""")
0725 params.append(state)
0726 if search:
0727 where.append(""""panda_queue" ILIKE %s""")
0728 params.append(f'%{search}%')
0729
0730 where_sql = (' WHERE ' + ' AND '.join(where)) if where else ''
0731
0732 sql = f"""
0733 SELECT "panda_queue", "data", "last_update"
0734 FROM "{PANDA_SCHEMA}"."schedconfig_json"
0735 {where_sql}
0736 ORDER BY "panda_queue"
0737 """
0738
0739 try:
0740 with conn.cursor() as cursor:
0741 cursor.execute(sql, params)
0742 rows = cursor.fetchall()
0743 except Exception as e:
0744 logger.error(f"list_queues failed: {e}")
0745 return {"error": str(e)}
0746
0747 queues = []
0748 for row in rows:
0749 data = row[1] or {}
0750 summary = {'panda_queue': row[0]}
0751 for f in QUEUE_SUMMARY_FIELDS:
0752 val = data.get(f)
0753 if val is not None:
0754 summary[f] = val
0755 summary['last_update'] = row[2].isoformat() if row[2] else None
0756 queues.append(summary)
0757
0758 return {
0759 "queues": queues,
0760 "count": len(queues),
0761 "filters": {"vo": vo, "status": status, "state": state, "search": search},
0762 }
0763
0764
0765 def get_queue(panda_queue):
0766 """Get full configuration for a single PanDA queue."""
0767 conn = connections['panda']
0768
0769 sql = f"""
0770 SELECT "panda_queue", "data", "last_update"
0771 FROM "{PANDA_SCHEMA}"."schedconfig_json"
0772 WHERE "panda_queue" = %s
0773 """
0774
0775 try:
0776 with conn.cursor() as cursor:
0777 cursor.execute(sql, [panda_queue])
0778 row = cursor.fetchone()
0779 except Exception as e:
0780 logger.error(f"get_queue failed: {e}")
0781 return {"error": str(e)}
0782
0783 if not row:
0784 return {"error": f"Queue '{panda_queue}' not found"}
0785
0786 data = row[1] or {}
0787
0788 config = {k: v for k, v in data.items() if v is not None}
0789 config['panda_queue'] = row[0]
0790 config['last_update'] = row[2].isoformat() if row[2] else None
0791
0792 return {"queue": config}
0793
0794
0795 def resource_usage(days=30, site=None, username=None, taskid=None):
0796 """Aggregate resource usage for finished jobs.
0797
0798 Reports two core-hour metrics:
0799 - allocated: actualcorecount × wall time (cores reserved by the facility)
0800 - used: cpuconsumptiontime (CPU time the job actually consumed)
0801
0802 Only counts jobs that actually ran: jobstatus='finished' with both
0803 starttime and endtime set. Pre-running queue time is excluded.
0804 """
0805 cutoff = timezone.now() - timedelta(days=days)
0806 conn = connections['panda']
0807
0808 filters = ''
0809 extra_params = []
0810
0811 if site:
0812 clause, val = like_or_eq('computingsite', site)
0813 filters += f' AND {clause}'
0814 extra_params.append(val)
0815 if username:
0816 clause, val = like_or_eq('produsername', username)
0817 filters += f' AND {clause}'
0818 extra_params.append(val)
0819 if taskid:
0820 filters += ' AND "jeditaskid" = %s'
0821 extra_params.append(taskid)
0822
0823 base_where = (
0824 '"modificationtime" >= %s'
0825 ' AND "jobstatus" = \'finished\''
0826 ' AND "starttime" IS NOT NULL'
0827 ' AND "endtime" IS NOT NULL'
0828 + filters
0829 )
0830 base_params = [cutoff] + extra_params
0831
0832 inner_fields = (
0833 '"computingsite", "produsername", '
0834 '"cpuconsumptiontime", "actualcorecount", "corecount", '
0835 '"starttime", "endtime"'
0836 )
0837
0838 agg_cols = """
0839 COUNT(*) as job_count,
0840 COALESCE(SUM(
0841 EXTRACT(EPOCH FROM ("endtime" - "starttime"))
0842 * COALESCE("actualcorecount", "corecount", 1)
0843 ), 0) / 3600.0 as allocated_core_hours,
0844 COALESCE(SUM("cpuconsumptiontime"), 0) / 3600.0 as used_core_hours,
0845 COALESCE(SUM(
0846 EXTRACT(EPOCH FROM ("endtime" - "starttime"))
0847 ), 0) / 3600.0 as wall_hours
0848 """
0849
0850 def _run(group_col=None):
0851 select = f'"{group_col}", {agg_cols}' if group_col else agg_cols
0852 group = f'GROUP BY "{group_col}" ORDER BY allocated_core_hours DESC' if group_col else ''
0853 sql = f"""
0854 SELECT {select} FROM (
0855 SELECT {inner_fields}
0856 FROM "{PANDA_SCHEMA}"."jobsactive4" WHERE {base_where}
0857 UNION ALL
0858 SELECT {inner_fields}
0859 FROM "{PANDA_SCHEMA}"."jobsarchived4" WHERE {base_where}
0860 ) combined
0861 {group}
0862 """
0863 with conn.cursor() as cursor:
0864 cursor.execute(sql, base_params + base_params)
0865 return cursor.fetchall()
0866
0867 def _parse(row, offset=0):
0868 return {
0869 'job_count': row[offset],
0870 'allocated_core_hours': round(float(row[offset + 1]), 1),
0871 'used_core_hours': round(float(row[offset + 2]), 1),
0872 'wall_hours': round(float(row[offset + 3]), 1),
0873 }
0874
0875 try:
0876 rows = _run()
0877 totals = _parse(rows[0]) if rows else {
0878 'job_count': 0, 'allocated_core_hours': 0,
0879 'used_core_hours': 0, 'wall_hours': 0,
0880 }
0881
0882 by_site = []
0883 for row in _run('computingsite'):
0884 entry = _parse(row, offset=1)
0885 entry['site'] = row[0]
0886 by_site.append(entry)
0887
0888 by_user = []
0889 for row in _run('produsername'):
0890 entry = _parse(row, offset=1)
0891 entry['user'] = row[0]
0892 by_user.append(entry)
0893
0894 except Exception as e:
0895 logger.error(f"resource_usage query failed: {e}")
0896 return {"error": str(e)}
0897
0898 return {
0899 "totals": totals,
0900 "by_site": by_site,
0901 "by_user": by_user,
0902 "filters": {
0903 "days": days,
0904 "site": site,
0905 "username": username,
0906 "taskid": taskid,
0907 },
0908 }
0909
0910
0911 def study_job(pandaid):
0912 """Deep study of a single PanDA job — full record, files, harvester logs, errors."""
0913 conn = connections['panda']
0914
0915
0916 field_list = ', '.join(f'"{f}"' for f in STUDY_FIELDS)
0917 job_sql = f"""
0918 SELECT {field_list} FROM "{PANDA_SCHEMA}"."jobsactive4" WHERE "pandaid" = %s
0919 UNION ALL
0920 SELECT {field_list} FROM "{PANDA_SCHEMA}"."jobsarchived4" WHERE "pandaid" = %s
0921 """
0922
0923 try:
0924 with conn.cursor() as cursor:
0925 cursor.execute(job_sql, [pandaid, pandaid])
0926 row = cursor.fetchone()
0927 except Exception as e:
0928 logger.error(f"study_job query failed: {e}")
0929 return {"error": str(e)}
0930
0931 if not row:
0932 return {"error": f"Job {pandaid} not found"}
0933
0934 job = row_to_dict(row, STUDY_FIELDS)
0935 job['errors'] = extract_errors(job)
0936
0937
0938 job = {k: v for k, v in job.items() if v is not None}
0939
0940
0941 log_urls = {}
0942 pilotid = job.get('pilotid', '')
0943 if pilotid and '|' in pilotid:
0944 parts = pilotid.split('|')
0945 stdout_url = parts[0]
0946 log_urls['pilot_stdout'] = stdout_url
0947 log_urls['pilot_stderr'] = stdout_url.replace('.out', '.err')
0948 log_urls['batch_log'] = stdout_url.replace('.out', '.log')
0949 if len(parts) >= 4:
0950 job['pilot_type'] = parts[1]
0951 job['pilot_version'] = parts[3]
0952
0953
0954
0955 site = job.get('computingsite') or ''
0956 if site.startswith('NERSC_Perlmutter'):
0957 portal_urls = _nersc_portal_log_urls(site, pandaid)
0958 if portal_urls:
0959
0960
0961 log_urls.pop('pilot_stderr', None)
0962 log_urls.pop('batch_log', None)
0963 log_urls.update(portal_urls)
0964
0965
0966 file_field_list = ', '.join(f'"{f}"' for f in FILE_FIELDS)
0967 files_sql = f"""
0968 SELECT {file_field_list}
0969 FROM "{PANDA_SCHEMA}"."filestable4"
0970 WHERE "pandaid" = %s
0971 ORDER BY "type", "lfn"
0972 """
0973
0974 files = []
0975 log_file = None
0976 try:
0977 with conn.cursor() as cursor:
0978 cursor.execute(files_sql, [pandaid])
0979 for frow in cursor.fetchall():
0980 fd = row_to_dict(frow, FILE_FIELDS)
0981 fd = {k: v for k, v in fd.items() if v is not None}
0982 files.append(fd)
0983 if fd.get('type') == 'log':
0984 log_file = fd
0985 except Exception as e:
0986 logger.error(f"study_job files query failed: {e}")
0987
0988
0989
0990 harvester = None
0991 harvester_sql = f"""
0992 SELECT w."workerid", w."harvesterid", w."stdout", w."stderr", w."batchlog",
0993 w."errorcode", w."diagmessage", w."status"
0994 FROM "{PANDA_SCHEMA}"."harvester_workers" w
0995 JOIN "{PANDA_SCHEMA}"."harvester_rel_jobs_workers" r
0996 ON w."workerid" = r."workerid" AND w."harvesterid" = r."harvesterid"
0997 WHERE r."pandaid" = %s
0998 """
0999
1000 try:
1001 with conn.cursor() as cursor:
1002 cursor.execute(harvester_sql, [pandaid])
1003 hrow = cursor.fetchone()
1004 if hrow:
1005 hcols = ['workerid', 'harvesterid', 'stdout', 'stderr', 'batchlog',
1006 'errorcode', 'diagmessage', 'status']
1007 harvester = row_to_dict(hrow, hcols)
1008 harvester = {k: v for k, v in harvester.items() if v is not None}
1009
1010 if harvester.get('stdout'):
1011 log_urls['pilot_stdout'] = harvester['stdout']
1012 if harvester.get('stderr'):
1013 log_urls['pilot_stderr'] = harvester['stderr']
1014 if harvester.get('batchlog'):
1015 log_urls['batch_log'] = harvester['batchlog']
1016 except Exception as e:
1017 logger.error(f"study_job harvester query failed: {e}")
1018
1019
1020
1021 task_info = None
1022 jeditaskid = job.get('jeditaskid')
1023 if jeditaskid:
1024 task_sql = f"""
1025 SELECT "jeditaskid", "taskname", "status", "username", "errordialog",
1026 "failurerate", "workinggroup"
1027 FROM "{PANDA_SCHEMA}"."jedi_tasks"
1028 WHERE "jeditaskid" = %s
1029 """
1030 try:
1031 with conn.cursor() as cursor:
1032 cursor.execute(task_sql, [jeditaskid])
1033 trow = cursor.fetchone()
1034 if trow:
1035 tcols = ['jeditaskid', 'taskname', 'status', 'username',
1036 'errordialog', 'failurerate', 'workinggroup']
1037 task_info = row_to_dict(trow, tcols)
1038 task_info = {k: v for k, v in task_info.items() if v is not None}
1039 except Exception as e:
1040 logger.error(f"study_job task query failed: {e}")
1041
1042
1043 result = {
1044 "pandaid": pandaid,
1045 "job": job,
1046 "files": files,
1047 "log_urls": log_urls,
1048 }
1049
1050 if log_file:
1051 result["log_file"] = log_file
1052
1053 if harvester:
1054 result["harvester"] = harvester
1055
1056 if task_info:
1057 result["task"] = task_info
1058
1059
1060 result["monitor_url"] = f"https://epic-devcloud.org/panda/jobs/{pandaid}/"
1061
1062
1063
1064
1065 jobstatus = job.get('jobstatus', '')
1066 if jobstatus in ('failed', 'holding', 'cancelled', 'closed'):
1067 try:
1068 from askpanda_atlas.log_analysis_impl import (
1069 _select_log_filename, _fetch_log_text,
1070 extract_log_excerpt, classify_failure,
1071 )
1072 from decouple import config
1073 base_url = config('PANDA_BASE_URL', default='https://pandamon01.sdcc.bnl.gov')
1074 log_filename = _select_log_filename(job)
1075 log_text = _fetch_log_text(pandaid, log_filename, base_url, timeout=30)
1076 log_source = 'filebrowser'
1077
1078
1079
1080 if not log_text:
1081 direct_url = (
1082 log_urls.get('pilot_stdout')
1083 or (harvester or {}).get('stdout')
1084 )
1085 if direct_url:
1086 import requests as _requests
1087 try:
1088 resp = _requests.get(
1089 direct_url, timeout=30, verify=False,
1090 )
1091 if resp.status_code == 200 and resp.text:
1092 log_text = resp.text
1093 log_source = (
1094 'nersc_portal'
1095 if 'portal.nersc.gov' in direct_url
1096 else 'harvester'
1097 )
1098 except Exception as exc:
1099 logger.warning("Direct log fetch failed: %s", exc)
1100
1101 if log_text:
1102 piloterrorcode = int(job.get('piloterrorcode') or 0)
1103 piloterrordiag = str(job.get('piloterrordiag') or '')
1104 excerpt = extract_log_excerpt(
1105 log_text, log_filename, piloterrorcode, piloterrordiag
1106 )
1107 failure_type = classify_failure(job, excerpt)
1108 result['log_analysis'] = {
1109 'failure_type': failure_type,
1110 'log_filename': log_filename,
1111 'log_source': log_source,
1112 'log_excerpt': excerpt,
1113 'log_available': True,
1114 }
1115 else:
1116 result['log_analysis'] = {
1117 'log_available': False,
1118 'log_filename': log_filename,
1119 }
1120 except Exception as e:
1121 logger.error(f"study_job log analysis failed: {e}")
1122 result['log_analysis'] = {'error': str(e)}
1123
1124 return result
1125
1126
1127
1128
1129
1130 JOB_ORDER_COLUMNS = {f: f'"{f}"' for f in LIST_FIELDS}
1131 TASK_ORDER_COLUMNS = {f: f'"{f}"' for f in TASK_LIST_FIELDS}
1132
1133
1134 JOB_SEARCH_FIELDS = ['pandaid', 'jeditaskid', 'produsername', 'jobstatus',
1135 'computingsite', 'transformation']
1136 TASK_SEARCH_FIELDS = ['jeditaskid', 'taskname', 'status', 'username',
1137 'workinggroup', 'transpath']
1138
1139
1140 def list_jobs_dt(days=7, status=None, username=None, site=None,
1141 taskid=None, reqid=None,
1142 order_by='"pandaid" DESC', limit=100, offset=0, search=None):
1143 """List PanDA jobs for DataTables (returns rows, total, filtered counts)."""
1144 cutoff = timezone.now() - timedelta(days=days)
1145 where = ['"modificationtime" >= %s']
1146 params = [cutoff]
1147
1148 if status:
1149 where.append('"jobstatus" = %s')
1150 params.append(status)
1151 if username:
1152 clause, val = like_or_eq('produsername', username)
1153 where.append(clause)
1154 params.append(val)
1155 if site:
1156 clause, val = like_or_eq('computingsite', site)
1157 where.append(clause)
1158 params.append(val)
1159 if taskid:
1160 where.append('"jeditaskid" = %s')
1161 params.append(int(taskid))
1162 if reqid:
1163 where.append('"reqid" = %s')
1164 params.append(int(reqid))
1165
1166 conn = connections['panda']
1167
1168
1169 count_sql, count_params = build_union_count(where, params)
1170 try:
1171 with conn.cursor() as cursor:
1172 cursor.execute(count_sql, count_params)
1173 total = cursor.fetchone()[0]
1174 except Exception as e:
1175 logger.error(f"list_jobs_dt count failed: {e}")
1176 return [], 0, 0
1177
1178
1179 filtered_where = list(where)
1180 filtered_params = list(params)
1181 if search:
1182 search_clause, search_params = build_search_clauses(JOB_SEARCH_FIELDS, search)
1183 filtered_where.append(search_clause)
1184 filtered_params.extend(search_params)
1185
1186
1187 if search:
1188 fcount_sql, fcount_params = build_union_count(filtered_where, filtered_params)
1189 try:
1190 with conn.cursor() as cursor:
1191 cursor.execute(fcount_sql, fcount_params)
1192 filtered = cursor.fetchone()[0]
1193 except Exception as e:
1194 logger.error(f"list_jobs_dt filtered count failed: {e}")
1195 return [], total, 0
1196 else:
1197 filtered = total
1198
1199
1200 sql, full_params = build_union_query_dt(
1201 LIST_FIELDS, filtered_where, filtered_params,
1202 order_by=order_by, limit=limit, offset=offset,
1203 )
1204
1205 rows = []
1206 try:
1207 with conn.cursor() as cursor:
1208 cursor.execute(sql, full_params)
1209 for row in cursor.fetchall():
1210 rows.append(row_to_dict(row, LIST_FIELDS))
1211 except Exception as e:
1212 logger.error(f"list_jobs_dt query failed: {e}")
1213 return [], total, filtered
1214
1215 return rows, total, filtered
1216
1217
1218 def list_tasks_dt(days=7, status=None, username=None, taskname=None,
1219 workinggroup=None, order_by='"jeditaskid" DESC',
1220 limit=100, offset=0, search=None):
1221 """List JEDI tasks for DataTables (returns rows, total, filtered counts)."""
1222 cutoff = timezone.now() - timedelta(days=days)
1223 where = ['"modificationtime" >= %s']
1224 params = [cutoff]
1225
1226 _stale = _stale_task_filter()
1227 where.append(_stale['clause'])
1228 params.extend(_stale['params'])
1229
1230 if status:
1231 where.append('"status" = %s')
1232 params.append(status)
1233 if username:
1234 clause, val = like_or_eq('username', username)
1235 where.append(clause)
1236 params.append(val)
1237 if taskname:
1238 clause, val = like_or_eq('taskname', taskname)
1239 where.append(clause)
1240 params.append(val)
1241 if workinggroup:
1242 where.append('"workinggroup" = %s')
1243 params.append(workinggroup)
1244
1245 conn = connections['panda']
1246
1247
1248 count_sql, count_params = build_task_count(where, params)
1249 try:
1250 with conn.cursor() as cursor:
1251 cursor.execute(count_sql, count_params)
1252 total = cursor.fetchone()[0]
1253 except Exception as e:
1254 logger.error(f"list_tasks_dt count failed: {e}")
1255 return [], 0, 0
1256
1257
1258 filtered_where = list(where)
1259 filtered_params = list(params)
1260 if search:
1261 search_clause, search_params = build_search_clauses(TASK_SEARCH_FIELDS, search)
1262 filtered_where.append(search_clause)
1263 filtered_params.extend(search_params)
1264
1265
1266 if search:
1267 fcount_sql, fcount_params = build_task_count(filtered_where, filtered_params)
1268 try:
1269 with conn.cursor() as cursor:
1270 cursor.execute(fcount_sql, fcount_params)
1271 filtered = cursor.fetchone()[0]
1272 except Exception as e:
1273 logger.error(f"list_tasks_dt filtered count failed: {e}")
1274 return [], total, 0
1275 else:
1276 filtered = total
1277
1278
1279 sql, full_params = build_task_query_dt(
1280 TASK_LIST_FIELDS, filtered_where, filtered_params,
1281 order_by=order_by, limit=limit, offset=offset,
1282 )
1283
1284 rows = []
1285 try:
1286 with conn.cursor() as cursor:
1287 cursor.execute(sql, full_params)
1288 for row in cursor.fetchall():
1289 rows.append(row_to_dict(row, TASK_LIST_FIELDS))
1290 except Exception as e:
1291 logger.error(f"list_tasks_dt query failed: {e}")
1292 return [], total, filtered
1293
1294 return rows, total, filtered
1295
1296
1297 def job_filter_counts(days=7, status=None, username=None, site=None,
1298 taskid=None, reqid=None):
1299 """Get filter option counts for job list (status, user, site)."""
1300 cutoff = timezone.now() - timedelta(days=days)
1301 base_where = ['"modificationtime" >= %s']
1302 base_params = [cutoff]
1303
1304 if taskid:
1305 base_where.append('"jeditaskid" = %s')
1306 base_params.append(int(taskid))
1307 if reqid:
1308 base_where.append('"reqid" = %s')
1309 base_params.append(int(reqid))
1310
1311 conn = connections['panda']
1312 result = {}
1313
1314 filter_config = [
1315 ('jobstatus', 'status', status),
1316 ('produsername', 'username', username),
1317 ('computingsite', 'site', site),
1318 ]
1319
1320 for db_field, filter_name, current_value in filter_config:
1321
1322 where = list(base_where)
1323 params = list(base_params)
1324 for other_db_field, other_name, other_value in filter_config:
1325 if other_name != filter_name and other_value:
1326 clause, val = like_or_eq(other_db_field, other_value)
1327 where.append(clause)
1328 params.append(val)
1329
1330 sql, full_params = build_union_count_by_field(db_field, where, params)
1331 try:
1332 with conn.cursor() as cursor:
1333 cursor.execute(sql, full_params)
1334 result[filter_name] = [(row[0], row[1]) for row in cursor.fetchall()]
1335 except Exception as e:
1336 logger.error(f"job_filter_counts {filter_name} failed: {e}")
1337 result[filter_name] = []
1338
1339 return result
1340
1341
1342 def task_filter_counts(days=7, status=None, username=None, workinggroup=None):
1343 """Get filter option counts for task list (status, username, workinggroup)."""
1344 cutoff = timezone.now() - timedelta(days=days)
1345 base_where = ['"modificationtime" >= %s']
1346 base_params = [cutoff]
1347
1348 _stale = _stale_task_filter()
1349 base_where.append(_stale['clause'])
1350 base_params.extend(_stale['params'])
1351
1352 conn = connections['panda']
1353 result = {}
1354
1355 filter_config = [
1356 ('status', 'status', status),
1357 ('username', 'username', username),
1358 ('workinggroup', 'workinggroup', workinggroup),
1359 ]
1360
1361 for db_field, filter_name, current_value in filter_config:
1362 where = list(base_where)
1363 params = list(base_params)
1364 for other_db_field, other_name, other_value in filter_config:
1365 if other_name != filter_name and other_value:
1366 where.append(f'"{other_db_field}" = %s')
1367 params.append(other_value)
1368
1369 sql, full_params = build_task_count_by_field(db_field, where, params)
1370 try:
1371 with conn.cursor() as cursor:
1372 cursor.execute(sql, full_params)
1373 result[filter_name] = [(row[0], row[1]) for row in cursor.fetchall()]
1374 except Exception as e:
1375 logger.error(f"task_filter_counts {filter_name} failed: {e}")
1376 result[filter_name] = []
1377
1378 return result
1379
1380
1381 def get_task(jeditaskid):
1382 """Get a single JEDI task record."""
1383 conn = connections['panda']
1384 field_list = ', '.join(f'"{f}"' for f in TASK_LIST_FIELDS)
1385 sql = f"""
1386 SELECT {field_list}
1387 FROM "{PANDA_SCHEMA}"."jedi_tasks"
1388 WHERE "jeditaskid" = %s
1389 """
1390 try:
1391 with conn.cursor() as cursor:
1392 cursor.execute(sql, [jeditaskid])
1393 row = cursor.fetchone()
1394 except Exception as e:
1395 logger.error(f"get_task query failed: {e}")
1396 return {"error": str(e)}
1397
1398 if not row:
1399 return {"error": f"Task {jeditaskid} not found"}
1400
1401 task = row_to_dict(row, TASK_LIST_FIELDS)
1402 return task