Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-25 08:29:11

0001 """

0002 Enhanced Django models for SWF workflow tracking.

0003 

0004 This module extends the existing models with workflow-specific fields and adds new models

0005 for tracking the complete STF processing pipeline from DAQ generation through agent processing.

0006 """
0007 
0008 import uuid
0009 from django.db import models
0010 from django.utils import timezone
0011 
0012 
0013 class DAQState(models.TextChoices):
0014     """

0015     DAQ system states from the schedule-based simulation.

0016     These correspond to the detector/accelerator operational states.

0017     """
0018     NO_BEAM = "no_beam", "No Beam"
0019     BEAM = "beam", "Beam"
0020     RUN = "run", "Run"
0021     CALIB = "calib", "Calibration"
0022     TEST = "test", "Test"
0023 
0024 
0025 class DAQSubstate(models.TextChoices):
0026     """

0027     DAQ system substates providing additional context within each main state.

0028     """
0029     NOT_READY = "not_ready", "Not Ready"
0030     READY = "ready", "Ready"
0031     PHYSICS = "physics", "Physics"
0032     STANDBY = "standby", "Standby"
0033     LUMI = "lumi", "Luminosity"
0034     EIC = "eic", "EIC"
0035     EPIC = "epic", "ePIC"
0036     DAQ = "daq", "DAQ"
0037     CALIB = "calib", "Calibration"
0038 
0039 
0040 class WorkflowStatus(models.TextChoices):
0041     """

0042     Overall workflow status for STF processing through the complete pipeline.

0043     Complete symmetry across all agent types: daqsim, data, processing, fastmon.

0044     """
0045     GENERATED = "generated", "Generated by DAQ"
0046     
0047     # DAQSIM Agent statuses

0048     DAQSIM_RECEIVED = "daqsim_received", "Received by DAQSIM Agent"
0049     DAQSIM_PROCESSING = "daqsim_processing", "DAQSIM Agent Processing"
0050     DAQSIM_COMPLETE = "daqsim_complete", "DAQSIM Agent Complete"
0051     
0052     # Data Agent statuses

0053     DATA_RECEIVED = "data_received", "Received by Data Agent"
0054     DATA_PROCESSING = "data_processing", "Data Agent Processing"
0055     DATA_COMPLETE = "data_complete", "Data Agent Complete"
0056     
0057     # Processing Agent statuses

0058     PROCESSING_RECEIVED = "processing_received", "Received by Processing Agent"
0059     PROCESSING_PROCESSING = "processing_processing", "Processing Agent Processing"
0060     PROCESSING_COMPLETE = "processing_complete", "Processing Agent Complete"
0061     
0062     # FastMon Agent statuses

0063     FASTMON_RECEIVED = "fastmon_received", "Received by FastMon Agent"
0064     FASTMON_PROCESSING = "fastmon_processing", "FastMon Agent Processing"
0065     FASTMON_COMPLETE = "fastmon_complete", "FastMon Agent Complete"
0066     
0067     # Overall workflow statuses

0068     WORKFLOW_COMPLETE = "workflow_complete", "Workflow Complete"
0069     FAILED = "failed", "Failed"
0070 
0071 
0072 class AgentType(models.TextChoices):
0073     """

0074     Standardized agent types for the workflow system.

0075     """
0076     DAQSIM = "daqsim", "DAQ Simulator"
0077     DATA = "data", "Data Agent"
0078     PROCESSING = "processing", "Processing Agent"
0079     FASTMON = "fastmon", "Fast Monitoring Agent"
0080     MONITOR = "monitor", "Monitor System"
0081 
0082 
0083 class STFWorkflow(models.Model):
0084     """

0085     Tracks the complete workflow lifecycle of a Super Time Frame from generation to completion.

0086 

0087     This model extends the existing StfFile model concept to include workflow-specific fields

0088     and tracks the STF as it moves through different agents in the pipeline.

0089     """
0090     workflow_id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
0091 
0092     # STF identification and metadata

0093     filename = models.CharField(max_length=255, unique=True)
0094     file_id = models.UUIDField(null=True, blank=True)  # Link to StfFile if needed

0095 
0096     # Workflow instance identification

