File indexing completed on 2026-04-27 07:41:42
0001 """
0002 Workflow and data pipeline MCP tools.
0003
0004 Includes: workflow definitions, executions, messages, runs, STF files, TF slices,
0005 workflow control (start/stop), monitoring.
0006 """
0007
0008 import logging
0009 from datetime import timedelta
0010 from django.utils import timezone
0011 from django.db.models import Count
0012 from asgiref.sync import sync_to_async
0013
0014 from mcp_server import mcp_server as mcp
0015
0016 from ..models import Run, StfFile, TFSlice, AppLog, SystemAgent
0017 from ..workflow_models import WorkflowDefinition, WorkflowExecution, WorkflowMessage
0018 from .common import _parse_time, _default_start_time, _monitor_url, _get_testbed_config_path, _get_username
0019
0020 logger = logging.getLogger(__name__)
0021
0022
0023
0024
0025
0026
0027 @mcp.tool()
0028 async def swf_list_workflow_definitions(
0029 workflow_type: str = None,
0030 created_by: str = None,
0031 ) -> list:
0032 """
0033 List available workflow definitions that can be executed.
0034
0035 Workflow definitions describe the structure of a workflow (stages, agents needed).
0036 Common workflows include 'stf_datataking' for streaming data acquisition simulation.
0037
0038 Args:
0039 workflow_type: Filter by type (e.g., 'simulation', 'production')
0040 created_by: Filter by creator username
0041
0042 Returns list of definitions with: workflow_name, version, workflow_type,
0043 created_by, created_at, execution_count
0044 """
0045 @sync_to_async
0046 def fetch():
0047 qs = WorkflowDefinition.objects.annotate(
0048 execution_count=Count('executions')
0049 ).order_by('workflow_name', '-version')
0050
0051 if workflow_type:
0052 qs = qs.filter(workflow_type=workflow_type)
0053 if created_by:
0054 qs = qs.filter(created_by=created_by)
0055
0056 MAX_ITEMS = 100
0057 total_count = qs.count()
0058 items = [
0059 {
0060 "workflow_name": w.workflow_name,
0061 "version": w.version,
0062 "workflow_type": w.workflow_type,
0063 "created_by": w.created_by,
0064 "created_at": w.created_at.isoformat() if w.created_at else None,
0065 "execution_count": w.execution_count,
0066 }
0067 for w in qs[:MAX_ITEMS]
0068 ]
0069 return {
0070 "items": items,
0071 "total_count": total_count,
0072 "has_more": total_count > MAX_ITEMS,
0073 "monitor_urls": [
0074 {"title": "Workflow Definitions", "url": _monitor_url("/workflow-definitions/")},
0075 ],
0076 }
0077
0078 return await fetch()
0079
0080
0081
0082
0083
0084
0085 @mcp.tool()
0086 async def swf_list_workflow_executions(
0087 namespace: str = None,
0088 status: str = None,
0089 executed_by: str = None,
0090 workflow_name: str = None,
0091 currently_running: bool = False,
0092 start_time: str = None,
0093 end_time: str = None,
0094 ) -> list:
0095 """
0096 List workflow executions with filtering.
0097
0098 Args:
0099 namespace: Filter to executions in this namespace
0100 status: Filter by status: 'pending', 'running', 'completed', 'failed', 'terminated'
0101 executed_by: Filter by user who started the execution
0102 workflow_name: Filter by workflow definition name
0103 currently_running: If True, return all running executions (ignores date range)
0104 start_time: Filter executions started >= this ISO datetime (default: last 24 hours)
0105 end_time: Filter executions started <= this ISO datetime
0106
0107 Returns list of executions with: execution_id, workflow_name, namespace,
0108 status, executed_by, start_time, end_time, parameter_values
0109 """
0110 @sync_to_async
0111 def fetch():
0112 qs = WorkflowExecution.objects.select_related('workflow_definition').order_by('-start_time')
0113
0114 if namespace:
0115 qs = qs.filter(namespace=namespace)
0116 if currently_running:
0117 qs = qs.filter(status__iexact='running')
0118 elif status:
0119 qs = qs.filter(status__iexact=status)
0120 if executed_by:
0121 qs = qs.filter(executed_by=executed_by)
0122 if workflow_name:
0123 qs = qs.filter(workflow_definition__workflow_name=workflow_name)
0124
0125 if not currently_running:
0126 start = _parse_time(start_time) or _default_start_time(24)
0127 end = _parse_time(end_time)
0128 qs = qs.filter(start_time__gte=start)
0129 if end:
0130 qs = qs.filter(start_time__lte=end)
0131
0132 params = []
0133 if namespace:
0134 params.append(f"namespace={namespace}")
0135 if status:
0136 params.append(f"status={status}")
0137 if executed_by:
0138 params.append(f"executed_by={executed_by}")
0139 query_string = "&".join(params)
0140 url = _monitor_url(f"/workflow-executions/?{query_string}" if query_string else "/workflow-executions/")
0141
0142 MAX_ITEMS = 100
0143 total_count = qs.count()
0144 items = [
0145 {
0146 "execution_id": e.execution_id,
0147 "workflow_name": e.workflow_definition.workflow_name if e.workflow_definition else None,
0148 "namespace": e.namespace,
0149 "status": e.status,
0150 "executed_by": e.executed_by,
0151 "start_time": e.start_time.isoformat() if e.start_time else None,
0152 "end_time": e.end_time.isoformat() if e.end_time else None,
0153 "parameter_values": e.parameter_values,
0154 }
0155 for e in qs[:MAX_ITEMS]
0156 ]
0157 return {
0158 "items": items,
0159 "total_count": total_count,
0160 "has_more": total_count > MAX_ITEMS,
0161 "monitor_urls": [
0162 {"title": "Executions List", "url": url},
0163 ],
0164 }
0165
0166 return await fetch()
0167
0168
0169 @mcp.tool()
0170 async def swf_get_workflow_execution(execution_id: str) -> dict:
0171 """
0172 Get detailed information about a specific workflow execution.
0173
0174 Use swf_list_workflow_executions first to find execution IDs if needed.
0175
0176 Args:
0177 execution_id: The execution ID (e.g., 'stf_datataking-wenauseic-0042')
0178
0179 Returns: execution_id, workflow_name, namespace, status, executed_by,
0180 start_time, end_time, parameter_values, performance_metrics
0181 """
0182 @sync_to_async
0183 def fetch():
0184 try:
0185 e = WorkflowExecution.objects.select_related('workflow_definition').get(
0186 execution_id=execution_id
0187 )
0188 return {
0189 "execution_id": e.execution_id,
0190 "workflow_name": e.workflow_definition.workflow_name if e.workflow_definition else None,
0191 "namespace": e.namespace,
0192 "status": e.status,
0193 "executed_by": e.executed_by,
0194 "start_time": e.start_time.isoformat() if e.start_time else None,
0195 "end_time": e.end_time.isoformat() if e.end_time else None,
0196 "parameter_values": e.parameter_values,
0197 "performance_metrics": e.performance_metrics,
0198 "monitor_urls": [
0199 {"title": "Execution Detail", "url": _monitor_url(f"/workflow-executions/{e.execution_id}/")},
0200 ],
0201 }
0202 except WorkflowExecution.DoesNotExist:
0203 return {"error": f"Execution '{execution_id}' not found. Use swf_list_workflow_executions to see recent runs."}
0204
0205 return await fetch()
0206
0207
0208
0209
0210
0211
0212 @mcp.tool()
0213 async def swf_list_messages(
0214 namespace: str = None,
0215 execution_id: str = None,
0216 agent: str = None,
0217 message_type: str = None,
0218 start_time: str = None,
0219 end_time: str = None,
0220 ) -> list:
0221 """
0222 List workflow messages with filtering.
0223
0224 Messages are sent between agents during workflow execution. They record
0225 events like STF creation, processing completion, state transitions, etc.
0226
0227 DIAGNOSTIC USE CASES:
0228 - Track workflow progress: list_messages(execution_id='stf_datataking-user-0044')
0229 - See what an agent sent: list_messages(agent='daq_simulator-agent-user-123')
0230 - Debug message flow: list_messages(namespace='torre1', start_time='2025-01-13T11:00:00')
0231 - For workflow failures: use list_logs(level='ERROR') instead
0232
0233 Common message types: run_imminent, start_run, stf_gen, end_run, pause_run, resume_run
0234
0235 Args:
0236 namespace: Filter to messages from this namespace
0237 execution_id: Filter to messages for this workflow execution
0238 agent: Filter to messages from this sender agent
0239 message_type: Filter by type (e.g., 'stf_gen', 'start_run')
0240 start_time: Filter messages sent >= this ISO datetime (default: last 1 hour)
0241 end_time: Filter messages sent <= this ISO datetime
0242
0243 Returns list of messages (max 200) with: message_type, sender_agent, namespace,
0244 sent_at, execution_id, run_id, payload_summary
0245 """
0246 @sync_to_async
0247 def fetch():
0248 qs = WorkflowMessage.objects.order_by('-sent_at')
0249
0250 if namespace:
0251 qs = qs.filter(namespace=namespace)
0252 if execution_id:
0253 qs = qs.filter(execution_id=execution_id)
0254 if agent:
0255 qs = qs.filter(sender_agent=agent)
0256 if message_type:
0257 qs = qs.filter(message_type=message_type)
0258
0259 start = _parse_time(start_time) or _default_start_time(1)
0260 end = _parse_time(end_time)
0261 qs = qs.filter(sent_at__gte=start)
0262 if end:
0263 qs = qs.filter(sent_at__lte=end)
0264
0265 params = []
0266 if namespace:
0267 params.append(f"namespace={namespace}")
0268 if execution_id:
0269 params.append(f"execution_id={execution_id}")
0270 if message_type:
0271 params.append(f"message_type={message_type}")
0272 query_string = "&".join(params)
0273 url = _monitor_url(f"/workflow/messages/?{query_string}" if query_string else "/workflow/messages/")
0274
0275 MAX_ITEMS = 200
0276 total_count = qs.count()
0277 items = [
0278 {
0279 "message_type": m.message_type,
0280 "sender_agent": m.sender_agent,
0281 "namespace": m.namespace,
0282 "sent_at": m.sent_at.isoformat() if m.sent_at else None,
0283 "execution_id": m.execution_id,
0284 "run_id": m.run_id,
0285 "payload_summary": str(m.message_content)[:200] if m.message_content else None,
0286 }
0287 for m in qs[:MAX_ITEMS]
0288 ]
0289 return {
0290 "items": items,
0291 "total_count": total_count,
0292 "has_more": total_count > MAX_ITEMS,
0293 "monitor_urls": [
0294 {"title": "Messages List", "url": url},
0295 ],
0296 }
0297
0298 return await fetch()
0299
0300
0301
0302
0303
0304
0305 @mcp.tool()
0306 async def swf_list_runs(
0307 start_time: str = None,
0308 end_time: str = None,
0309 ) -> list:
0310 """
0311 List simulation runs with timing and file counts.
0312
0313 Runs represent data-taking periods in the ePIC detector system.
0314 Each run contains multiple STF (Super Time Frame) files.
0315
0316 Args:
0317 start_time: Filter runs started >= this ISO datetime (default: last 7 days)
0318 end_time: Filter runs started <= this ISO datetime
0319
0320 Returns list of runs with: run_number, start_time, end_time, duration_seconds,
0321 stf_file_count
0322 """
0323 @sync_to_async
0324 def fetch():
0325 qs = Run.objects.annotate(
0326 stf_file_count=Count('stf_files')
0327 ).order_by('-start_time')
0328
0329 start = _parse_time(start_time) or _default_start_time(168)
0330 end = _parse_time(end_time)
0331 qs = qs.filter(start_time__gte=start)
0332 if end:
0333 qs = qs.filter(start_time__lte=end)
0334
0335 MAX_ITEMS = 100
0336 total_count = qs.count()
0337 items = []
0338 for r in qs[:MAX_ITEMS]:
0339 duration = None
0340 if r.start_time and r.end_time:
0341 duration = (r.end_time - r.start_time).total_seconds()
0342
0343 items.append({
0344 "run_number": r.run_number,
0345 "start_time": r.start_time.isoformat() if r.start_time else None,
0346 "end_time": r.end_time.isoformat() if r.end_time else None,
0347 "duration_seconds": duration,
0348 "stf_file_count": r.stf_file_count,
0349 })
0350
0351 return {
0352 "items": items,
0353 "total_count": total_count,
0354 "has_more": total_count > MAX_ITEMS,
0355 "monitor_urls": [
0356 {"title": "Runs List", "url": _monitor_url("/runs/")},
0357 ],
0358 }
0359
0360 return await fetch()
0361
0362
0363 @mcp.tool()
0364 async def swf_get_run(run_number: int) -> dict:
0365 """
0366 Get detailed information about a specific run.
0367
0368 Args:
0369 run_number: The run number (required)
0370
0371 Returns: run_number, start_time, end_time, duration_seconds, run_conditions,
0372 file_stats (counts by status)
0373 """
0374 @sync_to_async
0375 def fetch():
0376 try:
0377 r = Run.objects.get(run_number=run_number)
0378
0379 duration = None
0380 if r.start_time and r.end_time:
0381 duration = (r.end_time - r.start_time).total_seconds()
0382
0383 file_stats = {}
0384 stf_files = StfFile.objects.filter(run=r)
0385 for status_choice in StfFile._meta.get_field('status').choices:
0386 status_value = status_choice[0]
0387 file_stats[status_value] = stf_files.filter(status=status_value).count()
0388
0389 return {
0390 "run_number": r.run_number,
0391 "start_time": r.start_time.isoformat() if r.start_time else None,
0392 "end_time": r.end_time.isoformat() if r.end_time else None,
0393 "duration_seconds": duration,
0394 "run_conditions": r.run_conditions,
0395 "file_stats": file_stats,
0396 "total_stf_files": sum(file_stats.values()),
0397 "monitor_urls": [
0398 {"title": "Run Detail", "url": _monitor_url(f"/runs/{r.run_number}/")},
0399 ],
0400 }
0401 except Run.DoesNotExist:
0402 return {"error": f"Run {run_number} not found. Use swf_list_runs to see available runs."}
0403
0404 return await fetch()
0405
0406
0407
0408
0409
0410
0411 @mcp.tool()
0412 async def swf_list_stf_files(
0413 run_number: int = None,
0414 status: str = None,
0415 machine_state: str = None,
0416 start_time: str = None,
0417 end_time: str = None,
0418 ) -> list:
0419 """
0420 List STF (Super Time Frame) files with filtering.
0421
0422 STF files are the primary data units from the ePIC detector DAQ system.
0423 Each STF represents a time slice of detector data.
0424
0425 Args:
0426 run_number: Filter to files from this run
0427 status: Filter by status: 'registered', 'processing', 'processed', 'done', 'failed'
0428 machine_state: Filter by detector state (e.g., 'physics', 'cosmics')
0429 start_time: Filter files created >= this ISO datetime (default: last 24 hours)
0430 end_time: Filter files created <= this ISO datetime
0431
0432 Returns list of STF files with: file_id, stf_filename, run_number, status,
0433 machine_state, file_size_bytes, created_at, tf_file_count
0434 """
0435 @sync_to_async
0436 def fetch():
0437 qs = StfFile.objects.select_related('run').annotate(
0438 tf_file_count=Count('tf_files')
0439 ).order_by('-created_at')
0440
0441 if run_number:
0442 qs = qs.filter(run__run_number=run_number)
0443 if status:
0444 qs = qs.filter(status__iexact=status)
0445 if machine_state:
0446 qs = qs.filter(machine_state__iexact=machine_state)
0447
0448 start = _parse_time(start_time) or (None if run_number else _default_start_time(24))
0449 end = _parse_time(end_time)
0450 if start:
0451 qs = qs.filter(created_at__gte=start)
0452 if end:
0453 qs = qs.filter(created_at__lte=end)
0454
0455 params = []
0456 if run_number:
0457 params.append(f"run_number={run_number}")
0458 if status:
0459 params.append(f"status={status}")
0460 query_string = "&".join(params)
0461 url = _monitor_url(f"/stf-files/?{query_string}" if query_string else "/stf-files/")
0462
0463 MAX_ITEMS = 100
0464 total_count = qs.count()
0465 items = [
0466 {
0467 "file_id": str(f.file_id),
0468 "stf_filename": f.stf_filename,
0469 "run_number": f.run.run_number if f.run else None,
0470 "status": f.status,
0471 "machine_state": f.machine_state,
0472 "file_size_bytes": f.file_size_bytes,
0473 "created_at": f.created_at.isoformat() if f.created_at else None,
0474 "tf_file_count": f.tf_file_count,
0475 }
0476 for f in qs[:MAX_ITEMS]
0477 ]
0478 return {
0479 "items": items,
0480 "total_count": total_count,
0481 "has_more": total_count > MAX_ITEMS,
0482 "monitor_urls": [
0483 {"title": "STF Files List", "url": url},
0484 ],
0485 }
0486
0487 return await fetch()
0488
0489
0490 @mcp.tool()
0491 async def swf_get_stf_file(file_id: str = None, stf_filename: str = None) -> dict:
0492 """
0493 Get detailed information about a specific STF file.
0494
0495 Provide either file_id or stf_filename to identify the file.
0496
0497 Args:
0498 file_id: The UUID file ID
0499 stf_filename: The STF filename
0500
0501 Returns: file_id, stf_filename, run_number, status, machine_state,
0502 file_size_bytes, checksum, created_at, metadata, workflow_id, daq_state,
0503 daq_substate, workflow_status
0504 """
0505 @sync_to_async
0506 def fetch():
0507 try:
0508 if file_id:
0509 f = StfFile.objects.select_related('run').get(file_id=file_id)
0510 elif stf_filename:
0511 f = StfFile.objects.select_related('run').get(stf_filename=stf_filename)
0512 else:
0513 return {"error": "Provide either file_id or stf_filename"}
0514
0515 return {
0516 "file_id": str(f.file_id),
0517 "stf_filename": f.stf_filename,
0518 "run_number": f.run.run_number if f.run else None,
0519 "status": f.status,
0520 "machine_state": f.machine_state,
0521 "file_size_bytes": f.file_size_bytes,
0522 "checksum": f.checksum,
0523 "created_at": f.created_at.isoformat() if f.created_at else None,
0524 "metadata": f.metadata,
0525 "workflow_id": str(f.workflow_id) if f.workflow_id else None,
0526 "daq_state": f.daq_state,
0527 "daq_substate": f.daq_substate,
0528 "workflow_status": f.workflow_status,
0529 "monitor_urls": [
0530 {"title": "STF File Detail", "url": _monitor_url(f"/stf-files/{f.file_id}/")},
0531 ],
0532 }
0533 except StfFile.DoesNotExist:
0534 return {"error": "STF file not found. Use swf_list_stf_files to see available files."}
0535
0536 return await fetch()
0537
0538
0539
0540
0541
0542
0543 @mcp.tool()
0544 async def swf_list_tf_slices(
0545 run_number: int = None,
0546 stf_filename: str = None,
0547 tf_filename: str = None,
0548 status: str = None,
0549 assigned_worker: str = None,
0550 start_time: str = None,
0551 end_time: str = None,
0552 ) -> list:
0553 """
0554 List TF slices for the fast processing workflow.
0555
0556 TF slices are small portions of TF samples (~15 per STF) that workers
0557 process independently in ~30 seconds each.
0558
0559 Args:
0560 run_number: Filter to slices from this run
0561 stf_filename: Filter to slices from this STF file
0562 tf_filename: Filter to slices from this TF sample
0563 status: Filter by status: 'queued', 'processing', 'completed', 'failed'
0564 assigned_worker: Filter by assigned worker ID
0565 start_time: Filter slices created >= this ISO datetime (default: last 24 hours)
0566 end_time: Filter slices created <= this ISO datetime
0567
0568 Returns list of slices with: slice_id, tf_filename, stf_filename, run_number,
0569 tf_first, tf_last, tf_count, status, assigned_worker, created_at, completed_at
0570 """
0571 @sync_to_async
0572 def fetch():
0573 qs = TFSlice.objects.all().order_by('-created_at')
0574
0575 if run_number:
0576 qs = qs.filter(run_number=run_number)
0577 if stf_filename:
0578 qs = qs.filter(stf_filename=stf_filename)
0579 if tf_filename:
0580 qs = qs.filter(tf_filename=tf_filename)
0581 if status:
0582 qs = qs.filter(status__iexact=status)
0583 if assigned_worker:
0584 qs = qs.filter(assigned_worker=assigned_worker)
0585
0586 has_context = run_number or stf_filename or tf_filename
0587 start = _parse_time(start_time) or (None if has_context else _default_start_time(24))
0588 end = _parse_time(end_time)
0589 if start:
0590 qs = qs.filter(created_at__gte=start)
0591 if end:
0592 qs = qs.filter(created_at__lte=end)
0593
0594 params = []
0595 if run_number:
0596 params.append(f"run_number={run_number}")
0597 if status:
0598 params.append(f"status={status}")
0599 query_string = "&".join(params)
0600 url = _monitor_url(f"/tf-slices/?{query_string}" if query_string else "/tf-slices/")
0601
0602 MAX_ITEMS = 200
0603 total_count = qs.count()
0604 items = [
0605 {
0606 "slice_id": s.slice_id,
0607 "tf_filename": s.tf_filename,
0608 "stf_filename": s.stf_filename,
0609 "run_number": s.run_number,
0610 "tf_first": s.tf_first,
0611 "tf_last": s.tf_last,
0612 "tf_count": s.tf_count,
0613 "status": s.status,
0614 "assigned_worker": s.assigned_worker,
0615 "created_at": s.created_at.isoformat() if s.created_at else None,
0616 "completed_at": s.completed_at.isoformat() if s.completed_at else None,
0617 }
0618 for s in qs[:MAX_ITEMS]
0619 ]
0620 return {
0621 "items": items,
0622 "total_count": total_count,
0623 "has_more": total_count > MAX_ITEMS,
0624 "monitor_urls": [
0625 {"title": "TF Slices List", "url": url},
0626 ],
0627 }
0628
0629 return await fetch()
0630
0631
0632 @mcp.tool()
0633 async def swf_get_tf_slice(tf_filename: str, slice_id: int) -> dict:
0634 """
0635 Get detailed information about a specific TF slice.
0636
0637 Args:
0638 tf_filename: The TF filename (required)
0639 slice_id: The slice ID within the TF (required, typically 1-15)
0640
0641 Returns: slice_id, tf_filename, stf_filename, run_number, tf_first, tf_last,
0642 tf_count, status, retries, assigned_worker, assigned_at, completed_at, metadata
0643 """
0644 @sync_to_async
0645 def fetch():
0646 try:
0647 s = TFSlice.objects.get(tf_filename=tf_filename, slice_id=slice_id)
0648 return {
0649 "slice_id": s.slice_id,
0650 "tf_filename": s.tf_filename,
0651 "stf_filename": s.stf_filename,
0652 "run_number": s.run_number,
0653 "tf_first": s.tf_first,
0654 "tf_last": s.tf_last,
0655 "tf_count": s.tf_count,
0656 "status": s.status,
0657 "retries": s.retries,
0658 "assigned_worker": s.assigned_worker,
0659 "assigned_at": s.assigned_at.isoformat() if s.assigned_at else None,
0660 "completed_at": s.completed_at.isoformat() if s.completed_at else None,
0661 "metadata": s.metadata,
0662 }
0663 except TFSlice.DoesNotExist:
0664 return {"error": f"TF slice {slice_id} for {tf_filename} not found. Use swf_list_tf_slices to see available slices."}
0665
0666 return await fetch()
0667
0668
0669
0670
0671
0672
0673 @mcp.tool()
0674 async def swf_start_workflow(
0675 workflow_name: str = None,
0676 namespace: str = None,
0677 config: str = None,
0678 realtime: bool = None,
0679 duration: int = 0,
0680 stf_count: int = None,
0681 physics_period_count: int = None,
0682 physics_period_duration: float = None,
0683 stf_interval: float = None,
0684 ) -> dict:
0685 """
0686 Start a workflow execution by sending a command to the DAQ Simulator agent.
0687
0688 All parameters are optional - defaults are read from PersistentState 'workflow_defaults'.
0689 Call with no arguments to use configured defaults.
0690
0691 Args:
0692 workflow_name: Name of the workflow (default: from config, typically 'stf_datataking')
0693 namespace: Testbed namespace (default: from config, e.g., 'torre1')
0694 config: Workflow config name (default: from config, e.g., 'fast_processing_default')
0695 realtime: Run in real-time mode (default: from config, typically True)
0696 duration: Max duration in seconds (0 = run until complete)
0697 stf_count: Number of STF files to generate (overrides config)
0698 physics_period_count: Number of physics periods (overrides config)
0699 physics_period_duration: Duration of each physics period in seconds (overrides config)
0700 stf_interval: Interval between STF generation in seconds (overrides config)
0701
0702 Returns:
0703 Success/failure status with execution_id if started.
0704
0705 After starting, ACTIVELY POLL — do not sleep:
0706 get_workflow_monitor(execution_id) every 10-15s until completion
0707 Report progress to user as it evolves
0708 list_logs(level='ERROR') after completion
0709 """
0710 import json
0711 from datetime import datetime
0712
0713 @sync_to_async
0714 def do_start():
0715 import os
0716 from pathlib import Path
0717 from ..activemq_connection import ActiveMQConnectionManager
0718
0719 toml_namespace = None
0720 toml_workflow_name = None
0721 toml_config = None
0722 toml_realtime = None
0723 toml_params = {}
0724
0725 testbed_toml, config_source = _get_testbed_config_path()
0726 if testbed_toml.exists():
0727 try:
0728 import tomllib
0729 with open(testbed_toml, 'rb') as f:
0730 toml_data = tomllib.load(f)
0731 toml_namespace = toml_data.get('testbed', {}).get('namespace')
0732 workflow_section = toml_data.get('workflow', {})
0733 toml_workflow_name = workflow_section.get('name')
0734 toml_config = workflow_section.get('config')
0735 toml_realtime = workflow_section.get('realtime')
0736 toml_params = toml_data.get('parameters', {})
0737 if config_source == 'SWF_TESTBED_CONFIG':
0738 logger.info(f"Using config from SWF_TESTBED_CONFIG: {testbed_toml.name}")
0739 except Exception as e:
0740 logger.warning(f"Failed to read {testbed_toml}: {e}")
0741
0742 actual_workflow_name = workflow_name or toml_workflow_name or 'stf_datataking'
0743 actual_namespace = namespace or toml_namespace
0744 actual_config = config or toml_config or 'fast_processing_default'
0745
0746
0747
0748 if not actual_namespace:
0749 try:
0750 cutoff = timezone.now() - timedelta(minutes=5)
0751 am = SystemAgent.objects.filter(
0752 agent_type='agent_manager',
0753 last_heartbeat__gte=cutoff
0754 ).order_by('-last_heartbeat').first()
0755 if am and am.namespace:
0756 actual_namespace = am.namespace
0757 logger.info(f"Using namespace '{am.namespace}' from agent manager '{am.instance_name}'")
0758 except Exception:
0759 pass
0760 if not actual_namespace:
0761 actual_namespace = 'torre1'
0762 actual_realtime = realtime if realtime is not None else (toml_realtime if toml_realtime is not None else True)
0763
0764 params = dict(toml_params)
0765 if stf_count is not None:
0766 params['stf_count'] = stf_count
0767 if physics_period_count is not None:
0768 params['physics_period_count'] = physics_period_count
0769 if physics_period_duration is not None:
0770 params['physics_period_duration'] = physics_period_duration
0771 if stf_interval is not None:
0772 params['stf_interval'] = stf_interval
0773
0774 params['namespace'] = actual_namespace
0775
0776 msg = {
0777 'msg_type': 'run_workflow',
0778 'namespace': actual_namespace,
0779 'workflow_name': actual_workflow_name,
0780 'config': actual_config,
0781 'realtime': actual_realtime,
0782 'duration': duration,
0783 'params': params,
0784 'timestamp': datetime.now().isoformat(),
0785 'source': 'mcp'
0786 }
0787
0788 mq = ActiveMQConnectionManager()
0789 if mq.send_message('/queue/workflow_control', json.dumps(msg)):
0790 logger.info(
0791 f"MCP start_workflow: sent run_workflow command for '{actual_workflow_name}' "
0792 f"(namespace={actual_namespace}, config={actual_config}, realtime={actual_realtime})"
0793 )
0794 return {
0795 "success": True,
0796 "message": f"Workflow '{actual_workflow_name}' start command sent to DAQ Simulator",
0797 "workflow_name": actual_workflow_name,
0798 "namespace": actual_namespace,
0799 "config": actual_config,
0800 "realtime": actual_realtime,
0801 "params": params,
0802 "note": "Workflow runs asynchronously. Use swf_list_workflow_executions to monitor."
0803 }
0804 else:
0805 return {
0806 "success": False,
0807 "error": "Failed to send message to ActiveMQ. Is the message broker running?",
0808 "workflow_name": actual_workflow_name,
0809 "namespace": actual_namespace,
0810 }
0811
0812 return await do_start()
0813
0814
0815 @mcp.tool()
0816 async def swf_stop_workflow(execution_id: str) -> dict:
0817 """
0818 Stop a running workflow by sending a stop command to the DAQ Simulator agent.
0819
0820 Sends a stop_workflow command that the agent checks between simulation events.
0821 The workflow stops gracefully at the next checkpoint.
0822
0823 To find the execution_id, use list_workflow_executions(currently_running=True).
0824
0825 Args:
0826 execution_id: The execution ID to stop (e.g., 'stf_datataking-wenauseic-0042')
0827
0828 Returns:
0829 Success/failure status. The actual stop is asynchronous - monitor via
0830 list_workflow_executions to confirm termination.
0831 """
0832 import json
0833 from datetime import datetime
0834
0835 @sync_to_async
0836 def do_stop():
0837 from ..activemq_connection import ActiveMQConnectionManager
0838
0839 try:
0840 execution = WorkflowExecution.objects.get(execution_id=execution_id)
0841 except WorkflowExecution.DoesNotExist:
0842 return {
0843 "success": False,
0844 "error": f"Execution '{execution_id}' not found",
0845 }
0846
0847 if execution.status != 'running':
0848 return {
0849 "success": False,
0850 "error": f"Execution '{execution_id}' is not running (status: {execution.status})",
0851 }
0852
0853 msg = {
0854 'msg_type': 'stop_workflow',
0855 'execution_id': execution_id,
0856 'namespace': execution.namespace,
0857 'timestamp': datetime.now().isoformat(),
0858 'source': 'mcp'
0859 }
0860
0861 mq = ActiveMQConnectionManager()
0862 if mq.send_message('/queue/workflow_control', json.dumps(msg)):
0863 logger.info(f"MCP stop_workflow: sent stop command for execution '{execution_id}'")
0864 return {
0865 "success": True,
0866 "message": f"Stop command sent for execution '{execution_id}'",
0867 "execution_id": execution_id,
0868 "namespace": execution.namespace,
0869 "note": "Workflow will stop at next checkpoint. Monitor via list_workflow_executions."
0870 }
0871 else:
0872 return {
0873 "success": False,
0874 "error": "Failed to send message to ActiveMQ. Is the message broker running?",
0875 "execution_id": execution_id,
0876 }
0877
0878 return await do_stop()
0879
0880
0881 @mcp.tool()
0882 async def swf_end_execution(execution_id: str) -> dict:
0883 """
0884 End a running workflow execution by setting its status to 'terminated'.
0885
0886 Use this to clean up stale or stuck executions that are still marked as 'running'.
0887 This is a state change only - no data is deleted. The action is logged.
0888
0889 Args:
0890 execution_id: The execution ID to end (use list_workflow_executions to find running ones)
0891
0892 Returns:
0893 Success/failure status with details
0894 """
0895 @sync_to_async
0896 def do_end():
0897 try:
0898 execution = WorkflowExecution.objects.get(execution_id=execution_id)
0899 except WorkflowExecution.DoesNotExist:
0900 return {
0901 "success": False,
0902 "error": f"Execution '{execution_id}' not found",
0903 }
0904
0905 old_status = execution.status
0906 if old_status != 'running':
0907 return {
0908 "success": False,
0909 "error": f"Execution '{execution_id}' is not running (status: {old_status})",
0910 }
0911
0912 execution.status = 'terminated'
0913 execution.end_time = timezone.now()
0914 execution.save()
0915
0916 logger.info(
0917 f"MCP end_execution: '{execution_id}' terminated (was running since {execution.start_time})"
0918 )
0919
0920 return {
0921 "success": True,
0922 "execution_id": execution_id,
0923 "old_status": old_status,
0924 "new_status": "terminated",
0925 "start_time": execution.start_time.isoformat() if execution.start_time else None,
0926 "end_time": execution.end_time.isoformat() if execution.end_time else None,
0927 }
0928
0929 return await do_end()
0930
0931
0932
0933
0934
0935
0936 @mcp.tool()
0937 async def swf_get_workflow_monitor(execution_id: str) -> dict:
0938 """
0939 Get the status and accumulated events for a workflow execution.
0940
0941 This provides a summary of workflow progress without needing to poll
0942 multiple tools. Aggregates messages and logs for the execution.
0943
0944 Args:
0945 execution_id: The execution ID to get monitor status for
0946
0947 Returns:
0948 - execution_id: The execution being monitored
0949 - status: Current workflow status (running/completed/failed/terminated)
0950 - phase: Current phase (imminent/running/ended)
0951 - events: List of key events with timestamps
0952 - stf_count: Number of STF files generated
0953 - errors: List of any errors encountered
0954 - duration_seconds: How long the workflow ran (if completed)
0955 """
0956 import logging as py_logging
0957
0958 @sync_to_async
0959 def fetch():
0960 try:
0961 execution = WorkflowExecution.objects.get(execution_id=execution_id)
0962 db_status = execution.status
0963 db_start_time = execution.start_time
0964 db_end_time = execution.end_time
0965 except WorkflowExecution.DoesNotExist:
0966 return {"error": f"Execution '{execution_id}' not found"}
0967
0968 duration_seconds = None
0969 if db_start_time and db_end_time:
0970 duration_seconds = (db_end_time - db_start_time).total_seconds()
0971
0972 messages = WorkflowMessage.objects.filter(
0973 execution_id=execution_id
0974 ).order_by('sent_at')
0975
0976 events = []
0977 current_phase = "unknown"
0978 run_id = None
0979 run_ids = set()
0980 errors = []
0981
0982 for msg in messages:
0983 msg_type = msg.message_type
0984 timestamp = msg.sent_at.isoformat() if msg.sent_at else None
0985 content = msg.message_content or {}
0986
0987 if msg_type == 'run_imminent':
0988 current_phase = "imminent"
0989 run_id = content.get('run_id') or msg.run_id
0990 if run_id:
0991 run_ids.add(run_id)
0992 events.append({"type": "run_imminent", "time": timestamp, "run_id": run_id})
0993 elif msg_type == 'start_run':
0994 current_phase = "running"
0995 events.append({"type": "start_run", "time": timestamp})
0996 elif msg_type == 'stf_gen':
0997 pass
0998 elif msg_type == 'end_run':
0999 current_phase = "ended"
1000 events.append({"type": "end_run", "time": timestamp})
1001 elif msg_type in ('run_workflow_failed', 'error'):
1002 errors.append({
1003 "time": timestamp,
1004 "error": content.get('error', str(content)),
1005 })
1006
1007
1008 if not run_ids:
1009 run_ids = set(WorkflowMessage.objects.filter(
1010 execution_id=execution_id, run_id__isnull=False,
1011 ).values_list('run_id', flat=True).distinct())
1012 run_numbers = [int(r) for r in run_ids if r]
1013 stf_count = StfFile.objects.filter(run__run_number__in=run_numbers).count()
1014
1015 error_logs = AppLog.objects.filter(
1016 level__gte=py_logging.ERROR,
1017 extra_data__execution_id=execution_id,
1018 ).order_by('timestamp')[:10]
1019
1020 for log in error_logs:
1021 errors.append({
1022 "time": log.timestamp.isoformat() if log.timestamp else None,
1023 "error": log.message,
1024 "source": "log",
1025 })
1026
1027 return {
1028 "execution_id": execution_id,
1029 "status": db_status,
1030 "phase": current_phase,
1031 "run_id": run_id,
1032 "stf_count": stf_count,
1033 "events": events,
1034 "errors": errors,
1035 "start_time": db_start_time.isoformat() if db_start_time else None,
1036 "end_time": db_end_time.isoformat() if db_end_time else None,
1037 "duration_seconds": duration_seconds,
1038 "monitor_urls": [
1039 {"title": "Execution Detail", "url": _monitor_url(f"/workflow-executions/{execution_id}/")},
1040 ],
1041 }
1042
1043 return await fetch()
1044
1045
1046 @mcp.tool()
1047 async def swf_list_workflow_monitors() -> list:
1048 """
1049 List recent workflow executions that can be monitored.
1050
1051 Returns executions from the last 24 hours with their current status,
1052 allowing you to pick one to monitor with get_workflow_monitor().
1053
1054 Returns list of executions with: execution_id, status, start_time, stf_count
1055 """
1056 @sync_to_async
1057 def fetch():
1058 now = timezone.now()
1059 qs = WorkflowExecution.objects.filter(
1060 start_time__gte=now - timedelta(hours=24)
1061 ).order_by('-start_time')
1062
1063 MAX_ITEMS = 20
1064 total_count = qs.count()
1065 items = []
1066 for e in qs[:MAX_ITEMS]:
1067 run_ids = WorkflowMessage.objects.filter(
1068 execution_id=e.execution_id,
1069 run_id__isnull=False,
1070 ).values_list('run_id', flat=True).distinct()
1071 run_numbers = [int(r) for r in run_ids if r]
1072 stf_count = StfFile.objects.filter(run__run_number__in=run_numbers).count()
1073
1074 items.append({
1075 "execution_id": e.execution_id,
1076 "status": e.status,
1077 "start_time": e.start_time.isoformat() if e.start_time else None,
1078 "end_time": e.end_time.isoformat() if e.end_time else None,
1079 "stf_count": stf_count,
1080 })
1081
1082 return {
1083 "items": items,
1084 "total_count": total_count,
1085 "has_more": total_count > MAX_ITEMS,
1086 "monitor_urls": [
1087 {"title": "Executions List", "url": _monitor_url("/workflow-executions/")},
1088 ],
1089 }
1090
1091 return await fetch()
1092
1093
1094 @mcp.tool()
1095 async def swf_send_message(message: str, message_type: str = "announcement", metadata: dict = None) -> dict:
1096 """
1097 Send a message to the workflow monitoring stream.
1098
1099 Use for testing the message pipeline, announcements to colleagues,
1100 or any other broadcast purpose.
1101
1102 The sender is automatically identified as '{username}-personal-agent'.
1103
1104 Args:
1105 message: The message text to send
1106 message_type: Type of message - 'announcement', 'status', 'test', etc.
1107 If 'test', namespace is omitted. Otherwise uses configured namespace.
1108 metadata: Optional additional key-value data to include
1109
1110 Returns:
1111 Success/failure status with message details
1112 """
1113 import json
1114 from datetime import datetime
1115
1116 @sync_to_async
1117 def do_send():
1118 from ..activemq_connection import ActiveMQConnectionManager
1119
1120 username = _get_username()
1121 sender = f"{username}-personal-agent"
1122
1123 namespace = None
1124 if message_type != 'test':
1125 testbed_toml, _ = _get_testbed_config_path()
1126 if testbed_toml and testbed_toml.exists():
1127 try:
1128 import tomllib
1129 with open(testbed_toml, 'rb') as f:
1130 toml_data = tomllib.load(f)
1131 namespace = toml_data.get('testbed', {}).get('namespace')
1132 except Exception:
1133 pass
1134
1135 msg = {
1136 'msg_type': message_type,
1137 'sender': sender,
1138 'namespace': namespace,
1139 'message': message,
1140 'timestamp': datetime.now().isoformat(),
1141 'source': 'mcp_send_message',
1142 }
1143 if metadata:
1144 msg['metadata'] = metadata
1145
1146 topic = '/topic/epictopic'
1147 mq = ActiveMQConnectionManager()
1148 if mq.send_message(topic, json.dumps(msg)):
1149 logger.info(f"MCP send_message: sent {message_type} from {sender}")
1150 return {
1151 "success": True,
1152 "message": "Message sent to monitoring stream",
1153 "sender": sender,
1154 "message_type": message_type,
1155 "namespace": namespace,
1156 "content": message,
1157 }
1158 else:
1159 return {
1160 "success": False,
1161 "error": "Failed to send message to ActiveMQ. Is the message broker running?",
1162 }
1163
1164 return await do_send()