Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-27 07:41:42

0001 """
0002 Workflow and data pipeline MCP tools.
0003 
0004 Includes: workflow definitions, executions, messages, runs, STF files, TF slices,
0005 workflow control (start/stop), monitoring.
0006 """
0007 
0008 import logging
0009 from datetime import timedelta
0010 from django.utils import timezone
0011 from django.db.models import Count
0012 from asgiref.sync import sync_to_async
0013 
0014 from mcp_server import mcp_server as mcp
0015 
0016 from ..models import Run, StfFile, TFSlice, AppLog, SystemAgent
0017 from ..workflow_models import WorkflowDefinition, WorkflowExecution, WorkflowMessage
0018 from .common import _parse_time, _default_start_time, _monitor_url, _get_testbed_config_path, _get_username
0019 
0020 logger = logging.getLogger(__name__)
0021 
0022 
0023 # -----------------------------------------------------------------------------
0024 # Workflow Definitions
0025 # -----------------------------------------------------------------------------
0026 
0027 @mcp.tool()
0028 async def swf_list_workflow_definitions(
0029     workflow_type: str = None,
0030     created_by: str = None,
0031 ) -> list:
0032     """
0033     List available workflow definitions that can be executed.
0034 
0035     Workflow definitions describe the structure of a workflow (stages, agents needed).
0036     Common workflows include 'stf_datataking' for streaming data acquisition simulation.
0037 
0038     Args:
0039         workflow_type: Filter by type (e.g., 'simulation', 'production')
0040         created_by: Filter by creator username
0041 
0042     Returns list of definitions with: workflow_name, version, workflow_type,
0043     created_by, created_at, execution_count
0044     """
0045     @sync_to_async
0046     def fetch():
0047         qs = WorkflowDefinition.objects.annotate(
0048             execution_count=Count('executions')
0049         ).order_by('workflow_name', '-version')
0050 
0051         if workflow_type:
0052             qs = qs.filter(workflow_type=workflow_type)
0053         if created_by:
0054             qs = qs.filter(created_by=created_by)
0055 
0056         MAX_ITEMS = 100
0057         total_count = qs.count()
0058         items = [
0059             {
0060                 "workflow_name": w.workflow_name,
0061                 "version": w.version,
0062                 "workflow_type": w.workflow_type,
0063                 "created_by": w.created_by,
0064                 "created_at": w.created_at.isoformat() if w.created_at else None,
0065                 "execution_count": w.execution_count,
0066             }
0067             for w in qs[:MAX_ITEMS]
0068         ]
0069         return {
0070             "items": items,
0071             "total_count": total_count,
0072             "has_more": total_count > MAX_ITEMS,
0073             "monitor_urls": [
0074                 {"title": "Workflow Definitions", "url": _monitor_url("/workflow-definitions/")},
0075             ],
0076         }
0077 
0078     return await fetch()
0079 
0080 
0081 # -----------------------------------------------------------------------------
0082 # Workflow Executions
0083 # -----------------------------------------------------------------------------
0084 
0085 @mcp.tool()
0086 async def swf_list_workflow_executions(
0087     namespace: str = None,
0088     status: str = None,
0089     executed_by: str = None,
0090     workflow_name: str = None,
0091     currently_running: bool = False,
0092     start_time: str = None,
0093     end_time: str = None,
0094 ) -> list:
0095     """
0096     List workflow executions with filtering.
0097 
0098     Args:
0099         namespace: Filter to executions in this namespace
0100         status: Filter by status: 'pending', 'running', 'completed', 'failed', 'terminated'
0101         executed_by: Filter by user who started the execution
0102         workflow_name: Filter by workflow definition name
0103         currently_running: If True, return all running executions (ignores date range)
0104         start_time: Filter executions started >= this ISO datetime (default: last 24 hours)
0105         end_time: Filter executions started <= this ISO datetime
0106 
0107     Returns list of executions with: execution_id, workflow_name, namespace,
0108     status, executed_by, start_time, end_time, parameter_values
0109     """
0110     @sync_to_async
0111     def fetch():
0112         qs = WorkflowExecution.objects.select_related('workflow_definition').order_by('-start_time')
0113 
0114         if namespace:
0115             qs = qs.filter(namespace=namespace)
0116         if currently_running:
0117             qs = qs.filter(status__iexact='running')
0118         elif status:
0119             qs = qs.filter(status__iexact=status)
0120         if executed_by:
0121             qs = qs.filter(executed_by=executed_by)
0122         if workflow_name:
0123             qs = qs.filter(workflow_definition__workflow_name=workflow_name)
0124 
0125         if not currently_running:
0126             start = _parse_time(start_time) or _default_start_time(24)
0127             end = _parse_time(end_time)
0128             qs = qs.filter(start_time__gte=start)
0129             if end:
0130                 qs = qs.filter(start_time__lte=end)
0131 
0132         params = []
0133         if namespace:
0134             params.append(f"namespace={namespace}")
0135         if status:
0136             params.append(f"status={status}")
0137         if executed_by:
0138             params.append(f"executed_by={executed_by}")
0139         query_string = "&".join(params)
0140         url = _monitor_url(f"/workflow-executions/?{query_string}" if query_string else "/workflow-executions/")
0141 
0142         MAX_ITEMS = 100
0143         total_count = qs.count()
0144         items = [
0145             {
0146                 "execution_id": e.execution_id,
0147                 "workflow_name": e.workflow_definition.workflow_name if e.workflow_definition else None,
0148                 "namespace": e.namespace,
0149                 "status": e.status,
0150                 "executed_by": e.executed_by,
0151                 "start_time": e.start_time.isoformat() if e.start_time else None,
0152                 "end_time": e.end_time.isoformat() if e.end_time else None,
0153                 "parameter_values": e.parameter_values,
0154             }
0155             for e in qs[:MAX_ITEMS]
0156         ]
0157         return {
0158             "items": items,
0159             "total_count": total_count,
0160             "has_more": total_count > MAX_ITEMS,
0161             "monitor_urls": [
0162                 {"title": "Executions List", "url": url},
0163             ],
0164         }
0165 
0166     return await fetch()
0167 
0168 
0169 @mcp.tool()
0170 async def swf_get_workflow_execution(execution_id: str) -> dict:
0171     """
0172     Get detailed information about a specific workflow execution.
0173 
0174     Use swf_list_workflow_executions first to find execution IDs if needed.
0175 
0176     Args:
0177         execution_id: The execution ID (e.g., 'stf_datataking-wenauseic-0042')
0178 
0179     Returns: execution_id, workflow_name, namespace, status, executed_by,
0180     start_time, end_time, parameter_values, performance_metrics
0181     """
0182     @sync_to_async
0183     def fetch():
0184         try:
0185             e = WorkflowExecution.objects.select_related('workflow_definition').get(
0186                 execution_id=execution_id
0187             )
0188             return {
0189                 "execution_id": e.execution_id,
0190                 "workflow_name": e.workflow_definition.workflow_name if e.workflow_definition else None,
0191                 "namespace": e.namespace,
0192                 "status": e.status,
0193                 "executed_by": e.executed_by,
0194                 "start_time": e.start_time.isoformat() if e.start_time else None,
0195                 "end_time": e.end_time.isoformat() if e.end_time else None,
0196                 "parameter_values": e.parameter_values,
0197                 "performance_metrics": e.performance_metrics,
0198                 "monitor_urls": [
0199                     {"title": "Execution Detail", "url": _monitor_url(f"/workflow-executions/{e.execution_id}/")},
0200                 ],
0201             }
0202         except WorkflowExecution.DoesNotExist:
0203             return {"error": f"Execution '{execution_id}' not found. Use swf_list_workflow_executions to see recent runs."}
0204 
0205     return await fetch()
0206 
0207 
0208 # -----------------------------------------------------------------------------
0209 # Messages
0210 # -----------------------------------------------------------------------------
0211 
0212 @mcp.tool()
0213 async def swf_list_messages(
0214     namespace: str = None,
0215     execution_id: str = None,
0216     agent: str = None,
0217     message_type: str = None,
0218     start_time: str = None,
0219     end_time: str = None,
0220 ) -> list:
0221     """
0222     List workflow messages with filtering.
0223 
0224     Messages are sent between agents during workflow execution. They record
0225     events like STF creation, processing completion, state transitions, etc.
0226 
0227     DIAGNOSTIC USE CASES:
0228     - Track workflow progress: list_messages(execution_id='stf_datataking-user-0044')
0229     - See what an agent sent: list_messages(agent='daq_simulator-agent-user-123')
0230     - Debug message flow: list_messages(namespace='torre1', start_time='2025-01-13T11:00:00')
0231     - For workflow failures: use list_logs(level='ERROR') instead
0232 
0233     Common message types: run_imminent, start_run, stf_gen, end_run, pause_run, resume_run
0234 
0235     Args:
0236         namespace: Filter to messages from this namespace
0237         execution_id: Filter to messages for this workflow execution
0238         agent: Filter to messages from this sender agent
0239         message_type: Filter by type (e.g., 'stf_gen', 'start_run')
0240         start_time: Filter messages sent >= this ISO datetime (default: last 1 hour)
0241         end_time: Filter messages sent <= this ISO datetime
0242 
0243     Returns list of messages (max 200) with: message_type, sender_agent, namespace,
0244     sent_at, execution_id, run_id, payload_summary
0245     """
0246     @sync_to_async
0247     def fetch():
0248         qs = WorkflowMessage.objects.order_by('-sent_at')
0249 
0250         if namespace:
0251             qs = qs.filter(namespace=namespace)
0252         if execution_id:
0253             qs = qs.filter(execution_id=execution_id)
0254         if agent:
0255             qs = qs.filter(sender_agent=agent)
0256         if message_type:
0257             qs = qs.filter(message_type=message_type)
0258 
0259         start = _parse_time(start_time) or _default_start_time(1)
0260         end = _parse_time(end_time)
0261         qs = qs.filter(sent_at__gte=start)
0262         if end:
0263             qs = qs.filter(sent_at__lte=end)
0264 
0265         params = []
0266         if namespace:
0267             params.append(f"namespace={namespace}")
0268         if execution_id:
0269             params.append(f"execution_id={execution_id}")
0270         if message_type:
0271             params.append(f"message_type={message_type}")
0272         query_string = "&".join(params)
0273         url = _monitor_url(f"/workflow/messages/?{query_string}" if query_string else "/workflow/messages/")
0274 
0275         MAX_ITEMS = 200
0276         total_count = qs.count()
0277         items = [
0278             {
0279                 "message_type": m.message_type,
0280                 "sender_agent": m.sender_agent,
0281                 "namespace": m.namespace,
0282                 "sent_at": m.sent_at.isoformat() if m.sent_at else None,
0283                 "execution_id": m.execution_id,
0284                 "run_id": m.run_id,
0285                 "payload_summary": str(m.message_content)[:200] if m.message_content else None,
0286             }
0287             for m in qs[:MAX_ITEMS]
0288         ]
0289         return {
0290             "items": items,
0291             "total_count": total_count,
0292             "has_more": total_count > MAX_ITEMS,
0293             "monitor_urls": [
0294                 {"title": "Messages List", "url": url},
0295             ],
0296         }
0297 
0298     return await fetch()
0299 
0300 
0301 # -----------------------------------------------------------------------------
0302 # Runs
0303 # -----------------------------------------------------------------------------
0304 
0305 @mcp.tool()
0306 async def swf_list_runs(
0307     start_time: str = None,
0308     end_time: str = None,
0309 ) -> list:
0310     """
0311     List simulation runs with timing and file counts.
0312 
0313     Runs represent data-taking periods in the ePIC detector system.
0314     Each run contains multiple STF (Super Time Frame) files.
0315 
0316     Args:
0317         start_time: Filter runs started >= this ISO datetime (default: last 7 days)
0318         end_time: Filter runs started <= this ISO datetime
0319 
0320     Returns list of runs with: run_number, start_time, end_time, duration_seconds,
0321     stf_file_count
0322     """
0323     @sync_to_async
0324     def fetch():
0325         qs = Run.objects.annotate(
0326             stf_file_count=Count('stf_files')
0327         ).order_by('-start_time')
0328 
0329         start = _parse_time(start_time) or _default_start_time(168)
0330         end = _parse_time(end_time)
0331         qs = qs.filter(start_time__gte=start)
0332         if end:
0333             qs = qs.filter(start_time__lte=end)
0334 
0335         MAX_ITEMS = 100
0336         total_count = qs.count()
0337         items = []
0338         for r in qs[:MAX_ITEMS]:
0339             duration = None
0340             if r.start_time and r.end_time:
0341                 duration = (r.end_time - r.start_time).total_seconds()
0342 
0343             items.append({
0344                 "run_number": r.run_number,
0345                 "start_time": r.start_time.isoformat() if r.start_time else None,
0346                 "end_time": r.end_time.isoformat() if r.end_time else None,
0347                 "duration_seconds": duration,
0348                 "stf_file_count": r.stf_file_count,
0349             })
0350 
0351         return {
0352             "items": items,
0353             "total_count": total_count,
0354             "has_more": total_count > MAX_ITEMS,
0355             "monitor_urls": [
0356                 {"title": "Runs List", "url": _monitor_url("/runs/")},
0357             ],
0358         }
0359 
0360     return await fetch()
0361 
0362 
0363 @mcp.tool()
0364 async def swf_get_run(run_number: int) -> dict:
0365     """
0366     Get detailed information about a specific run.
0367 
0368     Args:
0369         run_number: The run number (required)
0370 
0371     Returns: run_number, start_time, end_time, duration_seconds, run_conditions,
0372     file_stats (counts by status)
0373     """
0374     @sync_to_async
0375     def fetch():
0376         try:
0377             r = Run.objects.get(run_number=run_number)
0378 
0379             duration = None
0380             if r.start_time and r.end_time:
0381                 duration = (r.end_time - r.start_time).total_seconds()
0382 
0383             file_stats = {}
0384             stf_files = StfFile.objects.filter(run=r)
0385             for status_choice in StfFile._meta.get_field('status').choices:
0386                 status_value = status_choice[0]
0387                 file_stats[status_value] = stf_files.filter(status=status_value).count()
0388 
0389             return {
0390                 "run_number": r.run_number,
0391                 "start_time": r.start_time.isoformat() if r.start_time else None,
0392                 "end_time": r.end_time.isoformat() if r.end_time else None,
0393                 "duration_seconds": duration,
0394                 "run_conditions": r.run_conditions,
0395                 "file_stats": file_stats,
0396                 "total_stf_files": sum(file_stats.values()),
0397                 "monitor_urls": [
0398                     {"title": "Run Detail", "url": _monitor_url(f"/runs/{r.run_number}/")},
0399                 ],
0400             }
0401         except Run.DoesNotExist:
0402             return {"error": f"Run {run_number} not found. Use swf_list_runs to see available runs."}
0403 
0404     return await fetch()
0405 
0406 
0407 # -----------------------------------------------------------------------------
0408 # STF Files
0409 # -----------------------------------------------------------------------------
0410 
0411 @mcp.tool()
0412 async def swf_list_stf_files(
0413     run_number: int = None,
0414     status: str = None,
0415     machine_state: str = None,
0416     start_time: str = None,
0417     end_time: str = None,
0418 ) -> list:
0419     """
0420     List STF (Super Time Frame) files with filtering.
0421 
0422     STF files are the primary data units from the ePIC detector DAQ system.
0423     Each STF represents a time slice of detector data.
0424 
0425     Args:
0426         run_number: Filter to files from this run
0427         status: Filter by status: 'registered', 'processing', 'processed', 'done', 'failed'
0428         machine_state: Filter by detector state (e.g., 'physics', 'cosmics')
0429         start_time: Filter files created >= this ISO datetime (default: last 24 hours)
0430         end_time: Filter files created <= this ISO datetime
0431 
0432     Returns list of STF files with: file_id, stf_filename, run_number, status,
0433     machine_state, file_size_bytes, created_at, tf_file_count
0434     """
0435     @sync_to_async
0436     def fetch():
0437         qs = StfFile.objects.select_related('run').annotate(
0438             tf_file_count=Count('tf_files')
0439         ).order_by('-created_at')
0440 
0441         if run_number:
0442             qs = qs.filter(run__run_number=run_number)
0443         if status:
0444             qs = qs.filter(status__iexact=status)
0445         if machine_state:
0446             qs = qs.filter(machine_state__iexact=machine_state)
0447 
0448         start = _parse_time(start_time) or (None if run_number else _default_start_time(24))
0449         end = _parse_time(end_time)
0450         if start:
0451             qs = qs.filter(created_at__gte=start)
0452         if end:
0453             qs = qs.filter(created_at__lte=end)
0454 
0455         params = []
0456         if run_number:
0457             params.append(f"run_number={run_number}")
0458         if status:
0459             params.append(f"status={status}")
0460         query_string = "&".join(params)
0461         url = _monitor_url(f"/stf-files/?{query_string}" if query_string else "/stf-files/")
0462 
0463         MAX_ITEMS = 100
0464         total_count = qs.count()
0465         items = [
0466             {
0467                 "file_id": str(f.file_id),
0468                 "stf_filename": f.stf_filename,
0469                 "run_number": f.run.run_number if f.run else None,
0470                 "status": f.status,
0471                 "machine_state": f.machine_state,
0472                 "file_size_bytes": f.file_size_bytes,
0473                 "created_at": f.created_at.isoformat() if f.created_at else None,
0474                 "tf_file_count": f.tf_file_count,
0475             }
0476             for f in qs[:MAX_ITEMS]
0477         ]
0478         return {
0479             "items": items,
0480             "total_count": total_count,
0481             "has_more": total_count > MAX_ITEMS,
0482             "monitor_urls": [
0483                 {"title": "STF Files List", "url": url},
0484             ],
0485         }
0486 
0487     return await fetch()
0488 
0489 
0490 @mcp.tool()
0491 async def swf_get_stf_file(file_id: str = None, stf_filename: str = None) -> dict:
0492     """
0493     Get detailed information about a specific STF file.
0494 
0495     Provide either file_id or stf_filename to identify the file.
0496 
0497     Args:
0498         file_id: The UUID file ID
0499         stf_filename: The STF filename
0500 
0501     Returns: file_id, stf_filename, run_number, status, machine_state,
0502     file_size_bytes, checksum, created_at, metadata, workflow_id, daq_state,
0503     daq_substate, workflow_status
0504     """
0505     @sync_to_async
0506     def fetch():
0507         try:
0508             if file_id:
0509                 f = StfFile.objects.select_related('run').get(file_id=file_id)
0510             elif stf_filename:
0511                 f = StfFile.objects.select_related('run').get(stf_filename=stf_filename)
0512             else:
0513                 return {"error": "Provide either file_id or stf_filename"}
0514 
0515             return {
0516                 "file_id": str(f.file_id),
0517                 "stf_filename": f.stf_filename,
0518                 "run_number": f.run.run_number if f.run else None,
0519                 "status": f.status,
0520                 "machine_state": f.machine_state,
0521                 "file_size_bytes": f.file_size_bytes,
0522                 "checksum": f.checksum,
0523                 "created_at": f.created_at.isoformat() if f.created_at else None,
0524                 "metadata": f.metadata,
0525                 "workflow_id": str(f.workflow_id) if f.workflow_id else None,
0526                 "daq_state": f.daq_state,
0527                 "daq_substate": f.daq_substate,
0528                 "workflow_status": f.workflow_status,
0529                 "monitor_urls": [
0530                     {"title": "STF File Detail", "url": _monitor_url(f"/stf-files/{f.file_id}/")},
0531                 ],
0532             }
0533         except StfFile.DoesNotExist:
0534             return {"error": "STF file not found. Use swf_list_stf_files to see available files."}
0535 
0536     return await fetch()
0537 
0538 
0539 # -----------------------------------------------------------------------------
0540 # TF Slices (Fast Processing)
0541 # -----------------------------------------------------------------------------
0542 
0543 @mcp.tool()
0544 async def swf_list_tf_slices(
0545     run_number: int = None,
0546     stf_filename: str = None,
0547     tf_filename: str = None,
0548     status: str = None,
0549     assigned_worker: str = None,
0550     start_time: str = None,
0551     end_time: str = None,
0552 ) -> list:
0553     """
0554     List TF slices for the fast processing workflow.
0555 
0556     TF slices are small portions of TF samples (~15 per STF) that workers
0557     process independently in ~30 seconds each.
0558 
0559     Args:
0560         run_number: Filter to slices from this run
0561         stf_filename: Filter to slices from this STF file
0562         tf_filename: Filter to slices from this TF sample
0563         status: Filter by status: 'queued', 'processing', 'completed', 'failed'
0564         assigned_worker: Filter by assigned worker ID
0565         start_time: Filter slices created >= this ISO datetime (default: last 24 hours)
0566         end_time: Filter slices created <= this ISO datetime
0567 
0568     Returns list of slices with: slice_id, tf_filename, stf_filename, run_number,
0569     tf_first, tf_last, tf_count, status, assigned_worker, created_at, completed_at
0570     """
0571     @sync_to_async
0572     def fetch():
0573         qs = TFSlice.objects.all().order_by('-created_at')
0574 
0575         if run_number:
0576             qs = qs.filter(run_number=run_number)
0577         if stf_filename:
0578             qs = qs.filter(stf_filename=stf_filename)
0579         if tf_filename:
0580             qs = qs.filter(tf_filename=tf_filename)
0581         if status:
0582             qs = qs.filter(status__iexact=status)
0583         if assigned_worker:
0584             qs = qs.filter(assigned_worker=assigned_worker)
0585 
0586         has_context = run_number or stf_filename or tf_filename
0587         start = _parse_time(start_time) or (None if has_context else _default_start_time(24))
0588         end = _parse_time(end_time)
0589         if start:
0590             qs = qs.filter(created_at__gte=start)
0591         if end:
0592             qs = qs.filter(created_at__lte=end)
0593 
0594         params = []
0595         if run_number:
0596             params.append(f"run_number={run_number}")
0597         if status:
0598             params.append(f"status={status}")
0599         query_string = "&".join(params)
0600         url = _monitor_url(f"/tf-slices/?{query_string}" if query_string else "/tf-slices/")
0601 
0602         MAX_ITEMS = 200
0603         total_count = qs.count()
0604         items = [
0605             {
0606                 "slice_id": s.slice_id,
0607                 "tf_filename": s.tf_filename,
0608                 "stf_filename": s.stf_filename,
0609                 "run_number": s.run_number,
0610                 "tf_first": s.tf_first,
0611                 "tf_last": s.tf_last,
0612                 "tf_count": s.tf_count,
0613                 "status": s.status,
0614                 "assigned_worker": s.assigned_worker,
0615                 "created_at": s.created_at.isoformat() if s.created_at else None,
0616                 "completed_at": s.completed_at.isoformat() if s.completed_at else None,
0617             }
0618             for s in qs[:MAX_ITEMS]
0619         ]
0620         return {
0621             "items": items,
0622             "total_count": total_count,
0623             "has_more": total_count > MAX_ITEMS,
0624             "monitor_urls": [
0625                 {"title": "TF Slices List", "url": url},
0626             ],
0627         }
0628 
0629     return await fetch()
0630 
0631 
0632 @mcp.tool()
0633 async def swf_get_tf_slice(tf_filename: str, slice_id: int) -> dict:
0634     """
0635     Get detailed information about a specific TF slice.
0636 
0637     Args:
0638         tf_filename: The TF filename (required)
0639         slice_id: The slice ID within the TF (required, typically 1-15)
0640 
0641     Returns: slice_id, tf_filename, stf_filename, run_number, tf_first, tf_last,
0642     tf_count, status, retries, assigned_worker, assigned_at, completed_at, metadata
0643     """
0644     @sync_to_async
0645     def fetch():
0646         try:
0647             s = TFSlice.objects.get(tf_filename=tf_filename, slice_id=slice_id)
0648             return {
0649                 "slice_id": s.slice_id,
0650                 "tf_filename": s.tf_filename,
0651                 "stf_filename": s.stf_filename,
0652                 "run_number": s.run_number,
0653                 "tf_first": s.tf_first,
0654                 "tf_last": s.tf_last,
0655                 "tf_count": s.tf_count,
0656                 "status": s.status,
0657                 "retries": s.retries,
0658                 "assigned_worker": s.assigned_worker,
0659                 "assigned_at": s.assigned_at.isoformat() if s.assigned_at else None,
0660                 "completed_at": s.completed_at.isoformat() if s.completed_at else None,
0661                 "metadata": s.metadata,
0662             }
0663         except TFSlice.DoesNotExist:
0664             return {"error": f"TF slice {slice_id} for {tf_filename} not found. Use swf_list_tf_slices to see available slices."}
0665 
0666     return await fetch()
0667 
0668 
0669 # -----------------------------------------------------------------------------
0670 # Workflow Control
0671 # -----------------------------------------------------------------------------
0672 
0673 @mcp.tool()
0674 async def swf_start_workflow(
0675     workflow_name: str = None,
0676     namespace: str = None,
0677     config: str = None,
0678     realtime: bool = None,
0679     duration: int = 0,
0680     stf_count: int = None,
0681     physics_period_count: int = None,
0682     physics_period_duration: float = None,
0683     stf_interval: float = None,
0684 ) -> dict:
0685     """
0686     Start a workflow execution by sending a command to the DAQ Simulator agent.
0687 
0688     All parameters are optional - defaults are read from PersistentState 'workflow_defaults'.
0689     Call with no arguments to use configured defaults.
0690 
0691     Args:
0692         workflow_name: Name of the workflow (default: from config, typically 'stf_datataking')
0693         namespace: Testbed namespace (default: from config, e.g., 'torre1')
0694         config: Workflow config name (default: from config, e.g., 'fast_processing_default')
0695         realtime: Run in real-time mode (default: from config, typically True)
0696         duration: Max duration in seconds (0 = run until complete)
0697         stf_count: Number of STF files to generate (overrides config)
0698         physics_period_count: Number of physics periods (overrides config)
0699         physics_period_duration: Duration of each physics period in seconds (overrides config)
0700         stf_interval: Interval between STF generation in seconds (overrides config)
0701 
0702     Returns:
0703         Success/failure status with execution_id if started.
0704 
0705     After starting, ACTIVELY POLL — do not sleep:
0706         get_workflow_monitor(execution_id) every 10-15s until completion
0707         Report progress to user as it evolves
0708         list_logs(level='ERROR') after completion
0709     """
0710     import json
0711     from datetime import datetime
0712 
0713     @sync_to_async
0714     def do_start():
0715         import os
0716         from pathlib import Path
0717         from ..activemq_connection import ActiveMQConnectionManager
0718 
0719         toml_namespace = None
0720         toml_workflow_name = None
0721         toml_config = None
0722         toml_realtime = None
0723         toml_params = {}
0724 
0725         testbed_toml, config_source = _get_testbed_config_path()
0726         if testbed_toml.exists():
0727             try:
0728                 import tomllib
0729                 with open(testbed_toml, 'rb') as f:
0730                     toml_data = tomllib.load(f)
0731                 toml_namespace = toml_data.get('testbed', {}).get('namespace')
0732                 workflow_section = toml_data.get('workflow', {})
0733                 toml_workflow_name = workflow_section.get('name')
0734                 toml_config = workflow_section.get('config')
0735                 toml_realtime = workflow_section.get('realtime')
0736                 toml_params = toml_data.get('parameters', {})
0737                 if config_source == 'SWF_TESTBED_CONFIG':
0738                     logger.info(f"Using config from SWF_TESTBED_CONFIG: {testbed_toml.name}")
0739             except Exception as e:
0740                 logger.warning(f"Failed to read {testbed_toml}: {e}")
0741 
0742         actual_workflow_name = workflow_name or toml_workflow_name or 'stf_datataking'
0743         actual_namespace = namespace or toml_namespace
0744         actual_config = config or toml_config or 'fast_processing_default'
0745 
0746         # If namespace not from caller or TOML, use the running agent manager's
0747         # namespace from DB — it reflects the actual loaded config
0748         if not actual_namespace:
0749             try:
0750                 cutoff = timezone.now() - timedelta(minutes=5)
0751                 am = SystemAgent.objects.filter(
0752                     agent_type='agent_manager',
0753                     last_heartbeat__gte=cutoff
0754                 ).order_by('-last_heartbeat').first()
0755                 if am and am.namespace:
0756                     actual_namespace = am.namespace
0757                     logger.info(f"Using namespace '{am.namespace}' from agent manager '{am.instance_name}'")
0758             except Exception:
0759                 pass
0760         if not actual_namespace:
0761             actual_namespace = 'torre1'
0762         actual_realtime = realtime if realtime is not None else (toml_realtime if toml_realtime is not None else True)
0763 
0764         params = dict(toml_params)
0765         if stf_count is not None:
0766             params['stf_count'] = stf_count
0767         if physics_period_count is not None:
0768             params['physics_period_count'] = physics_period_count
0769         if physics_period_duration is not None:
0770             params['physics_period_duration'] = physics_period_duration
0771         if stf_interval is not None:
0772             params['stf_interval'] = stf_interval
0773 
0774         params['namespace'] = actual_namespace
0775 
0776         msg = {
0777             'msg_type': 'run_workflow',
0778             'namespace': actual_namespace,
0779             'workflow_name': actual_workflow_name,
0780             'config': actual_config,
0781             'realtime': actual_realtime,
0782             'duration': duration,
0783             'params': params,
0784             'timestamp': datetime.now().isoformat(),
0785             'source': 'mcp'
0786         }
0787 
0788         mq = ActiveMQConnectionManager()
0789         if mq.send_message('/queue/workflow_control', json.dumps(msg)):
0790             logger.info(
0791                 f"MCP start_workflow: sent run_workflow command for '{actual_workflow_name}' "
0792                 f"(namespace={actual_namespace}, config={actual_config}, realtime={actual_realtime})"
0793             )
0794             return {
0795                 "success": True,
0796                 "message": f"Workflow '{actual_workflow_name}' start command sent to DAQ Simulator",
0797                 "workflow_name": actual_workflow_name,
0798                 "namespace": actual_namespace,
0799                 "config": actual_config,
0800                 "realtime": actual_realtime,
0801                 "params": params,
0802                 "note": "Workflow runs asynchronously. Use swf_list_workflow_executions to monitor."
0803             }
0804         else:
0805             return {
0806                 "success": False,
0807                 "error": "Failed to send message to ActiveMQ. Is the message broker running?",
0808                 "workflow_name": actual_workflow_name,
0809                 "namespace": actual_namespace,
0810             }
0811 
0812     return await do_start()
0813 
0814 
0815 @mcp.tool()
0816 async def swf_stop_workflow(execution_id: str) -> dict:
0817     """
0818     Stop a running workflow by sending a stop command to the DAQ Simulator agent.
0819 
0820     Sends a stop_workflow command that the agent checks between simulation events.
0821     The workflow stops gracefully at the next checkpoint.
0822 
0823     To find the execution_id, use list_workflow_executions(currently_running=True).
0824 
0825     Args:
0826         execution_id: The execution ID to stop (e.g., 'stf_datataking-wenauseic-0042')
0827 
0828     Returns:
0829         Success/failure status. The actual stop is asynchronous - monitor via
0830         list_workflow_executions to confirm termination.
0831     """
0832     import json
0833     from datetime import datetime
0834 
0835     @sync_to_async
0836     def do_stop():
0837         from ..activemq_connection import ActiveMQConnectionManager
0838 
0839         try:
0840             execution = WorkflowExecution.objects.get(execution_id=execution_id)
0841         except WorkflowExecution.DoesNotExist:
0842             return {
0843                 "success": False,
0844                 "error": f"Execution '{execution_id}' not found",
0845             }
0846 
0847         if execution.status != 'running':
0848             return {
0849                 "success": False,
0850                 "error": f"Execution '{execution_id}' is not running (status: {execution.status})",
0851             }
0852 
0853         msg = {
0854             'msg_type': 'stop_workflow',
0855             'execution_id': execution_id,
0856             'namespace': execution.namespace,
0857             'timestamp': datetime.now().isoformat(),
0858             'source': 'mcp'
0859         }
0860 
0861         mq = ActiveMQConnectionManager()
0862         if mq.send_message('/queue/workflow_control', json.dumps(msg)):
0863             logger.info(f"MCP stop_workflow: sent stop command for execution '{execution_id}'")
0864             return {
0865                 "success": True,
0866                 "message": f"Stop command sent for execution '{execution_id}'",
0867                 "execution_id": execution_id,
0868                 "namespace": execution.namespace,
0869                 "note": "Workflow will stop at next checkpoint. Monitor via list_workflow_executions."
0870             }
0871         else:
0872             return {
0873                 "success": False,
0874                 "error": "Failed to send message to ActiveMQ. Is the message broker running?",
0875                 "execution_id": execution_id,
0876             }
0877 
0878     return await do_stop()
0879 
0880 
0881 @mcp.tool()
0882 async def swf_end_execution(execution_id: str) -> dict:
0883     """
0884     End a running workflow execution by setting its status to 'terminated'.
0885 
0886     Use this to clean up stale or stuck executions that are still marked as 'running'.
0887     This is a state change only - no data is deleted. The action is logged.
0888 
0889     Args:
0890         execution_id: The execution ID to end (use list_workflow_executions to find running ones)
0891 
0892     Returns:
0893         Success/failure status with details
0894     """
0895     @sync_to_async
0896     def do_end():
0897         try:
0898             execution = WorkflowExecution.objects.get(execution_id=execution_id)
0899         except WorkflowExecution.DoesNotExist:
0900             return {
0901                 "success": False,
0902                 "error": f"Execution '{execution_id}' not found",
0903             }
0904 
0905         old_status = execution.status
0906         if old_status != 'running':
0907             return {
0908                 "success": False,
0909                 "error": f"Execution '{execution_id}' is not running (status: {old_status})",
0910             }
0911 
0912         execution.status = 'terminated'
0913         execution.end_time = timezone.now()
0914         execution.save()
0915 
0916         logger.info(
0917             f"MCP end_execution: '{execution_id}' terminated (was running since {execution.start_time})"
0918         )
0919 
0920         return {
0921             "success": True,
0922             "execution_id": execution_id,
0923             "old_status": old_status,
0924             "new_status": "terminated",
0925             "start_time": execution.start_time.isoformat() if execution.start_time else None,
0926             "end_time": execution.end_time.isoformat() if execution.end_time else None,
0927         }
0928 
0929     return await do_end()
0930 
0931 
0932 # -----------------------------------------------------------------------------
0933 # Workflow Monitoring
0934 # -----------------------------------------------------------------------------
0935 
0936 @mcp.tool()
0937 async def swf_get_workflow_monitor(execution_id: str) -> dict:
0938     """
0939     Get the status and accumulated events for a workflow execution.
0940 
0941     This provides a summary of workflow progress without needing to poll
0942     multiple tools. Aggregates messages and logs for the execution.
0943 
0944     Args:
0945         execution_id: The execution ID to get monitor status for
0946 
0947     Returns:
0948         - execution_id: The execution being monitored
0949         - status: Current workflow status (running/completed/failed/terminated)
0950         - phase: Current phase (imminent/running/ended)
0951         - events: List of key events with timestamps
0952         - stf_count: Number of STF files generated
0953         - errors: List of any errors encountered
0954         - duration_seconds: How long the workflow ran (if completed)
0955     """
0956     import logging as py_logging
0957 
0958     @sync_to_async
0959     def fetch():
0960         try:
0961             execution = WorkflowExecution.objects.get(execution_id=execution_id)
0962             db_status = execution.status
0963             db_start_time = execution.start_time
0964             db_end_time = execution.end_time
0965         except WorkflowExecution.DoesNotExist:
0966             return {"error": f"Execution '{execution_id}' not found"}
0967 
0968         duration_seconds = None
0969         if db_start_time and db_end_time:
0970             duration_seconds = (db_end_time - db_start_time).total_seconds()
0971 
0972         messages = WorkflowMessage.objects.filter(
0973             execution_id=execution_id
0974         ).order_by('sent_at')
0975 
0976         events = []
0977         current_phase = "unknown"
0978         run_id = None
0979         run_ids = set()
0980         errors = []
0981 
0982         for msg in messages:
0983             msg_type = msg.message_type
0984             timestamp = msg.sent_at.isoformat() if msg.sent_at else None
0985             content = msg.message_content or {}
0986 
0987             if msg_type == 'run_imminent':
0988                 current_phase = "imminent"
0989                 run_id = content.get('run_id') or msg.run_id
0990                 if run_id:
0991                     run_ids.add(run_id)
0992                 events.append({"type": "run_imminent", "time": timestamp, "run_id": run_id})
0993             elif msg_type == 'start_run':
0994                 current_phase = "running"
0995                 events.append({"type": "start_run", "time": timestamp})
0996             elif msg_type == 'stf_gen':
0997                 pass  # counted from StfFile table below
0998             elif msg_type == 'end_run':
0999                 current_phase = "ended"
1000                 events.append({"type": "end_run", "time": timestamp})
1001             elif msg_type in ('run_workflow_failed', 'error'):
1002                 errors.append({
1003                     "time": timestamp,
1004                     "error": content.get('error', str(content)),
1005                 })
1006 
1007         # Count actual STF files from the StfFile table
1008         if not run_ids:
1009             run_ids = set(WorkflowMessage.objects.filter(
1010                 execution_id=execution_id, run_id__isnull=False,
1011             ).values_list('run_id', flat=True).distinct())
1012         run_numbers = [int(r) for r in run_ids if r]
1013         stf_count = StfFile.objects.filter(run__run_number__in=run_numbers).count()
1014 
1015         error_logs = AppLog.objects.filter(
1016             level__gte=py_logging.ERROR,
1017             extra_data__execution_id=execution_id,
1018         ).order_by('timestamp')[:10]
1019 
1020         for log in error_logs:
1021             errors.append({
1022                 "time": log.timestamp.isoformat() if log.timestamp else None,
1023                 "error": log.message,
1024                 "source": "log",
1025             })
1026 
1027         return {
1028             "execution_id": execution_id,
1029             "status": db_status,
1030             "phase": current_phase,
1031             "run_id": run_id,
1032             "stf_count": stf_count,
1033             "events": events,
1034             "errors": errors,
1035             "start_time": db_start_time.isoformat() if db_start_time else None,
1036             "end_time": db_end_time.isoformat() if db_end_time else None,
1037             "duration_seconds": duration_seconds,
1038             "monitor_urls": [
1039                 {"title": "Execution Detail", "url": _monitor_url(f"/workflow-executions/{execution_id}/")},
1040             ],
1041         }
1042 
1043     return await fetch()
1044 
1045 
1046 @mcp.tool()
1047 async def swf_list_workflow_monitors() -> list:
1048     """
1049     List recent workflow executions that can be monitored.
1050 
1051     Returns executions from the last 24 hours with their current status,
1052     allowing you to pick one to monitor with get_workflow_monitor().
1053 
1054     Returns list of executions with: execution_id, status, start_time, stf_count
1055     """
1056     @sync_to_async
1057     def fetch():
1058         now = timezone.now()
1059         qs = WorkflowExecution.objects.filter(
1060             start_time__gte=now - timedelta(hours=24)
1061         ).order_by('-start_time')
1062 
1063         MAX_ITEMS = 20
1064         total_count = qs.count()
1065         items = []
1066         for e in qs[:MAX_ITEMS]:
1067             run_ids = WorkflowMessage.objects.filter(
1068                 execution_id=e.execution_id,
1069                 run_id__isnull=False,
1070             ).values_list('run_id', flat=True).distinct()
1071             run_numbers = [int(r) for r in run_ids if r]
1072             stf_count = StfFile.objects.filter(run__run_number__in=run_numbers).count()
1073 
1074             items.append({
1075                 "execution_id": e.execution_id,
1076                 "status": e.status,
1077                 "start_time": e.start_time.isoformat() if e.start_time else None,
1078                 "end_time": e.end_time.isoformat() if e.end_time else None,
1079                 "stf_count": stf_count,
1080             })
1081 
1082         return {
1083             "items": items,
1084             "total_count": total_count,
1085             "has_more": total_count > MAX_ITEMS,
1086             "monitor_urls": [
1087                 {"title": "Executions List", "url": _monitor_url("/workflow-executions/")},
1088             ],
1089         }
1090 
1091     return await fetch()
1092 
1093 
1094 @mcp.tool()
1095 async def swf_send_message(message: str, message_type: str = "announcement", metadata: dict = None) -> dict:
1096     """
1097     Send a message to the workflow monitoring stream.
1098 
1099     Use for testing the message pipeline, announcements to colleagues,
1100     or any other broadcast purpose.
1101 
1102     The sender is automatically identified as '{username}-personal-agent'.
1103 
1104     Args:
1105         message: The message text to send
1106         message_type: Type of message - 'announcement', 'status', 'test', etc.
1107                       If 'test', namespace is omitted. Otherwise uses configured namespace.
1108         metadata: Optional additional key-value data to include
1109 
1110     Returns:
1111         Success/failure status with message details
1112     """
1113     import json
1114     from datetime import datetime
1115 
1116     @sync_to_async
1117     def do_send():
1118         from ..activemq_connection import ActiveMQConnectionManager
1119 
1120         username = _get_username()
1121         sender = f"{username}-personal-agent"
1122 
1123         namespace = None
1124         if message_type != 'test':
1125             testbed_toml, _ = _get_testbed_config_path()
1126             if testbed_toml and testbed_toml.exists():
1127                 try:
1128                     import tomllib
1129                     with open(testbed_toml, 'rb') as f:
1130                         toml_data = tomllib.load(f)
1131                     namespace = toml_data.get('testbed', {}).get('namespace')
1132                 except Exception:
1133                     pass
1134 
1135         msg = {
1136             'msg_type': message_type,
1137             'sender': sender,
1138             'namespace': namespace,
1139             'message': message,
1140             'timestamp': datetime.now().isoformat(),
1141             'source': 'mcp_send_message',
1142         }
1143         if metadata:
1144             msg['metadata'] = metadata
1145 
1146         topic = '/topic/epictopic'
1147         mq = ActiveMQConnectionManager()
1148         if mq.send_message(topic, json.dumps(msg)):
1149             logger.info(f"MCP send_message: sent {message_type} from {sender}")
1150             return {
1151                 "success": True,
1152                 "message": "Message sent to monitoring stream",
1153                 "sender": sender,
1154                 "message_type": message_type,
1155                 "namespace": namespace,
1156                 "content": message,
1157             }
1158         else:
1159             return {
1160                 "success": False,
1161                 "error": "Failed to send message to ActiveMQ. Is the message broker running?",
1162             }
1163 
1164     return await do_send()