0097     namespace = models.CharField(max_length=100, null=True, blank=True, db_index=True,
0098                                  help_text="Testbed namespace for workflow delineation")
0099     execution_id = models.CharField(max_length=100, null=True, blank=True, db_index=True,
0100                                     help_text="Workflow execution instance ID")
0101     run_id = models.CharField(max_length=50, null=True, blank=True, db_index=True,
0102                               help_text="Run number within execution")
0103 
0104     # DAQ state information

0105     daq_state = models.CharField(max_length=20, choices=DAQState.choices)
0106     daq_substate = models.CharField(max_length=20, choices=DAQSubstate.choices)
0107     
0108     # Time tracking

0109     generated_time = models.DateTimeField()  # From STF start time

0110     stf_start_time = models.DateTimeField()  # From STF metadata

0111     stf_end_time = models.DateTimeField()    # From STF metadata

0112     
0113     # Workflow status

0114     current_status = models.CharField(
0115         max_length=30,
0116         choices=WorkflowStatus.choices,
0117         default=WorkflowStatus.GENERATED
0118     )
0119     
0120     # Agent tracking

0121     current_agent = models.CharField(max_length=20, choices=AgentType.choices, default=AgentType.DAQSIM)
0122     
0123     # Completion tracking

0124     completed_at = models.DateTimeField(null=True, blank=True)
0125     failed_at = models.DateTimeField(null=True, blank=True)
0126     failure_reason = models.TextField(null=True, blank=True)
0127     
0128     # Metadata storage

0129     stf_metadata = models.JSONField(null=True, blank=True)
0130     
0131     # Timestamps

0132     created_at = models.DateTimeField(auto_now_add=True)
0133     updated_at = models.DateTimeField(auto_now=True)
0134 
0135     class Meta:
0136         db_table = 'swf_stf_workflows'
0137         ordering = ['-generated_time']
0138         indexes = [
0139             models.Index(fields=['current_status', 'generated_time']),
0140             models.Index(fields=['daq_state', 'daq_substate']),
0141             models.Index(fields=['current_agent']),
0142             models.Index(fields=['namespace', 'execution_id']),
0143             models.Index(fields=['namespace', 'run_id']),
0144         ]
0145 
0146     def __str__(self):
0147         return f"STF Workflow {self.filename} - {self.current_status}"
0148 
0149     def mark_completed(self):
0150         """Mark the workflow as completed."""
0151         self.current_status = WorkflowStatus.WORKFLOW_COMPLETE
0152         self.completed_at = timezone.now()
0153         self.save()
0154 
0155     def mark_failed(self, reason):
0156         """Mark the workflow as failed with a reason."""
0157         self.current_status = WorkflowStatus.FAILED
0158         self.failed_at = timezone.now()
0159         self.failure_reason = reason
0160         self.save()
0161 
0162 
0163 class AgentWorkflowStage(models.Model):
0164     """

0165     Tracks individual agent processing stages within an STF workflow.

0166     

0167     This model records when each agent receives, processes, and completes work on an STF,

0168     providing detailed timing and status information for performance monitoring.

0169     """
0170     stage_id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
0171     
0172     # Relationships

0173     workflow = models.ForeignKey(STFWorkflow, on_delete=models.CASCADE, related_name='stages')
0174     agent_name = models.CharField(max_length=100)  # Instance name of the agent

0175     agent_type = models.CharField(max_length=20, choices=AgentType.choices)
0176     
0177     # Status tracking

0178     status = models.CharField(
0179         max_length=30,
0180         choices=WorkflowStatus.choices,
0181         default=WorkflowStatus.GENERATED
0182     )
0183     
0184     # Timing

0185     received_at = models.DateTimeField(null=True, blank=True)
0186     started_at = models.DateTimeField(null=True, blank=True)
0187     completed_at = models.DateTimeField(null=True, blank=True)
0188     failed_at = models.DateTimeField(null=True, blank=True)
0189     
0190     # Processing details

0191     processing_time_seconds = models.FloatField(null=True, blank=True)
0192     failure_reason = models.TextField(null=True, blank=True)
0193     
0194     # Message tracking

0195     input_message = models.JSONField(null=True, blank=True)
0196     output_message = models.JSONField(null=True, blank=True)
0197     
0198     # Metadata

0199     stage_metadata = models.JSONField(null=True, blank=True)
0200     
0201     # Timestamps

