Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-30 09:14:56

0001 from pathlib import Path
0002 import os
0003 import json
0004 import functools
0005 import tarfile
0006 import urllib.request
0007 import subprocess
0008 import sys
0009 import re
0010 import collections
0011 
0012 import pytest
0013 
0014 from helpers import (
0015     geant4Enabled,
0016     dd4hepEnabled,
0017     hepmc3Enabled,
0018     pythia8Enabled,
0019     exatrkxEnabled,
0020     onnxEnabled,
0021     hashingSeedingEnabled,
0022     AssertCollectionExistsAlg,
0023     failure_threshold,
0024 )
0025 
0026 import acts
0027 from acts.examples import (
0028     Sequencer,
0029     GenericDetector,
0030     AlignedDetector,
0031 )
0032 from acts.examples.odd import getOpenDataDetector, getOpenDataDetectorDirectory
0033 
0034 
0035 u = acts.UnitConstants
0036 
0037 
0038 @pytest.fixture
0039 def field():
0040     return acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0041 
0042 
0043 @pytest.fixture
0044 def seq():
0045     return Sequencer(events=10, numThreads=1)
0046 
0047 
0048 def assert_csv_output(csv_path, stem):
0049     __tracebackhide__ = True
0050     # print(list(csv_path.iterdir()))
0051     assert len([f for f in csv_path.iterdir() if f.name.endswith(stem + ".csv")]) > 0
0052     assert all(
0053         [
0054             f.stat().st_size > 100
0055             for f in csv_path.iterdir()
0056             if f.name.endswith(stem + ".csv")
0057         ]
0058     )
0059 
0060 
0061 def assert_entries(root_file, tree_name, exp=None, non_zero=False):
0062     __tracebackhide__ = True
0063     import ROOT
0064 
0065     ROOT.PyConfig.IgnoreCommandLineOptions = True
0066     ROOT.gROOT.SetBatch(True)
0067 
0068     rf = ROOT.TFile.Open(str(root_file))
0069     keys = [k.GetName() for k in rf.GetListOfKeys()]
0070     assert tree_name in keys
0071     print("Entries:", rf.Get(tree_name).GetEntries())
0072     if non_zero:
0073         assert rf.Get(tree_name).GetEntries() > 0, f"{root_file}:{tree_name}"
0074     if exp is not None:
0075         assert rf.Get(tree_name).GetEntries() == exp, f"{root_file}:{tree_name}"
0076 
0077 
0078 def assert_has_entries(root_file, tree_name):
0079     __tracebackhide__ = True
0080     assert_entries(root_file, tree_name, non_zero=True)
0081 
0082 
0083 @pytest.mark.slow
0084 @pytest.mark.skipif(not pythia8Enabled, reason="Pythia8 not set up")
0085 def test_pythia8(tmp_path, seq, assert_root_hash):
0086     from pythia8 import runPythia8
0087 
0088     (tmp_path / "csv").mkdir()
0089 
0090     assert not (tmp_path / "particles.root").exists()
0091     assert len(list((tmp_path / "csv").iterdir())) == 0
0092 
0093     events = seq.config.events
0094 
0095     runPythia8(str(tmp_path), outputRoot=True, outputCsv=True, s=seq).run()
0096 
0097     fp = tmp_path / "particles.root"
0098     assert fp.exists()
0099     assert fp.stat().st_size > 2**10 * 50
0100     assert_entries(fp, "particles", events)
0101     assert_root_hash(fp.name, fp)
0102 
0103     assert len(list((tmp_path / "csv").iterdir())) > 0
0104     assert_csv_output(tmp_path / "csv", "particles")
0105 
0106 
0107 def test_fatras(trk_geo, tmp_path, field, assert_root_hash):
0108     from fatras import runFatras
0109 
0110     csv = tmp_path / "csv"
0111     csv.mkdir()
0112 
0113     nevents = 10
0114 
0115     root_files = [
0116         (
0117             "particles_simulation.root",
0118             "particles",
0119         ),
0120         (
0121             "hits.root",
0122             "hits",
0123         ),
0124     ]
0125 
0126     assert len(list(csv.iterdir())) == 0
0127     for rf, _ in root_files:
0128         assert not (tmp_path / rf).exists()
0129 
0130     seq = Sequencer(events=nevents)
0131     runFatras(trk_geo, field, str(tmp_path), s=seq).run()
0132 
0133     assert_csv_output(csv, "particles_simulated")
0134     assert_csv_output(csv, "hits")
0135     for f, tn in root_files:
0136         rfp = tmp_path / f
0137         assert rfp.exists()
0138         assert rfp.stat().st_size > 2**10 * 10
0139 
0140         assert_has_entries(rfp, tn)
0141         assert_root_hash(f, rfp)
0142 
0143 
0144 @pytest.mark.slow
0145 @pytest.mark.odd
0146 @pytest.mark.skipif(not geant4Enabled, reason="Geant4 not set up")
0147 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0148 def test_geant4(tmp_path, assert_root_hash):
0149     # This test literally only ensures that the geant 4 example can run without erroring out
0150 
0151     # just to make sure it can build the odd
0152     with getOpenDataDetector():
0153         pass
0154 
0155     csv = tmp_path / "csv"
0156     csv.mkdir()
0157 
0158     root_files = [
0159         "particles_simulation.root",
0160         "hits.root",
0161     ]
0162 
0163     assert len(list(csv.iterdir())) == 0
0164     for rf in root_files:
0165         assert not (tmp_path / rf).exists()
0166 
0167     script = (
0168         Path(__file__).parent.parent.parent.parent
0169         / "Examples"
0170         / "Scripts"
0171         / "Python"
0172         / "geant4.py"
0173     )
0174     assert script.exists()
0175     env = os.environ.copy()
0176     env["ACTS_LOG_FAILURE_THRESHOLD"] = "WARNING"
0177     try:
0178         subprocess.check_call(
0179             [sys.executable, str(script)],
0180             cwd=tmp_path,
0181             env=env,
0182             stderr=subprocess.STDOUT,
0183         )
0184     except subprocess.CalledProcessError as e:
0185         print(e.output.decode("utf-8"))
0186         raise
0187 
0188     assert_csv_output(csv, "particles_simulated")
0189     assert_csv_output(csv, "hits")
0190     for f in root_files:
0191         rfp = tmp_path / f
0192         assert rfp.exists()
0193         assert rfp.stat().st_size > 2**10 * 10
0194 
0195         assert_root_hash(f, rfp)
0196 
0197 
0198 def test_seeding(tmp_path, trk_geo, field, assert_root_hash):
0199     from seeding import runSeeding
0200 
0201     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0202 
0203     csv = tmp_path / "csv"
0204     csv.mkdir()
0205 
0206     seq = Sequencer(events=10, numThreads=1)
0207 
0208     root_files = [
0209         (
0210             "estimatedparams.root",
0211             "estimatedparams",
0212         ),
0213         (
0214             "performance_seeding.root",
0215             None,
0216         ),
0217         (
0218             "particles.root",
0219             "particles",
0220         ),
0221         (
0222             "particles_simulation.root",
0223             "particles",
0224         ),
0225     ]
0226 
0227     for fn, _ in root_files:
0228         fp = tmp_path / fn
0229         assert not fp.exists()
0230 
0231     assert len(list(csv.iterdir())) == 0
0232 
0233     runSeeding(trk_geo, field, outputDir=str(tmp_path), s=seq).run()
0234 
0235     for fn, tn in root_files:
0236         fp = tmp_path / fn
0237         assert fp.exists()
0238         assert fp.stat().st_size > 100
0239 
0240         if tn is not None:
0241             assert_has_entries(fp, tn)
0242             assert_root_hash(fn, fp)
0243 
0244     assert_csv_output(csv, "particles")
0245     assert_csv_output(csv, "particles_simulated")
0246 
0247 
0248 @pytest.mark.slow
0249 @pytest.mark.skipif(not hashingSeedingEnabled, reason="HashingSeeding not set up")
0250 def test_hashing_seeding(tmp_path, trk_geo, field, assert_root_hash):
0251     from hashing_seeding import runHashingSeeding, Config
0252 
0253     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0254 
0255     seq = Sequencer(events=10, numThreads=1)
0256 
0257     root_files = [
0258         (
0259             "estimatedparams.root",
0260             "estimatedparams",
0261         ),
0262         (
0263             "performance_seeding.root",
0264             None,
0265         ),
0266     ]
0267 
0268     for fn, _ in root_files:
0269         fp = tmp_path / fn
0270         assert not fp.exists(), f"{fp} exists"
0271 
0272     config = Config(
0273         mu=50,
0274     )
0275 
0276     _, _, digiConfig, geoSelectionConfigFile = config.getDetectorInfo()
0277 
0278     runHashingSeeding(
0279         10,
0280         trk_geo,
0281         field,
0282         outputDir=str(tmp_path),
0283         saveFiles=True,
0284         npileup=config.mu,
0285         seedingAlgorithm=config.seedingAlgorithm,
0286         maxSeedsPerSpM=config.maxSeedsPerSpM,
0287         digiConfig=digiConfig,
0288         geoSelectionConfigFile=geoSelectionConfigFile,
0289         config=config,
0290         s=seq,
0291     ).run()
0292 
0293     del seq
0294 
0295     for fn, tn in root_files:
0296         fp = tmp_path / fn
0297         assert fp.exists(), f"{fp} does not exist"
0298         assert fp.stat().st_size > 100, f"{fp} is too small: {fp.stat().st_size} bytes"
0299 
0300         if tn is not None:
0301             assert_has_entries(fp, tn)
0302             assert_root_hash(fn, fp)
0303 
0304     assert_csv_output(tmp_path, "particles_simulated")
0305     assert_csv_output(tmp_path, "buckets")
0306     assert_csv_output(tmp_path, "seed")
0307 
0308 
0309 def test_seeding_orthogonal(tmp_path, trk_geo, field, assert_root_hash):
0310     from seeding import runSeeding, SeedingAlgorithm
0311 
0312     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0313 
0314     csv = tmp_path / "csv"
0315     csv.mkdir()
0316 
0317     seq = Sequencer(events=10, numThreads=1)
0318 
0319     root_files = [
0320         (
0321             "estimatedparams.root",
0322             "estimatedparams",
0323         ),
0324         (
0325             "performance_seeding.root",
0326             None,
0327         ),
0328         (
0329             "particles.root",
0330             "particles",
0331         ),
0332         (
0333             "particles_simulation.root",
0334             "particles",
0335         ),
0336     ]
0337 
0338     for fn, _ in root_files:
0339         fp = tmp_path / fn
0340         assert not fp.exists()
0341 
0342     assert len(list(csv.iterdir())) == 0
0343 
0344     runSeeding(
0345         trk_geo,
0346         field,
0347         outputDir=str(tmp_path),
0348         s=seq,
0349         seedingAlgorithm=SeedingAlgorithm.Orthogonal,
0350     ).run()
0351 
0352     for fn, tn in root_files:
0353         fp = tmp_path / fn
0354         assert fp.exists()
0355         assert fp.stat().st_size > 100
0356 
0357         if tn is not None:
0358             assert_has_entries(fp, tn)
0359             assert_root_hash(fn, fp)
0360 
0361     assert_csv_output(csv, "particles")
0362     assert_csv_output(csv, "particles_simulated")
0363 
0364 
0365 def test_itk_seeding(tmp_path, trk_geo, field, assert_root_hash):
0366     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0367 
0368     csv = tmp_path / "csv"
0369     csv.mkdir()
0370 
0371     seq = Sequencer(events=10, numThreads=1)
0372 
0373     root_files = [
0374         (
0375             "estimatedparams.root",
0376             "estimatedparams",
0377         ),
0378         (
0379             "performance_seeding.root",
0380             None,
0381         ),
0382         (
0383             "particles.root",
0384             "particles",
0385         ),
0386         (
0387             "particles_simulation.root",
0388             "particles",
0389         ),
0390     ]
0391 
0392     for fn, _ in root_files:
0393         fp = tmp_path / fn
0394         assert not fp.exists()
0395 
0396     assert len(list(csv.iterdir())) == 0
0397 
0398     rnd = acts.examples.RandomNumbers(seed=42)
0399 
0400     from acts.examples.simulation import (
0401         addParticleGun,
0402         EtaConfig,
0403         MomentumConfig,
0404         ParticleConfig,
0405         addFatras,
0406         addDigitization,
0407         ParticleSelectorConfig,
0408         addDigiParticleSelection,
0409     )
0410 
0411     addParticleGun(
0412         seq,
0413         MomentumConfig(1.0 * u.GeV, 10.0 * u.GeV, True),
0414         EtaConfig(-4.0, 4.0, True),
0415         ParticleConfig(1, acts.PdgParticle.eMuon, True),
0416         outputDirCsv=tmp_path / "csv",
0417         outputDirRoot=str(tmp_path),
0418         rnd=rnd,
0419     )
0420 
0421     addFatras(
0422         seq,
0423         trk_geo,
0424         field,
0425         outputDirCsv=tmp_path / "csv",
0426         outputDirRoot=str(tmp_path),
0427         rnd=rnd,
0428     )
0429 
0430     srcdir = Path(__file__).resolve().parent.parent.parent.parent
0431     addDigitization(
0432         seq,
0433         trk_geo,
0434         field,
0435         digiConfigFile=srcdir
0436         / "Examples/Algorithms/Digitization/share/default-smearing-config-generic.json",
0437         rnd=rnd,
0438     )
0439 
0440     addDigiParticleSelection(
0441         seq,
0442         ParticleSelectorConfig(
0443             pt=(0.9 * u.GeV, None),
0444             eta=(-4, 4),
0445             measurements=(9, None),
0446             removeNeutral=True,
0447         ),
0448     )
0449 
0450     from acts.examples.reconstruction import (
0451         addSeeding,
0452     )
0453     from acts.examples.itk import itkSeedingAlgConfig, InputSpacePointsType
0454 
0455     addSeeding(
0456         seq,
0457         trk_geo,
0458         field,
0459         *itkSeedingAlgConfig(InputSpacePointsType.PixelSpacePoints),
0460         acts.logging.VERBOSE,
0461         geoSelectionConfigFile=srcdir
0462         / "Examples/Algorithms/TrackFinding/share/geoSelection-genericDetector.json",
0463         outputDirRoot=str(tmp_path),
0464     )
0465 
0466     seq.run()
0467 
0468     for fn, tn in root_files:
0469         fp = tmp_path / fn
0470         assert fp.exists()
0471         assert fp.stat().st_size > 100
0472 
0473         if tn is not None:
0474             assert_has_entries(fp, tn)
0475             assert_root_hash(fn, fp)
0476 
0477     assert_csv_output(csv, "particles")
0478     assert_csv_output(csv, "particles_simulated")
0479 
0480 
0481 @pytest.mark.slow
0482 def test_propagation(tmp_path, trk_geo, field, seq, assert_root_hash):
0483     from propagation import runPropagation
0484 
0485     root_files = [
0486         (
0487             "propagation_summary.root",
0488             "propagation_summary",
0489             10000,
0490         )
0491     ]
0492 
0493     for fn, _, _ in root_files:
0494         fp = tmp_path / fn
0495         assert not fp.exists()
0496 
0497     runPropagation(trk_geo, field, str(tmp_path), s=seq).run()
0498 
0499     for fn, tn, ee in root_files:
0500         fp = tmp_path / fn
0501         assert fp.exists()
0502         assert fp.stat().st_size > 2**10 * 50
0503         assert_entries(fp, tn, ee)
0504         assert_root_hash(fn, fp)
0505 
0506 
0507 @pytest.mark.slow
0508 @pytest.mark.odd
0509 @pytest.mark.skipif(not geant4Enabled, reason="Geant4 not set up")
0510 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0511 def test_material_recording(tmp_path, material_recording, assert_root_hash):
0512     root_files = [
0513         (
0514             "geant4_material_tracks.root",
0515             "material-tracks",
0516             200,
0517         )
0518     ]
0519 
0520     for fn, tn, ee in root_files:
0521         fp = material_recording / fn
0522         assert fp.exists()
0523         assert fp.stat().st_size > 2**10 * 50
0524         assert_entries(fp, tn, ee)
0525         assert_root_hash(fn, fp)
0526 
0527 
0528 @pytest.mark.slow
0529 @pytest.mark.odd
0530 @pytest.mark.skipif(not hepmc3Enabled, reason="HepMC3 plugin not available")
0531 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0532 @pytest.mark.skipif(not geant4Enabled, reason="Geant4 not set up")
0533 def test_event_recording(tmp_path):
0534     script = (
0535         Path(__file__).parent.parent.parent.parent
0536         / "Examples"
0537         / "Scripts"
0538         / "Python"
0539         / "event_recording.py"
0540     )
0541     assert script.exists()
0542 
0543     env = os.environ.copy()
0544     env["NEVENTS"] = "1"
0545     env["ACTS_LOG_FAILURE_THRESHOLD"] = "WARNING"
0546     try:
0547         subprocess.check_call(
0548             [sys.executable, str(script)],
0549             cwd=tmp_path,
0550             env=env,
0551             stderr=subprocess.STDOUT,
0552         )
0553     except subprocess.CalledProcessError as e:
0554         print(e.output.decode("utf-8"))
0555         raise
0556 
0557     from acts.examples.hepmc3 import HepMC3AsciiReader
0558 
0559     out_path = tmp_path / "hepmc3"
0560     # out_path.mkdir()
0561 
0562     assert len([f for f in out_path.iterdir() if f.name.endswith("events.hepmc3")]) > 0
0563     assert all([f.stat().st_size > 100 for f in out_path.iterdir()])
0564 
0565     s = Sequencer(numThreads=1)
0566 
0567     s.addReader(
0568         HepMC3AsciiReader(
0569             level=acts.logging.INFO,
0570             inputDir=str(out_path),
0571             inputStem="events",
0572             outputEvents="hepmc-events",
0573         )
0574     )
0575 
0576     alg = AssertCollectionExistsAlg(
0577         "hepmc-events", name="check_alg", level=acts.logging.INFO
0578     )
0579     s.addAlgorithm(alg)
0580 
0581     s.run()
0582 
0583     assert alg.events_seen == 1
0584 
0585 
0586 @pytest.mark.parametrize("revFiltMomThresh", [0 * u.GeV, 1 * u.TeV])
0587 def test_truth_tracking_kalman(
0588     tmp_path, assert_root_hash, revFiltMomThresh, detector_config
0589 ):
0590     root_files = [
0591         ("trackstates_kf.root", "trackstates", 19),
0592         ("tracksummary_kf.root", "tracksummary", 10),
0593         ("performance_kf.root", None, -1),
0594     ]
0595 
0596     for fn, _, _ in root_files:
0597         fp = tmp_path / fn
0598         assert not fp.exists()
0599 
0600     with detector_config.detector:
0601         from truth_tracking_kalman import runTruthTrackingKalman
0602 
0603         field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0604 
0605         seq = Sequencer(events=10, numThreads=1)
0606 
0607         runTruthTrackingKalman(
0608             trackingGeometry=detector_config.trackingGeometry,
0609             field=field,
0610             digiConfigFile=detector_config.digiConfigFile,
0611             outputDir=tmp_path,
0612             reverseFilteringMomThreshold=revFiltMomThresh,
0613             s=seq,
0614         )
0615 
0616         seq.run()
0617 
0618     for fn, tn, ee in root_files:
0619         fp = tmp_path / fn
0620         assert fp.exists()
0621         assert fp.stat().st_size > 1024
0622         if tn is not None:
0623             assert_has_entries(fp, tn)
0624             assert_root_hash(fn, fp)
0625 
0626     import ROOT
0627 
0628     ROOT.PyConfig.IgnoreCommandLineOptions = True
0629     ROOT.gROOT.SetBatch(True)
0630     rf = ROOT.TFile.Open(str(tmp_path / "tracksummary_kf.root"))
0631     keys = [k.GetName() for k in rf.GetListOfKeys()]
0632     assert "tracksummary" in keys
0633     for entry in rf.Get("tracksummary"):
0634         assert entry.hasFittedParams
0635 
0636 
0637 def test_truth_tracking_gsf(tmp_path, assert_root_hash, detector_config):
0638     from truth_tracking_gsf import runTruthTrackingGsf
0639 
0640     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0641 
0642     seq = Sequencer(
0643         events=10,
0644         numThreads=1,
0645         fpeMasks=[
0646             (
0647                 "Core/include/Acts/TrackFitting/detail/GsfUtils.hpp:197",
0648                 acts.FpeType.FLTUND,
0649                 1,
0650             ),
0651         ],
0652     )
0653 
0654     root_files = [
0655         ("trackstates_gsf.root", "trackstates"),
0656         ("tracksummary_gsf.root", "tracksummary"),
0657     ]
0658 
0659     for fn, _ in root_files:
0660         fp = tmp_path / fn
0661         assert not fp.exists()
0662 
0663     with detector_config.detector:
0664         runTruthTrackingGsf(
0665             trackingGeometry=detector_config.trackingGeometry,
0666             decorators=detector_config.decorators,
0667             field=field,
0668             digiConfigFile=detector_config.digiConfigFile,
0669             outputDir=tmp_path,
0670             s=seq,
0671         )
0672 
0673         # See https://github.com/acts-project/acts/issues/1300
0674         with failure_threshold(acts.logging.FATAL):
0675             seq.run()
0676 
0677     for fn, tn in root_files:
0678         fp = tmp_path / fn
0679         assert fp.exists()
0680         assert fp.stat().st_size > 1024
0681         if tn is not None:
0682             assert_root_hash(fn, fp)
0683 
0684 
0685 def test_refitting(tmp_path, detector_config, assert_root_hash):
0686     from truth_tracking_gsf_refitting import runRefittingGsf
0687 
0688     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0689 
0690     seq = Sequencer(
0691         events=10,
0692         numThreads=1,
0693     )
0694 
0695     with detector_config.detector:
0696         # Only check if it runs without errors right known
0697         # Changes in fitter behaviour should be caught by other tests
0698         runRefittingGsf(
0699             trackingGeometry=detector_config.trackingGeometry,
0700             field=field,
0701             digiConfigFile=detector_config.digiConfigFile,
0702             outputDir=tmp_path,
0703             s=seq,
0704         ).run()
0705 
0706     root_files = [
0707         ("trackstates_gsf_refit.root", "trackstates"),
0708         ("tracksummary_gsf_refit.root", "tracksummary"),
0709     ]
0710 
0711     for fn, tn in root_files:
0712         fp = tmp_path / fn
0713         assert fp.exists()
0714         assert fp.stat().st_size > 1024
0715         if tn is not None:
0716             assert_root_hash(fn, fp)
0717 
0718 
0719 def test_particle_gun(tmp_path, assert_root_hash):
0720     from particle_gun import runParticleGun
0721 
0722     s = Sequencer(events=20, numThreads=-1)
0723 
0724     csv_dir = tmp_path / "csv"
0725     root_file = tmp_path / "particles.root"
0726 
0727     assert not csv_dir.exists()
0728     assert not root_file.exists()
0729 
0730     runParticleGun(str(tmp_path), s=s).run()
0731 
0732     assert csv_dir.exists()
0733     assert root_file.exists()
0734 
0735     assert len([f for f in csv_dir.iterdir() if f.name.endswith("particles.csv")]) > 0
0736     assert all([f.stat().st_size > 100 for f in csv_dir.iterdir()])
0737 
0738     assert root_file.stat().st_size > 200
0739     assert_entries(root_file, "particles", 20)
0740     assert_root_hash(root_file.name, root_file)
0741 
0742 
0743 @pytest.mark.slow
0744 @pytest.mark.odd
0745 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0746 def test_material_mapping(material_recording, tmp_path, assert_root_hash):
0747     from material_mapping import runMaterialMapping
0748     from material_validation import runMaterialValidation
0749 
0750     map_file = tmp_path / "material-map_tracks.root"
0751     assert not map_file.exists()
0752 
0753     odd_dir = getOpenDataDetectorDirectory()
0754     config = acts.MaterialMapJsonConverter.Config()
0755     mdecorator = acts.JsonMaterialDecorator(
0756         level=acts.logging.INFO,
0757         rConfig=config,
0758         jFileName=str(odd_dir / "config/odd-material-mapping-config.json"),
0759     )
0760 
0761     s = Sequencer(numThreads=1)
0762 
0763     with getOpenDataDetector(mdecorator) as detector:
0764         trackingGeometry = detector.trackingGeometry()
0765         decorators = detector.contextDecorators()
0766 
0767         runMaterialMapping(
0768             trackingGeometry,
0769             decorators,
0770             outputDir=str(tmp_path),
0771             inputDir=material_recording,
0772             mappingStep=1,
0773             s=s,
0774         )
0775 
0776         s.run()
0777 
0778     mat_file = tmp_path / "material-map.json"
0779 
0780     assert mat_file.exists()
0781     assert mat_file.stat().st_size > 10
0782 
0783     with mat_file.open() as fh:
0784         assert json.load(fh)
0785 
0786     assert map_file.exists()
0787     assert_entries(map_file, "material-tracks", 200)
0788     assert_root_hash(map_file.name, map_file)
0789 
0790     val_file = tmp_path / "propagation-material.root"
0791     assert not val_file.exists()
0792 
0793     # test the validation as well
0794 
0795     field = acts.NullBField()
0796 
0797     s = Sequencer(events=10, numThreads=1)
0798 
0799     with getOpenDataDetector(
0800         mdecorator=acts.IMaterialDecorator.fromFile(mat_file)
0801     ) as detector:
0802         trackingGeometry = detector.trackingGeometry()
0803         decorators = detector.contextDecorators()
0804 
0805         runMaterialValidation(
0806             10, 1000, trackingGeometry, decorators, field, outputDir=str(tmp_path), s=s
0807         )
0808 
0809         s.run()
0810 
0811     assert val_file.exists()
0812     assert_entries(val_file, "material-tracks", 10000)
0813     assert_root_hash(val_file.name, val_file)
0814 
0815 
0816 @pytest.mark.slow
0817 @pytest.mark.odd
0818 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0819 def test_volume_material_mapping(material_recording, tmp_path, assert_root_hash):
0820     from material_mapping import runMaterialMapping
0821     from material_validation import runMaterialValidation
0822 
0823     map_file = tmp_path / "material-map-volume_tracks.root"
0824     assert not map_file.exists()
0825 
0826     geo_map = Path(__file__).parent / "geometry-volume-map.json"
0827     assert geo_map.exists()
0828     assert geo_map.stat().st_size > 10
0829     with geo_map.open() as fh:
0830         assert json.load(fh)
0831 
0832     s = Sequencer(numThreads=1)
0833 
0834     with getOpenDataDetector(
0835         mdecorator=acts.IMaterialDecorator.fromFile(geo_map)
0836     ) as detector:
0837         trackingGeometry = detector.trackingGeometry()
0838         decorators = detector.contextDecorators()
0839 
0840         runMaterialMapping(
0841             trackingGeometry,
0842             decorators,
0843             mapName="material-map-volume",
0844             outputDir=str(tmp_path),
0845             inputDir=material_recording,
0846             mappingStep=1,
0847             s=s,
0848         )
0849 
0850         s.run()
0851 
0852     mat_file = tmp_path / "material-map-volume.json"
0853 
0854     assert mat_file.exists()
0855     assert mat_file.stat().st_size > 10
0856 
0857     with mat_file.open() as fh:
0858         assert json.load(fh)
0859 
0860     assert map_file.exists()
0861     assert_entries(map_file, "material-tracks", 200)
0862     assert_root_hash(map_file.name, map_file)
0863 
0864     val_file = tmp_path / "propagation-volume-material.root"
0865     assert not val_file.exists()
0866 
0867     # test the validation as well
0868 
0869     field = acts.NullBField()
0870 
0871     s = Sequencer(events=10, numThreads=1)
0872 
0873     with getOpenDataDetector(
0874         mdecorator=acts.IMaterialDecorator.fromFile(mat_file)
0875     ) as detector:
0876         trackingGeometry = detector.trackingGeometry()
0877         decorators = detector.contextDecorators()
0878 
0879         runMaterialValidation(
0880             10,
0881             1000,
0882             trackingGeometry,
0883             decorators,
0884             field,
0885             outputDir=str(tmp_path),
0886             outputName="propagation-volume-material",
0887             s=s,
0888         )
0889 
0890         s.run()
0891 
0892     assert val_file.exists()
0893 
0894     assert_root_hash(val_file.name, val_file)
0895 
0896 
0897 @pytest.mark.parametrize(
0898     "detectorFactory,aligned,nobj",
0899     [
0900         (GenericDetector, True, 450),
0901         pytest.param(
0902             getOpenDataDetector,
0903             True,
0904             540,
0905             marks=[
0906                 pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up"),
0907                 pytest.mark.slow,
0908                 pytest.mark.odd,
0909             ],
0910         ),
0911         (functools.partial(AlignedDetector, iovSize=1), False, 450),
0912     ],
0913 )
0914 @pytest.mark.slow
0915 def test_geometry_example(detectorFactory, aligned, nobj, tmp_path):
0916     detector = detectorFactory()
0917     trackingGeometry = detector.trackingGeometry()
0918     decorators = detector.contextDecorators()
0919 
0920     from geometry import runGeometry
0921 
0922     json_dir = tmp_path / "json"
0923     csv_dir = tmp_path / "csv"
0924     obj_dir = tmp_path / "obj"
0925 
0926     for d in (json_dir, csv_dir, obj_dir):
0927         d.mkdir()
0928 
0929     events = 5
0930 
0931     kwargs = dict(
0932         trackingGeometry=trackingGeometry,
0933         decorators=decorators,
0934         events=events,
0935         outputDir=str(tmp_path),
0936     )
0937 
0938     runGeometry(outputJson=True, **kwargs)
0939     runGeometry(outputJson=False, **kwargs)
0940 
0941     assert len(list(obj_dir.iterdir())) == nobj
0942     assert all(f.stat().st_size > 200 for f in obj_dir.iterdir())
0943 
0944     assert len(list(csv_dir.iterdir())) == 3 * events
0945     assert all(f.stat().st_size > 200 for f in csv_dir.iterdir())
0946 
0947     detector_files = [csv_dir / f"event{i:>09}-detectors.csv" for i in range(events)]
0948     for detector_file in detector_files:
0949         assert detector_file.exists()
0950         assert detector_file.stat().st_size > 200
0951 
0952     contents = [f.read_text() for f in detector_files]
0953     ref = contents[0]
0954     for c in contents[1:]:
0955         if aligned:
0956             assert c == ref, "Detector writeout is expected to be identical"
0957         else:
0958             assert c != ref, "Detector writeout is expected to be different"
0959 
0960     if aligned:
0961         for f in [json_dir / f"event{i:>09}-detector.json" for i in range(events)]:
0962             assert detector_file.exists()
0963             with f.open() as fh:
0964                 data = json.load(fh)
0965                 assert data
0966         material_file = tmp_path / "geometry-map.json"
0967         assert material_file.exists()
0968         assert material_file.stat().st_size > 200
0969 
0970 
0971 DIGI_SHARE_DIR = (
0972     Path(__file__).parent.parent.parent.parent
0973     / "Examples/Algorithms/Digitization/share"
0974 )
0975 
0976 
0977 @pytest.mark.parametrize(
0978     "digi_config_file",
0979     [
0980         DIGI_SHARE_DIR / "default-smearing-config-generic.json",
0981         DIGI_SHARE_DIR / "default-geometric-config-generic.json",
0982     ],
0983     ids=["smeared", "geometric"],
0984 )
0985 def test_digitization_example(trk_geo, tmp_path, assert_root_hash, digi_config_file):
0986     from digitization import runDigitization
0987 
0988     s = Sequencer(events=10, numThreads=-1)
0989 
0990     csv_dir = tmp_path / "csv"
0991     root_file = tmp_path / "measurements.root"
0992 
0993     assert not root_file.exists()
0994     assert not csv_dir.exists()
0995 
0996     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0997     runDigitization(
0998         trk_geo, field, outputDir=tmp_path, digiConfigFile=digi_config_file, s=s
0999     )
1000 
1001     s.run()
1002 
1003     assert root_file.exists()
1004     assert csv_dir.exists()
1005 
1006     assert len(list(csv_dir.iterdir())) == 3 * s.config.events
1007     assert all(f.stat().st_size > 50 for f in csv_dir.iterdir())
1008 
1009     assert_root_hash(root_file.name, root_file)
1010 
1011 
1012 @pytest.mark.parametrize(
1013     "digi_config_file",
1014     [
1015         DIGI_SHARE_DIR / "default-smearing-config-generic.json",
1016         DIGI_SHARE_DIR / "default-geometric-config-generic.json",
1017         pytest.param(
1018             (
1019                 getOpenDataDetectorDirectory()
1020                 / "config"
1021                 / "odd-digi-smearing-config.json"
1022             ),
1023             marks=[
1024                 pytest.mark.odd,
1025             ],
1026         ),
1027         pytest.param(
1028             (
1029                 getOpenDataDetectorDirectory()
1030                 / "config"
1031                 / "odd-digi-geometric-config.json"
1032             ),
1033             marks=[
1034                 pytest.mark.odd,
1035             ],
1036         ),
1037     ],
1038     ids=["smeared", "geometric", "odd-smeared", "odd-geometric"],
1039 )
1040 def test_digitization_example_input_parsing(digi_config_file):
1041     from acts.examples import readDigiConfigFromJson
1042 
1043     acts.examples.readDigiConfigFromJson(str(digi_config_file))
1044 
1045 
1046 @pytest.mark.parametrize(
1047     "digi_config_file",
1048     [
1049         DIGI_SHARE_DIR / "default-smearing-config-generic.json",
1050         DIGI_SHARE_DIR / "default-geometric-config-generic.json",
1051     ],
1052     ids=["smeared", "geometric"],
1053 )
1054 def test_digitization_example_input(
1055     trk_geo, tmp_path, assert_root_hash, digi_config_file
1056 ):
1057     from particle_gun import runParticleGun
1058     from digitization import runDigitization
1059 
1060     ptcl_dir = tmp_path / "ptcl"
1061     ptcl_dir.mkdir()
1062     pgs = Sequencer(events=20, numThreads=-1)
1063     runParticleGun(str(ptcl_dir), s=pgs)
1064 
1065     pgs.run()
1066 
1067     s = Sequencer(numThreads=-1)
1068 
1069     csv_dir = tmp_path / "csv"
1070     root_file = tmp_path / "measurements.root"
1071 
1072     assert not root_file.exists()
1073     assert not csv_dir.exists()
1074 
1075     assert_root_hash(
1076         "particles.root",
1077         ptcl_dir / "particles.root",
1078     )
1079 
1080     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
1081     runDigitization(
1082         trk_geo,
1083         field,
1084         outputDir=tmp_path,
1085         digiConfigFile=digi_config_file,
1086         particlesInput=ptcl_dir / "particles.root",
1087         s=s,
1088         doMerge=True,
1089     )
1090 
1091     s.run()
1092 
1093     assert root_file.exists()
1094     assert csv_dir.exists()
1095 
1096     assert len(list(csv_dir.iterdir())) == 3 * pgs.config.events
1097     assert all(f.stat().st_size > 50 for f in csv_dir.iterdir())
1098 
1099     assert_root_hash(root_file.name, root_file)
1100 
1101 
1102 def test_digitization_config_example(trk_geo, tmp_path):
1103     from digitization_config import runDigitizationConfig
1104 
1105     out_file = tmp_path / "output.json"
1106     assert not out_file.exists()
1107 
1108     input = (
1109         Path(__file__).parent
1110         / "../../../Examples/Algorithms/Digitization/share/default-smearing-config-generic.json"
1111     )
1112     assert input.exists(), input.resolve()
1113 
1114     runDigitizationConfig(trk_geo, input=input, output=out_file)
1115 
1116     assert out_file.exists()
1117 
1118     with out_file.open() as fh:
1119         data = json.load(fh)
1120     assert len(data.keys()) == 2
1121     assert data["acts-geometry-hierarchy-map"]["format-version"] == 0
1122     assert (
1123         data["acts-geometry-hierarchy-map"]["value-identifier"]
1124         == "digitization-configuration"
1125     )
1126     assert len(data["entries"]) == 27
1127 
1128 
1129 @pytest.mark.parametrize(
1130     "truthSmeared,truthEstimated",
1131     [
1132         [False, False],
1133         [False, True],
1134         [True, False],
1135     ],
1136     ids=["full_seeding", "truth_estimated", "truth_smeared"],
1137 )
1138 @pytest.mark.slow
1139 def test_ckf_tracks_example(
1140     tmp_path, assert_root_hash, truthSmeared, truthEstimated, detector_config
1141 ):
1142     csv = tmp_path / "csv"
1143 
1144     assert not csv.exists()
1145 
1146     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
1147     events = 100
1148     s = Sequencer(events=events, numThreads=-1)
1149 
1150     root_files = [
1151         (
1152             "performance_finding_ckf.root",
1153             None,
1154         ),
1155         (
1156             "trackstates_ckf.root",
1157             "trackstates",
1158         ),
1159         (
1160             "tracksummary_ckf.root",
1161             "tracksummary",
1162         ),
1163     ]
1164 
1165     if not truthSmeared:
1166         root_files += [
1167             (
1168                 "performance_seeding.root",
1169                 None,
1170             ),
1171         ]
1172 
1173     for rf, _ in root_files:
1174         assert not (tmp_path / rf).exists()
1175 
1176     from ckf_tracks import runCKFTracks
1177 
1178     with detector_config.detector:
1179         runCKFTracks(
1180             detector_config.trackingGeometry,
1181             detector_config.decorators,
1182             field=field,
1183             outputCsv=True,
1184             outputDir=tmp_path,
1185             geometrySelection=detector_config.geometrySelection,
1186             digiConfigFile=detector_config.digiConfigFile,
1187             truthSmearedSeeded=truthSmeared,
1188             truthEstimatedSeeded=truthEstimated,
1189             s=s,
1190         )
1191 
1192         s.run()
1193 
1194     assert csv.exists()
1195     for rf, tn in root_files:
1196         rp = tmp_path / rf
1197         assert rp.exists()
1198         if tn is not None:
1199             assert_root_hash(rf, rp)
1200 
1201     assert (
1202         len([f for f in csv.iterdir() if f.name.endswith("tracks_ckf.csv")]) == events
1203     )
1204     assert all([f.stat().st_size > 300 for f in csv.iterdir()])
1205 
1206 
1207 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
1208 @pytest.mark.odd
1209 @pytest.mark.slow
1210 def test_full_chain_odd_example(tmp_path):
1211     # This test literally only ensures that the full chain example can run without erroring out
1212 
1213     # just to make sure it can build the odd
1214     with getOpenDataDetector():
1215         pass
1216 
1217     script = (
1218         Path(__file__).parent.parent.parent.parent
1219         / "Examples"
1220         / "Scripts"
1221         / "Python"
1222         / "full_chain_odd.py"
1223     )
1224     assert script.exists()
1225     env = os.environ.copy()
1226     env["ACTS_LOG_FAILURE_THRESHOLD"] = "ERROR"
1227     try:
1228         subprocess.check_call(
1229             [sys.executable, str(script), "-n1"],
1230             cwd=tmp_path,
1231             env=env,
1232             stderr=subprocess.STDOUT,
1233         )
1234     except subprocess.CalledProcessError as e:
1235         print(e.output.decode("utf-8"))
1236         raise
1237 
1238 
1239 @pytest.mark.skipif(
1240     not dd4hepEnabled or not geant4Enabled, reason="DD4hep and/or Geant4 not set up"
1241 )
1242 @pytest.mark.slow
1243 def test_full_chain_odd_example_pythia_geant4(tmp_path):
1244     # This test literally only ensures that the full chain example can run without erroring out
1245 
1246     # just to make sure it can build the odd
1247     with getOpenDataDetector():
1248         pass
1249 
1250     script = (
1251         Path(__file__).parent.parent.parent.parent
1252         / "Examples"
1253         / "Scripts"
1254         / "Python"
1255         / "full_chain_odd.py"
1256     )
1257     assert script.exists()
1258     env = os.environ.copy()
1259     env["ACTS_LOG_FAILURE_THRESHOLD"] = "ERROR"
1260     try:
1261         stdout = subprocess.check_output(
1262             [
1263                 sys.executable,
1264                 str(script),
1265                 "-n1",
1266                 "--geant4",
1267                 "--ttbar",
1268                 "--ttbar-pu",
1269                 "50",
1270             ],
1271             cwd=tmp_path,
1272             env=env,
1273             stderr=subprocess.STDOUT,
1274         )
1275         stdout = stdout.decode("utf-8")
1276     except subprocess.CalledProcessError as e:
1277         print(e.output.decode("utf-8"))
1278         raise
1279 
1280     # collect and compare known errors
1281     errors = []
1282     error_regex = re.compile(r"^\d\d:\d\d:\d\d\s+(\w+)\s+ERROR\s+", re.MULTILINE)
1283     for match in error_regex.finditer(stdout):
1284         (algo,) = match.groups()
1285         errors.append(algo)
1286     errors = collections.Counter(errors)
1287     assert dict(errors) == {}, stdout
1288 
1289 
1290 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
1291 @pytest.mark.skipif(not onnxEnabled, reason="ONNX plugin not enabled")
1292 @pytest.mark.slow
1293 def test_ML_Ambiguity_Solver(tmp_path, assert_root_hash):
1294     # This test literally only ensures that the full chain example can run without erroring out
1295 
1296     root_file = "performance_finding_ambiML.root"
1297     output_dir = "odd_output"
1298     assert not (tmp_path / root_file).exists()
1299 
1300     # just to make sure it can build the odd
1301     with getOpenDataDetector():
1302         pass
1303 
1304     script = (
1305         Path(__file__).parent.parent.parent.parent
1306         / "Examples"
1307         / "Scripts"
1308         / "Python"
1309         / "full_chain_odd.py"
1310     )
1311     assert script.exists()
1312     env = os.environ.copy()
1313     env["ACTS_LOG_FAILURE_THRESHOLD"] = "ERROR"
1314     try:
1315         subprocess.check_call(
1316             [sys.executable, str(script), "-n1", "--ambi-solver", "ML"],
1317             cwd=tmp_path,
1318             env=env,
1319             stderr=subprocess.STDOUT,
1320         )
1321     except subprocess.CalledProcessError as e:
1322         print(e.output.decode("utf-8"))
1323         raise
1324 
1325     rfp = tmp_path / output_dir / root_file
1326     assert rfp.exists()
1327 
1328     assert_root_hash(root_file, rfp)
1329 
1330 
1331 def test_bfield_writing(tmp_path, seq, assert_root_hash):
1332     from bfield_writing import runBFieldWriting
1333 
1334     root_files = [
1335         ("solenoid.root", "solenoid", 100),
1336         ("solenoid2.root", "solenoid", 100),
1337     ]
1338 
1339     for fn, _, _ in root_files:
1340         fp = tmp_path / fn
1341         assert not fp.exists()
1342 
1343     runBFieldWriting(outputDir=tmp_path, rewrites=1)
1344 
1345     for fn, tn, ee in root_files:
1346         fp = tmp_path / fn
1347         assert fp.exists()
1348         assert fp.stat().st_size > 2**10 * 2
1349         assert_entries(fp, tn, ee)
1350         assert_root_hash(fn, fp)
1351 
1352 
1353 @pytest.mark.parametrize("backend", ["onnx", "torch"])
1354 @pytest.mark.parametrize("hardware", ["cpu", "gpu"])
1355 @pytest.mark.skipif(not exatrkxEnabled, reason="ExaTrkX environment not set up")
1356 def test_exatrkx(tmp_path, trk_geo, field, assert_root_hash, backend, hardware):
1357     if backend == "onnx" and hardware == "cpu":
1358         pytest.skip("Combination of ONNX and CPU not yet supported")
1359 
1360     root_file = "performance_track_finding.root"
1361     assert not (tmp_path / root_file).exists()
1362 
1363     if backend == "onnx":
1364         url = "https://acts.web.cern.ch/ci/exatrkx/onnx_models_v01.tar"
1365     else:
1366         url = "https://acts.web.cern.ch/ci/exatrkx/torchscript_models_v01.tar"
1367 
1368     tarfile_name = tmp_path / "models.tar"
1369     urllib.request.urlretrieve(url, tarfile_name)
1370     tarfile.open(tarfile_name).extractall(tmp_path)
1371     script = (
1372         Path(__file__).parent.parent.parent.parent
1373         / "Examples"
1374         / "Scripts"
1375         / "Python"
1376         / "exatrkx.py"
1377     )
1378     assert script.exists()
1379     env = os.environ.copy()
1380     env["ACTS_LOG_FAILURE_THRESHOLD"] = "WARNING"
1381 
1382     if hardware == "cpu":
1383         env["CUDA_VISIBLE_DEVICES"] = ""
1384 
1385     try:
1386         subprocess.check_call(
1387             [sys.executable, str(script), backend],
1388             cwd=tmp_path,
1389             env=env,
1390             stderr=subprocess.STDOUT,
1391         )
1392     except subprocess.CalledProcessError as e:
1393         print(e.output.decode("utf-8"))
1394         raise
1395 
1396     rfp = tmp_path / root_file
1397     assert rfp.exists()
1398 
1399     assert_root_hash(root_file, rfp)