Back to home page

EIC code displayed by LXR

 
 

    


Warning, /EICrecon/docs/design/zmq_msc.md is written in an unsupported language. File is not indexed.

0001 # EICrecon Managed PODIO Processing Sequence Diagram
0002 
0003 ```mermaid
0004 sequenceDiagram
0005     participant Client as Python Client
0006     participant Listener as Listener Thread
0007     participant Processor as JEventProcessorManagedPODIO
0008     participant Source as JEventSourceManagedPODIO
0009     participant JANA as JANA Event Loop
0010 
0011     Note over Client, JANA: System Initialization
0012     Processor->>Processor: Init() - Create ZMQ socket, bind to /tmp/eicrecon_managed.sock
0013     Processor->>Listener: Start listener thread
0014     Listener->>Listener: ListenForMessages() - Poll ZMQ socket
0015     Source->>Source: Open() - Wait for file requests
0016     JANA->>JANA: Start event processing loop
0017 
0018     Note over Client, JANA: File Processing Request
0019     Client->>Listener: ZMQ REQ: {"input_file": "...", "output_file": "..."}
0020 
0021     Listener->>Listener: Receive ZMQ message
0022     Listener->>Processor: ProcessFileRequest(json)
0023     Processor->>Processor: Validate input file exists
0024     Processor->>Processor: Set m_awaiting_reply = true
0025     Processor->>Processor: OpenOutputFile() - Create podio::Writer
0026 
0027     Processor->>Source: NotifySourceNewFile(input_file)
0028     Source->>Source: SetCurrentFile() - Reset state, open input file
0029     Source->>Source: Create podio::Reader
0030     Source->>Source: Set m_file_available = true, notify condition variable
0031 
0032     Processor->>Source: GetNeventsInFile()
0033     Source-->>Processor: Return Nevents_in_file
0034 
0035     alt Zero events in file
0036         Processor->>Processor: CloseOutputFile() + SendResponse()
0037         Processor->>Processor: Set m_file_processing_active = false
0038         Listener->>Client: ZMQ REP: {"status": "completed", "events_processed": 0}
0039     else File has events
0040         Note over Client, JANA: Event Processing Loop
0041         loop For each event in file
0042         JANA->>Source: Emit(event)
0043         Source->>Source: Check if file available, read next event
0044         Source->>Source: Insert collections into JEvent
0045         Source-->>JANA: Return Success
0046 
0047         JANA->>Processor: Process(event)
0048         Processor->>Processor: CheckFileCompletion() - Poll source
0049         Processor->>Source: IsFileProcessingComplete()
0050         Source-->>Processor: Return completion status
0051 
0052         alt File not complete
0053             Processor->>Processor: Write event to output file
0054             Processor->>Processor: Increment m_events_processed
0055         else File complete
0056             Processor->>Processor: CloseOutputFile()
0057             Processor->>Processor: Propagate non-event frames
0058             Processor->>Processor: Finish writer
0059             Processor->>Listener: SendResponse(completion_json)
0060             Listener->>Client: ZMQ REP: {"status": "completed", "events_processed": N}
0061             Processor->>Processor: Set m_file_processing_active = false
0062         end
0063     end
0064 
0065     Note over Source: End of file reached
0066     Source->>Source: Set m_file_processing_complete = true
0067     Source->>Source: Set m_file_available = false
0068     Source-->>JANA: Return FailureTryAgain (wait for next file)
0069 
0070     Note over Client, JANA: Next File or Shutdown
0071     alt Another file request
0072         Client->>Listener: ZMQ REQ: {"input_file": "...", "output_file": "..."}
0073         Note over Client, JANA: Process repeats for new file
0074     else Shutdown
0075         Processor->>Listener: Stop listener thread
0076         Listener->>Listener: Clean up and exit
0077         Processor->>Processor: Clean up ZMQ socket
0078         Source->>Source: Close()
0079     end
0080 ```
0081 
0082 ## Key Points
0083 
0084 1. **Initialization**: Processor creates ZMQ socket and starts listener thread. Source waits for file requests.
0085 
0086 2. **File Request**: Client sends JSON request with input/output file paths via ZMQ REQ/REP pattern.
0087 
0088 3. **Coordination**: Processor validates request, opens output file, then notifies source to open input file. The source only receives the input file path; output file handling is managed entirely by the processor.
0089 
0090 4. **Event Processing**: JANA event loop calls Source::Emit() to read events and Processor::Process() to write them. Processor polls source for completion status.
0091 
0092 5. **Completion**: When source finishes reading all events, processor closes output file and sends completion response to client.
0093 
0094 6. **Zero-Event Fast-Path**: After `NotifySourceNewFile()` returns, the processor queries `GetNeventsInFile()` on the source. If the file has zero events, the processor closes the output file and sends the ZMQ response immediately — without entering the event loop. This is necessary because JANA never calls `Process()` when there are no events to emit, so the completion check in `Process()` would never run and the client would hang. Note: the implementation must also ensure the event loop is not awakened for zero-event files (or otherwise coordinate) to avoid races between `Emit()` and output-file finalization.
0095 
0096 7. **Next File**: Source returns FailureTryAgain to keep JANA event loop alive, waiting for next file request.
0097 
0098 ## Communication Patterns
0099 
0100 - **ZMQ REQ/REP**: Client ↔ Listener Thread (external communication)
0101 - **Direct Method Calls**: Listener Thread → Processor, Processor → Source (internal coordination)
0102 - **Polling**: Processor polls Source for completion status
0103 - **Condition Variables**: Source uses CV to wait for new files
0104 - **Threading**: Listener Thread runs independently, handling ZMQ communication asynchronously