0202     created_at = models.DateTimeField(auto_now_add=True)
0203     updated_at = models.DateTimeField(auto_now=True)
0204 
0205     class Meta:
0206         db_table = 'swf_agent_workflow_stages'
0207         ordering = ['workflow', 'created_at']
0208         indexes = [
0209             models.Index(fields=['workflow', 'agent_type']),
0210             models.Index(fields=['agent_name', 'status']),
0211             models.Index(fields=['received_at']),
0212         ]
0213         unique_together = [['workflow', 'agent_name', 'agent_type']]
0214 
0215     def __str__(self):
0216         return f"Stage {self.agent_name} ({self.agent_type}) - {self.status}"
0217 
0218     def mark_received(self, message=None):
0219         """Mark the stage as received by the agent."""
0220         self.received_at = timezone.now()
0221         self.status = f"{self.agent_type.lower()}_received"
0222         if message:
0223             self.input_message = message
0224         self.save()
0225 
0226     def mark_processing(self):
0227         """Mark the stage as being processed."""
0228         self.started_at = timezone.now()
0229         self.status = f"{self.agent_type.lower()}_processing"
0230         self.save()
0231 
0232     def mark_completed(self, output_message=None):
0233         """Mark the stage as completed."""
0234         self.completed_at = timezone.now()
0235         self.status = f"{self.agent_type.lower()}_complete"
0236         if output_message:
0237             self.output_message = output_message
0238         
0239         # Calculate processing time

0240         if self.started_at:
0241             self.processing_time_seconds = (self.completed_at - self.started_at).total_seconds()
0242         
0243         self.save()
0244 
0245     def mark_failed(self, reason):
0246         """Mark the stage as failed."""
0247         self.failed_at = timezone.now()
0248         self.status = WorkflowStatus.FAILED
0249         self.failure_reason = reason
0250         self.save()
0251 
0252 
0253 class WorkflowMessage(models.Model):
0254     """

0255     Tracks all messages exchanged in the workflow system.

0256 

0257     Provides workflow-specific message tracking with agent identification and message type categorization.

0258     """
0259     message_id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
0260     
0261     # Relationships

0262     workflow = models.ForeignKey(STFWorkflow, on_delete=models.CASCADE, related_name='messages', null=True, blank=True)
0263     stage = models.ForeignKey(AgentWorkflowStage, on_delete=models.CASCADE, related_name='messages', null=True, blank=True)
0264     
0265     # Message identification

0266     message_type = models.CharField(max_length=50)  # e.g., 'stf_gen', 'data_ready', 'proc_complete'

0267     request_id = models.IntegerField(null=True, blank=True)
0268     
0269     # Agent information

0270     sender_agent = models.CharField(max_length=100, null=True, blank=True)
0271     sender_type = models.CharField(max_length=20, choices=AgentType.choices, null=True, blank=True)
0272     recipient_agent = models.CharField(max_length=100, null=True, blank=True)
0273     recipient_type = models.CharField(max_length=20, choices=AgentType.choices, null=True, blank=True)
0274 
0275     # Namespace for multi-user disambiguation

0276     namespace = models.CharField(max_length=100, null=True, blank=True, db_index=True,
0277                                  help_text="Testbed namespace for multi-user message filtering")
0278 
0279     # Workflow instance identification (extracted from message content)

0280     execution_id = models.CharField(max_length=100, null=True, blank=True, db_index=True,
0281                                     help_text="Workflow execution instance ID")
0282     run_id = models.CharField(max_length=50, null=True, blank=True, db_index=True,
0283                               help_text="Run number within execution")
0284 
0285     # Message content

0286     message_content = models.JSONField()
0287     
0288     # Extensible metadata for monitoring and debugging

0289     message_metadata = models.JSONField(null=True, blank=True, default=dict, help_text="Extensible metadata for monitoring, debugging, and system tracking")
0290     
0291     # Delivery tracking

0292     sent_at = models.DateTimeField(auto_now_add=True)
0293     delivered_at = models.DateTimeField(null=True, blank=True)
0294     acknowledged_at = models.DateTimeField(null=True, blank=True)
0295     
0296     # Status

