Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-27 07:41:42

0001 """
0002 PanDA Monitor MCP tools — thin wrappers over panda.queries.
0003 
0004 Each tool registers with the MCP server, provides an LLM-oriented docstring,
0005 and delegates to the synchronous query function via sync_to_async.
0006 """
0007 
0008 from asgiref.sync import sync_to_async
0009 from mcp_server import mcp_server as mcp
0010 from monitor_app.panda import queries
0011 
0012 
0013 @mcp.tool()
0014 async def panda_list_jobs(
0015     days: int = 7,
0016     status: str = None,
0017     username: str = None,
0018     site: str = None,
0019     taskid: int = None,
0020     reqid: int = None,
0021     limit: int = 200,
0022     before_id: int = None,
0023 ) -> dict:
0024     """
0025     List PanDA jobs from the ePIC production database with summary statistics.
0026 
0027     Returns jobs in reverse time order (newest first) with cursor-based pagination.
0028     Use before_id to page through results: pass the last pandaid from the previous
0029     call to get the next batch.
0030 
0031     For a quick overview without individual records, use panda_get_activity instead.
0032     For error diagnostics on failed jobs, use panda_diagnose_jobs instead.
0033 
0034     Args:
0035         days: Time window in days (default 7). Jobs with modificationtime within this window.
0036         status: Filter by jobstatus (e.g. 'failed', 'finished', 'running', 'activated').
0037         username: Filter by job owner (produsername). Supports SQL LIKE with %.
0038         site: Filter by computing site (computingsite). Supports SQL LIKE with %.
0039         taskid: Filter by JEDI task ID (jeditaskid).
0040         reqid: Filter by request ID.
0041         limit: Maximum jobs to return (default 200).
0042         before_id: Pagination cursor — return jobs with pandaid < this value.
0043 
0044     Returns:
0045         summary: Job counts by status for the full query (not just this page).
0046         jobs: List of job records with key fields. Each job includes
0047             destinationse — the destination storage element (Rucio SE) where
0048             output files are written, looked up from the files table.
0049         pagination: {before_id, has_more, next_before_id} for incremental pulling.
0050         total_in_window: Total jobs matching filters in the time window.
0051     """
0052     return await sync_to_async(queries.list_jobs)(
0053         days=days, status=status, username=username, site=site,
0054         taskid=taskid, reqid=reqid, limit=limit, before_id=before_id,
0055     )
0056 
0057 
0058 @mcp.tool()
0059 async def panda_diagnose_jobs(
0060     days: int = 7,
0061     username: str = None,
0062     site: str = None,
0063     taskid: int = None,
0064     reqid: int = None,
0065     error_component: str = None,
0066     limit: int = 500,
0067     before_id: int = None,
0068 ) -> dict:
0069     """
0070     Diagnose failed and faulty PanDA jobs with full error details.
0071 
0072     Pulls only jobs in failed/cancelled/closed status with non-zero error codes.
0073     Returns all 7 error component fields (pilot, executor, DDM, brokerage,
0074     dispatcher, supervisor, taskbuffer) plus transformation exit code, distilled
0075     into a structured errors list per job.
0076 
0077     Use this after panda_list_jobs shows failures you want to understand.
0078 
0079     Args:
0080         days: Time window in days (default 7).
0081         username: Filter by job owner (produsername). Supports SQL LIKE with %.
0082         site: Filter by computing site. Supports SQL LIKE with %.
0083         taskid: Filter by JEDI task ID.
0084         reqid: Filter by request ID.
0085         error_component: Filter to jobs with errors in this component
0086                          (pilot, executor, ddm, brokerage, dispatcher, supervisor, taskbuffer).
0087         limit: Maximum jobs to return (default 500).
0088         before_id: Pagination cursor — return jobs with pandaid < this value.
0089 
0090     Returns:
0091         error_summary: Counts by error component and top error codes.
0092         jobs: Failed jobs with full error details and structured errors list.
0093         pagination: {before_id, has_more, next_before_id} for incremental pulling.
0094     """
0095     return await sync_to_async(queries.diagnose_jobs)(
0096         days=days, username=username, site=site, taskid=taskid,
0097         reqid=reqid, error_component=error_component,
0098         limit=limit, before_id=before_id,
0099     )
0100 
0101 
0102 @mcp.tool()
0103 async def panda_list_tasks(
0104     days: int = 7,
0105     status: str = None,
0106     username: str = None,
0107     taskname: str = None,
0108     reqid: int = None,
0109     workinggroup: str = None,
0110     taskid: int = None,
0111     processingtype: str = None,
0112     limit: int = 25,
0113     before_id: int = None,
0114 ) -> dict:
0115     """
0116     List JEDI tasks from the ePIC production database with summary statistics.
0117 
0118     Tasks are higher-level units than jobs — each task spawns one or more jobs.
0119     Returns tasks in reverse ID order (newest first) with cursor-based pagination.
0120 
0121     Args:
0122         days: Time window in days (default 7). Tasks with modificationtime within this window.
0123         status: Filter by task status (e.g. 'done', 'failed', 'running', 'ready', 'broken', 'aborted').
0124         username: Filter by task owner. Supports SQL LIKE with %.
0125         taskname: Filter by task name. Supports SQL LIKE with %.
0126         reqid: Filter by request ID.
0127         workinggroup: Filter by working group (e.g. 'EIC', 'Rubin'). NULL for iDDS automation tasks.
0128         taskid: Filter by specific JEDI task ID (jeditaskid).
0129         processingtype: Filter by processing type (e.g. 'epicproduction'). Supports SQL LIKE with %.
0130         limit: Maximum tasks to return (default 25).
0131         before_id: Pagination cursor — return tasks with jeditaskid < this value.
0132 
0133     Returns:
0134         summary: Task counts by status for the full query (not just this page).
0135         tasks: List of task records with key fields.
0136         pagination: {before_id, has_more, next_before_id} for incremental pulling.
0137         total_in_window: Total tasks matching filters in the time window.
0138     """
0139     return await sync_to_async(queries.list_tasks)(
0140         days=days, status=status, username=username, taskname=taskname,
0141         reqid=reqid, workinggroup=workinggroup, taskid=taskid,
0142         processingtype=processingtype, limit=limit, before_id=before_id,
0143     )
0144 
0145 
0146 @mcp.tool()
0147 async def panda_error_summary(
0148     days: int = 10,
0149     username: str = None,
0150     site: str = None,
0151     destinationse: str = None,
0152     taskid: int = None,
0153     error_source: str = None,
0154     limit: int = 20,
0155 ) -> dict:
0156     """
0157     Aggregate error summary across failed PanDA jobs, ranked by frequency.
0158 
0159     Extracts non-zero errors from all 7 error components (pilot, executor, DDM,
0160     brokerage, dispatcher, supervisor, taskbuffer) across failed/cancelled/closed
0161     jobs, groups by (component, code, diagnostic), and ranks by occurrence count.
0162 
0163     Unlike panda_diagnose_jobs (per-job detail), this tool gives the big picture:
0164     "What are the most common errors and who do they affect?"
0165 
0166     Args:
0167         days: Time window in days (default 10).
0168         username: Filter by job owner (produsername). Supports SQL LIKE with %.
0169         site: Filter by computing site (computingsite). Supports SQL LIKE with %.
0170         destinationse: Filter by destination storage element — the Rucio SE where
0171             output files are written, located at a site. Supports SQL LIKE with %.
0172         taskid: Filter by JEDI task ID.
0173         error_source: Filter to errors from one component
0174                       (pilot, executor, ddm, brokerage, dispatcher, supervisor, taskbuffer).
0175         limit: Maximum error patterns to return (default 20).
0176 
0177     Returns:
0178         total_errors: Total error occurrences across all components.
0179         errors: Ranked list of error patterns, each with:
0180             error_source, error_code, error_diag, count,
0181             task_count, users, sites, destination_sites.
0182     """
0183     return await sync_to_async(queries.error_summary)(
0184         days=days, username=username, site=site,
0185         destinationse=destinationse,
0186         taskid=taskid, error_source=error_source, limit=limit,
0187     )
0188 
0189 
0190 @mcp.tool()
0191 async def panda_get_activity(
0192     days: int = 1,
0193     username: str = None,
0194     site: str = None,
0195     workinggroup: str = None,
0196 ) -> dict:
0197     """
0198     Pre-digested overview of PanDA activity. No individual job/task records.
0199 
0200     Use this first to answer "What is PanDA doing?" before drilling into
0201     panda_list_jobs or panda_list_tasks for individual records.
0202 
0203     Args:
0204         days: Time window in days (default 1).
0205         username: Filter by job owner (produsername). Supports SQL LIKE with %.
0206         site: Filter by computing site (computingsite). Supports SQL LIKE with %.
0207         workinggroup: Filter tasks by working group (e.g. 'EIC').
0208 
0209     Returns:
0210         jobs: {total, by_status, by_user, by_site} — aggregate counts only.
0211         tasks: {total, by_status, by_user} — aggregate counts only.
0212         filters: Applied filter values.
0213     """
0214     return await sync_to_async(queries.get_activity)(
0215         days=days, username=username, site=site, workinggroup=workinggroup,
0216     )
0217 
0218 
0219 @mcp.tool()
0220 async def panda_list_queues(
0221     vo: str = None,
0222     status: str = None,
0223     state: str = None,
0224     search: str = None,
0225 ) -> dict:
0226     """
0227     List PanDA compute queues with configuration summary.
0228 
0229     Shows available queues from the PanDA schedconfig registry. Each queue
0230     represents a compute endpoint where jobs can be submitted.
0231 
0232     Args:
0233         vo: Filter by Virtual Organisation (e.g. 'eic', 'atlas', 'osg', 'lsst').
0234             Use 'eic' for ePIC experiment queues.
0235         status: Filter by queue status (e.g. 'online', 'brokeroff', 'offline').
0236         state: Filter by queue state (e.g. 'ACTIVE').
0237         search: Search queue name (case-insensitive, supports partial match).
0238                 Example: 'Perlmutter' to find all NERSC Perlmutter queues.
0239 
0240     Returns:
0241         queues: List of queue summaries with status, VO, resource type, region, etc.
0242         count: Number of queues matching filters.
0243     """
0244     return await sync_to_async(queries.list_queues)(
0245         vo=vo, status=status, state=state, search=search,
0246     )
0247 
0248 
0249 @mcp.tool()
0250 async def panda_get_queue(
0251     panda_queue: str,
0252 ) -> dict:
0253     """
0254     Get full configuration for a single PanDA queue.
0255 
0256     Returns the complete schedconfig for a queue including container options,
0257     copy tools, storage endpoints, CE endpoints, resource limits, and all
0258     operational parameters.
0259 
0260     Args:
0261         panda_queue: The queue name (e.g. 'NERSC_Perlmutter_epic').
0262 
0263     Returns:
0264         queue: Full configuration dict with all parameters.
0265     """
0266     return await sync_to_async(queries.get_queue)(panda_queue=panda_queue)
0267 
0268 
0269 @mcp.tool()
0270 async def panda_resource_usage(
0271     days: int = 30,
0272     site: str = None,
0273     username: str = None,
0274     taskid: int = None,
0275 ) -> dict:
0276     """
0277     Aggregate resource usage (core-hours) for finished PanDA jobs.
0278 
0279     Reports two core-hour metrics:
0280     - allocated_core_hours: cores reserved × wall time (what the facility charges)
0281     - used_core_hours: CPU time actually consumed by the job
0282 
0283     The gap between allocated and used reflects efficiency — e.g. a job that
0284     requests 1 core but gets 2 allocated uses ~50% of its allocation.
0285 
0286     Only counts finished jobs with actual runtime (starttime and endtime set).
0287     Queue/waiting time is excluded.
0288 
0289     Args:
0290         days: Time window in days (default 30).
0291         site: Filter by computing site (computingsite). Supports SQL LIKE with %.
0292               Example: 'NERSC_Perlmutter%' for all Perlmutter queues.
0293         username: Filter by job owner (produsername). Supports SQL LIKE with %.
0294         taskid: Filter by JEDI task ID.
0295 
0296     Returns:
0297         totals: {job_count, allocated_core_hours, used_core_hours, wall_hours}
0298         by_site: Breakdown by computing site, sorted by allocated_core_hours.
0299         by_user: Breakdown by job owner, sorted by allocated_core_hours.
0300     """
0301     return await sync_to_async(queries.resource_usage)(
0302         days=days, site=site, username=username, taskid=taskid,
0303     )
0304 
0305 
0306 @mcp.tool()
0307 async def panda_study_job(
0308     pandaid: int,
0309 ) -> dict:
0310     """
0311     Deep study of a single PanDA job — full record, files, errors, log URLs.
0312 
0313     Gathers everything available from the database for a single job:
0314     - Full job record with all error fields and resource usage
0315     - Associated files from filestable4 (input, output, log)
0316     - Harvester worker info with condor log URLs
0317     - Parent task context (name, status, error dialog)
0318     - Structured error extraction across all 7 components
0319 
0320     Use this after panda_diagnose_jobs identifies a failed job you want to
0321     understand in detail. Returns log URLs for manual inspection even when
0322     programmatic log retrieval is not yet available.
0323 
0324     Args:
0325         pandaid: The PanDA job ID to study (required).
0326 
0327     Returns:
0328         job: Full job record (null fields stripped) with structured errors list.
0329         files: All associated files (log, output, input) with lfn, guid, scope, status.
0330         log_urls: URLs for pilot stdout, stderr, batch log (require CILogon auth).
0331         log_file: Log tarball metadata if registered (lfn, guid, scope for rucio retrieval).
0332         harvester: Condor worker details if available.
0333         task: Parent JEDI task context.
0334         monitor_url: Link to PanDA monitoring page.
0335     """
0336     return await sync_to_async(queries.study_job)(pandaid=pandaid)
0337 
0338 
0339 @mcp.tool()
0340 async def panda_harvester_workers(
0341     site: str | None = None,
0342     hours: int = 1,
0343 ) -> dict:
0344     """
0345     Live Harvester pilot/worker counts across EIC compute queues.
0346 
0347     Shows how many pilots are running, submitted, finished, etc. at each site.
0348     Useful for checking if Perlmutter or other sites are actively processing.
0349 
0350     Args:
0351         site: Filter to a specific queue (e.g. 'NERSC_Perlmutter_epic'). Default: all sites.
0352         hours: Time window in hours to look back (default 1).
0353 
0354     Returns:
0355         nworkers_total: Grand total across all statuses.
0356         nworkers_by_status: Counts by worker status (running, submitted, finished, etc.).
0357         nworkers_by_site: Counts by computing site.
0358         pivot: Breakdown by status × jobtype × resourcetype.
0359     """
0360     from datetime import datetime, timedelta, timezone
0361     from askpanda_atlas.harvester_worker_impl import fetch_worker_stats
0362     from decouple import config
0363 
0364     base_url = config('PANDA_BASE_URL', default='https://pandamon01.sdcc.bnl.gov')
0365     now = datetime.now(timezone.utc)
0366     from_dt = (now - timedelta(hours=hours)).isoformat()
0367     to_dt = now.isoformat()
0368     raw = await sync_to_async(fetch_worker_stats)(
0369         base_url, from_dt, to_dt, site=site,
0370     )
0371     if raw.get('error'):
0372         return {"error": raw['error']}
0373     return {
0374         "summary": (
0375             f"{raw.get('nworkers_total', 0)} pilots total"
0376             + (f" at {site}" if site else " across all EIC sites")
0377             + f" (last {hours}h)"
0378         ),
0379         "by_status": raw.get('nworkers_by_status', {}),
0380         "by_site": raw.get('nworkers_by_site', {}),
0381         "by_resourcetype": raw.get('nworkers_by_resourcetype', {}),
0382         "time_window": {"from": from_dt, "to": to_dt},
0383     }