Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-10-13 08:17:45

0001 from pathlib import Path
0002 import os
0003 import json
0004 import functools
0005 import tarfile
0006 import urllib.request
0007 import subprocess
0008 import sys
0009 import re
0010 import collections
0011 import shutil
0012 
0013 import pytest
0014 
0015 from helpers import (
0016     geant4Enabled,
0017     geomodelEnabled,
0018     dd4hepEnabled,
0019     hepmc3Enabled,
0020     pythia8Enabled,
0021     gnnEnabled,
0022     onnxEnabled,
0023     hashingSeedingEnabled,
0024     AssertCollectionExistsAlg,
0025     failure_threshold,
0026 )
0027 
0028 import acts
0029 from acts.examples import (
0030     Sequencer,
0031     GenericDetector,
0032 )
0033 from acts.examples.odd import getOpenDataDetector, getOpenDataDetectorDirectory
0034 
0035 
0036 u = acts.UnitConstants
0037 
0038 
0039 @pytest.fixture
0040 def field():
0041     return acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0042 
0043 
0044 @pytest.fixture
0045 def seq():
0046     return Sequencer(events=10, numThreads=1)
0047 
0048 
0049 def assert_csv_output(csv_path, stem):
0050     __tracebackhide__ = True
0051     # print(list(csv_path.iterdir()))
0052     assert len([f for f in csv_path.iterdir() if f.name.endswith(stem + ".csv")]) > 0
0053     assert all(
0054         [
0055             f.stat().st_size > 100
0056             for f in csv_path.iterdir()
0057             if f.name.endswith(stem + ".csv")
0058         ]
0059     )
0060 
0061 
0062 def assert_entries(root_file, tree_name, exp=None, non_zero=False):
0063     __tracebackhide__ = True
0064     import ROOT
0065 
0066     ROOT.PyConfig.IgnoreCommandLineOptions = True
0067     ROOT.gROOT.SetBatch(True)
0068 
0069     rf = ROOT.TFile.Open(str(root_file))
0070     keys = [k.GetName() for k in rf.GetListOfKeys()]
0071     assert tree_name in keys
0072     print("Entries:", rf.Get(tree_name).GetEntries())
0073     if non_zero:
0074         assert rf.Get(tree_name).GetEntries() > 0, f"{root_file}:{tree_name}"
0075     if exp is not None:
0076         assert rf.Get(tree_name).GetEntries() == exp, f"{root_file}:{tree_name}"
0077 
0078 
0079 def assert_has_entries(root_file, tree_name):
0080     __tracebackhide__ = True
0081     assert_entries(root_file, tree_name, non_zero=True)
0082 
0083 
0084 @pytest.mark.slow
0085 @pytest.mark.skipif(not pythia8Enabled, reason="Pythia8 not set up")
0086 def test_pythia8(tmp_path, seq, assert_root_hash):
0087     from pythia8 import runPythia8
0088 
0089     (tmp_path / "csv").mkdir()
0090 
0091     assert not (tmp_path / "particles.root").exists()
0092     assert len(list((tmp_path / "csv").iterdir())) == 0
0093 
0094     events = seq.config.events
0095 
0096     vtxGen = acts.examples.GaussianVertexGenerator(
0097         stddev=acts.Vector4(50 * u.um, 50 * u.um, 150 * u.mm, 0),
0098         mean=acts.Vector4(0, 0, 0, 0),
0099     )
0100 
0101     runPythia8(
0102         str(tmp_path), outputRoot=True, outputCsv=True, vtxGen=vtxGen, s=seq
0103     ).run()
0104 
0105     fp = tmp_path / "particles.root"
0106     assert fp.exists()
0107     assert fp.stat().st_size > 2**10 * 50
0108     assert_entries(fp, "particles", events)
0109     assert_root_hash(fp.name, fp)
0110 
0111     assert len(list((tmp_path / "csv").iterdir())) > 0
0112     assert_csv_output(tmp_path / "csv", "particles")
0113 
0114 
0115 def test_fatras(trk_geo, tmp_path, field, assert_root_hash):
0116     from fatras import runFatras
0117 
0118     csv = tmp_path / "csv"
0119     csv.mkdir()
0120 
0121     nevents = 10
0122 
0123     root_files = [
0124         (
0125             "particles_simulation.root",
0126             "particles",
0127         ),
0128         (
0129             "hits.root",
0130             "hits",
0131         ),
0132     ]
0133 
0134     assert len(list(csv.iterdir())) == 0
0135     for rf, _ in root_files:
0136         assert not (tmp_path / rf).exists()
0137 
0138     seq = Sequencer(events=nevents)
0139     runFatras(trk_geo, field, str(tmp_path), s=seq).run()
0140 
0141     assert_csv_output(csv, "particles_simulated")
0142     assert_csv_output(csv, "hits")
0143     for f, tn in root_files:
0144         rfp = tmp_path / f
0145         assert rfp.exists()
0146         assert rfp.stat().st_size > 2**10 * 10
0147 
0148         assert_has_entries(rfp, tn)
0149         assert_root_hash(f, rfp)
0150 
0151 
0152 @pytest.mark.slow
0153 @pytest.mark.odd
0154 @pytest.mark.skipif(not geant4Enabled, reason="Geant4 not set up")
0155 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0156 def test_geant4(tmp_path, assert_root_hash):
0157     # This test literally only ensures that the geant 4 example can run without erroring out
0158 
0159     # just to make sure it can build the odd
0160     with getOpenDataDetector():
0161         pass
0162 
0163     csv = tmp_path / "csv"
0164     csv.mkdir()
0165 
0166     root_files = [
0167         "particles_simulation.root",
0168         "hits.root",
0169     ]
0170 
0171     assert len(list(csv.iterdir())) == 0
0172     for rf in root_files:
0173         assert not (tmp_path / rf).exists()
0174 
0175     script = (
0176         Path(__file__).parent.parent.parent.parent
0177         / "Examples"
0178         / "Scripts"
0179         / "Python"
0180         / "geant4.py"
0181     )
0182     assert script.exists()
0183     env = os.environ.copy()
0184     env["ACTS_LOG_FAILURE_THRESHOLD"] = "WARNING"
0185     try:
0186         subprocess.check_call(
0187             [sys.executable, str(script)],
0188             cwd=tmp_path,
0189             env=env,
0190             stderr=subprocess.STDOUT,
0191         )
0192     except subprocess.CalledProcessError as e:
0193         if e.output is not None:
0194             print(e.output.decode("utf-8"))
0195         if e.stderr is not None:
0196             print(e.stderr.decode("utf-8"))
0197         raise
0198 
0199     assert_csv_output(csv, "particles_simulated")
0200     assert_csv_output(csv, "hits")
0201     for f in root_files:
0202         rfp = tmp_path / f
0203         assert rfp.exists()
0204         assert rfp.stat().st_size > 2**10 * 10
0205 
0206         assert_root_hash(f, rfp)
0207 
0208 
0209 def test_seeding(tmp_path, trk_geo, field, assert_root_hash):
0210     from seeding import runSeeding
0211 
0212     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0213 
0214     csv = tmp_path / "csv"
0215     csv.mkdir()
0216 
0217     seq = Sequencer(events=10, numThreads=1)
0218 
0219     root_files = [
0220         (
0221             "estimatedparams.root",
0222             "estimatedparams",
0223         ),
0224         (
0225             "performance_seeding.root",
0226             None,
0227         ),
0228         (
0229             "particles.root",
0230             "particles",
0231         ),
0232         (
0233             "particles_simulation.root",
0234             "particles",
0235         ),
0236     ]
0237 
0238     for fn, _ in root_files:
0239         fp = tmp_path / fn
0240         assert not fp.exists()
0241 
0242     assert len(list(csv.iterdir())) == 0
0243 
0244     runSeeding(trk_geo, field, outputDir=str(tmp_path), s=seq).run()
0245 
0246     for fn, tn in root_files:
0247         fp = tmp_path / fn
0248         assert fp.exists()
0249         assert fp.stat().st_size > 100
0250 
0251         if tn is not None:
0252             assert_has_entries(fp, tn)
0253             assert_root_hash(fn, fp)
0254 
0255     assert_csv_output(csv, "particles")
0256     assert_csv_output(csv, "particles_simulated")
0257 
0258 
0259 @pytest.mark.slow
0260 @pytest.mark.skipif(not hashingSeedingEnabled, reason="HashingSeeding not set up")
0261 def test_hashing_seeding(tmp_path, trk_geo, field, assert_root_hash):
0262     from hashing_seeding import runHashingSeeding, Config
0263 
0264     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0265 
0266     seq = Sequencer(events=10, numThreads=1)
0267 
0268     rnd = acts.examples.RandomNumbers(seed=4242)
0269 
0270     root_files = [
0271         (
0272             "estimatedparams.root",
0273             "estimatedparams",
0274         ),
0275         (
0276             "performance_seeding.root",
0277             None,
0278         ),
0279     ]
0280 
0281     for fn, _ in root_files:
0282         fp = tmp_path / fn
0283         assert not fp.exists(), f"{fp} exists"
0284 
0285     config = Config(
0286         mu=50,
0287     )
0288 
0289     _, _, digiConfig, geoSelectionConfigFile = config.getDetectorInfo()
0290 
0291     runHashingSeeding(
0292         10,
0293         trk_geo,
0294         field,
0295         outputDir=str(tmp_path),
0296         saveFiles=True,
0297         npileup=config.mu,
0298         seedingAlgorithm=config.seedingAlgorithm,
0299         maxSeedsPerSpM=config.maxSeedsPerSpM,
0300         digiConfig=digiConfig,
0301         geoSelectionConfigFile=geoSelectionConfigFile,
0302         config=config,
0303         s=seq,
0304         rnd=rnd,
0305     ).run()
0306 
0307     del seq
0308 
0309     for fn, tn in root_files:
0310         fp = tmp_path / fn
0311         assert fp.exists(), f"{fp} does not exist"
0312         assert fp.stat().st_size > 100, f"{fp} is too small: {fp.stat().st_size} bytes"
0313 
0314         if tn is not None:
0315             assert_has_entries(fp, tn)
0316             assert_root_hash(fn, fp)
0317 
0318     assert_csv_output(tmp_path, "particles_simulated")
0319     assert_csv_output(tmp_path, "buckets")
0320     assert_csv_output(tmp_path, "seed")
0321 
0322 
0323 def test_seeding_orthogonal(tmp_path, trk_geo, field, assert_root_hash):
0324     from seeding import runSeeding, SeedingAlgorithm
0325 
0326     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0327 
0328     csv = tmp_path / "csv"
0329     csv.mkdir()
0330 
0331     seq = Sequencer(events=10, numThreads=1)
0332 
0333     root_files = [
0334         (
0335             "estimatedparams.root",
0336             "estimatedparams",
0337         ),
0338         (
0339             "performance_seeding.root",
0340             None,
0341         ),
0342         (
0343             "particles.root",
0344             "particles",
0345         ),
0346         (
0347             "particles_simulation.root",
0348             "particles",
0349         ),
0350     ]
0351 
0352     for fn, _ in root_files:
0353         fp = tmp_path / fn
0354         assert not fp.exists()
0355 
0356     assert len(list(csv.iterdir())) == 0
0357 
0358     runSeeding(
0359         trk_geo,
0360         field,
0361         outputDir=str(tmp_path),
0362         s=seq,
0363         seedingAlgorithm=SeedingAlgorithm.OrthogonalTriplet,
0364     ).run()
0365 
0366     for fn, tn in root_files:
0367         fp = tmp_path / fn
0368         assert fp.exists()
0369         assert fp.stat().st_size > 100
0370 
0371         if tn is not None:
0372             assert_has_entries(fp, tn)
0373             assert_root_hash(fn, fp)
0374 
0375     assert_csv_output(csv, "particles")
0376     assert_csv_output(csv, "particles_simulated")
0377 
0378 
0379 def test_itk_seeding(tmp_path, trk_geo, field, assert_root_hash):
0380     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0381 
0382     csv = tmp_path / "csv"
0383     csv.mkdir()
0384 
0385     seq = Sequencer(events=10, numThreads=1)
0386 
0387     root_files = [
0388         (
0389             "estimatedparams.root",
0390             "estimatedparams",
0391         ),
0392         (
0393             "performance_seeding.root",
0394             None,
0395         ),
0396         (
0397             "particles.root",
0398             "particles",
0399         ),
0400         (
0401             "particles_simulation.root",
0402             "particles",
0403         ),
0404     ]
0405 
0406     for fn, _ in root_files:
0407         fp = tmp_path / fn
0408         assert not fp.exists()
0409 
0410     assert len(list(csv.iterdir())) == 0
0411 
0412     rnd = acts.examples.RandomNumbers(seed=42)
0413 
0414     from acts.examples.simulation import (
0415         addParticleGun,
0416         EtaConfig,
0417         MomentumConfig,
0418         ParticleConfig,
0419         addFatras,
0420         addDigitization,
0421         ParticleSelectorConfig,
0422         addDigiParticleSelection,
0423     )
0424 
0425     addParticleGun(
0426         seq,
0427         MomentumConfig(1.0 * u.GeV, 10.0 * u.GeV, True),
0428         EtaConfig(-4.0, 4.0, True),
0429         ParticleConfig(1, acts.PdgParticle.eMuon, True),
0430         outputDirCsv=tmp_path / "csv",
0431         outputDirRoot=str(tmp_path),
0432         rnd=rnd,
0433     )
0434 
0435     addFatras(
0436         seq,
0437         trk_geo,
0438         field,
0439         outputDirCsv=tmp_path / "csv",
0440         outputDirRoot=str(tmp_path),
0441         rnd=rnd,
0442     )
0443 
0444     srcdir = Path(__file__).resolve().parent.parent.parent.parent
0445     addDigitization(
0446         seq,
0447         trk_geo,
0448         field,
0449         digiConfigFile=srcdir / "Examples/Configs/generic-digi-smearing-config.json",
0450         rnd=rnd,
0451     )
0452 
0453     addDigiParticleSelection(
0454         seq,
0455         ParticleSelectorConfig(
0456             pt=(0.9 * u.GeV, None),
0457             eta=(-4, 4),
0458             measurements=(9, None),
0459             removeNeutral=True,
0460         ),
0461     )
0462 
0463     from acts.examples.reconstruction import (
0464         addSeeding,
0465     )
0466     from acts.examples.itk import itkSeedingAlgConfig, InputSpacePointsType
0467 
0468     addSeeding(
0469         seq,
0470         trk_geo,
0471         field,
0472         *itkSeedingAlgConfig(InputSpacePointsType.PixelSpacePoints),
0473         acts.logging.VERBOSE,
0474         geoSelectionConfigFile=srcdir / "Examples/Configs/generic-seeding-config.json",
0475         outputDirRoot=str(tmp_path),
0476     )
0477 
0478     seq.run()
0479 
0480     for fn, tn in root_files:
0481         fp = tmp_path / fn
0482         assert fp.exists()
0483         assert fp.stat().st_size > 100
0484 
0485         if tn is not None:
0486             assert_has_entries(fp, tn)
0487             assert_root_hash(fn, fp)
0488 
0489     assert_csv_output(csv, "particles")
0490     assert_csv_output(csv, "particles_simulated")
0491 
0492 
0493 @pytest.mark.slow
0494 def test_propagation(tmp_path, trk_geo, field, seq, assert_root_hash):
0495     from propagation import runPropagation
0496 
0497     root_files = [
0498         (
0499             "propagation_summary.root",
0500             "propagation_summary",
0501             10000,
0502         )
0503     ]
0504 
0505     for fn, _, _ in root_files:
0506         fp = tmp_path / fn
0507         assert not fp.exists()
0508 
0509     runPropagation(trk_geo, field, str(tmp_path), s=seq).run()
0510 
0511     for fn, tn, ee in root_files:
0512         fp = tmp_path / fn
0513         assert fp.exists()
0514         assert fp.stat().st_size > 2**10 * 50
0515         assert_entries(fp, tn, ee)
0516         assert_root_hash(fn, fp)
0517 
0518 
0519 @pytest.mark.slow
0520 @pytest.mark.odd
0521 @pytest.mark.skipif(not geant4Enabled, reason="Geant4 not set up")
0522 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0523 def test_material_recording(tmp_path, material_recording, assert_root_hash):
0524     root_files = [
0525         (
0526             "geant4_material_tracks.root",
0527             "material-tracks",
0528             200,
0529         )
0530     ]
0531 
0532     for fn, tn, ee in root_files:
0533         fp = material_recording / fn
0534         assert fp.exists()
0535         assert fp.stat().st_size > 2**10 * 50
0536         assert_entries(fp, tn, ee)
0537         assert_root_hash(fn, fp)
0538 
0539 
0540 @pytest.mark.parametrize("revFiltMomThresh", [0 * u.GeV, 1 * u.TeV])
0541 def test_truth_tracking_kalman(
0542     tmp_path, assert_root_hash, revFiltMomThresh, detector_config
0543 ):
0544     root_files = [
0545         ("trackstates_kf.root", "trackstates", 19),
0546         ("tracksummary_kf.root", "tracksummary", 10),
0547         ("performance_kf.root", None, -1),
0548     ]
0549 
0550     for fn, _, _ in root_files:
0551         fp = tmp_path / fn
0552         assert not fp.exists()
0553 
0554     with detector_config.detector:
0555         from truth_tracking_kalman import runTruthTrackingKalman
0556 
0557         field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0558 
0559         seq = Sequencer(events=10, numThreads=1)
0560 
0561         runTruthTrackingKalman(
0562             trackingGeometry=detector_config.trackingGeometry,
0563             field=field,
0564             digiConfigFile=detector_config.digiConfigFile,
0565             outputDir=tmp_path,
0566             reverseFilteringMomThreshold=revFiltMomThresh,
0567             s=seq,
0568         )
0569 
0570         seq.run()
0571 
0572     for fn, tn, ee in root_files:
0573         fp = tmp_path / fn
0574         assert fp.exists()
0575         assert fp.stat().st_size > 1024
0576         if tn is not None:
0577             assert_has_entries(fp, tn)
0578             assert_root_hash(fn, fp)
0579 
0580     import ROOT
0581 
0582     ROOT.PyConfig.IgnoreCommandLineOptions = True
0583     ROOT.gROOT.SetBatch(True)
0584     rf = ROOT.TFile.Open(str(tmp_path / "tracksummary_kf.root"))
0585     keys = [k.GetName() for k in rf.GetListOfKeys()]
0586     assert "tracksummary" in keys
0587     for entry in rf.Get("tracksummary"):
0588         assert entry.hasFittedParams
0589 
0590 
0591 def test_truth_tracking_gsf(tmp_path, assert_root_hash, detector_config):
0592     from truth_tracking_gsf import runTruthTrackingGsf
0593 
0594     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0595 
0596     seq = Sequencer(
0597         events=10,
0598         numThreads=1,
0599         fpeMasks=[
0600             (
0601                 "Core/include/Acts/TrackFitting/detail/GsfUtils.hpp:197",
0602                 acts.FpeType.FLTUND,
0603                 1,
0604             ),
0605         ],
0606     )
0607 
0608     root_files = [
0609         ("trackstates_gsf.root", "trackstates"),
0610         ("tracksummary_gsf.root", "tracksummary"),
0611     ]
0612 
0613     for fn, _ in root_files:
0614         fp = tmp_path / fn
0615         assert not fp.exists()
0616 
0617     with detector_config.detector:
0618         runTruthTrackingGsf(
0619             trackingGeometry=detector_config.trackingGeometry,
0620             decorators=detector_config.decorators,
0621             field=field,
0622             digiConfigFile=detector_config.digiConfigFile,
0623             outputDir=tmp_path,
0624             s=seq,
0625         )
0626 
0627         # See https://github.com/acts-project/acts/issues/1300
0628         with failure_threshold(acts.logging.FATAL):
0629             seq.run()
0630 
0631     for fn, tn in root_files:
0632         fp = tmp_path / fn
0633         assert fp.exists()
0634         assert fp.stat().st_size > 1024
0635         if tn is not None:
0636             assert_root_hash(fn, fp)
0637 
0638 
0639 def test_refitting(tmp_path, detector_config, assert_root_hash):
0640     from truth_tracking_gsf_refitting import runRefittingGsf
0641 
0642     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0643 
0644     seq = Sequencer(
0645         events=10,
0646         numThreads=1,
0647     )
0648 
0649     with detector_config.detector:
0650         # Only check if it runs without errors right known
0651         # Changes in fitter behaviour should be caught by other tests
0652         runRefittingGsf(
0653             trackingGeometry=detector_config.trackingGeometry,
0654             field=field,
0655             digiConfigFile=detector_config.digiConfigFile,
0656             outputDir=tmp_path,
0657             s=seq,
0658         ).run()
0659 
0660     root_files = [
0661         ("trackstates_gsf_refit.root", "trackstates"),
0662         ("tracksummary_gsf_refit.root", "tracksummary"),
0663     ]
0664 
0665     for fn, tn in root_files:
0666         fp = tmp_path / fn
0667         assert fp.exists()
0668         assert fp.stat().st_size > 1024
0669         if tn is not None:
0670             assert_root_hash(fn, fp)
0671 
0672 
0673 def test_particle_gun(tmp_path, assert_root_hash):
0674     from particle_gun import runParticleGun
0675 
0676     s = Sequencer(events=20, numThreads=-1)
0677 
0678     csv_dir = tmp_path / "csv"
0679     root_file = tmp_path / "particles.root"
0680 
0681     assert not csv_dir.exists()
0682     assert not root_file.exists()
0683 
0684     runParticleGun(str(tmp_path), s=s).run()
0685 
0686     assert csv_dir.exists()
0687     assert root_file.exists()
0688 
0689     assert len([f for f in csv_dir.iterdir() if f.name.endswith("particles.csv")]) > 0
0690     assert all([f.stat().st_size > 100 for f in csv_dir.iterdir()])
0691 
0692     assert root_file.stat().st_size > 200
0693     assert_entries(root_file, "particles", 20)
0694     assert_root_hash(root_file.name, root_file)
0695 
0696 
0697 @pytest.mark.slow
0698 @pytest.mark.odd
0699 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0700 def test_material_mapping(material_recording, tmp_path, assert_root_hash):
0701     from material_mapping import runMaterialMapping
0702     from material_validation import runMaterialValidation
0703 
0704     map_file = tmp_path / "material-map_tracks.root"
0705     assert not map_file.exists()
0706 
0707     odd_dir = getOpenDataDetectorDirectory()
0708     config = acts.MaterialMapJsonConverter.Config()
0709     materialDecorator = acts.JsonMaterialDecorator(
0710         level=acts.logging.INFO,
0711         rConfig=config,
0712         jFileName=str(odd_dir / "config/odd-material-mapping-config.json"),
0713     )
0714 
0715     s = Sequencer(numThreads=1)
0716 
0717     with getOpenDataDetector(materialDecorator) as detector:
0718         trackingGeometry = detector.trackingGeometry()
0719         decorators = detector.contextDecorators()
0720 
0721         runMaterialMapping(
0722             trackingGeometry,
0723             decorators,
0724             outputDir=str(tmp_path),
0725             inputDir=material_recording,
0726             mappingStep=1,
0727             s=s,
0728         )
0729 
0730         s.run()
0731 
0732     mat_file = tmp_path / "material-map.json"
0733 
0734     assert mat_file.exists()
0735     assert mat_file.stat().st_size > 10
0736 
0737     with mat_file.open() as fh:
0738         assert json.load(fh)
0739 
0740     assert map_file.exists()
0741     assert_entries(map_file, "material-tracks", 200)
0742     assert_root_hash(map_file.name, map_file)
0743 
0744     val_file = tmp_path / "propagation-material.root"
0745     assert not val_file.exists()
0746 
0747     # test the validation as well
0748 
0749     field = acts.NullBField()
0750 
0751     s = Sequencer(events=10, numThreads=1)
0752 
0753     with getOpenDataDetector(
0754         materialDecorator=acts.IMaterialDecorator.fromFile(mat_file)
0755     ) as detector:
0756         trackingGeometry = detector.trackingGeometry()
0757         decorators = detector.contextDecorators()
0758 
0759         runMaterialValidation(
0760             10, 1000, trackingGeometry, decorators, field, outputDir=str(tmp_path), s=s
0761         )
0762 
0763         s.run()
0764 
0765     assert val_file.exists()
0766     assert_entries(val_file, "material-tracks", 10000)
0767     assert_root_hash(val_file.name, val_file)
0768 
0769 
0770 @pytest.mark.slow
0771 @pytest.mark.odd
0772 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0773 def test_volume_material_mapping(material_recording, tmp_path, assert_root_hash):
0774     from material_mapping import runMaterialMapping
0775     from material_validation import runMaterialValidation
0776 
0777     map_file = tmp_path / "material-map-volume_tracks.root"
0778     assert not map_file.exists()
0779 
0780     geo_map = Path(__file__).parent / "geometry-volume-map.json"
0781     assert geo_map.exists()
0782     assert geo_map.stat().st_size > 10
0783     with geo_map.open() as fh:
0784         assert json.load(fh)
0785 
0786     s = Sequencer(numThreads=1)
0787 
0788     with getOpenDataDetector(
0789         materialDecorator=acts.IMaterialDecorator.fromFile(geo_map)
0790     ) as detector:
0791         trackingGeometry = detector.trackingGeometry()
0792         decorators = detector.contextDecorators()
0793 
0794         runMaterialMapping(
0795             trackingGeometry,
0796             decorators,
0797             mapName="material-map-volume",
0798             outputDir=str(tmp_path),
0799             inputDir=material_recording,
0800             mappingStep=1,
0801             s=s,
0802         )
0803 
0804         s.run()
0805 
0806     mat_file = tmp_path / "material-map-volume.json"
0807 
0808     assert mat_file.exists()
0809     assert mat_file.stat().st_size > 10
0810 
0811     with mat_file.open() as fh:
0812         assert json.load(fh)
0813 
0814     assert map_file.exists()
0815     assert_entries(map_file, "material-tracks", 200)
0816     assert_root_hash(map_file.name, map_file)
0817 
0818     val_file = tmp_path / "propagation-volume-material.root"
0819     assert not val_file.exists()
0820 
0821     # test the validation as well
0822 
0823     field = acts.NullBField()
0824 
0825     s = Sequencer(events=10, numThreads=1)
0826 
0827     with getOpenDataDetector(
0828         materialDecorator=acts.IMaterialDecorator.fromFile(mat_file)
0829     ) as detector:
0830         trackingGeometry = detector.trackingGeometry()
0831         decorators = detector.contextDecorators()
0832 
0833         runMaterialValidation(
0834             10,
0835             1000,
0836             trackingGeometry,
0837             decorators,
0838             field,
0839             outputDir=str(tmp_path),
0840             outputName="propagation-volume-material",
0841             s=s,
0842         )
0843 
0844         s.run()
0845 
0846     assert val_file.exists()
0847 
0848     assert_root_hash(val_file.name, val_file)
0849 
0850 
0851 ACTS_DIR = Path(__file__).parent.parent.parent.parent
0852 CONFIG_DIR = ACTS_DIR / "Examples/Configs"
0853 DIGI_SHARE_DIR = (
0854     Path(__file__).parent.parent.parent.parent
0855     / "Examples/Algorithms/Digitization/share"
0856 )
0857 
0858 
0859 @pytest.mark.parametrize(
0860     "digi_config_file",
0861     [
0862         CONFIG_DIR / "generic-digi-smearing-config.json",
0863         CONFIG_DIR / "generic-digi-geometric-config.json",
0864     ],
0865     ids=["smeared", "geometric"],
0866 )
0867 def test_digitization_example(trk_geo, tmp_path, assert_root_hash, digi_config_file):
0868     from digitization import runDigitization
0869 
0870     s = Sequencer(events=10, numThreads=-1)
0871 
0872     csv_dir = tmp_path / "csv"
0873     root_file = tmp_path / "measurements.root"
0874 
0875     assert not root_file.exists()
0876     assert not csv_dir.exists()
0877 
0878     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0879     runDigitization(
0880         trk_geo, field, outputDir=tmp_path, digiConfigFile=digi_config_file, s=s
0881     )
0882 
0883     s.run()
0884 
0885     assert root_file.exists()
0886     assert csv_dir.exists()
0887 
0888     assert len(list(csv_dir.iterdir())) == 3 * s.config.events
0889     assert all(f.stat().st_size > 50 for f in csv_dir.iterdir())
0890 
0891     assert_root_hash(root_file.name, root_file)
0892 
0893 
0894 @pytest.mark.parametrize(
0895     "digi_config_file",
0896     [
0897         CONFIG_DIR / "generic-digi-smearing-config.json",
0898         CONFIG_DIR / "generic-digi-geometric-config.json",
0899         pytest.param(
0900             (ACTS_DIR / "Examples/Configs" / "odd-digi-smearing-config.json"),
0901             marks=[
0902                 pytest.mark.odd,
0903             ],
0904         ),
0905         pytest.param(
0906             (ACTS_DIR / "Examples/Configs" / "odd-digi-geometric-config.json"),
0907             marks=[
0908                 pytest.mark.odd,
0909             ],
0910         ),
0911     ],
0912     ids=["smeared", "geometric", "odd-smeared", "odd-geometric"],
0913 )
0914 def test_digitization_example_input_parsing(digi_config_file):
0915     from acts.examples import readDigiConfigFromJson
0916 
0917     readDigiConfigFromJson(str(digi_config_file))
0918 
0919 
0920 @pytest.mark.parametrize(
0921     "digi_config_file",
0922     [
0923         CONFIG_DIR / "generic-digi-smearing-config.json",
0924         CONFIG_DIR / "generic-digi-geometric-config.json",
0925     ],
0926     ids=["smeared", "geometric"],
0927 )
0928 def test_digitization_example_input(
0929     trk_geo, tmp_path, assert_root_hash, digi_config_file
0930 ):
0931     from particle_gun import runParticleGun
0932     from digitization import runDigitization
0933 
0934     ptcl_dir = tmp_path / "ptcl"
0935     ptcl_dir.mkdir()
0936     pgs = Sequencer(events=20, numThreads=-1)
0937     runParticleGun(str(ptcl_dir), s=pgs)
0938 
0939     pgs.run()
0940 
0941     s = Sequencer(numThreads=-1)
0942 
0943     csv_dir = tmp_path / "csv"
0944     root_file = tmp_path / "measurements.root"
0945 
0946     assert not root_file.exists()
0947     assert not csv_dir.exists()
0948 
0949     assert_root_hash(
0950         "particles.root",
0951         ptcl_dir / "particles.root",
0952     )
0953 
0954     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0955     runDigitization(
0956         trk_geo,
0957         field,
0958         outputDir=tmp_path,
0959         digiConfigFile=digi_config_file,
0960         particlesInput=ptcl_dir / "particles.root",
0961         s=s,
0962         doMerge=True,
0963     )
0964 
0965     s.run()
0966 
0967     assert root_file.exists()
0968     assert csv_dir.exists()
0969 
0970     assert len(list(csv_dir.iterdir())) == 3 * pgs.config.events
0971     assert all(f.stat().st_size > 50 for f in csv_dir.iterdir())
0972 
0973     assert_root_hash(root_file.name, root_file)
0974 
0975 
0976 def test_digitization_config_example(trk_geo, tmp_path):
0977     from digitization_config import runDigitizationConfig
0978 
0979     out_file = tmp_path / "output.json"
0980     assert not out_file.exists()
0981 
0982     input = (
0983         Path(__file__).parent
0984         / "../../../Examples/Configs/generic-digi-smearing-config.json"
0985     )
0986     assert input.exists(), input.resolve()
0987 
0988     runDigitizationConfig(trk_geo, input=input, output=out_file)
0989 
0990     assert out_file.exists()
0991 
0992     with out_file.open() as fh:
0993         data = json.load(fh)
0994     assert len(data.keys()) == 2
0995     assert data["acts-geometry-hierarchy-map"]["format-version"] == 0
0996     assert (
0997         data["acts-geometry-hierarchy-map"]["value-identifier"]
0998         == "digitization-configuration"
0999     )
1000     assert len(data["entries"]) == 27
1001 
1002 
1003 @pytest.mark.parametrize(
1004     "truthSmeared,truthEstimated",
1005     [
1006         [False, False],
1007         [False, True],
1008         [True, False],
1009     ],
1010     ids=["full_seeding", "truth_estimated", "truth_smeared"],
1011 )
1012 @pytest.mark.slow
1013 def test_ckf_tracks_example(
1014     tmp_path, assert_root_hash, truthSmeared, truthEstimated, detector_config
1015 ):
1016     csv = tmp_path / "csv"
1017 
1018     assert not csv.exists()
1019 
1020     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
1021     events = 100
1022     s = Sequencer(events=events, numThreads=-1)
1023 
1024     root_files = [
1025         (
1026             "performance_finding_ckf.root",
1027             None,
1028         ),
1029         (
1030             "trackstates_ckf.root",
1031             "trackstates",
1032         ),
1033         (
1034             "tracksummary_ckf.root",
1035             "tracksummary",
1036         ),
1037     ]
1038 
1039     if not truthSmeared:
1040         root_files += [
1041             (
1042                 "performance_seeding.root",
1043                 None,
1044             ),
1045         ]
1046 
1047     for rf, _ in root_files:
1048         assert not (tmp_path / rf).exists()
1049 
1050     from ckf_tracks import runCKFTracks
1051 
1052     with detector_config.detector:
1053         runCKFTracks(
1054             detector_config.trackingGeometry,
1055             detector_config.decorators,
1056             field=field,
1057             outputCsv=True,
1058             outputDir=tmp_path,
1059             geometrySelection=detector_config.geometrySelection,
1060             digiConfigFile=detector_config.digiConfigFile,
1061             truthSmearedSeeded=truthSmeared,
1062             truthEstimatedSeeded=truthEstimated,
1063             s=s,
1064         )
1065 
1066         s.run()
1067 
1068     assert csv.exists()
1069     for rf, tn in root_files:
1070         rp = tmp_path / rf
1071         assert rp.exists()
1072         if tn is not None:
1073             assert_root_hash(rf, rp)
1074 
1075     assert (
1076         len([f for f in csv.iterdir() if f.name.endswith("tracks_ckf.csv")]) == events
1077     )
1078     assert all([f.stat().st_size > 300 for f in csv.iterdir()])
1079 
1080 
1081 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
1082 @pytest.mark.odd
1083 @pytest.mark.slow
1084 def test_full_chain_odd_example(tmp_path):
1085     # This test literally only ensures that the full chain example can run without erroring out
1086 
1087     # just to make sure it can build the odd
1088     with getOpenDataDetector():
1089         pass
1090 
1091     script = (
1092         Path(__file__).parent.parent.parent.parent
1093         / "Examples"
1094         / "Scripts"
1095         / "Python"
1096         / "full_chain_odd.py"
1097     )
1098     assert script.exists()
1099     env = os.environ.copy()
1100     env["ACTS_LOG_FAILURE_THRESHOLD"] = "ERROR"
1101     try:
1102         subprocess.check_call(
1103             [sys.executable, str(script), "-n1"],
1104             cwd=tmp_path,
1105             env=env,
1106             stderr=subprocess.STDOUT,
1107         )
1108     except subprocess.CalledProcessError as e:
1109         print(e.output.decode("utf-8"))
1110         raise
1111 
1112 
1113 @pytest.mark.skipif(
1114     not dd4hepEnabled or not geant4Enabled, reason="DD4hep and/or Geant4 not set up"
1115 )
1116 @pytest.mark.slow
1117 def test_full_chain_odd_example_pythia_geant4(tmp_path):
1118     # This test literally only ensures that the full chain example can run without erroring out
1119 
1120     # just to make sure it can build the odd
1121     with getOpenDataDetector():
1122         pass
1123 
1124     script = (
1125         Path(__file__).parent.parent.parent.parent
1126         / "Examples"
1127         / "Scripts"
1128         / "Python"
1129         / "full_chain_odd.py"
1130     )
1131     assert script.exists()
1132     env = os.environ.copy()
1133     env["ACTS_LOG_FAILURE_THRESHOLD"] = "ERROR"
1134     try:
1135         stdout = subprocess.check_output(
1136             [
1137                 sys.executable,
1138                 str(script),
1139                 "-n1",
1140                 "--geant4",
1141                 "--ttbar",
1142                 "--ttbar-pu",
1143                 "50",
1144             ],
1145             cwd=tmp_path,
1146             env=env,
1147             stderr=subprocess.STDOUT,
1148         )
1149         stdout = stdout.decode("utf-8")
1150     except subprocess.CalledProcessError as e:
1151         print(e.output.decode("utf-8"))
1152         raise
1153 
1154     # collect and compare known errors
1155     errors = []
1156     error_regex = re.compile(r"^\d\d:\d\d:\d\d\s+(\w+)\s+ERROR\s+", re.MULTILINE)
1157     for match in error_regex.finditer(stdout):
1158         (algo,) = match.groups()
1159         errors.append(algo)
1160     errors = collections.Counter(errors)
1161     assert dict(errors) == {}, stdout
1162 
1163 
1164 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
1165 @pytest.mark.skipif(not onnxEnabled, reason="ONNX plugin not enabled")
1166 @pytest.mark.slow
1167 def test_ML_Ambiguity_Solver(tmp_path, assert_root_hash):
1168     # This test literally only ensures that the full chain example can run without erroring out
1169 
1170     root_file = "performance_finding_ambiML.root"
1171     output_dir = "odd_output"
1172     assert not (tmp_path / root_file).exists()
1173 
1174     # just to make sure it can build the odd
1175     with getOpenDataDetector():
1176         pass
1177 
1178     script = (
1179         Path(__file__).parent.parent.parent.parent
1180         / "Examples"
1181         / "Scripts"
1182         / "Python"
1183         / "full_chain_odd.py"
1184     )
1185     assert script.exists()
1186     env = os.environ.copy()
1187     env["ACTS_LOG_FAILURE_THRESHOLD"] = "ERROR"
1188     try:
1189         subprocess.check_call(
1190             [sys.executable, str(script), "-n1", "--ambi-solver", "ML"],
1191             cwd=tmp_path,
1192             env=env,
1193             stderr=subprocess.STDOUT,
1194         )
1195     except subprocess.CalledProcessError as e:
1196         print(e.output.decode("utf-8"))
1197         raise
1198 
1199     rfp = tmp_path / output_dir / root_file
1200     assert rfp.exists()
1201 
1202     assert_root_hash(root_file, rfp)
1203 
1204 
1205 def test_bfield_writing(tmp_path, seq, assert_root_hash):
1206     from bfield_writing import runBFieldWriting
1207 
1208     root_files = [
1209         ("solenoid.root", "solenoid", 100),
1210         ("solenoid2.root", "solenoid", 100),
1211     ]
1212 
1213     for fn, _, _ in root_files:
1214         fp = tmp_path / fn
1215         assert not fp.exists()
1216 
1217     runBFieldWriting(outputDir=tmp_path, rewrites=1)
1218 
1219     for fn, tn, ee in root_files:
1220         fp = tmp_path / fn
1221         assert fp.exists()
1222         assert fp.stat().st_size > 2**10 * 2
1223         assert_entries(fp, tn, ee)
1224         assert_root_hash(fn, fp)
1225 
1226 
1227 @pytest.mark.parametrize("backend", ["onnx", "torch"])
1228 @pytest.mark.parametrize("hardware", ["cpu", "gpu"])
1229 @pytest.mark.skipif(not gnnEnabled, reason="Gnn environment not set up")
1230 def test_gnn(tmp_path, trk_geo, field, assert_root_hash, backend, hardware):
1231     if backend == "onnx" and hardware == "cpu":
1232         pytest.skip("Combination of ONNX and CPU not yet supported")
1233 
1234     if backend == "torch":
1235         pytest.skip(
1236             "Disabled torch support until replacement for torch-scatter is found"
1237         )
1238 
1239     root_file = "performance_track_finding.root"
1240     assert not (tmp_path / root_file).exists()
1241 
1242     # Extract both models, since we currently don't have a working implementation
1243     # of metric learning with ONNX and we need to use torch here
1244     onnx_url = "https://acts.web.cern.ch/ci/exatrkx/onnx_models_v01.tar"
1245     torch_url = "https://acts.web.cern.ch/ci/exatrkx/torchscript_models_v01.tar"
1246 
1247     for url in [onnx_url, torch_url]:
1248         tarfile_name = tmp_path / "models.tar"
1249         urllib.request.urlretrieve(url, tarfile_name)
1250         tarfile.open(tarfile_name).extractall(tmp_path)
1251 
1252     shutil.copyfile(
1253         tmp_path / "torchscript_models/embed.pt", tmp_path / "onnx_models/embed.pt"
1254     )
1255 
1256     script = (
1257         Path(__file__).parent.parent.parent.parent
1258         / "Examples"
1259         / "Scripts"
1260         / "Python"
1261         / "gnn.py"
1262     )
1263     assert script.exists()
1264     env = os.environ.copy()
1265     env["ACTS_LOG_FAILURE_THRESHOLD"] = "WARNING"
1266 
1267     if hardware == "cpu":
1268         env["CUDA_VISIBLE_DEVICES"] = ""
1269 
1270     try:
1271         subprocess.check_call(
1272             [sys.executable, str(script), backend],
1273             cwd=tmp_path,
1274             env=env,
1275             stderr=subprocess.STDOUT,
1276         )
1277     except subprocess.CalledProcessError as e:
1278         print(e.output.decode("utf-8"))
1279         raise
1280 
1281     rfp = tmp_path / root_file
1282     assert rfp.exists()
1283 
1284     assert_root_hash(root_file, rfp)
1285 
1286 
1287 @pytest.mark.odd
1288 def test_strip_spacepoints(detector_config, field, tmp_path, assert_root_hash):
1289     if detector_config.name == "generic":
1290         pytest.skip("No strip spacepoint formation for the generic detector currently")
1291 
1292     from strip_spacepoints import createStripSpacepoints
1293 
1294     s = Sequencer(events=20, numThreads=-1)
1295 
1296     config_path = Path(__file__).parent.parent.parent.parent / "Examples" / "Configs"
1297 
1298     geo_selection = config_path / "odd-strip-spacepoint-selection.json"
1299     digi_config_file = config_path / "odd-digi-smearing-config.json"
1300 
1301     with detector_config.detector:
1302         createStripSpacepoints(
1303             trackingGeometry=detector_config.trackingGeometry,
1304             field=field,
1305             digiConfigFile=digi_config_file,
1306             geoSelection=geo_selection,
1307             outputDir=tmp_path,
1308             s=s,
1309         ).run()
1310 
1311     root_file = "strip_spacepoints.root"
1312     rfp = tmp_path / root_file
1313 
1314     assert_root_hash(root_file, rfp)
1315 
1316 
1317 @pytest.mark.skipif(not geant4Enabled, reason="Geant4 not set up")
1318 @pytest.mark.skipif(not geomodelEnabled, reason="Geomodel not set up")
1319 def test_geomodel_G4(tmp_path):
1320     script = (
1321         Path(__file__).parent.parent.parent.parent
1322         / "Examples"
1323         / "Scripts"
1324         / "Python"
1325         / "geomodel_G4.py"
1326     )
1327     assert script.exists()
1328     # Prepare arguments for the script
1329     mockup_det = "Muon"
1330     out_dir = tmp_path / "geomodel_g4_out"
1331     out_dir.mkdir()
1332     args = [
1333         "python3",
1334         str(script),
1335         "--mockupDetector",
1336         str(mockup_det),
1337         "--outDir",
1338         str(out_dir),
1339     ]
1340     subprocess.check_call(args)
1341 
1342     assert (out_dir / "obj").exists()