File indexing completed on 2026-04-27 07:41:42
0001 """
0002 PanDA Monitor MCP tools — thin wrappers over panda.queries.
0003
0004 Each tool registers with the MCP server, provides an LLM-oriented docstring,
0005 and delegates to the synchronous query function via sync_to_async.
0006 """
0007
0008 from asgiref.sync import sync_to_async
0009 from mcp_server import mcp_server as mcp
0010 from monitor_app.panda import queries
0011
0012
0013 @mcp.tool()
0014 async def panda_list_jobs(
0015 days: int = 7,
0016 status: str = None,
0017 username: str = None,
0018 site: str = None,
0019 taskid: int = None,
0020 reqid: int = None,
0021 limit: int = 200,
0022 before_id: int = None,
0023 ) -> dict:
0024 """
0025 List PanDA jobs from the ePIC production database with summary statistics.
0026
0027 Returns jobs in reverse time order (newest first) with cursor-based pagination.
0028 Use before_id to page through results: pass the last pandaid from the previous
0029 call to get the next batch.
0030
0031 For a quick overview without individual records, use panda_get_activity instead.
0032 For error diagnostics on failed jobs, use panda_diagnose_jobs instead.
0033
0034 Args:
0035 days: Time window in days (default 7). Jobs with modificationtime within this window.
0036 status: Filter by jobstatus (e.g. 'failed', 'finished', 'running', 'activated').
0037 username: Filter by job owner (produsername). Supports SQL LIKE with %.
0038 site: Filter by computing site (computingsite). Supports SQL LIKE with %.
0039 taskid: Filter by JEDI task ID (jeditaskid).
0040 reqid: Filter by request ID.
0041 limit: Maximum jobs to return (default 200).
0042 before_id: Pagination cursor — return jobs with pandaid < this value.
0043
0044 Returns:
0045 summary: Job counts by status for the full query (not just this page).
0046 jobs: List of job records with key fields. Each job includes
0047 destinationse — the destination storage element (Rucio SE) where
0048 output files are written, looked up from the files table.
0049 pagination: {before_id, has_more, next_before_id} for incremental pulling.
0050 total_in_window: Total jobs matching filters in the time window.
0051 """
0052 return await sync_to_async(queries.list_jobs)(
0053 days=days, status=status, username=username, site=site,
0054 taskid=taskid, reqid=reqid, limit=limit, before_id=before_id,
0055 )
0056
0057
0058 @mcp.tool()
0059 async def panda_diagnose_jobs(
0060 days: int = 7,
0061 username: str = None,
0062 site: str = None,
0063 taskid: int = None,
0064 reqid: int = None,
0065 error_component: str = None,
0066 limit: int = 500,
0067 before_id: int = None,
0068 ) -> dict:
0069 """
0070 Diagnose failed and faulty PanDA jobs with full error details.
0071
0072 Pulls only jobs in failed/cancelled/closed status with non-zero error codes.
0073 Returns all 7 error component fields (pilot, executor, DDM, brokerage,
0074 dispatcher, supervisor, taskbuffer) plus transformation exit code, distilled
0075 into a structured errors list per job.
0076
0077 Use this after panda_list_jobs shows failures you want to understand.
0078
0079 Args:
0080 days: Time window in days (default 7).
0081 username: Filter by job owner (produsername). Supports SQL LIKE with %.
0082 site: Filter by computing site. Supports SQL LIKE with %.
0083 taskid: Filter by JEDI task ID.
0084 reqid: Filter by request ID.
0085 error_component: Filter to jobs with errors in this component
0086 (pilot, executor, ddm, brokerage, dispatcher, supervisor, taskbuffer).
0087 limit: Maximum jobs to return (default 500).
0088 before_id: Pagination cursor — return jobs with pandaid < this value.
0089
0090 Returns:
0091 error_summary: Counts by error component and top error codes.
0092 jobs: Failed jobs with full error details and structured errors list.
0093 pagination: {before_id, has_more, next_before_id} for incremental pulling.
0094 """
0095 return await sync_to_async(queries.diagnose_jobs)(
0096 days=days, username=username, site=site, taskid=taskid,
0097 reqid=reqid, error_component=error_component,
0098 limit=limit, before_id=before_id,
0099 )
0100
0101
0102 @mcp.tool()
0103 async def panda_list_tasks(
0104 days: int = 7,
0105 status: str = None,
0106 username: str = None,
0107 taskname: str = None,
0108 reqid: int = None,
0109 workinggroup: str = None,
0110 taskid: int = None,
0111 processingtype: str = None,
0112 limit: int = 25,
0113 before_id: int = None,
0114 ) -> dict:
0115 """
0116 List JEDI tasks from the ePIC production database with summary statistics.
0117
0118 Tasks are higher-level units than jobs — each task spawns one or more jobs.
0119 Returns tasks in reverse ID order (newest first) with cursor-based pagination.
0120
0121 Args:
0122 days: Time window in days (default 7). Tasks with modificationtime within this window.
0123 status: Filter by task status (e.g. 'done', 'failed', 'running', 'ready', 'broken', 'aborted').
0124 username: Filter by task owner. Supports SQL LIKE with %.
0125 taskname: Filter by task name. Supports SQL LIKE with %.
0126 reqid: Filter by request ID.
0127 workinggroup: Filter by working group (e.g. 'EIC', 'Rubin'). NULL for iDDS automation tasks.
0128 taskid: Filter by specific JEDI task ID (jeditaskid).
0129 processingtype: Filter by processing type (e.g. 'epicproduction'). Supports SQL LIKE with %.
0130 limit: Maximum tasks to return (default 25).
0131 before_id: Pagination cursor — return tasks with jeditaskid < this value.
0132
0133 Returns:
0134 summary: Task counts by status for the full query (not just this page).
0135 tasks: List of task records with key fields.
0136 pagination: {before_id, has_more, next_before_id} for incremental pulling.
0137 total_in_window: Total tasks matching filters in the time window.
0138 """
0139 return await sync_to_async(queries.list_tasks)(
0140 days=days, status=status, username=username, taskname=taskname,
0141 reqid=reqid, workinggroup=workinggroup, taskid=taskid,
0142 processingtype=processingtype, limit=limit, before_id=before_id,
0143 )
0144
0145
0146 @mcp.tool()
0147 async def panda_error_summary(
0148 days: int = 10,
0149 username: str = None,
0150 site: str = None,
0151 destinationse: str = None,
0152 taskid: int = None,
0153 error_source: str = None,
0154 limit: int = 20,
0155 ) -> dict:
0156 """
0157 Aggregate error summary across failed PanDA jobs, ranked by frequency.
0158
0159 Extracts non-zero errors from all 7 error components (pilot, executor, DDM,
0160 brokerage, dispatcher, supervisor, taskbuffer) across failed/cancelled/closed
0161 jobs, groups by (component, code, diagnostic), and ranks by occurrence count.
0162
0163 Unlike panda_diagnose_jobs (per-job detail), this tool gives the big picture:
0164 "What are the most common errors and who do they affect?"
0165
0166 Args:
0167 days: Time window in days (default 10).
0168 username: Filter by job owner (produsername). Supports SQL LIKE with %.
0169 site: Filter by computing site (computingsite). Supports SQL LIKE with %.
0170 destinationse: Filter by destination storage element — the Rucio SE where
0171 output files are written, located at a site. Supports SQL LIKE with %.
0172 taskid: Filter by JEDI task ID.
0173 error_source: Filter to errors from one component
0174 (pilot, executor, ddm, brokerage, dispatcher, supervisor, taskbuffer).
0175 limit: Maximum error patterns to return (default 20).
0176
0177 Returns:
0178 total_errors: Total error occurrences across all components.
0179 errors: Ranked list of error patterns, each with:
0180 error_source, error_code, error_diag, count,
0181 task_count, users, sites, destination_sites.
0182 """
0183 return await sync_to_async(queries.error_summary)(
0184 days=days, username=username, site=site,
0185 destinationse=destinationse,
0186 taskid=taskid, error_source=error_source, limit=limit,
0187 )
0188
0189
0190 @mcp.tool()
0191 async def panda_get_activity(
0192 days: int = 1,
0193 username: str = None,
0194 site: str = None,
0195 workinggroup: str = None,
0196 ) -> dict:
0197 """
0198 Pre-digested overview of PanDA activity. No individual job/task records.
0199
0200 Use this first to answer "What is PanDA doing?" before drilling into
0201 panda_list_jobs or panda_list_tasks for individual records.
0202
0203 Args:
0204 days: Time window in days (default 1).
0205 username: Filter by job owner (produsername). Supports SQL LIKE with %.
0206 site: Filter by computing site (computingsite). Supports SQL LIKE with %.
0207 workinggroup: Filter tasks by working group (e.g. 'EIC').
0208
0209 Returns:
0210 jobs: {total, by_status, by_user, by_site} — aggregate counts only.
0211 tasks: {total, by_status, by_user} — aggregate counts only.
0212 filters: Applied filter values.
0213 """
0214 return await sync_to_async(queries.get_activity)(
0215 days=days, username=username, site=site, workinggroup=workinggroup,
0216 )
0217
0218
0219 @mcp.tool()
0220 async def panda_list_queues(
0221 vo: str = None,
0222 status: str = None,
0223 state: str = None,
0224 search: str = None,
0225 ) -> dict:
0226 """
0227 List PanDA compute queues with configuration summary.
0228
0229 Shows available queues from the PanDA schedconfig registry. Each queue
0230 represents a compute endpoint where jobs can be submitted.
0231
0232 Args:
0233 vo: Filter by Virtual Organisation (e.g. 'eic', 'atlas', 'osg', 'lsst').
0234 Use 'eic' for ePIC experiment queues.
0235 status: Filter by queue status (e.g. 'online', 'brokeroff', 'offline').
0236 state: Filter by queue state (e.g. 'ACTIVE').
0237 search: Search queue name (case-insensitive, supports partial match).
0238 Example: 'Perlmutter' to find all NERSC Perlmutter queues.
0239
0240 Returns:
0241 queues: List of queue summaries with status, VO, resource type, region, etc.
0242 count: Number of queues matching filters.
0243 """
0244 return await sync_to_async(queries.list_queues)(
0245 vo=vo, status=status, state=state, search=search,
0246 )
0247
0248
0249 @mcp.tool()
0250 async def panda_get_queue(
0251 panda_queue: str,
0252 ) -> dict:
0253 """
0254 Get full configuration for a single PanDA queue.
0255
0256 Returns the complete schedconfig for a queue including container options,
0257 copy tools, storage endpoints, CE endpoints, resource limits, and all
0258 operational parameters.
0259
0260 Args:
0261 panda_queue: The queue name (e.g. 'NERSC_Perlmutter_epic').
0262
0263 Returns:
0264 queue: Full configuration dict with all parameters.
0265 """
0266 return await sync_to_async(queries.get_queue)(panda_queue=panda_queue)
0267
0268
0269 @mcp.tool()
0270 async def panda_resource_usage(
0271 days: int = 30,
0272 site: str = None,
0273 username: str = None,
0274 taskid: int = None,
0275 ) -> dict:
0276 """
0277 Aggregate resource usage (core-hours) for finished PanDA jobs.
0278
0279 Reports two core-hour metrics:
0280 - allocated_core_hours: cores reserved × wall time (what the facility charges)
0281 - used_core_hours: CPU time actually consumed by the job
0282
0283 The gap between allocated and used reflects efficiency — e.g. a job that
0284 requests 1 core but gets 2 allocated uses ~50% of its allocation.
0285
0286 Only counts finished jobs with actual runtime (starttime and endtime set).
0287 Queue/waiting time is excluded.
0288
0289 Args:
0290 days: Time window in days (default 30).
0291 site: Filter by computing site (computingsite). Supports SQL LIKE with %.
0292 Example: 'NERSC_Perlmutter%' for all Perlmutter queues.
0293 username: Filter by job owner (produsername). Supports SQL LIKE with %.
0294 taskid: Filter by JEDI task ID.
0295
0296 Returns:
0297 totals: {job_count, allocated_core_hours, used_core_hours, wall_hours}
0298 by_site: Breakdown by computing site, sorted by allocated_core_hours.
0299 by_user: Breakdown by job owner, sorted by allocated_core_hours.
0300 """
0301 return await sync_to_async(queries.resource_usage)(
0302 days=days, site=site, username=username, taskid=taskid,
0303 )
0304
0305
0306 @mcp.tool()
0307 async def panda_study_job(
0308 pandaid: int,
0309 ) -> dict:
0310 """
0311 Deep study of a single PanDA job — full record, files, errors, log URLs.
0312
0313 Gathers everything available from the database for a single job:
0314 - Full job record with all error fields and resource usage
0315 - Associated files from filestable4 (input, output, log)
0316 - Harvester worker info with condor log URLs
0317 - Parent task context (name, status, error dialog)
0318 - Structured error extraction across all 7 components
0319
0320 Use this after panda_diagnose_jobs identifies a failed job you want to
0321 understand in detail. Returns log URLs for manual inspection even when
0322 programmatic log retrieval is not yet available.
0323
0324 Args:
0325 pandaid: The PanDA job ID to study (required).
0326
0327 Returns:
0328 job: Full job record (null fields stripped) with structured errors list.
0329 files: All associated files (log, output, input) with lfn, guid, scope, status.
0330 log_urls: URLs for pilot stdout, stderr, batch log (require CILogon auth).
0331 log_file: Log tarball metadata if registered (lfn, guid, scope for rucio retrieval).
0332 harvester: Condor worker details if available.
0333 task: Parent JEDI task context.
0334 monitor_url: Link to PanDA monitoring page.
0335 """
0336 return await sync_to_async(queries.study_job)(pandaid=pandaid)
0337
0338
0339 @mcp.tool()
0340 async def panda_harvester_workers(
0341 site: str | None = None,
0342 hours: int = 1,
0343 ) -> dict:
0344 """
0345 Live Harvester pilot/worker counts across EIC compute queues.
0346
0347 Shows how many pilots are running, submitted, finished, etc. at each site.
0348 Useful for checking if Perlmutter or other sites are actively processing.
0349
0350 Args:
0351 site: Filter to a specific queue (e.g. 'NERSC_Perlmutter_epic'). Default: all sites.
0352 hours: Time window in hours to look back (default 1).
0353
0354 Returns:
0355 nworkers_total: Grand total across all statuses.
0356 nworkers_by_status: Counts by worker status (running, submitted, finished, etc.).
0357 nworkers_by_site: Counts by computing site.
0358 pivot: Breakdown by status × jobtype × resourcetype.
0359 """
0360 from datetime import datetime, timedelta, timezone
0361 from askpanda_atlas.harvester_worker_impl import fetch_worker_stats
0362 from decouple import config
0363
0364 base_url = config('PANDA_BASE_URL', default='https://pandamon01.sdcc.bnl.gov')
0365 now = datetime.now(timezone.utc)
0366 from_dt = (now - timedelta(hours=hours)).isoformat()
0367 to_dt = now.isoformat()
0368 raw = await sync_to_async(fetch_worker_stats)(
0369 base_url, from_dt, to_dt, site=site,
0370 )
0371 if raw.get('error'):
0372 return {"error": raw['error']}
0373 return {
0374 "summary": (
0375 f"{raw.get('nworkers_total', 0)} pilots total"
0376 + (f" at {site}" if site else " across all EIC sites")
0377 + f" (last {hours}h)"
0378 ),
0379 "by_status": raw.get('nworkers_by_status', {}),
0380 "by_site": raw.get('nworkers_by_site', {}),
0381 "by_resourcetype": raw.get('nworkers_by_resourcetype', {}),
0382 "time_window": {"from": from_dt, "to": to_dt},
0383 }