0297     is_successful = models.BooleanField(null=True, default=None)
0298     error_message = models.TextField(null=True, blank=True)
0299     
0300     # ActiveMQ specific fields

0301     queue_name = models.CharField(max_length=100, null=True, blank=True)
0302     correlation_id = models.CharField(max_length=100, null=True, blank=True)
0303 
0304     class Meta:
0305         db_table = 'swf_workflow_messages'
0306         ordering = ['-sent_at']
0307         indexes = [
0308             models.Index(fields=['workflow', 'message_type']),
0309             models.Index(fields=['sender_agent', 'sent_at']),
0310             models.Index(fields=['message_type', 'sent_at']),
0311             models.Index(fields=['namespace', 'execution_id']),
0312             models.Index(fields=['namespace', 'run_id']),
0313         ]
0314 
0315     def __str__(self):
0316         return f"Message {self.message_type} from {self.sender_agent} at {self.sent_at}"
0317 
0318     def mark_delivered(self):
0319         """Mark the message as delivered."""
0320         self.delivered_at = timezone.now()
0321         self.is_successful = True
0322         self.save()
0323 
0324     def mark_failed(self, error):
0325         """Mark the message as failed."""
0326         self.is_successful = False
0327         self.error_message = error
0328         self.save()
0329 
0330 
0331 class WorkflowDefinition(models.Model):
0332     """

0333     Defines reusable workflow templates with parameters and execution logic.

0334     """
0335     workflow_name = models.CharField(max_length=200, help_text="Unique workflow name")
0336     version = models.CharField(max_length=50, help_text="Version string")
0337     workflow_type = models.CharField(max_length=100, help_text="Flexible workflow type classification")
0338     definition = models.TextField(help_text="Python workflow code content")
0339     parameter_values = models.JSONField(default=dict, help_text="Default parameter values and schema")
0340     created_by = models.CharField(max_length=100, help_text="Username who created this workflow")
0341     created_at = models.DateTimeField(auto_now_add=True)
0342     updated_at = models.DateTimeField(auto_now=True)
0343 
0344     class Meta:
0345         db_table = 'swf_workflow_definitions'
0346         unique_together = [['workflow_name', 'version']]
0347 
0348     def __str__(self):
0349         return f"{self.workflow_name} v{self.version}"
0350 
0351 
0352 class WorkflowExecution(models.Model):
0353     """

0354     Tracks individual workflow execution instances.

0355     """
0356     execution_id = models.CharField(primary_key=True, max_length=100, help_text="Human-readable execution ID")
0357     workflow_definition = models.ForeignKey(WorkflowDefinition, on_delete=models.CASCADE, related_name='executions')
0358     namespace = models.CharField(max_length=100, null=True, blank=True, db_index=True,
0359                                  help_text="Testbed namespace for workflow delineation")
0360     parameter_values = models.JSONField(help_text="Actual parameter values used for this execution")
0361     performance_metrics = models.JSONField(null=True, blank=True, help_text="Performance metrics and results")
0362     status = models.CharField(max_length=50, default='pending', help_text="Flexible execution status")
0363     start_time = models.DateTimeField(help_text="Execution start timestamp")
0364     end_time = models.DateTimeField(null=True, blank=True, help_text="Execution completion timestamp")
0365     executed_by = models.CharField(max_length=100, help_text="Username who executed this workflow")
0366 
0367     class Meta:
0368         db_table = 'swf_workflow_executions'
0369         ordering = ['-start_time']
0370 
0371     def __str__(self):
0372         return f"Execution {self.execution_id} ({self.status})"
0373 
0374 
0375 class Namespace(models.Model):
0376     """

0377     Testbed namespace for workflow isolation and multi-user environments.

0378     Namespaces group agents, executions, and messages for a particular user or purpose.

0379     """
0380     name = models.CharField(max_length=100, primary_key=True)
0381     owner = models.CharField(max_length=100, help_text="Username of namespace owner")
0382     description = models.TextField(blank=True)
0383     metadata = models.JSONField(null=True, blank=True)
0384     created_at = models.DateTimeField(auto_now_add=True)
0385     updated_at = models.DateTimeField(auto_now=True)
0386 
0387     class Meta:
0388         db_table = 'swf_namespace'
0389         ordering = ['name']
0390 
0391     def __str__(self):
0392         return self.name