File indexing completed on 2026-04-25 08:29:11
0001 """
0002 Enhanced Django models for SWF workflow tracking.
0003
0004 This module extends the existing models with workflow-specific fields and adds new models
0005 for tracking the complete STF processing pipeline from DAQ generation through agent processing.
0006 """
0007
0008 import uuid
0009 from django.db import models
0010 from django.utils import timezone
0011
0012
0013 class DAQState(models.TextChoices):
0014 """
0015 DAQ system states from the schedule-based simulation.
0016 These correspond to the detector/accelerator operational states.
0017 """
0018 NO_BEAM = "no_beam", "No Beam"
0019 BEAM = "beam", "Beam"
0020 RUN = "run", "Run"
0021 CALIB = "calib", "Calibration"
0022 TEST = "test", "Test"
0023
0024
0025 class DAQSubstate(models.TextChoices):
0026 """
0027 DAQ system substates providing additional context within each main state.
0028 """
0029 NOT_READY = "not_ready", "Not Ready"
0030 READY = "ready", "Ready"
0031 PHYSICS = "physics", "Physics"
0032 STANDBY = "standby", "Standby"
0033 LUMI = "lumi", "Luminosity"
0034 EIC = "eic", "EIC"
0035 EPIC = "epic", "ePIC"
0036 DAQ = "daq", "DAQ"
0037 CALIB = "calib", "Calibration"
0038
0039
0040 class WorkflowStatus(models.TextChoices):
0041 """
0042 Overall workflow status for STF processing through the complete pipeline.
0043 Complete symmetry across all agent types: daqsim, data, processing, fastmon.
0044 """
0045 GENERATED = "generated", "Generated by DAQ"
0046
0047
0048 DAQSIM_RECEIVED = "daqsim_received", "Received by DAQSIM Agent"
0049 DAQSIM_PROCESSING = "daqsim_processing", "DAQSIM Agent Processing"
0050 DAQSIM_COMPLETE = "daqsim_complete", "DAQSIM Agent Complete"
0051
0052
0053 DATA_RECEIVED = "data_received", "Received by Data Agent"
0054 DATA_PROCESSING = "data_processing", "Data Agent Processing"
0055 DATA_COMPLETE = "data_complete", "Data Agent Complete"
0056
0057
0058 PROCESSING_RECEIVED = "processing_received", "Received by Processing Agent"
0059 PROCESSING_PROCESSING = "processing_processing", "Processing Agent Processing"
0060 PROCESSING_COMPLETE = "processing_complete", "Processing Agent Complete"
0061
0062
0063 FASTMON_RECEIVED = "fastmon_received", "Received by FastMon Agent"
0064 FASTMON_PROCESSING = "fastmon_processing", "FastMon Agent Processing"
0065 FASTMON_COMPLETE = "fastmon_complete", "FastMon Agent Complete"
0066
0067
0068 WORKFLOW_COMPLETE = "workflow_complete", "Workflow Complete"
0069 FAILED = "failed", "Failed"
0070
0071
0072 class AgentType(models.TextChoices):
0073 """
0074 Standardized agent types for the workflow system.
0075 """
0076 DAQSIM = "daqsim", "DAQ Simulator"
0077 DATA = "data", "Data Agent"
0078 PROCESSING = "processing", "Processing Agent"
0079 FASTMON = "fastmon", "Fast Monitoring Agent"
0080 MONITOR = "monitor", "Monitor System"
0081
0082
0083 class STFWorkflow(models.Model):
0084 """
0085 Tracks the complete workflow lifecycle of a Super Time Frame from generation to completion.
0086
0087 This model extends the existing StfFile model concept to include workflow-specific fields
0088 and tracks the STF as it moves through different agents in the pipeline.
0089 """
0090 workflow_id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
0091
0092
0093 filename = models.CharField(max_length=255, unique=True)
0094 file_id = models.UUIDField(null=True, blank=True)
0095
0096
0097 namespace = models.CharField(max_length=100, null=True, blank=True, db_index=True,
0098 help_text="Testbed namespace for workflow delineation")
0099 execution_id = models.CharField(max_length=100, null=True, blank=True, db_index=True,
0100 help_text="Workflow execution instance ID")
0101 run_id = models.CharField(max_length=50, null=True, blank=True, db_index=True,
0102 help_text="Run number within execution")
0103
0104
0105 daq_state = models.CharField(max_length=20, choices=DAQState.choices)
0106 daq_substate = models.CharField(max_length=20, choices=DAQSubstate.choices)
0107
0108
0109 generated_time = models.DateTimeField()
0110 stf_start_time = models.DateTimeField()
0111 stf_end_time = models.DateTimeField()
0112
0113
0114 current_status = models.CharField(
0115 max_length=30,
0116 choices=WorkflowStatus.choices,
0117 default=WorkflowStatus.GENERATED
0118 )
0119
0120
0121 current_agent = models.CharField(max_length=20, choices=AgentType.choices, default=AgentType.DAQSIM)
0122
0123
0124 completed_at = models.DateTimeField(null=True, blank=True)
0125 failed_at = models.DateTimeField(null=True, blank=True)
0126 failure_reason = models.TextField(null=True, blank=True)
0127
0128
0129 stf_metadata = models.JSONField(null=True, blank=True)
0130
0131
0132 created_at = models.DateTimeField(auto_now_add=True)
0133 updated_at = models.DateTimeField(auto_now=True)
0134
0135 class Meta:
0136 db_table = 'swf_stf_workflows'
0137 ordering = ['-generated_time']
0138 indexes = [
0139 models.Index(fields=['current_status', 'generated_time']),
0140 models.Index(fields=['daq_state', 'daq_substate']),
0141 models.Index(fields=['current_agent']),
0142 models.Index(fields=['namespace', 'execution_id']),
0143 models.Index(fields=['namespace', 'run_id']),
0144 ]
0145
0146 def __str__(self):
0147 return f"STF Workflow {self.filename} - {self.current_status}"
0148
0149 def mark_completed(self):
0150 """Mark the workflow as completed."""
0151 self.current_status = WorkflowStatus.WORKFLOW_COMPLETE
0152 self.completed_at = timezone.now()
0153 self.save()
0154
0155 def mark_failed(self, reason):
0156 """Mark the workflow as failed with a reason."""
0157 self.current_status = WorkflowStatus.FAILED
0158 self.failed_at = timezone.now()
0159 self.failure_reason = reason
0160 self.save()
0161
0162
0163 class AgentWorkflowStage(models.Model):
0164 """
0165 Tracks individual agent processing stages within an STF workflow.
0166
0167 This model records when each agent receives, processes, and completes work on an STF,
0168 providing detailed timing and status information for performance monitoring.
0169 """
0170 stage_id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
0171
0172
0173 workflow = models.ForeignKey(STFWorkflow, on_delete=models.CASCADE, related_name='stages')
0174 agent_name = models.CharField(max_length=100)
0175 agent_type = models.CharField(max_length=20, choices=AgentType.choices)
0176
0177
0178 status = models.CharField(
0179 max_length=30,
0180 choices=WorkflowStatus.choices,
0181 default=WorkflowStatus.GENERATED
0182 )
0183
0184
0185 received_at = models.DateTimeField(null=True, blank=True)
0186 started_at = models.DateTimeField(null=True, blank=True)
0187 completed_at = models.DateTimeField(null=True, blank=True)
0188 failed_at = models.DateTimeField(null=True, blank=True)
0189
0190
0191 processing_time_seconds = models.FloatField(null=True, blank=True)
0192 failure_reason = models.TextField(null=True, blank=True)
0193
0194
0195 input_message = models.JSONField(null=True, blank=True)
0196 output_message = models.JSONField(null=True, blank=True)
0197
0198
0199 stage_metadata = models.JSONField(null=True, blank=True)
0200
0201
0202 created_at = models.DateTimeField(auto_now_add=True)
0203 updated_at = models.DateTimeField(auto_now=True)
0204
0205 class Meta:
0206 db_table = 'swf_agent_workflow_stages'
0207 ordering = ['workflow', 'created_at']
0208 indexes = [
0209 models.Index(fields=['workflow', 'agent_type']),
0210 models.Index(fields=['agent_name', 'status']),
0211 models.Index(fields=['received_at']),
0212 ]
0213 unique_together = [['workflow', 'agent_name', 'agent_type']]
0214
0215 def __str__(self):
0216 return f"Stage {self.agent_name} ({self.agent_type}) - {self.status}"
0217
0218 def mark_received(self, message=None):
0219 """Mark the stage as received by the agent."""
0220 self.received_at = timezone.now()
0221 self.status = f"{self.agent_type.lower()}_received"
0222 if message:
0223 self.input_message = message
0224 self.save()
0225
0226 def mark_processing(self):
0227 """Mark the stage as being processed."""
0228 self.started_at = timezone.now()
0229 self.status = f"{self.agent_type.lower()}_processing"
0230 self.save()
0231
0232 def mark_completed(self, output_message=None):
0233 """Mark the stage as completed."""
0234 self.completed_at = timezone.now()
0235 self.status = f"{self.agent_type.lower()}_complete"
0236 if output_message:
0237 self.output_message = output_message
0238
0239
0240 if self.started_at:
0241 self.processing_time_seconds = (self.completed_at - self.started_at).total_seconds()
0242
0243 self.save()
0244
0245 def mark_failed(self, reason):
0246 """Mark the stage as failed."""
0247 self.failed_at = timezone.now()
0248 self.status = WorkflowStatus.FAILED
0249 self.failure_reason = reason
0250 self.save()
0251
0252
0253 class WorkflowMessage(models.Model):
0254 """
0255 Tracks all messages exchanged in the workflow system.
0256
0257 Provides workflow-specific message tracking with agent identification and message type categorization.
0258 """
0259 message_id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
0260
0261
0262 workflow = models.ForeignKey(STFWorkflow, on_delete=models.CASCADE, related_name='messages', null=True, blank=True)
0263 stage = models.ForeignKey(AgentWorkflowStage, on_delete=models.CASCADE, related_name='messages', null=True, blank=True)
0264
0265
0266 message_type = models.CharField(max_length=50)
0267 request_id = models.IntegerField(null=True, blank=True)
0268
0269
0270 sender_agent = models.CharField(max_length=100, null=True, blank=True)
0271 sender_type = models.CharField(max_length=20, choices=AgentType.choices, null=True, blank=True)
0272 recipient_agent = models.CharField(max_length=100, null=True, blank=True)
0273 recipient_type = models.CharField(max_length=20, choices=AgentType.choices, null=True, blank=True)
0274
0275
0276 namespace = models.CharField(max_length=100, null=True, blank=True, db_index=True,
0277 help_text="Testbed namespace for multi-user message filtering")
0278
0279
0280 execution_id = models.CharField(max_length=100, null=True, blank=True, db_index=True,
0281 help_text="Workflow execution instance ID")
0282 run_id = models.CharField(max_length=50, null=True, blank=True, db_index=True,
0283 help_text="Run number within execution")
0284
0285
0286 message_content = models.JSONField()
0287
0288
0289 message_metadata = models.JSONField(null=True, blank=True, default=dict, help_text="Extensible metadata for monitoring, debugging, and system tracking")
0290
0291
0292 sent_at = models.DateTimeField(auto_now_add=True)
0293 delivered_at = models.DateTimeField(null=True, blank=True)
0294 acknowledged_at = models.DateTimeField(null=True, blank=True)
0295
0296
0297 is_successful = models.BooleanField(null=True, default=None)
0298 error_message = models.TextField(null=True, blank=True)
0299
0300
0301 queue_name = models.CharField(max_length=100, null=True, blank=True)
0302 correlation_id = models.CharField(max_length=100, null=True, blank=True)
0303
0304 class Meta:
0305 db_table = 'swf_workflow_messages'
0306 ordering = ['-sent_at']
0307 indexes = [
0308 models.Index(fields=['workflow', 'message_type']),
0309 models.Index(fields=['sender_agent', 'sent_at']),
0310 models.Index(fields=['message_type', 'sent_at']),
0311 models.Index(fields=['namespace', 'execution_id']),
0312 models.Index(fields=['namespace', 'run_id']),
0313 ]
0314
0315 def __str__(self):
0316 return f"Message {self.message_type} from {self.sender_agent} at {self.sent_at}"
0317
0318 def mark_delivered(self):
0319 """Mark the message as delivered."""
0320 self.delivered_at = timezone.now()
0321 self.is_successful = True
0322 self.save()
0323
0324 def mark_failed(self, error):
0325 """Mark the message as failed."""
0326 self.is_successful = False
0327 self.error_message = error
0328 self.save()
0329
0330
0331 class WorkflowDefinition(models.Model):
0332 """
0333 Defines reusable workflow templates with parameters and execution logic.
0334 """
0335 workflow_name = models.CharField(max_length=200, help_text="Unique workflow name")
0336 version = models.CharField(max_length=50, help_text="Version string")
0337 workflow_type = models.CharField(max_length=100, help_text="Flexible workflow type classification")
0338 definition = models.TextField(help_text="Python workflow code content")
0339 parameter_values = models.JSONField(default=dict, help_text="Default parameter values and schema")
0340 created_by = models.CharField(max_length=100, help_text="Username who created this workflow")
0341 created_at = models.DateTimeField(auto_now_add=True)
0342 updated_at = models.DateTimeField(auto_now=True)
0343
0344 class Meta:
0345 db_table = 'swf_workflow_definitions'
0346 unique_together = [['workflow_name', 'version']]
0347
0348 def __str__(self):
0349 return f"{self.workflow_name} v{self.version}"
0350
0351
0352 class WorkflowExecution(models.Model):
0353 """
0354 Tracks individual workflow execution instances.
0355 """
0356 execution_id = models.CharField(primary_key=True, max_length=100, help_text="Human-readable execution ID")
0357 workflow_definition = models.ForeignKey(WorkflowDefinition, on_delete=models.CASCADE, related_name='executions')
0358 namespace = models.CharField(max_length=100, null=True, blank=True, db_index=True,
0359 help_text="Testbed namespace for workflow delineation")
0360 parameter_values = models.JSONField(help_text="Actual parameter values used for this execution")
0361 performance_metrics = models.JSONField(null=True, blank=True, help_text="Performance metrics and results")
0362 status = models.CharField(max_length=50, default='pending', help_text="Flexible execution status")
0363 start_time = models.DateTimeField(help_text="Execution start timestamp")
0364 end_time = models.DateTimeField(null=True, blank=True, help_text="Execution completion timestamp")
0365 executed_by = models.CharField(max_length=100, help_text="Username who executed this workflow")
0366
0367 class Meta:
0368 db_table = 'swf_workflow_executions'
0369 ordering = ['-start_time']
0370
0371 def __str__(self):
0372 return f"Execution {self.execution_id} ({self.status})"
0373
0374
0375 class Namespace(models.Model):
0376 """
0377 Testbed namespace for workflow isolation and multi-user environments.
0378 Namespaces group agents, executions, and messages for a particular user or purpose.
0379 """
0380 name = models.CharField(max_length=100, primary_key=True)
0381 owner = models.CharField(max_length=100, help_text="Username of namespace owner")
0382 description = models.TextField(blank=True)
0383 metadata = models.JSONField(null=True, blank=True)
0384 created_at = models.DateTimeField(auto_now_add=True)
0385 updated_at = models.DateTimeField(auto_now=True)
0386
0387 class Meta:
0388 db_table = 'swf_namespace'
0389 ordering = ['name']
0390
0391 def __str__(self):
0392 return self.name