Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-01-09 09:26:49

0001 from pathlib import Path
0002 import tempfile
0003 import sys
0004 import os
0005 import time
0006 import functools
0007 import warnings
0008 import numpy
0009 
0010 import pytest
0011 
0012 import acts
0013 import acts.examples
0014 from acts.examples import Sequencer
0015 from acts import UnitConstants as u
0016 
0017 ScopedFailureThreshold = acts.logging.ScopedFailureThreshold
0018 
0019 from helpers import (
0020     dd4hepEnabled,
0021     hepmc3Enabled,
0022     geant4Enabled,
0023     AssertCollectionExistsAlg,
0024     isCI,
0025 )
0026 
0027 
0028 def with_pyhepmc(fn):
0029     """
0030     Some tests use pyhepmc to inspect the written output files.
0031     Locally, if pyhepmc is not present, we ignore these checks with a warning.
0032     In CI mode, they become a failure
0033     """
0034 
0035     try:
0036         import pyhepmc
0037 
0038         fn(pyhepmc)
0039     except ImportError:
0040         if isCI:
0041             raise
0042         else:
0043             warnings.warn("pyhepmc not available, skipping checks")
0044             pass
0045 
0046 
0047 pytestmark = [
0048     pytest.mark.hepmc3,
0049     pytest.mark.skipif(not hepmc3Enabled, reason="HepMC3 plugin not available"),
0050 ]
0051 
0052 cm = acts.examples.hepmc3.Compression
0053 from acts.examples.hepmc3 import availableCompressionModes
0054 
0055 _available_compression_modes = availableCompressionModes()
0056 
0057 
0058 compression_modes = pytest.mark.parametrize(
0059     "compression",
0060     acts.examples.hepmc3.availableCompressionModes(),
0061     ids=[
0062         c.name if c != cm.none else "uncompressed"
0063         for c in acts.examples.hepmc3.availableCompressionModes()
0064     ],
0065 )
0066 
0067 
0068 def handle_path(out, compression):
0069     if compression == cm.none:
0070         actual_path = out
0071     else:
0072         assert not out.exists()
0073         actual_path = out.with_suffix(
0074             f".hepmc3{acts.examples.hepmc3.compressionExtension(compression)}"
0075         )
0076         assert actual_path.suffix == acts.examples.hepmc3.compressionExtension(
0077             compression
0078         )
0079     assert actual_path.exists()
0080     return actual_path
0081 
0082 
0083 def check_sidecar(actual_path, expected_events):
0084     """Check that a sidecar metadata file exists and contains the expected event count"""
0085     sidecar_path = Path(str(actual_path) + ".json")
0086     assert sidecar_path.exists(), f"Sidecar file {sidecar_path} does not exist"
0087 
0088     import json
0089 
0090     with sidecar_path.open("r") as f:
0091         metadata = json.load(f)
0092 
0093     assert "num_events" in metadata, "Sidecar file missing 'num_events' field"
0094     assert (
0095         metadata["num_events"] == expected_events
0096     ), f"Expected {expected_events} events, but sidecar says {metadata['num_events']}"
0097 
0098 
0099 all_formats = []
0100 all_format_ids = []
0101 
0102 for compression in acts.examples.hepmc3.availableCompressionModes():
0103     all_formats.append(("hepmc3", compression))
0104     all_format_ids.append(
0105         f"hepmc3-{compression.name if compression != cm.none else 'uncompressed'}"
0106     )
0107 
0108 all_formats.append(("root", cm.none))
0109 all_format_ids.append("root")
0110 
0111 all_formats = pytest.mark.parametrize(
0112     "format,compression", all_formats, ids=all_format_ids
0113 )
0114 
0115 main_formats = []
0116 main_format_ids = []
0117 
0118 for compression in acts.examples.hepmc3.availableCompressionModes():
0119     main_formats.append(("hepmc3", compression))
0120     main_format_ids.append(
0121         f"hepmc3-{compression.name if compression != cm.none else 'uncompressed'}"
0122     )
0123 
0124 main_formats.append(("root", cm.none))
0125 main_format_ids.append("root")
0126 
0127 
0128 main_formats = pytest.mark.parametrize(
0129     "format,compression", main_formats, ids=main_format_ids
0130 )
0131 
0132 
0133 @all_formats
0134 def test_hemc3_writer(tmp_path, rng, compression, format):
0135     from acts.examples.hepmc3 import (
0136         HepMC3Writer,
0137     )
0138 
0139     s = Sequencer(numThreads=10, events=100)
0140 
0141     evGen = acts.examples.EventGenerator(
0142         level=acts.logging.DEBUG,
0143         generators=[
0144             acts.examples.EventGenerator.Generator(
0145                 multiplicity=acts.examples.FixedMultiplicityGenerator(n=2),
0146                 vertex=acts.examples.GaussianVertexGenerator(
0147                     stddev=acts.Vector4(50 * u.um, 50 * u.um, 150 * u.mm, 20 * u.ns),
0148                     mean=acts.Vector4(0, 0, 0, 0),
0149                 ),
0150                 particles=acts.examples.ParametricParticleGenerator(
0151                     p=(100 * u.GeV, 100 * u.GeV),
0152                     eta=(-2, 2),
0153                     phi=(0, 360 * u.degree),
0154                     randomizeCharge=True,
0155                     numParticles=2,
0156                 ),
0157             )
0158         ],
0159         outputEvent="hepmc3_event",
0160         randomNumbers=rng,
0161     )
0162     s.addReader(evGen)
0163 
0164     s.addAlgorithm(
0165         acts.examples.hepmc3.HepMC3InputConverter(
0166             level=acts.logging.DEBUG,
0167             inputEvent=evGen.config.outputEvent,
0168             outputParticles="particles_generated",
0169             outputVertices="vertices_truth",
0170         )
0171     )
0172 
0173     alg = AssertCollectionExistsAlg(
0174         [
0175             "particles_generated",
0176             "vertices_truth",
0177         ],
0178         "check_alg",
0179         acts.logging.WARNING,
0180     )
0181     s.addAlgorithm(alg)
0182 
0183     out = tmp_path / "out" / f"pytest.{format}"
0184     out.parent.mkdir(parents=True, exist_ok=True)
0185 
0186     s.addWriter(
0187         HepMC3Writer(
0188             acts.logging.DEBUG,
0189             inputEvent="hepmc3_event",
0190             outputPath=out,
0191             compression=compression,
0192             # writeEventsInOrder=False,
0193         )
0194     )
0195 
0196     s.run()
0197 
0198     actual_path = handle_path(out, compression)
0199 
0200     # Check sidecar metadata file
0201     check_sidecar(actual_path, s.config.events)
0202 
0203     # pyhepmc does not support zstd
0204     if compression in (cm.none, cm.lzma, cm.bzip2, cm.zlib) and format != "root":
0205 
0206         @with_pyhepmc
0207         def check(pyhepmc):
0208             nevts = 0
0209             event_numbers = []
0210             with pyhepmc.open(actual_path) as f:
0211                 for evt in f:
0212                     nevts += 1
0213                     event_numbers.append(evt.event_number)
0214                     assert len(evt.particles) == 4 + 2  # muons + beam particles
0215             assert nevts == s.config.events
0216             assert event_numbers == list(
0217                 range(s.config.events)
0218             ), "Events are out of order"
0219 
0220 
0221 class StallAlgorithm(acts.examples.IAlgorithm):
0222     sleep: float = 1
0223 
0224     def execute(self, ctx):
0225 
0226         if ctx.eventNumber == 50:
0227             print("BEGIN SLEEP")
0228             time.sleep(self.sleep)
0229             print("END OF SLEEP")
0230         else:
0231             time.sleep(0.01)
0232 
0233         return acts.examples.ProcessCode.SUCCESS
0234 
0235 
0236 @pytest.fixture
0237 def common_evgen(rng):
0238     def func(s: Sequencer):
0239         evGen = acts.examples.EventGenerator(
0240             level=acts.logging.INFO,
0241             generators=[
0242                 acts.examples.EventGenerator.Generator(
0243                     multiplicity=acts.examples.FixedMultiplicityGenerator(n=2),
0244                     vertex=acts.examples.GaussianVertexGenerator(
0245                         stddev=acts.Vector4(
0246                             50 * u.um, 50 * u.um, 150 * u.mm, 20 * u.ns
0247                         ),
0248                         mean=acts.Vector4(0, 0, 0, 0),
0249                     ),
0250                     particles=acts.examples.ParametricParticleGenerator(
0251                         p=(100 * u.GeV, 100 * u.GeV),
0252                         eta=(-2, 2),
0253                         phi=(0, 360 * u.degree),
0254                         randomizeCharge=True,
0255                         numParticles=2,
0256                     ),
0257                 )
0258             ],
0259             outputEvent="hepmc3_event",
0260             randomNumbers=rng,
0261         )
0262         s.addReader(evGen)
0263         return evGen
0264 
0265     return func
0266 
0267 
0268 @pytest.fixture
0269 def common_writer(tmp_path, common_evgen):
0270     def func(s: Sequencer, compression: acts.examples.hepmc3.Compression, format: str):
0271         from acts.examples.hepmc3 import HepMC3Writer
0272 
0273         evGen = common_evgen(s)
0274 
0275         out = tmp_path / "out" / f"events_pytest.{format}"
0276         out.parent.mkdir(parents=True, exist_ok=True)
0277 
0278         s.addWriter(
0279             HepMC3Writer(
0280                 acts.logging.INFO,
0281                 inputEvent=evGen.config.outputEvent,
0282                 outputPath=out,
0283                 compression=compression,
0284             )
0285         )
0286 
0287         return out
0288 
0289     return func
0290 
0291 
0292 @pytest.mark.parametrize(
0293     "bufsize",
0294     [1, 5, 15, 50],
0295     ids=lambda v: f"buf{v}",
0296 )
0297 @pytest.mark.repeat(5)
0298 @pytest.mark.timeout(5, method="thread")
0299 def test_hepmc3_writer_stall(common_evgen, tmp_path, bufsize):
0300     """
0301     This test simulates that one event takes significantly longer than the
0302     other ones. In this case, the HepMC3 writer should keep a buffer of events
0303     and wait until the trailing event comes in.
0304     """
0305     from acts.examples.hepmc3 import (
0306         HepMC3Writer,
0307     )
0308 
0309     s = Sequencer(numThreads=10, events=150, logLevel=acts.logging.VERBOSE)
0310 
0311     evGen = common_evgen(s)
0312 
0313     out = tmp_path / "out" / "pytest.hepmc3"
0314     out.parent.mkdir(parents=True, exist_ok=True)
0315 
0316     stall = StallAlgorithm(name="stall_alg", level=acts.logging.INFO)
0317     s.addAlgorithm(stall)
0318 
0319     s.addWriter(
0320         HepMC3Writer(
0321             acts.logging.VERBOSE,
0322             inputEvent=evGen.config.outputEvent,
0323             outputPath=out,
0324             maxEventsPending=bufsize,
0325         )
0326     )
0327 
0328     s.run()
0329 
0330     # Check sidecar metadata file
0331     check_sidecar(out, s.config.events)
0332 
0333     @with_pyhepmc
0334     def check(pyhepmc):
0335         nevts = 0
0336         event_numbers = []
0337         with pyhepmc.open(out) as f:
0338             for evt in f:
0339                 nevts += 1
0340                 event_numbers.append(evt.event_number)
0341         assert nevts == s.config.events
0342         assert event_numbers == list(range(s.config.events)), "Events are out of order"
0343 
0344 
0345 def test_hepmc3_writer_not_in_order(common_evgen, tmp_path):
0346     """
0347     Bypasses the event ordering. This test mainly checks that this code path completes
0348     """
0349     from acts.examples.hepmc3 import (
0350         HepMC3Writer,
0351     )
0352 
0353     s = Sequencer(numThreads=10, events=100)
0354 
0355     evGen = common_evgen(s)
0356 
0357     out = tmp_path / "out" / "pytest.hepmc3"
0358     out.parent.mkdir(parents=True, exist_ok=True)
0359 
0360     stall = StallAlgorithm(name="stall_alg", level=acts.logging.INFO)
0361     s.addAlgorithm(stall)
0362 
0363     s.addWriter(
0364         HepMC3Writer(
0365             acts.logging.VERBOSE,
0366             inputEvent=evGen.config.outputEvent,
0367             outputPath=out,
0368             maxEventsPending=5,
0369             writeEventsInOrder=False,
0370         )
0371     )
0372 
0373     s.run()
0374 
0375     # Check sidecar metadata file
0376     check_sidecar(out, s.config.events)
0377 
0378     @with_pyhepmc
0379     def check(pyhepmc):
0380         nevts = 0
0381         event_numbers = []
0382         with pyhepmc.open(out) as f:
0383             for evt in f:
0384                 nevts += 1
0385                 event_numbers.append(evt.event_number)
0386         assert nevts == s.config.events
0387         # We don't expect events to be in order, but they should be there
0388         assert set(event_numbers) == set(
0389             range(s.config.events)
0390         ), "Event numbers are different"
0391 
0392 
0393 @main_formats
0394 def test_hepmc3_writer_pythia8(tmp_path, rng, compression, format):
0395     from acts.examples.hepmc3 import (
0396         HepMC3Writer,
0397     )
0398 
0399     from acts.examples.simulation import addPythia8
0400 
0401     s = Sequencer(numThreads=1, events=3)
0402 
0403     vtxGen = acts.examples.GaussianVertexGenerator(
0404         stddev=acts.Vector4(50 * u.um, 50 * u.um, 150 * u.mm, 20 * u.ns),
0405         mean=acts.Vector4(0, 0, 0, 0),
0406     )
0407 
0408     addPythia8(
0409         s,
0410         rnd=rng,
0411         hardProcess=["Top:qqbar2ttbar=on"],
0412         npileup=10,
0413         outputDirCsv=None,
0414         outputDirRoot=None,
0415         logLevel=acts.logging.INFO,
0416         vtxGen=vtxGen,
0417     )
0418 
0419     out = tmp_path / f"events.{format}"
0420 
0421     s.addWriter(
0422         HepMC3Writer(
0423             acts.logging.VERBOSE,
0424             inputEvent="pythia8-event",
0425             outputPath=out,
0426             compression=compression,
0427         )
0428     )
0429 
0430     # Assert particles and vertices are present
0431     alg = AssertCollectionExistsAlg(
0432         [
0433             "particles_generated",
0434             "vertices_truth",
0435             "pythia8-event",
0436         ],
0437         "check_alg",
0438         acts.logging.WARNING,
0439     )
0440     s.addAlgorithm(alg)
0441 
0442     s.run()
0443 
0444     actual_path = handle_path(out, compression)
0445 
0446     assert actual_path.exists(), f"File {actual_path} does not exist"
0447 
0448     # Check sidecar metadata file
0449     check_sidecar(actual_path, s.config.events)
0450 
0451     if compression in (cm.none, cm.lzma, cm.bzip2, cm.zlib) and format != "root":
0452 
0453         @with_pyhepmc
0454         def check(pyhepmc):
0455             nevts = 0
0456             with pyhepmc.open(actual_path) as f:
0457                 for evt in f:
0458                     # we expect the event to be populated but the exact number is random
0459                     assert len(evt.particles) > 2000
0460                     nevts += 1
0461             assert nevts == s.config.events
0462 
0463 
0464 @main_formats
0465 def test_hepmc3_reader(common_writer, rng, compression, format):
0466     from acts.examples.hepmc3 import (
0467         HepMC3Reader,
0468     )
0469 
0470     nevents = 1200
0471 
0472     s = Sequencer(numThreads=10, events=nevents)
0473 
0474     out = common_writer(s, compression, format)
0475 
0476     s.run()
0477 
0478     actual_path = handle_path(out, compression)
0479 
0480     # Without external event number, we need to read all events
0481     # use multiple threads to test if seeking to the right event works
0482     s = Sequencer(numThreads=10)
0483 
0484     with acts.logging.ScopedFailureThreshold(acts.logging.ERROR):
0485         # We expect a warning about missing input event count
0486         s.addReader(
0487             HepMC3Reader(
0488                 inputPath=actual_path,
0489                 level=acts.logging.VERBOSE,
0490                 outputEvent="hepmc3_event",
0491             )
0492         )
0493 
0494     s.addAlgorithm(
0495         acts.examples.hepmc3.HepMC3InputConverter(
0496             level=acts.logging.INFO,
0497             inputEvent="hepmc3_event",
0498             outputParticles="particles_read",
0499             outputVertices="vertices_read",
0500         )
0501     )
0502 
0503     alg = AssertCollectionExistsAlg(
0504         [
0505             "particles_read",
0506             "vertices_read",
0507             "hepmc3_event",
0508         ],
0509         "check_alg",
0510         acts.logging.WARNING,
0511     )
0512     s.addAlgorithm(alg)
0513 
0514     s.run()
0515 
0516     assert alg.events_seen == nevents
0517 
0518 
0519 @main_formats
0520 def test_hepmc3_reader_explicit_num_events(common_writer, rng, compression, format):
0521     """
0522     The HepMC3Reader can be configured to read a specific number of events. If
0523     the explicit number of events is lower or equal to the actual number of
0524     events, reading works as expected.
0525     """
0526     from acts.examples.hepmc3 import (
0527         HepMC3Reader,
0528     )
0529 
0530     nevents = 1200
0531     s = Sequencer(numThreads=10, events=nevents)
0532     out = common_writer(s, compression, format)
0533     s.run()
0534     actual_path = handle_path(out, compression)
0535 
0536     # With external event number, we can read a specific event
0537     # Test 110 and 120 as both a lower number than is available and a higher number
0538     # than is available is valid
0539     for num in (nevents - 10, nevents):
0540         s = Sequencer(numThreads=10)
0541 
0542         s.addReader(
0543             HepMC3Reader(
0544                 acts.logging.DEBUG,
0545                 inputPath=actual_path,
0546                 outputEvent="hepmc3_event",
0547                 numEvents=num,
0548             )
0549         )
0550 
0551         alg = AssertCollectionExistsAlg(
0552             "hepmc3_event", "check_alg", acts.logging.WARNING
0553         )
0554         s.addAlgorithm(alg)
0555 
0556         s.run()
0557 
0558         assert alg.events_seen == num
0559 
0560 
0561 @main_formats
0562 def test_hepmc3_reader_explicit_num_events_too_large(
0563     common_writer, rng, compression, format
0564 ):
0565     """
0566     The HepMC3Reader can be configured to read a specific number of events. If
0567     the explicit number of events is larger than the actual number of events,
0568     the reader should throw an error.
0569     """
0570     from acts.examples.hepmc3 import (
0571         HepMC3Reader,
0572     )
0573 
0574     nevents = 1200
0575     s = Sequencer(numThreads=10, events=nevents)
0576     out = common_writer(s, compression, format)
0577     s.run()
0578     actual_path = handle_path(out, compression)
0579 
0580     s = Sequencer(numThreads=10)
0581 
0582     s.addReader(
0583         HepMC3Reader(
0584             acts.logging.DEBUG,
0585             inputPath=actual_path,
0586             outputEvent="hepmc3_event",
0587             numEvents=nevents + 10,
0588         )
0589     )
0590 
0591     with ScopedFailureThreshold(acts.logging.MAX):
0592         with pytest.raises(RuntimeError) as excinfo:
0593             s.run()
0594 
0595     assert "Failed to process event" in str(excinfo.value)
0596 
0597 
0598 @main_formats
0599 def test_hepmc3_reader_skip_events(common_writer, rng, compression, format):
0600     from acts.examples.hepmc3 import (
0601         HepMC3Reader,
0602     )
0603 
0604     nevents = 1200
0605     s = Sequencer(numThreads=10, events=nevents)
0606     out = common_writer(s, compression, format)
0607     s.run()
0608     actual_path = handle_path(out, compression)
0609 
0610     skip = 100
0611     s = Sequencer(numThreads=5, skip=skip, events=nevents - skip)
0612 
0613     with acts.logging.ScopedFailureThreshold(acts.logging.ERROR):
0614         # We expect a warning about missing input event count
0615         s.addReader(
0616             HepMC3Reader(
0617                 inputPath=actual_path,
0618                 level=acts.logging.DEBUG,
0619                 outputEvent="hepmc3_event",
0620                 maxEventBufferSize=10,
0621             )
0622         )
0623 
0624     class EventNumberCheckerAlg(acts.examples.IAlgorithm):
0625         events_seen = set()
0626 
0627         def __init__(
0628             self,
0629             name="check_alg",
0630             level=acts.logging.INFO,
0631             *args,
0632             **kwargs,
0633         ):
0634             acts.examples.IAlgorithm.__init__(
0635                 self, name=name, level=level, *args, **kwargs
0636             )
0637 
0638         def execute(self, ctx):
0639             self.events_seen.add(ctx.eventNumber)
0640             return acts.examples.ProcessCode.SUCCESS
0641 
0642     alg = EventNumberCheckerAlg()
0643     s.addAlgorithm(alg)
0644 
0645     s.run()
0646 
0647     exp = set(range(skip, nevents))
0648     # We can't assert the correct number assignment without having to expose
0649     # more of the FW The HepMC3Reader internally asserts that the even number
0650     # read from the event is equal to the event number currently being
0651     # processed.
0652     assert alg.events_seen == exp
0653 
0654 
0655 def test_hepmc3_compression_modes():
0656     assert cm.none in acts.examples.hepmc3.availableCompressionModes()
0657 
0658 
0659 @compression_modes
0660 def test_hepmc3_writer_compression_auto_detection(tmp_path, rng, compression):
0661     """Test that compression is automatically detected from the output path"""
0662     from acts.examples.hepmc3 import HepMC3Writer
0663 
0664     s = Sequencer(numThreads=1, events=10)
0665 
0666     evGen = acts.examples.EventGenerator(
0667         level=acts.logging.INFO,
0668         generators=[
0669             acts.examples.EventGenerator.Generator(
0670                 multiplicity=acts.examples.FixedMultiplicityGenerator(n=1),
0671                 vertex=acts.examples.GaussianVertexGenerator(
0672                     stddev=acts.Vector4(50 * u.um, 50 * u.um, 150 * u.mm, 20 * u.ns),
0673                     mean=acts.Vector4(0, 0, 0, 0),
0674                 ),
0675                 particles=acts.examples.ParametricParticleGenerator(
0676                     p=(100 * u.GeV, 100 * u.GeV),
0677                     eta=(-2, 2),
0678                     phi=(0, 360 * u.degree),
0679                     randomizeCharge=True,
0680                     numParticles=2,
0681                 ),
0682             )
0683         ],
0684         outputEvent="hepmc3_event",
0685         randomNumbers=rng,
0686     )
0687     s.addReader(evGen)
0688 
0689     # Create output path WITH compression extension but WITHOUT specifying compression in config
0690     ext = acts.examples.hepmc3.compressionExtension(compression)
0691     out = tmp_path / f"events.hepmc3{ext}"
0692     out.parent.mkdir(parents=True, exist_ok=True)
0693 
0694     # Don't specify compression - it should be auto-detected from the path
0695     s.addWriter(
0696         HepMC3Writer(
0697             acts.logging.INFO,
0698             inputEvent="hepmc3_event",
0699             outputPath=out,
0700             # compression=None (default)
0701         )
0702     )
0703 
0704     s.run()
0705 
0706     # File should exist with the correct extension
0707     assert out.exists(), f"Output file {out} does not exist"
0708 
0709     # Check sidecar
0710     check_sidecar(out, s.config.events)
0711 
0712     # Verify we can read it back
0713     if compression in (cm.none, cm.lzma, cm.bzip2, cm.zlib):
0714 
0715         @with_pyhepmc
0716         def check(pyhepmc):
0717             nevts = 0
0718             with pyhepmc.open(out) as f:
0719                 for evt in f:
0720                     nevts += 1
0721             assert nevts == s.config.events
0722 
0723 
0724 def test_hepmc3_writer_compression_consistency(tmp_path):
0725     """Test that specifying both path extension and config compression must be consistent"""
0726     from acts.examples.hepmc3 import HepMC3Writer
0727 
0728     out = tmp_path / "out" / "events.hepmc3.gz"
0729     out.parent.mkdir(parents=True, exist_ok=True)
0730 
0731     # Path says .gz (zlib) but config says bzip2 - should fail
0732     with acts.logging.ScopedFailureThreshold(acts.logging.MAX), pytest.raises(
0733         ValueError
0734     ) as excinfo:
0735         HepMC3Writer(
0736             acts.logging.INFO,
0737             inputEvent="hepmc3_event",
0738             outputPath=out,
0739             compression=acts.examples.hepmc3.Compression.bzip2,
0740         )
0741 
0742     assert "Compression mismatch" in str(excinfo.value)
0743     assert "zlib" in str(excinfo.value)
0744     assert "bzip2" in str(excinfo.value)
0745 
0746 
0747 def test_hepmc3_writer_compression_explicit_with_path(tmp_path, rng):
0748     """Test that you can specify compression in config and omit it from path"""
0749     from acts.examples.hepmc3 import HepMC3Writer
0750 
0751     s = Sequencer(numThreads=1, events=10)
0752 
0753     evGen = acts.examples.EventGenerator(
0754         level=acts.logging.INFO,
0755         generators=[
0756             acts.examples.EventGenerator.Generator(
0757                 multiplicity=acts.examples.FixedMultiplicityGenerator(n=1),
0758                 vertex=acts.examples.GaussianVertexGenerator(
0759                     stddev=acts.Vector4(50 * u.um, 50 * u.um, 150 * u.mm, 20 * u.ns),
0760                     mean=acts.Vector4(0, 0, 0, 0),
0761                 ),
0762                 particles=acts.examples.ParametricParticleGenerator(
0763                     p=(100 * u.GeV, 100 * u.GeV),
0764                     eta=(-2, 2),
0765                     phi=(0, 360 * u.degree),
0766                     randomizeCharge=True,
0767                     numParticles=2,
0768                 ),
0769             )
0770         ],
0771         outputEvent="hepmc3_event",
0772         randomNumbers=rng,
0773     )
0774     s.addReader(evGen)
0775 
0776     # Specify path WITHOUT extension, but WITH compression in config
0777     out = tmp_path / "events.hepmc3"
0778     out.parent.mkdir(parents=True, exist_ok=True)
0779 
0780     s.addWriter(
0781         HepMC3Writer(
0782             acts.logging.INFO,
0783             inputEvent="hepmc3_event",
0784             outputPath=out,
0785             compression=acts.examples.hepmc3.Compression.zlib,
0786         )
0787     )
0788 
0789     s.run()
0790 
0791     # File should exist with compression extension added
0792     actual_path = tmp_path / "events.hepmc3.gz"
0793     assert actual_path.exists(), f"Output file {actual_path} does not exist"
0794 
0795     # Check sidecar
0796     check_sidecar(actual_path, s.config.events)
0797 
0798 
0799 def test_hepmc3_writer_root_compression_error(tmp_path):
0800     """Test that an error is raised when trying to use compression with ROOT format"""
0801     from acts.examples.hepmc3 import HepMC3Writer
0802 
0803     out = tmp_path / "out" / "events.root"
0804     out.parent.mkdir(parents=True, exist_ok=True)
0805 
0806     with acts.logging.ScopedFailureThreshold(acts.logging.MAX), pytest.raises(
0807         ValueError
0808     ) as excinfo:
0809         HepMC3Writer(
0810             acts.logging.INFO,
0811             inputEvent="hepmc3_event",
0812             outputPath=out,
0813             compression=acts.examples.hepmc3.Compression.zlib,
0814         )
0815 
0816     assert "Compression not supported for ROOT format" in str(excinfo.value)
0817 
0818 
0819 def test_hepmc3_reader_multiple_files(tmp_path, rng):
0820     from acts.examples.hepmc3 import HepMC3Writer, HepMC3Reader
0821 
0822     events = 100
0823     n_pileup = 10
0824 
0825     s = Sequencer(numThreads=10, events=events, logLevel=acts.logging.INFO)
0826 
0827     vtxGenZero = acts.examples.FixedVertexGenerator(
0828         fixed=acts.Vector4(0, 0, 0, 0),
0829     )
0830 
0831     vtxGen = acts.examples.GaussianVertexGenerator(
0832         stddev=acts.Vector4(50 * u.um, 50 * u.um, 150 * u.mm, 20 * u.ns),
0833         mean=acts.Vector4(0, 0, 0, 0),
0834     )
0835 
0836     hard_scatter = acts.examples.EventGenerator(
0837         level=acts.logging.INFO,
0838         generators=[
0839             acts.examples.EventGenerator.Generator(
0840                 multiplicity=acts.examples.FixedMultiplicityGenerator(n=1),
0841                 vertex=vtxGenZero,
0842                 particles=acts.examples.ParametricParticleGenerator(
0843                     p=(100 * u.GeV, 100 * u.GeV),
0844                     eta=(-2, 2),
0845                     phi=(0, 360 * u.degree),
0846                     randomizeCharge=True,
0847                     numParticles=2,
0848                 ),
0849             )
0850         ],
0851         outputEvent="hard_scatter_event",
0852         randomNumbers=rng,
0853     )
0854     s.addReader(hard_scatter)
0855 
0856     out_hs = tmp_path / "out" / "events_pytest_hs.hepmc3"
0857     out_hs.parent.mkdir(parents=True, exist_ok=True)
0858 
0859     compression = acts.examples.hepmc3.Compression.bzip2
0860 
0861     s.addWriter(
0862         HepMC3Writer(
0863             acts.logging.INFO,
0864             inputEvent=hard_scatter.config.outputEvent,
0865             outputPath=out_hs,
0866             compression=compression,
0867         )
0868     )
0869 
0870     s.run()
0871 
0872     s = Sequencer(numThreads=10, events=events * n_pileup, logLevel=acts.logging.INFO)
0873 
0874     pileup = acts.examples.EventGenerator(
0875         level=acts.logging.INFO,
0876         generators=[
0877             acts.examples.EventGenerator.Generator(
0878                 multiplicity=acts.examples.FixedMultiplicityGenerator(n=1),
0879                 vertex=vtxGenZero,
0880                 particles=acts.examples.ParametricParticleGenerator(
0881                     p=(100 * u.GeV, 100 * u.GeV),
0882                     eta=(-2, 2),
0883                     phi=(0, 360 * u.degree),
0884                     randomizeCharge=True,
0885                     numParticles=2,
0886                 ),
0887             )
0888         ],
0889         outputEvent="pileup_event",
0890         randomNumbers=rng,
0891     )
0892     s.addReader(pileup)
0893 
0894     out_pu = tmp_path / "out" / "events_pytest_pu.hepmc3"
0895     out_pu.parent.mkdir(parents=True, exist_ok=True)
0896 
0897     s.addWriter(
0898         HepMC3Writer(
0899             acts.logging.INFO,
0900             inputEvent=pileup.config.outputEvent,
0901             outputPath=out_pu,
0902             compression=compression,
0903         )
0904     )
0905 
0906     s.run()
0907 
0908     act_hs = handle_path(out_hs, compression)
0909     act_pu = handle_path(out_pu, compression)
0910 
0911     # Check sidecar metadata files
0912     check_sidecar(act_hs, events)
0913     check_sidecar(act_pu, events * n_pileup)
0914 
0915     # do reading including merging and write combined file
0916 
0917     s = Sequencer(numThreads=10, logLevel=acts.logging.INFO)
0918 
0919     reader = HepMC3Reader(
0920         inputs=[
0921             HepMC3Reader.Input.Fixed(act_hs, 1),
0922             HepMC3Reader.Input.Fixed(act_pu, 10),
0923         ],
0924         level=acts.logging.VERBOSE,
0925         outputEvent="hepmc3_event",
0926         vertexGenerator=vtxGen,
0927         randomNumbers=rng,
0928     )
0929     s.addReader(reader)
0930 
0931     out_combined = tmp_path / "out" / "events_pytest_combined.hepmc3"
0932     out_combined.parent.mkdir(parents=True, exist_ok=True)
0933 
0934     s.addWriter(
0935         HepMC3Writer(
0936             acts.logging.INFO,
0937             inputEvent=reader.config.outputEvent,
0938             outputPath=out_combined,
0939             compression=compression,
0940         )
0941     )
0942 
0943     s.run()
0944 
0945     act_combined = handle_path(out_combined, compression)
0946 
0947     # Check sidecar metadata file for combined output
0948     check_sidecar(act_combined, events)
0949 
0950     @with_pyhepmc
0951     def check(pyhepmc):
0952         def get_vtx_pos(file: Path):
0953             with pyhepmc.open(file) as f:
0954                 for evt in f:
0955                     for vtx in evt.vertices:
0956                         yield [vtx.position.x, vtx.position.y, vtx.position.z]
0957 
0958         hs = numpy.vstack(list(get_vtx_pos(act_hs))).T
0959         pu = numpy.vstack(list(get_vtx_pos(act_pu))).T
0960         combined = numpy.vstack(list(get_vtx_pos(act_combined))).T
0961 
0962         # NO smearing in hs and pu
0963         for arr in (hs, pu):
0964             vx, vy, vz = arr
0965             std = numpy.std(vx)
0966             assert std < 1 * u.um
0967             std = numpy.std(vy)
0968             assert std < 1 * u.um
0969             std = numpy.std(vz)
0970             assert std < 1 * u.um
0971 
0972         # Configured smearing in combined
0973         # Checked values are a bit lower than configured smearing due to limited stats
0974         vx, vy, vz = combined
0975         std = numpy.std(vx)
0976         assert std > 40 * u.um
0977         std = numpy.std(vy)
0978         assert std > 40 * u.um
0979         std = numpy.std(vz)
0980         assert std > 140 * u.mm