Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-09-13 08:13:54

0001 from pathlib import Path
0002 import os
0003 import json
0004 import functools
0005 import tarfile
0006 import urllib.request
0007 import subprocess
0008 import sys
0009 import re
0010 import collections
0011 import shutil
0012 
0013 import pytest
0014 
0015 from helpers import (
0016     geant4Enabled,
0017     geomodelEnabled,
0018     dd4hepEnabled,
0019     hepmc3Enabled,
0020     pythia8Enabled,
0021     gnnEnabled,
0022     onnxEnabled,
0023     hashingSeedingEnabled,
0024     AssertCollectionExistsAlg,
0025     failure_threshold,
0026 )
0027 
0028 import acts
0029 from acts.examples import (
0030     Sequencer,
0031     GenericDetector,
0032 )
0033 from acts.examples.odd import getOpenDataDetector, getOpenDataDetectorDirectory
0034 
0035 
0036 u = acts.UnitConstants
0037 
0038 
0039 @pytest.fixture
0040 def field():
0041     return acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0042 
0043 
0044 @pytest.fixture
0045 def seq():
0046     return Sequencer(events=10, numThreads=1)
0047 
0048 
0049 def assert_csv_output(csv_path, stem):
0050     __tracebackhide__ = True
0051     # print(list(csv_path.iterdir()))
0052     assert len([f for f in csv_path.iterdir() if f.name.endswith(stem + ".csv")]) > 0
0053     assert all(
0054         [
0055             f.stat().st_size > 100
0056             for f in csv_path.iterdir()
0057             if f.name.endswith(stem + ".csv")
0058         ]
0059     )
0060 
0061 
0062 def assert_entries(root_file, tree_name, exp=None, non_zero=False):
0063     __tracebackhide__ = True
0064     import ROOT
0065 
0066     ROOT.PyConfig.IgnoreCommandLineOptions = True
0067     ROOT.gROOT.SetBatch(True)
0068 
0069     rf = ROOT.TFile.Open(str(root_file))
0070     keys = [k.GetName() for k in rf.GetListOfKeys()]
0071     assert tree_name in keys
0072     print("Entries:", rf.Get(tree_name).GetEntries())
0073     if non_zero:
0074         assert rf.Get(tree_name).GetEntries() > 0, f"{root_file}:{tree_name}"
0075     if exp is not None:
0076         assert rf.Get(tree_name).GetEntries() == exp, f"{root_file}:{tree_name}"
0077 
0078 
0079 def assert_has_entries(root_file, tree_name):
0080     __tracebackhide__ = True
0081     assert_entries(root_file, tree_name, non_zero=True)
0082 
0083 
0084 @pytest.mark.slow
0085 @pytest.mark.skipif(not pythia8Enabled, reason="Pythia8 not set up")
0086 def test_pythia8(tmp_path, seq, assert_root_hash):
0087     from pythia8 import runPythia8
0088 
0089     (tmp_path / "csv").mkdir()
0090 
0091     assert not (tmp_path / "particles.root").exists()
0092     assert len(list((tmp_path / "csv").iterdir())) == 0
0093 
0094     events = seq.config.events
0095 
0096     vtxGen = acts.examples.GaussianVertexGenerator(
0097         stddev=acts.Vector4(50 * u.um, 50 * u.um, 150 * u.mm, 0),
0098         mean=acts.Vector4(0, 0, 0, 0),
0099     )
0100 
0101     runPythia8(
0102         str(tmp_path), outputRoot=True, outputCsv=True, vtxGen=vtxGen, s=seq
0103     ).run()
0104 
0105     fp = tmp_path / "particles.root"
0106     assert fp.exists()
0107     assert fp.stat().st_size > 2**10 * 50
0108     assert_entries(fp, "particles", events)
0109     assert_root_hash(fp.name, fp)
0110 
0111     assert len(list((tmp_path / "csv").iterdir())) > 0
0112     assert_csv_output(tmp_path / "csv", "particles")
0113 
0114 
0115 def test_fatras(trk_geo, tmp_path, field, assert_root_hash):
0116     from fatras import runFatras
0117 
0118     csv = tmp_path / "csv"
0119     csv.mkdir()
0120 
0121     nevents = 10
0122 
0123     root_files = [
0124         (
0125             "particles_simulation.root",
0126             "particles",
0127         ),
0128         (
0129             "hits.root",
0130             "hits",
0131         ),
0132     ]
0133 
0134     assert len(list(csv.iterdir())) == 0
0135     for rf, _ in root_files:
0136         assert not (tmp_path / rf).exists()
0137 
0138     seq = Sequencer(events=nevents)
0139     runFatras(trk_geo, field, str(tmp_path), s=seq).run()
0140 
0141     assert_csv_output(csv, "particles_simulated")
0142     assert_csv_output(csv, "hits")
0143     for f, tn in root_files:
0144         rfp = tmp_path / f
0145         assert rfp.exists()
0146         assert rfp.stat().st_size > 2**10 * 10
0147 
0148         assert_has_entries(rfp, tn)
0149         assert_root_hash(f, rfp)
0150 
0151 
0152 @pytest.mark.slow
0153 @pytest.mark.odd
0154 @pytest.mark.skipif(not geant4Enabled, reason="Geant4 not set up")
0155 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0156 def test_geant4(tmp_path, assert_root_hash):
0157     # This test literally only ensures that the geant 4 example can run without erroring out
0158 
0159     # just to make sure it can build the odd
0160     with getOpenDataDetector():
0161         pass
0162 
0163     csv = tmp_path / "csv"
0164     csv.mkdir()
0165 
0166     root_files = [
0167         "particles_simulation.root",
0168         "hits.root",
0169     ]
0170 
0171     assert len(list(csv.iterdir())) == 0
0172     for rf in root_files:
0173         assert not (tmp_path / rf).exists()
0174 
0175     script = (
0176         Path(__file__).parent.parent.parent.parent
0177         / "Examples"
0178         / "Scripts"
0179         / "Python"
0180         / "geant4.py"
0181     )
0182     assert script.exists()
0183     env = os.environ.copy()
0184     env["ACTS_LOG_FAILURE_THRESHOLD"] = "WARNING"
0185     try:
0186         subprocess.check_call(
0187             [sys.executable, str(script)],
0188             cwd=tmp_path,
0189             env=env,
0190             stderr=subprocess.STDOUT,
0191         )
0192     except subprocess.CalledProcessError as e:
0193         if e.output is not None:
0194             print(e.output.decode("utf-8"))
0195         if e.stderr is not None:
0196             print(e.stderr.decode("utf-8"))
0197         raise
0198 
0199     assert_csv_output(csv, "particles_simulated")
0200     assert_csv_output(csv, "hits")
0201     for f in root_files:
0202         rfp = tmp_path / f
0203         assert rfp.exists()
0204         assert rfp.stat().st_size > 2**10 * 10
0205 
0206         assert_root_hash(f, rfp)
0207 
0208 
0209 def test_seeding(tmp_path, trk_geo, field, assert_root_hash):
0210     from seeding import runSeeding
0211 
0212     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0213 
0214     csv = tmp_path / "csv"
0215     csv.mkdir()
0216 
0217     seq = Sequencer(events=10, numThreads=1)
0218 
0219     root_files = [
0220         (
0221             "estimatedparams.root",
0222             "estimatedparams",
0223         ),
0224         (
0225             "performance_seeding.root",
0226             None,
0227         ),
0228         (
0229             "particles.root",
0230             "particles",
0231         ),
0232         (
0233             "particles_simulation.root",
0234             "particles",
0235         ),
0236     ]
0237 
0238     for fn, _ in root_files:
0239         fp = tmp_path / fn
0240         assert not fp.exists()
0241 
0242     assert len(list(csv.iterdir())) == 0
0243 
0244     runSeeding(trk_geo, field, outputDir=str(tmp_path), s=seq).run()
0245 
0246     for fn, tn in root_files:
0247         fp = tmp_path / fn
0248         assert fp.exists()
0249         assert fp.stat().st_size > 100
0250 
0251         if tn is not None:
0252             assert_has_entries(fp, tn)
0253             assert_root_hash(fn, fp)
0254 
0255     assert_csv_output(csv, "particles")
0256     assert_csv_output(csv, "particles_simulated")
0257 
0258 
0259 @pytest.mark.slow
0260 @pytest.mark.skipif(not hashingSeedingEnabled, reason="HashingSeeding not set up")
0261 def test_hashing_seeding(tmp_path, trk_geo, field, assert_root_hash):
0262     from hashing_seeding import runHashingSeeding, Config
0263 
0264     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0265 
0266     seq = Sequencer(events=10, numThreads=1)
0267 
0268     root_files = [
0269         (
0270             "estimatedparams.root",
0271             "estimatedparams",
0272         ),
0273         (
0274             "performance_seeding.root",
0275             None,
0276         ),
0277     ]
0278 
0279     for fn, _ in root_files:
0280         fp = tmp_path / fn
0281         assert not fp.exists(), f"{fp} exists"
0282 
0283     config = Config(
0284         mu=50,
0285     )
0286 
0287     _, _, digiConfig, geoSelectionConfigFile = config.getDetectorInfo()
0288 
0289     runHashingSeeding(
0290         10,
0291         trk_geo,
0292         field,
0293         outputDir=str(tmp_path),
0294         saveFiles=True,
0295         npileup=config.mu,
0296         seedingAlgorithm=config.seedingAlgorithm,
0297         maxSeedsPerSpM=config.maxSeedsPerSpM,
0298         digiConfig=digiConfig,
0299         geoSelectionConfigFile=geoSelectionConfigFile,
0300         config=config,
0301         s=seq,
0302     ).run()
0303 
0304     del seq
0305 
0306     for fn, tn in root_files:
0307         fp = tmp_path / fn
0308         assert fp.exists(), f"{fp} does not exist"
0309         assert fp.stat().st_size > 100, f"{fp} is too small: {fp.stat().st_size} bytes"
0310 
0311         if tn is not None:
0312             assert_has_entries(fp, tn)
0313             assert_root_hash(fn, fp)
0314 
0315     assert_csv_output(tmp_path, "particles_simulated")
0316     assert_csv_output(tmp_path, "buckets")
0317     assert_csv_output(tmp_path, "seed")
0318 
0319 
0320 def test_seeding_orthogonal(tmp_path, trk_geo, field, assert_root_hash):
0321     from seeding import runSeeding, SeedingAlgorithm
0322 
0323     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0324 
0325     csv = tmp_path / "csv"
0326     csv.mkdir()
0327 
0328     seq = Sequencer(events=10, numThreads=1)
0329 
0330     root_files = [
0331         (
0332             "estimatedparams.root",
0333             "estimatedparams",
0334         ),
0335         (
0336             "performance_seeding.root",
0337             None,
0338         ),
0339         (
0340             "particles.root",
0341             "particles",
0342         ),
0343         (
0344             "particles_simulation.root",
0345             "particles",
0346         ),
0347     ]
0348 
0349     for fn, _ in root_files:
0350         fp = tmp_path / fn
0351         assert not fp.exists()
0352 
0353     assert len(list(csv.iterdir())) == 0
0354 
0355     runSeeding(
0356         trk_geo,
0357         field,
0358         outputDir=str(tmp_path),
0359         s=seq,
0360         seedingAlgorithm=SeedingAlgorithm.Orthogonal,
0361     ).run()
0362 
0363     for fn, tn in root_files:
0364         fp = tmp_path / fn
0365         assert fp.exists()
0366         assert fp.stat().st_size > 100
0367 
0368         if tn is not None:
0369             assert_has_entries(fp, tn)
0370             assert_root_hash(fn, fp)
0371 
0372     assert_csv_output(csv, "particles")
0373     assert_csv_output(csv, "particles_simulated")
0374 
0375 
0376 def test_itk_seeding(tmp_path, trk_geo, field, assert_root_hash):
0377     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0378 
0379     csv = tmp_path / "csv"
0380     csv.mkdir()
0381 
0382     seq = Sequencer(events=10, numThreads=1)
0383 
0384     root_files = [
0385         (
0386             "estimatedparams.root",
0387             "estimatedparams",
0388         ),
0389         (
0390             "performance_seeding.root",
0391             None,
0392         ),
0393         (
0394             "particles.root",
0395             "particles",
0396         ),
0397         (
0398             "particles_simulation.root",
0399             "particles",
0400         ),
0401     ]
0402 
0403     for fn, _ in root_files:
0404         fp = tmp_path / fn
0405         assert not fp.exists()
0406 
0407     assert len(list(csv.iterdir())) == 0
0408 
0409     rnd = acts.examples.RandomNumbers(seed=42)
0410 
0411     from acts.examples.simulation import (
0412         addParticleGun,
0413         EtaConfig,
0414         MomentumConfig,
0415         ParticleConfig,
0416         addFatras,
0417         addDigitization,
0418         ParticleSelectorConfig,
0419         addDigiParticleSelection,
0420     )
0421 
0422     addParticleGun(
0423         seq,
0424         MomentumConfig(1.0 * u.GeV, 10.0 * u.GeV, True),
0425         EtaConfig(-4.0, 4.0, True),
0426         ParticleConfig(1, acts.PdgParticle.eMuon, True),
0427         outputDirCsv=tmp_path / "csv",
0428         outputDirRoot=str(tmp_path),
0429         rnd=rnd,
0430     )
0431 
0432     addFatras(
0433         seq,
0434         trk_geo,
0435         field,
0436         outputDirCsv=tmp_path / "csv",
0437         outputDirRoot=str(tmp_path),
0438         rnd=rnd,
0439     )
0440 
0441     srcdir = Path(__file__).resolve().parent.parent.parent.parent
0442     addDigitization(
0443         seq,
0444         trk_geo,
0445         field,
0446         digiConfigFile=srcdir / "Examples/Configs/generic-digi-smearing-config.json",
0447         rnd=rnd,
0448     )
0449 
0450     addDigiParticleSelection(
0451         seq,
0452         ParticleSelectorConfig(
0453             pt=(0.9 * u.GeV, None),
0454             eta=(-4, 4),
0455             measurements=(9, None),
0456             removeNeutral=True,
0457         ),
0458     )
0459 
0460     from acts.examples.reconstruction import (
0461         addSeeding,
0462     )
0463     from acts.examples.itk import itkSeedingAlgConfig, InputSpacePointsType
0464 
0465     addSeeding(
0466         seq,
0467         trk_geo,
0468         field,
0469         *itkSeedingAlgConfig(InputSpacePointsType.PixelSpacePoints),
0470         acts.logging.VERBOSE,
0471         geoSelectionConfigFile=srcdir / "Examples/Configs/generic-seeding-config.json",
0472         outputDirRoot=str(tmp_path),
0473     )
0474 
0475     seq.run()
0476 
0477     for fn, tn in root_files:
0478         fp = tmp_path / fn
0479         assert fp.exists()
0480         assert fp.stat().st_size > 100
0481 
0482         if tn is not None:
0483             assert_has_entries(fp, tn)
0484             assert_root_hash(fn, fp)
0485 
0486     assert_csv_output(csv, "particles")
0487     assert_csv_output(csv, "particles_simulated")
0488 
0489 
0490 @pytest.mark.slow
0491 def test_propagation(tmp_path, trk_geo, field, seq, assert_root_hash):
0492     from propagation import runPropagation
0493 
0494     root_files = [
0495         (
0496             "propagation_summary.root",
0497             "propagation_summary",
0498             10000,
0499         )
0500     ]
0501 
0502     for fn, _, _ in root_files:
0503         fp = tmp_path / fn
0504         assert not fp.exists()
0505 
0506     runPropagation(trk_geo, field, str(tmp_path), s=seq).run()
0507 
0508     for fn, tn, ee in root_files:
0509         fp = tmp_path / fn
0510         assert fp.exists()
0511         assert fp.stat().st_size > 2**10 * 50
0512         assert_entries(fp, tn, ee)
0513         assert_root_hash(fn, fp)
0514 
0515 
0516 @pytest.mark.slow
0517 @pytest.mark.odd
0518 @pytest.mark.skipif(not geant4Enabled, reason="Geant4 not set up")
0519 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0520 def test_material_recording(tmp_path, material_recording, assert_root_hash):
0521     root_files = [
0522         (
0523             "geant4_material_tracks.root",
0524             "material-tracks",
0525             200,
0526         )
0527     ]
0528 
0529     for fn, tn, ee in root_files:
0530         fp = material_recording / fn
0531         assert fp.exists()
0532         assert fp.stat().st_size > 2**10 * 50
0533         assert_entries(fp, tn, ee)
0534         assert_root_hash(fn, fp)
0535 
0536 
0537 @pytest.mark.parametrize("revFiltMomThresh", [0 * u.GeV, 1 * u.TeV])
0538 def test_truth_tracking_kalman(
0539     tmp_path, assert_root_hash, revFiltMomThresh, detector_config
0540 ):
0541     root_files = [
0542         ("trackstates_kf.root", "trackstates", 19),
0543         ("tracksummary_kf.root", "tracksummary", 10),
0544         ("performance_kf.root", None, -1),
0545     ]
0546 
0547     for fn, _, _ in root_files:
0548         fp = tmp_path / fn
0549         assert not fp.exists()
0550 
0551     with detector_config.detector:
0552         from truth_tracking_kalman import runTruthTrackingKalman
0553 
0554         field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0555 
0556         seq = Sequencer(events=10, numThreads=1)
0557 
0558         runTruthTrackingKalman(
0559             trackingGeometry=detector_config.trackingGeometry,
0560             field=field,
0561             digiConfigFile=detector_config.digiConfigFile,
0562             outputDir=tmp_path,
0563             reverseFilteringMomThreshold=revFiltMomThresh,
0564             s=seq,
0565         )
0566 
0567         seq.run()
0568 
0569     for fn, tn, ee in root_files:
0570         fp = tmp_path / fn
0571         assert fp.exists()
0572         assert fp.stat().st_size > 1024
0573         if tn is not None:
0574             assert_has_entries(fp, tn)
0575             assert_root_hash(fn, fp)
0576 
0577     import ROOT
0578 
0579     ROOT.PyConfig.IgnoreCommandLineOptions = True
0580     ROOT.gROOT.SetBatch(True)
0581     rf = ROOT.TFile.Open(str(tmp_path / "tracksummary_kf.root"))
0582     keys = [k.GetName() for k in rf.GetListOfKeys()]
0583     assert "tracksummary" in keys
0584     for entry in rf.Get("tracksummary"):
0585         assert entry.hasFittedParams
0586 
0587 
0588 def test_truth_tracking_gsf(tmp_path, assert_root_hash, detector_config):
0589     from truth_tracking_gsf import runTruthTrackingGsf
0590 
0591     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0592 
0593     seq = Sequencer(
0594         events=10,
0595         numThreads=1,
0596         fpeMasks=[
0597             (
0598                 "Core/include/Acts/TrackFitting/detail/GsfUtils.hpp:197",
0599                 acts.FpeType.FLTUND,
0600                 1,
0601             ),
0602         ],
0603     )
0604 
0605     root_files = [
0606         ("trackstates_gsf.root", "trackstates"),
0607         ("tracksummary_gsf.root", "tracksummary"),
0608     ]
0609 
0610     for fn, _ in root_files:
0611         fp = tmp_path / fn
0612         assert not fp.exists()
0613 
0614     with detector_config.detector:
0615         runTruthTrackingGsf(
0616             trackingGeometry=detector_config.trackingGeometry,
0617             decorators=detector_config.decorators,
0618             field=field,
0619             digiConfigFile=detector_config.digiConfigFile,
0620             outputDir=tmp_path,
0621             s=seq,
0622         )
0623 
0624         # See https://github.com/acts-project/acts/issues/1300
0625         with failure_threshold(acts.logging.FATAL):
0626             seq.run()
0627 
0628     for fn, tn in root_files:
0629         fp = tmp_path / fn
0630         assert fp.exists()
0631         assert fp.stat().st_size > 1024
0632         if tn is not None:
0633             assert_root_hash(fn, fp)
0634 
0635 
0636 def test_refitting(tmp_path, detector_config, assert_root_hash):
0637     from truth_tracking_gsf_refitting import runRefittingGsf
0638 
0639     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0640 
0641     seq = Sequencer(
0642         events=10,
0643         numThreads=1,
0644     )
0645 
0646     with detector_config.detector:
0647         # Only check if it runs without errors right known
0648         # Changes in fitter behaviour should be caught by other tests
0649         runRefittingGsf(
0650             trackingGeometry=detector_config.trackingGeometry,
0651             field=field,
0652             digiConfigFile=detector_config.digiConfigFile,
0653             outputDir=tmp_path,
0654             s=seq,
0655         ).run()
0656 
0657     root_files = [
0658         ("trackstates_gsf_refit.root", "trackstates"),
0659         ("tracksummary_gsf_refit.root", "tracksummary"),
0660     ]
0661 
0662     for fn, tn in root_files:
0663         fp = tmp_path / fn
0664         assert fp.exists()
0665         assert fp.stat().st_size > 1024
0666         if tn is not None:
0667             assert_root_hash(fn, fp)
0668 
0669 
0670 def test_particle_gun(tmp_path, assert_root_hash):
0671     from particle_gun import runParticleGun
0672 
0673     s = Sequencer(events=20, numThreads=-1)
0674 
0675     csv_dir = tmp_path / "csv"
0676     root_file = tmp_path / "particles.root"
0677 
0678     assert not csv_dir.exists()
0679     assert not root_file.exists()
0680 
0681     runParticleGun(str(tmp_path), s=s).run()
0682 
0683     assert csv_dir.exists()
0684     assert root_file.exists()
0685 
0686     assert len([f for f in csv_dir.iterdir() if f.name.endswith("particles.csv")]) > 0
0687     assert all([f.stat().st_size > 100 for f in csv_dir.iterdir()])
0688 
0689     assert root_file.stat().st_size > 200
0690     assert_entries(root_file, "particles", 20)
0691     assert_root_hash(root_file.name, root_file)
0692 
0693 
0694 @pytest.mark.slow
0695 @pytest.mark.odd
0696 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0697 def test_material_mapping(material_recording, tmp_path, assert_root_hash):
0698     from material_mapping import runMaterialMapping
0699     from material_validation import runMaterialValidation
0700 
0701     map_file = tmp_path / "material-map_tracks.root"
0702     assert not map_file.exists()
0703 
0704     odd_dir = getOpenDataDetectorDirectory()
0705     config = acts.MaterialMapJsonConverter.Config()
0706     materialDecorator = acts.JsonMaterialDecorator(
0707         level=acts.logging.INFO,
0708         rConfig=config,
0709         jFileName=str(odd_dir / "config/odd-material-mapping-config.json"),
0710     )
0711 
0712     s = Sequencer(numThreads=1)
0713 
0714     with getOpenDataDetector(materialDecorator) as detector:
0715         trackingGeometry = detector.trackingGeometry()
0716         decorators = detector.contextDecorators()
0717 
0718         runMaterialMapping(
0719             trackingGeometry,
0720             decorators,
0721             outputDir=str(tmp_path),
0722             inputDir=material_recording,
0723             mappingStep=1,
0724             s=s,
0725         )
0726 
0727         s.run()
0728 
0729     mat_file = tmp_path / "material-map.json"
0730 
0731     assert mat_file.exists()
0732     assert mat_file.stat().st_size > 10
0733 
0734     with mat_file.open() as fh:
0735         assert json.load(fh)
0736 
0737     assert map_file.exists()
0738     assert_entries(map_file, "material-tracks", 200)
0739     assert_root_hash(map_file.name, map_file)
0740 
0741     val_file = tmp_path / "propagation-material.root"
0742     assert not val_file.exists()
0743 
0744     # test the validation as well
0745 
0746     field = acts.NullBField()
0747 
0748     s = Sequencer(events=10, numThreads=1)
0749 
0750     with getOpenDataDetector(
0751         materialDecorator=acts.IMaterialDecorator.fromFile(mat_file)
0752     ) as detector:
0753         trackingGeometry = detector.trackingGeometry()
0754         decorators = detector.contextDecorators()
0755 
0756         runMaterialValidation(
0757             10, 1000, trackingGeometry, decorators, field, outputDir=str(tmp_path), s=s
0758         )
0759 
0760         s.run()
0761 
0762     assert val_file.exists()
0763     assert_entries(val_file, "material-tracks", 10000)
0764     assert_root_hash(val_file.name, val_file)
0765 
0766 
0767 @pytest.mark.slow
0768 @pytest.mark.odd
0769 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0770 def test_volume_material_mapping(material_recording, tmp_path, assert_root_hash):
0771     from material_mapping import runMaterialMapping
0772     from material_validation import runMaterialValidation
0773 
0774     map_file = tmp_path / "material-map-volume_tracks.root"
0775     assert not map_file.exists()
0776 
0777     geo_map = Path(__file__).parent / "geometry-volume-map.json"
0778     assert geo_map.exists()
0779     assert geo_map.stat().st_size > 10
0780     with geo_map.open() as fh:
0781         assert json.load(fh)
0782 
0783     s = Sequencer(numThreads=1)
0784 
0785     with getOpenDataDetector(
0786         materialDecorator=acts.IMaterialDecorator.fromFile(geo_map)
0787     ) as detector:
0788         trackingGeometry = detector.trackingGeometry()
0789         decorators = detector.contextDecorators()
0790 
0791         runMaterialMapping(
0792             trackingGeometry,
0793             decorators,
0794             mapName="material-map-volume",
0795             outputDir=str(tmp_path),
0796             inputDir=material_recording,
0797             mappingStep=1,
0798             s=s,
0799         )
0800 
0801         s.run()
0802 
0803     mat_file = tmp_path / "material-map-volume.json"
0804 
0805     assert mat_file.exists()
0806     assert mat_file.stat().st_size > 10
0807 
0808     with mat_file.open() as fh:
0809         assert json.load(fh)
0810 
0811     assert map_file.exists()
0812     assert_entries(map_file, "material-tracks", 200)
0813     assert_root_hash(map_file.name, map_file)
0814 
0815     val_file = tmp_path / "propagation-volume-material.root"
0816     assert not val_file.exists()
0817 
0818     # test the validation as well
0819 
0820     field = acts.NullBField()
0821 
0822     s = Sequencer(events=10, numThreads=1)
0823 
0824     with getOpenDataDetector(
0825         materialDecorator=acts.IMaterialDecorator.fromFile(mat_file)
0826     ) as detector:
0827         trackingGeometry = detector.trackingGeometry()
0828         decorators = detector.contextDecorators()
0829 
0830         runMaterialValidation(
0831             10,
0832             1000,
0833             trackingGeometry,
0834             decorators,
0835             field,
0836             outputDir=str(tmp_path),
0837             outputName="propagation-volume-material",
0838             s=s,
0839         )
0840 
0841         s.run()
0842 
0843     assert val_file.exists()
0844 
0845     assert_root_hash(val_file.name, val_file)
0846 
0847 
0848 ACTS_DIR = Path(__file__).parent.parent.parent.parent
0849 CONFIG_DIR = ACTS_DIR / "Examples/Configs"
0850 DIGI_SHARE_DIR = (
0851     Path(__file__).parent.parent.parent.parent
0852     / "Examples/Algorithms/Digitization/share"
0853 )
0854 
0855 
0856 @pytest.mark.parametrize(
0857     "digi_config_file",
0858     [
0859         CONFIG_DIR / "generic-digi-smearing-config.json",
0860         CONFIG_DIR / "generic-digi-geometric-config.json",
0861     ],
0862     ids=["smeared", "geometric"],
0863 )
0864 def test_digitization_example(trk_geo, tmp_path, assert_root_hash, digi_config_file):
0865     from digitization import runDigitization
0866 
0867     s = Sequencer(events=10, numThreads=-1)
0868 
0869     csv_dir = tmp_path / "csv"
0870     root_file = tmp_path / "measurements.root"
0871 
0872     assert not root_file.exists()
0873     assert not csv_dir.exists()
0874 
0875     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0876     runDigitization(
0877         trk_geo, field, outputDir=tmp_path, digiConfigFile=digi_config_file, s=s
0878     )
0879 
0880     s.run()
0881 
0882     assert root_file.exists()
0883     assert csv_dir.exists()
0884 
0885     assert len(list(csv_dir.iterdir())) == 3 * s.config.events
0886     assert all(f.stat().st_size > 50 for f in csv_dir.iterdir())
0887 
0888     assert_root_hash(root_file.name, root_file)
0889 
0890 
0891 @pytest.mark.parametrize(
0892     "digi_config_file",
0893     [
0894         CONFIG_DIR / "generic-digi-smearing-config.json",
0895         CONFIG_DIR / "generic-digi-geometric-config.json",
0896         pytest.param(
0897             (ACTS_DIR / "Examples/Configs" / "odd-digi-smearing-config.json"),
0898             marks=[
0899                 pytest.mark.odd,
0900             ],
0901         ),
0902         pytest.param(
0903             (ACTS_DIR / "Examples/Configs" / "odd-digi-geometric-config.json"),
0904             marks=[
0905                 pytest.mark.odd,
0906             ],
0907         ),
0908     ],
0909     ids=["smeared", "geometric", "odd-smeared", "odd-geometric"],
0910 )
0911 def test_digitization_example_input_parsing(digi_config_file):
0912     from acts.examples import readDigiConfigFromJson
0913 
0914     readDigiConfigFromJson(str(digi_config_file))
0915 
0916 
0917 @pytest.mark.parametrize(
0918     "digi_config_file",
0919     [
0920         CONFIG_DIR / "generic-digi-smearing-config.json",
0921         CONFIG_DIR / "generic-digi-geometric-config.json",
0922     ],
0923     ids=["smeared", "geometric"],
0924 )
0925 def test_digitization_example_input(
0926     trk_geo, tmp_path, assert_root_hash, digi_config_file
0927 ):
0928     from particle_gun import runParticleGun
0929     from digitization import runDigitization
0930 
0931     ptcl_dir = tmp_path / "ptcl"
0932     ptcl_dir.mkdir()
0933     pgs = Sequencer(events=20, numThreads=-1)
0934     runParticleGun(str(ptcl_dir), s=pgs)
0935 
0936     pgs.run()
0937 
0938     s = Sequencer(numThreads=-1)
0939 
0940     csv_dir = tmp_path / "csv"
0941     root_file = tmp_path / "measurements.root"
0942 
0943     assert not root_file.exists()
0944     assert not csv_dir.exists()
0945 
0946     assert_root_hash(
0947         "particles.root",
0948         ptcl_dir / "particles.root",
0949     )
0950 
0951     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0952     runDigitization(
0953         trk_geo,
0954         field,
0955         outputDir=tmp_path,
0956         digiConfigFile=digi_config_file,
0957         particlesInput=ptcl_dir / "particles.root",
0958         s=s,
0959         doMerge=True,
0960     )
0961 
0962     s.run()
0963 
0964     assert root_file.exists()
0965     assert csv_dir.exists()
0966 
0967     assert len(list(csv_dir.iterdir())) == 3 * pgs.config.events
0968     assert all(f.stat().st_size > 50 for f in csv_dir.iterdir())
0969 
0970     assert_root_hash(root_file.name, root_file)
0971 
0972 
0973 def test_digitization_config_example(trk_geo, tmp_path):
0974     from digitization_config import runDigitizationConfig
0975 
0976     out_file = tmp_path / "output.json"
0977     assert not out_file.exists()
0978 
0979     input = (
0980         Path(__file__).parent
0981         / "../../../Examples/Configs/generic-digi-smearing-config.json"
0982     )
0983     assert input.exists(), input.resolve()
0984 
0985     runDigitizationConfig(trk_geo, input=input, output=out_file)
0986 
0987     assert out_file.exists()
0988 
0989     with out_file.open() as fh:
0990         data = json.load(fh)
0991     assert len(data.keys()) == 2
0992     assert data["acts-geometry-hierarchy-map"]["format-version"] == 0
0993     assert (
0994         data["acts-geometry-hierarchy-map"]["value-identifier"]
0995         == "digitization-configuration"
0996     )
0997     assert len(data["entries"]) == 27
0998 
0999 
1000 @pytest.mark.parametrize(
1001     "truthSmeared,truthEstimated",
1002     [
1003         [False, False],
1004         [False, True],
1005         [True, False],
1006     ],
1007     ids=["full_seeding", "truth_estimated", "truth_smeared"],
1008 )
1009 @pytest.mark.slow
1010 def test_ckf_tracks_example(
1011     tmp_path, assert_root_hash, truthSmeared, truthEstimated, detector_config
1012 ):
1013     csv = tmp_path / "csv"
1014 
1015     assert not csv.exists()
1016 
1017     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
1018     events = 100
1019     s = Sequencer(events=events, numThreads=-1)
1020 
1021     root_files = [
1022         (
1023             "performance_finding_ckf.root",
1024             None,
1025         ),
1026         (
1027             "trackstates_ckf.root",
1028             "trackstates",
1029         ),
1030         (
1031             "tracksummary_ckf.root",
1032             "tracksummary",
1033         ),
1034     ]
1035 
1036     if not truthSmeared:
1037         root_files += [
1038             (
1039                 "performance_seeding.root",
1040                 None,
1041             ),
1042         ]
1043 
1044     for rf, _ in root_files:
1045         assert not (tmp_path / rf).exists()
1046 
1047     from ckf_tracks import runCKFTracks
1048 
1049     with detector_config.detector:
1050         runCKFTracks(
1051             detector_config.trackingGeometry,
1052             detector_config.decorators,
1053             field=field,
1054             outputCsv=True,
1055             outputDir=tmp_path,
1056             geometrySelection=detector_config.geometrySelection,
1057             digiConfigFile=detector_config.digiConfigFile,
1058             truthSmearedSeeded=truthSmeared,
1059             truthEstimatedSeeded=truthEstimated,
1060             s=s,
1061         )
1062 
1063         s.run()
1064 
1065     assert csv.exists()
1066     for rf, tn in root_files:
1067         rp = tmp_path / rf
1068         assert rp.exists()
1069         if tn is not None:
1070             assert_root_hash(rf, rp)
1071 
1072     assert (
1073         len([f for f in csv.iterdir() if f.name.endswith("tracks_ckf.csv")]) == events
1074     )
1075     assert all([f.stat().st_size > 300 for f in csv.iterdir()])
1076 
1077 
1078 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
1079 @pytest.mark.odd
1080 @pytest.mark.slow
1081 def test_full_chain_odd_example(tmp_path):
1082     # This test literally only ensures that the full chain example can run without erroring out
1083 
1084     # just to make sure it can build the odd
1085     with getOpenDataDetector():
1086         pass
1087 
1088     script = (
1089         Path(__file__).parent.parent.parent.parent
1090         / "Examples"
1091         / "Scripts"
1092         / "Python"
1093         / "full_chain_odd.py"
1094     )
1095     assert script.exists()
1096     env = os.environ.copy()
1097     env["ACTS_LOG_FAILURE_THRESHOLD"] = "ERROR"
1098     try:
1099         subprocess.check_call(
1100             [sys.executable, str(script), "-n1"],
1101             cwd=tmp_path,
1102             env=env,
1103             stderr=subprocess.STDOUT,
1104         )
1105     except subprocess.CalledProcessError as e:
1106         print(e.output.decode("utf-8"))
1107         raise
1108 
1109 
1110 @pytest.mark.skipif(
1111     not dd4hepEnabled or not geant4Enabled, reason="DD4hep and/or Geant4 not set up"
1112 )
1113 @pytest.mark.slow
1114 def test_full_chain_odd_example_pythia_geant4(tmp_path):
1115     # This test literally only ensures that the full chain example can run without erroring out
1116 
1117     # just to make sure it can build the odd
1118     with getOpenDataDetector():
1119         pass
1120 
1121     script = (
1122         Path(__file__).parent.parent.parent.parent
1123         / "Examples"
1124         / "Scripts"
1125         / "Python"
1126         / "full_chain_odd.py"
1127     )
1128     assert script.exists()
1129     env = os.environ.copy()
1130     env["ACTS_LOG_FAILURE_THRESHOLD"] = "ERROR"
1131     try:
1132         stdout = subprocess.check_output(
1133             [
1134                 sys.executable,
1135                 str(script),
1136                 "-n1",
1137                 "--geant4",
1138                 "--ttbar",
1139                 "--ttbar-pu",
1140                 "50",
1141             ],
1142             cwd=tmp_path,
1143             env=env,
1144             stderr=subprocess.STDOUT,
1145         )
1146         stdout = stdout.decode("utf-8")
1147     except subprocess.CalledProcessError as e:
1148         print(e.output.decode("utf-8"))
1149         raise
1150 
1151     # collect and compare known errors
1152     errors = []
1153     error_regex = re.compile(r"^\d\d:\d\d:\d\d\s+(\w+)\s+ERROR\s+", re.MULTILINE)
1154     for match in error_regex.finditer(stdout):
1155         (algo,) = match.groups()
1156         errors.append(algo)
1157     errors = collections.Counter(errors)
1158     assert dict(errors) == {}, stdout
1159 
1160 
1161 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
1162 @pytest.mark.skipif(not onnxEnabled, reason="ONNX plugin not enabled")
1163 @pytest.mark.slow
1164 def test_ML_Ambiguity_Solver(tmp_path, assert_root_hash):
1165     # This test literally only ensures that the full chain example can run without erroring out
1166 
1167     root_file = "performance_finding_ambiML.root"
1168     output_dir = "odd_output"
1169     assert not (tmp_path / root_file).exists()
1170 
1171     # just to make sure it can build the odd
1172     with getOpenDataDetector():
1173         pass
1174 
1175     script = (
1176         Path(__file__).parent.parent.parent.parent
1177         / "Examples"
1178         / "Scripts"
1179         / "Python"
1180         / "full_chain_odd.py"
1181     )
1182     assert script.exists()
1183     env = os.environ.copy()
1184     env["ACTS_LOG_FAILURE_THRESHOLD"] = "ERROR"
1185     try:
1186         subprocess.check_call(
1187             [sys.executable, str(script), "-n1", "--ambi-solver", "ML"],
1188             cwd=tmp_path,
1189             env=env,
1190             stderr=subprocess.STDOUT,
1191         )
1192     except subprocess.CalledProcessError as e:
1193         print(e.output.decode("utf-8"))
1194         raise
1195 
1196     rfp = tmp_path / output_dir / root_file
1197     assert rfp.exists()
1198 
1199     assert_root_hash(root_file, rfp)
1200 
1201 
1202 def test_bfield_writing(tmp_path, seq, assert_root_hash):
1203     from bfield_writing import runBFieldWriting
1204 
1205     root_files = [
1206         ("solenoid.root", "solenoid", 100),
1207         ("solenoid2.root", "solenoid", 100),
1208     ]
1209 
1210     for fn, _, _ in root_files:
1211         fp = tmp_path / fn
1212         assert not fp.exists()
1213 
1214     runBFieldWriting(outputDir=tmp_path, rewrites=1)
1215 
1216     for fn, tn, ee in root_files:
1217         fp = tmp_path / fn
1218         assert fp.exists()
1219         assert fp.stat().st_size > 2**10 * 2
1220         assert_entries(fp, tn, ee)
1221         assert_root_hash(fn, fp)
1222 
1223 
1224 @pytest.mark.parametrize("backend", ["onnx", "torch"])
1225 @pytest.mark.parametrize("hardware", ["cpu", "gpu"])
1226 @pytest.mark.skipif(not gnnEnabled, reason="Gnn environment not set up")
1227 def test_gnn(tmp_path, trk_geo, field, assert_root_hash, backend, hardware):
1228     if backend == "onnx" and hardware == "cpu":
1229         pytest.skip("Combination of ONNX and CPU not yet supported")
1230 
1231     if backend == "torch":
1232         pytest.skip(
1233             "Disabled torch support until replacement for torch-scatter is found"
1234         )
1235 
1236     root_file = "performance_track_finding.root"
1237     assert not (tmp_path / root_file).exists()
1238 
1239     # Extract both models, since we currently don't have a working implementation
1240     # of metric learning with ONNX and we need to use torch here
1241     onnx_url = "https://acts.web.cern.ch/ci/exatrkx/onnx_models_v01.tar"
1242     torch_url = "https://acts.web.cern.ch/ci/exatrkx/torchscript_models_v01.tar"
1243 
1244     for url in [onnx_url, torch_url]:
1245         tarfile_name = tmp_path / "models.tar"
1246         urllib.request.urlretrieve(url, tarfile_name)
1247         tarfile.open(tarfile_name).extractall(tmp_path)
1248 
1249     shutil.copyfile(
1250         tmp_path / "torchscript_models/embed.pt", tmp_path / "onnx_models/embed.pt"
1251     )
1252 
1253     script = (
1254         Path(__file__).parent.parent.parent.parent
1255         / "Examples"
1256         / "Scripts"
1257         / "Python"
1258         / "gnn.py"
1259     )
1260     assert script.exists()
1261     env = os.environ.copy()
1262     env["ACTS_LOG_FAILURE_THRESHOLD"] = "WARNING"
1263 
1264     if hardware == "cpu":
1265         env["CUDA_VISIBLE_DEVICES"] = ""
1266 
1267     try:
1268         subprocess.check_call(
1269             [sys.executable, str(script), backend],
1270             cwd=tmp_path,
1271             env=env,
1272             stderr=subprocess.STDOUT,
1273         )
1274     except subprocess.CalledProcessError as e:
1275         print(e.output.decode("utf-8"))
1276         raise
1277 
1278     rfp = tmp_path / root_file
1279     assert rfp.exists()
1280 
1281     assert_root_hash(root_file, rfp)
1282 
1283 
1284 @pytest.mark.odd
1285 def test_strip_spacepoints(detector_config, field, tmp_path, assert_root_hash):
1286     if detector_config.name == "generic":
1287         pytest.skip("No strip spacepoint formation for the generic detector currently")
1288 
1289     from strip_spacepoints import createStripSpacepoints
1290 
1291     s = Sequencer(events=20, numThreads=-1)
1292 
1293     config_path = Path(__file__).parent.parent.parent.parent / "Examples" / "Configs"
1294 
1295     geo_selection = config_path / "odd-strip-spacepoint-selection.json"
1296     digi_config_file = config_path / "odd-digi-smearing-config.json"
1297 
1298     with detector_config.detector:
1299         createStripSpacepoints(
1300             trackingGeometry=detector_config.trackingGeometry,
1301             field=field,
1302             digiConfigFile=digi_config_file,
1303             geoSelection=geo_selection,
1304             outputDir=tmp_path,
1305             s=s,
1306         ).run()
1307 
1308     root_file = "strip_spacepoints.root"
1309     rfp = tmp_path / root_file
1310 
1311     assert_root_hash(root_file, rfp)
1312 
1313 
1314 @pytest.mark.skipif(not geant4Enabled, reason="Geant4 not set up")
1315 @pytest.mark.skipif(not geomodelEnabled, reason="Geomodel not set up")
1316 def test_geomodel_G4(tmp_path):
1317     script = (
1318         Path(__file__).parent.parent.parent.parent
1319         / "Examples"
1320         / "Scripts"
1321         / "Python"
1322         / "geomodel_G4.py"
1323     )
1324     assert script.exists()
1325     # Prepare arguments for the script
1326     mockup_det = "Muon"
1327     out_dir = tmp_path / "geomodel_g4_out"
1328     out_dir.mkdir()
1329     args = [
1330         "python3",
1331         str(script),
1332         "--mockupDetector",
1333         str(mockup_det),
1334         "--outDir",
1335         str(out_dir),
1336     ]
1337     subprocess.check_call(args)
1338 
1339     assert (out_dir / "obj").exists()