Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-07-14 08:11:37

0001 from pathlib import Path
0002 import os
0003 import json
0004 import functools
0005 import tarfile
0006 import urllib.request
0007 import subprocess
0008 import sys
0009 import re
0010 import collections
0011 import shutil
0012 
0013 import pytest
0014 
0015 from helpers import (
0016     geant4Enabled,
0017     dd4hepEnabled,
0018     hepmc3Enabled,
0019     pythia8Enabled,
0020     exatrkxEnabled,
0021     onnxEnabled,
0022     hashingSeedingEnabled,
0023     AssertCollectionExistsAlg,
0024     failure_threshold,
0025 )
0026 
0027 import acts
0028 from acts.examples import (
0029     Sequencer,
0030     GenericDetector,
0031 )
0032 from acts.examples.odd import getOpenDataDetector, getOpenDataDetectorDirectory
0033 
0034 
0035 u = acts.UnitConstants
0036 
0037 
0038 @pytest.fixture
0039 def field():
0040     return acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0041 
0042 
0043 @pytest.fixture
0044 def seq():
0045     return Sequencer(events=10, numThreads=1)
0046 
0047 
0048 def assert_csv_output(csv_path, stem):
0049     __tracebackhide__ = True
0050     # print(list(csv_path.iterdir()))
0051     assert len([f for f in csv_path.iterdir() if f.name.endswith(stem + ".csv")]) > 0
0052     assert all(
0053         [
0054             f.stat().st_size > 100
0055             for f in csv_path.iterdir()
0056             if f.name.endswith(stem + ".csv")
0057         ]
0058     )
0059 
0060 
0061 def assert_entries(root_file, tree_name, exp=None, non_zero=False):
0062     __tracebackhide__ = True
0063     import ROOT
0064 
0065     ROOT.PyConfig.IgnoreCommandLineOptions = True
0066     ROOT.gROOT.SetBatch(True)
0067 
0068     rf = ROOT.TFile.Open(str(root_file))
0069     keys = [k.GetName() for k in rf.GetListOfKeys()]
0070     assert tree_name in keys
0071     print("Entries:", rf.Get(tree_name).GetEntries())
0072     if non_zero:
0073         assert rf.Get(tree_name).GetEntries() > 0, f"{root_file}:{tree_name}"
0074     if exp is not None:
0075         assert rf.Get(tree_name).GetEntries() == exp, f"{root_file}:{tree_name}"
0076 
0077 
0078 def assert_has_entries(root_file, tree_name):
0079     __tracebackhide__ = True
0080     assert_entries(root_file, tree_name, non_zero=True)
0081 
0082 
0083 @pytest.mark.slow
0084 @pytest.mark.skipif(not pythia8Enabled, reason="Pythia8 not set up")
0085 def test_pythia8(tmp_path, seq, assert_root_hash):
0086     from pythia8 import runPythia8
0087 
0088     (tmp_path / "csv").mkdir()
0089 
0090     assert not (tmp_path / "particles.root").exists()
0091     assert len(list((tmp_path / "csv").iterdir())) == 0
0092 
0093     events = seq.config.events
0094 
0095     vtxGen = acts.examples.GaussianVertexGenerator(
0096         stddev=acts.Vector4(50 * u.um, 50 * u.um, 150 * u.mm, 0),
0097         mean=acts.Vector4(0, 0, 0, 0),
0098     )
0099 
0100     runPythia8(
0101         str(tmp_path), outputRoot=True, outputCsv=True, vtxGen=vtxGen, s=seq
0102     ).run()
0103 
0104     fp = tmp_path / "particles.root"
0105     assert fp.exists()
0106     assert fp.stat().st_size > 2**10 * 50
0107     assert_entries(fp, "particles", events)
0108     assert_root_hash(fp.name, fp)
0109 
0110     assert len(list((tmp_path / "csv").iterdir())) > 0
0111     assert_csv_output(tmp_path / "csv", "particles")
0112 
0113 
0114 def test_fatras(trk_geo, tmp_path, field, assert_root_hash):
0115     from fatras import runFatras
0116 
0117     csv = tmp_path / "csv"
0118     csv.mkdir()
0119 
0120     nevents = 10
0121 
0122     root_files = [
0123         (
0124             "particles_simulation.root",
0125             "particles",
0126         ),
0127         (
0128             "hits.root",
0129             "hits",
0130         ),
0131     ]
0132 
0133     assert len(list(csv.iterdir())) == 0
0134     for rf, _ in root_files:
0135         assert not (tmp_path / rf).exists()
0136 
0137     seq = Sequencer(events=nevents)
0138     runFatras(trk_geo, field, str(tmp_path), s=seq).run()
0139 
0140     assert_csv_output(csv, "particles_simulated")
0141     assert_csv_output(csv, "hits")
0142     for f, tn in root_files:
0143         rfp = tmp_path / f
0144         assert rfp.exists()
0145         assert rfp.stat().st_size > 2**10 * 10
0146 
0147         assert_has_entries(rfp, tn)
0148         assert_root_hash(f, rfp)
0149 
0150 
0151 @pytest.mark.slow
0152 @pytest.mark.odd
0153 @pytest.mark.skipif(not geant4Enabled, reason="Geant4 not set up")
0154 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0155 def test_geant4(tmp_path, assert_root_hash):
0156     # This test literally only ensures that the geant 4 example can run without erroring out
0157 
0158     # just to make sure it can build the odd
0159     with getOpenDataDetector():
0160         pass
0161 
0162     csv = tmp_path / "csv"
0163     csv.mkdir()
0164 
0165     root_files = [
0166         "particles_simulation.root",
0167         "hits.root",
0168     ]
0169 
0170     assert len(list(csv.iterdir())) == 0
0171     for rf in root_files:
0172         assert not (tmp_path / rf).exists()
0173 
0174     script = (
0175         Path(__file__).parent.parent.parent.parent
0176         / "Examples"
0177         / "Scripts"
0178         / "Python"
0179         / "geant4.py"
0180     )
0181     assert script.exists()
0182     env = os.environ.copy()
0183     env["ACTS_LOG_FAILURE_THRESHOLD"] = "WARNING"
0184     try:
0185         subprocess.check_call(
0186             [sys.executable, str(script)],
0187             cwd=tmp_path,
0188             env=env,
0189             stderr=subprocess.STDOUT,
0190         )
0191     except subprocess.CalledProcessError as e:
0192         if e.output is not None:
0193             print(e.output.decode("utf-8"))
0194         if e.stderr is not None:
0195             print(e.stderr.decode("utf-8"))
0196         raise
0197 
0198     assert_csv_output(csv, "particles_simulated")
0199     assert_csv_output(csv, "hits")
0200     for f in root_files:
0201         rfp = tmp_path / f
0202         assert rfp.exists()
0203         assert rfp.stat().st_size > 2**10 * 10
0204 
0205         assert_root_hash(f, rfp)
0206 
0207 
0208 def test_seeding(tmp_path, trk_geo, field, assert_root_hash):
0209     from seeding import runSeeding
0210 
0211     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0212 
0213     csv = tmp_path / "csv"
0214     csv.mkdir()
0215 
0216     seq = Sequencer(events=10, numThreads=1)
0217 
0218     root_files = [
0219         (
0220             "estimatedparams.root",
0221             "estimatedparams",
0222         ),
0223         (
0224             "performance_seeding.root",
0225             None,
0226         ),
0227         (
0228             "particles.root",
0229             "particles",
0230         ),
0231         (
0232             "particles_simulation.root",
0233             "particles",
0234         ),
0235     ]
0236 
0237     for fn, _ in root_files:
0238         fp = tmp_path / fn
0239         assert not fp.exists()
0240 
0241     assert len(list(csv.iterdir())) == 0
0242 
0243     runSeeding(trk_geo, field, outputDir=str(tmp_path), s=seq).run()
0244 
0245     for fn, tn in root_files:
0246         fp = tmp_path / fn
0247         assert fp.exists()
0248         assert fp.stat().st_size > 100
0249 
0250         if tn is not None:
0251             assert_has_entries(fp, tn)
0252             assert_root_hash(fn, fp)
0253 
0254     assert_csv_output(csv, "particles")
0255     assert_csv_output(csv, "particles_simulated")
0256 
0257 
0258 @pytest.mark.slow
0259 @pytest.mark.skipif(not hashingSeedingEnabled, reason="HashingSeeding not set up")
0260 def test_hashing_seeding(tmp_path, trk_geo, field, assert_root_hash):
0261     from hashing_seeding import runHashingSeeding, Config
0262 
0263     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0264 
0265     seq = Sequencer(events=10, numThreads=1)
0266 
0267     root_files = [
0268         (
0269             "estimatedparams.root",
0270             "estimatedparams",
0271         ),
0272         (
0273             "performance_seeding.root",
0274             None,
0275         ),
0276     ]
0277 
0278     for fn, _ in root_files:
0279         fp = tmp_path / fn
0280         assert not fp.exists(), f"{fp} exists"
0281 
0282     config = Config(
0283         mu=50,
0284     )
0285 
0286     _, _, digiConfig, geoSelectionConfigFile = config.getDetectorInfo()
0287 
0288     runHashingSeeding(
0289         10,
0290         trk_geo,
0291         field,
0292         outputDir=str(tmp_path),
0293         saveFiles=True,
0294         npileup=config.mu,
0295         seedingAlgorithm=config.seedingAlgorithm,
0296         maxSeedsPerSpM=config.maxSeedsPerSpM,
0297         digiConfig=digiConfig,
0298         geoSelectionConfigFile=geoSelectionConfigFile,
0299         config=config,
0300         s=seq,
0301     ).run()
0302 
0303     del seq
0304 
0305     for fn, tn in root_files:
0306         fp = tmp_path / fn
0307         assert fp.exists(), f"{fp} does not exist"
0308         assert fp.stat().st_size > 100, f"{fp} is too small: {fp.stat().st_size} bytes"
0309 
0310         if tn is not None:
0311             assert_has_entries(fp, tn)
0312             assert_root_hash(fn, fp)
0313 
0314     assert_csv_output(tmp_path, "particles_simulated")
0315     assert_csv_output(tmp_path, "buckets")
0316     assert_csv_output(tmp_path, "seed")
0317 
0318 
0319 def test_seeding_orthogonal(tmp_path, trk_geo, field, assert_root_hash):
0320     from seeding import runSeeding, SeedingAlgorithm
0321 
0322     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0323 
0324     csv = tmp_path / "csv"
0325     csv.mkdir()
0326 
0327     seq = Sequencer(events=10, numThreads=1)
0328 
0329     root_files = [
0330         (
0331             "estimatedparams.root",
0332             "estimatedparams",
0333         ),
0334         (
0335             "performance_seeding.root",
0336             None,
0337         ),
0338         (
0339             "particles.root",
0340             "particles",
0341         ),
0342         (
0343             "particles_simulation.root",
0344             "particles",
0345         ),
0346     ]
0347 
0348     for fn, _ in root_files:
0349         fp = tmp_path / fn
0350         assert not fp.exists()
0351 
0352     assert len(list(csv.iterdir())) == 0
0353 
0354     runSeeding(
0355         trk_geo,
0356         field,
0357         outputDir=str(tmp_path),
0358         s=seq,
0359         seedingAlgorithm=SeedingAlgorithm.Orthogonal,
0360     ).run()
0361 
0362     for fn, tn in root_files:
0363         fp = tmp_path / fn
0364         assert fp.exists()
0365         assert fp.stat().st_size > 100
0366 
0367         if tn is not None:
0368             assert_has_entries(fp, tn)
0369             assert_root_hash(fn, fp)
0370 
0371     assert_csv_output(csv, "particles")
0372     assert_csv_output(csv, "particles_simulated")
0373 
0374 
0375 def test_itk_seeding(tmp_path, trk_geo, field, assert_root_hash):
0376     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0377 
0378     csv = tmp_path / "csv"
0379     csv.mkdir()
0380 
0381     seq = Sequencer(events=10, numThreads=1)
0382 
0383     root_files = [
0384         (
0385             "estimatedparams.root",
0386             "estimatedparams",
0387         ),
0388         (
0389             "performance_seeding.root",
0390             None,
0391         ),
0392         (
0393             "particles.root",
0394             "particles",
0395         ),
0396         (
0397             "particles_simulation.root",
0398             "particles",
0399         ),
0400     ]
0401 
0402     for fn, _ in root_files:
0403         fp = tmp_path / fn
0404         assert not fp.exists()
0405 
0406     assert len(list(csv.iterdir())) == 0
0407 
0408     rnd = acts.examples.RandomNumbers(seed=42)
0409 
0410     from acts.examples.simulation import (
0411         addParticleGun,
0412         EtaConfig,
0413         MomentumConfig,
0414         ParticleConfig,
0415         addFatras,
0416         addDigitization,
0417         ParticleSelectorConfig,
0418         addDigiParticleSelection,
0419     )
0420 
0421     addParticleGun(
0422         seq,
0423         MomentumConfig(1.0 * u.GeV, 10.0 * u.GeV, True),
0424         EtaConfig(-4.0, 4.0, True),
0425         ParticleConfig(1, acts.PdgParticle.eMuon, True),
0426         outputDirCsv=tmp_path / "csv",
0427         outputDirRoot=str(tmp_path),
0428         rnd=rnd,
0429     )
0430 
0431     addFatras(
0432         seq,
0433         trk_geo,
0434         field,
0435         outputDirCsv=tmp_path / "csv",
0436         outputDirRoot=str(tmp_path),
0437         rnd=rnd,
0438     )
0439 
0440     srcdir = Path(__file__).resolve().parent.parent.parent.parent
0441     addDigitization(
0442         seq,
0443         trk_geo,
0444         field,
0445         digiConfigFile=srcdir / "Examples/Configs/generic-digi-smearing-config.json",
0446         rnd=rnd,
0447     )
0448 
0449     addDigiParticleSelection(
0450         seq,
0451         ParticleSelectorConfig(
0452             pt=(0.9 * u.GeV, None),
0453             eta=(-4, 4),
0454             measurements=(9, None),
0455             removeNeutral=True,
0456         ),
0457     )
0458 
0459     from acts.examples.reconstruction import (
0460         addSeeding,
0461     )
0462     from acts.examples.itk import itkSeedingAlgConfig, InputSpacePointsType
0463 
0464     addSeeding(
0465         seq,
0466         trk_geo,
0467         field,
0468         *itkSeedingAlgConfig(InputSpacePointsType.PixelSpacePoints),
0469         acts.logging.VERBOSE,
0470         geoSelectionConfigFile=srcdir / "Examples/Configs/generic-seeding-config.json",
0471         outputDirRoot=str(tmp_path),
0472     )
0473 
0474     seq.run()
0475 
0476     for fn, tn in root_files:
0477         fp = tmp_path / fn
0478         assert fp.exists()
0479         assert fp.stat().st_size > 100
0480 
0481         if tn is not None:
0482             assert_has_entries(fp, tn)
0483             assert_root_hash(fn, fp)
0484 
0485     assert_csv_output(csv, "particles")
0486     assert_csv_output(csv, "particles_simulated")
0487 
0488 
0489 @pytest.mark.slow
0490 def test_propagation(tmp_path, trk_geo, field, seq, assert_root_hash):
0491     from propagation import runPropagation
0492 
0493     root_files = [
0494         (
0495             "propagation_summary.root",
0496             "propagation_summary",
0497             10000,
0498         )
0499     ]
0500 
0501     for fn, _, _ in root_files:
0502         fp = tmp_path / fn
0503         assert not fp.exists()
0504 
0505     runPropagation(trk_geo, field, str(tmp_path), s=seq).run()
0506 
0507     for fn, tn, ee in root_files:
0508         fp = tmp_path / fn
0509         assert fp.exists()
0510         assert fp.stat().st_size > 2**10 * 50
0511         assert_entries(fp, tn, ee)
0512         assert_root_hash(fn, fp)
0513 
0514 
0515 @pytest.mark.slow
0516 @pytest.mark.odd
0517 @pytest.mark.skipif(not geant4Enabled, reason="Geant4 not set up")
0518 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0519 def test_material_recording(tmp_path, material_recording, assert_root_hash):
0520     root_files = [
0521         (
0522             "geant4_material_tracks.root",
0523             "material-tracks",
0524             200,
0525         )
0526     ]
0527 
0528     for fn, tn, ee in root_files:
0529         fp = material_recording / fn
0530         assert fp.exists()
0531         assert fp.stat().st_size > 2**10 * 50
0532         assert_entries(fp, tn, ee)
0533         assert_root_hash(fn, fp)
0534 
0535 
0536 @pytest.mark.parametrize("revFiltMomThresh", [0 * u.GeV, 1 * u.TeV])
0537 def test_truth_tracking_kalman(
0538     tmp_path, assert_root_hash, revFiltMomThresh, detector_config
0539 ):
0540     root_files = [
0541         ("trackstates_kf.root", "trackstates", 19),
0542         ("tracksummary_kf.root", "tracksummary", 10),
0543         ("performance_kf.root", None, -1),
0544     ]
0545 
0546     for fn, _, _ in root_files:
0547         fp = tmp_path / fn
0548         assert not fp.exists()
0549 
0550     with detector_config.detector:
0551         from truth_tracking_kalman import runTruthTrackingKalman
0552 
0553         field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0554 
0555         seq = Sequencer(events=10, numThreads=1)
0556 
0557         runTruthTrackingKalman(
0558             trackingGeometry=detector_config.trackingGeometry,
0559             field=field,
0560             digiConfigFile=detector_config.digiConfigFile,
0561             outputDir=tmp_path,
0562             reverseFilteringMomThreshold=revFiltMomThresh,
0563             s=seq,
0564         )
0565 
0566         seq.run()
0567 
0568     for fn, tn, ee in root_files:
0569         fp = tmp_path / fn
0570         assert fp.exists()
0571         assert fp.stat().st_size > 1024
0572         if tn is not None:
0573             assert_has_entries(fp, tn)
0574             assert_root_hash(fn, fp)
0575 
0576     import ROOT
0577 
0578     ROOT.PyConfig.IgnoreCommandLineOptions = True
0579     ROOT.gROOT.SetBatch(True)
0580     rf = ROOT.TFile.Open(str(tmp_path / "tracksummary_kf.root"))
0581     keys = [k.GetName() for k in rf.GetListOfKeys()]
0582     assert "tracksummary" in keys
0583     for entry in rf.Get("tracksummary"):
0584         assert entry.hasFittedParams
0585 
0586 
0587 def test_truth_tracking_gsf(tmp_path, assert_root_hash, detector_config):
0588     from truth_tracking_gsf import runTruthTrackingGsf
0589 
0590     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0591 
0592     seq = Sequencer(
0593         events=10,
0594         numThreads=1,
0595         fpeMasks=[
0596             (
0597                 "Core/include/Acts/TrackFitting/detail/GsfUtils.hpp:197",
0598                 acts.FpeType.FLTUND,
0599                 1,
0600             ),
0601         ],
0602     )
0603 
0604     root_files = [
0605         ("trackstates_gsf.root", "trackstates"),
0606         ("tracksummary_gsf.root", "tracksummary"),
0607     ]
0608 
0609     for fn, _ in root_files:
0610         fp = tmp_path / fn
0611         assert not fp.exists()
0612 
0613     with detector_config.detector:
0614         runTruthTrackingGsf(
0615             trackingGeometry=detector_config.trackingGeometry,
0616             decorators=detector_config.decorators,
0617             field=field,
0618             digiConfigFile=detector_config.digiConfigFile,
0619             outputDir=tmp_path,
0620             s=seq,
0621         )
0622 
0623         # See https://github.com/acts-project/acts/issues/1300
0624         with failure_threshold(acts.logging.FATAL):
0625             seq.run()
0626 
0627     for fn, tn in root_files:
0628         fp = tmp_path / fn
0629         assert fp.exists()
0630         assert fp.stat().st_size > 1024
0631         if tn is not None:
0632             assert_root_hash(fn, fp)
0633 
0634 
0635 def test_refitting(tmp_path, detector_config, assert_root_hash):
0636     from truth_tracking_gsf_refitting import runRefittingGsf
0637 
0638     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0639 
0640     seq = Sequencer(
0641         events=10,
0642         numThreads=1,
0643     )
0644 
0645     with detector_config.detector:
0646         # Only check if it runs without errors right known
0647         # Changes in fitter behaviour should be caught by other tests
0648         runRefittingGsf(
0649             trackingGeometry=detector_config.trackingGeometry,
0650             field=field,
0651             digiConfigFile=detector_config.digiConfigFile,
0652             outputDir=tmp_path,
0653             s=seq,
0654         ).run()
0655 
0656     root_files = [
0657         ("trackstates_gsf_refit.root", "trackstates"),
0658         ("tracksummary_gsf_refit.root", "tracksummary"),
0659     ]
0660 
0661     for fn, tn in root_files:
0662         fp = tmp_path / fn
0663         assert fp.exists()
0664         assert fp.stat().st_size > 1024
0665         if tn is not None:
0666             assert_root_hash(fn, fp)
0667 
0668 
0669 def test_particle_gun(tmp_path, assert_root_hash):
0670     from particle_gun import runParticleGun
0671 
0672     s = Sequencer(events=20, numThreads=-1)
0673 
0674     csv_dir = tmp_path / "csv"
0675     root_file = tmp_path / "particles.root"
0676 
0677     assert not csv_dir.exists()
0678     assert not root_file.exists()
0679 
0680     runParticleGun(str(tmp_path), s=s).run()
0681 
0682     assert csv_dir.exists()
0683     assert root_file.exists()
0684 
0685     assert len([f for f in csv_dir.iterdir() if f.name.endswith("particles.csv")]) > 0
0686     assert all([f.stat().st_size > 100 for f in csv_dir.iterdir()])
0687 
0688     assert root_file.stat().st_size > 200
0689     assert_entries(root_file, "particles", 20)
0690     assert_root_hash(root_file.name, root_file)
0691 
0692 
0693 @pytest.mark.slow
0694 @pytest.mark.odd
0695 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0696 def test_material_mapping(material_recording, tmp_path, assert_root_hash):
0697     from material_mapping import runMaterialMapping
0698     from material_validation import runMaterialValidation
0699 
0700     map_file = tmp_path / "material-map_tracks.root"
0701     assert not map_file.exists()
0702 
0703     odd_dir = getOpenDataDetectorDirectory()
0704     config = acts.MaterialMapJsonConverter.Config()
0705     materialDecorator = acts.JsonMaterialDecorator(
0706         level=acts.logging.INFO,
0707         rConfig=config,
0708         jFileName=str(odd_dir / "config/odd-material-mapping-config.json"),
0709     )
0710 
0711     s = Sequencer(numThreads=1)
0712 
0713     with getOpenDataDetector(materialDecorator) as detector:
0714         trackingGeometry = detector.trackingGeometry()
0715         decorators = detector.contextDecorators()
0716 
0717         runMaterialMapping(
0718             trackingGeometry,
0719             decorators,
0720             outputDir=str(tmp_path),
0721             inputDir=material_recording,
0722             mappingStep=1,
0723             s=s,
0724         )
0725 
0726         s.run()
0727 
0728     mat_file = tmp_path / "material-map.json"
0729 
0730     assert mat_file.exists()
0731     assert mat_file.stat().st_size > 10
0732 
0733     with mat_file.open() as fh:
0734         assert json.load(fh)
0735 
0736     assert map_file.exists()
0737     assert_entries(map_file, "material-tracks", 200)
0738     assert_root_hash(map_file.name, map_file)
0739 
0740     val_file = tmp_path / "propagation-material.root"
0741     assert not val_file.exists()
0742 
0743     # test the validation as well
0744 
0745     field = acts.NullBField()
0746 
0747     s = Sequencer(events=10, numThreads=1)
0748 
0749     with getOpenDataDetector(
0750         materialDecorator=acts.IMaterialDecorator.fromFile(mat_file)
0751     ) as detector:
0752         trackingGeometry = detector.trackingGeometry()
0753         decorators = detector.contextDecorators()
0754 
0755         runMaterialValidation(
0756             10, 1000, trackingGeometry, decorators, field, outputDir=str(tmp_path), s=s
0757         )
0758 
0759         s.run()
0760 
0761     assert val_file.exists()
0762     assert_entries(val_file, "material-tracks", 10000)
0763     assert_root_hash(val_file.name, val_file)
0764 
0765 
0766 @pytest.mark.slow
0767 @pytest.mark.odd
0768 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0769 def test_volume_material_mapping(material_recording, tmp_path, assert_root_hash):
0770     from material_mapping import runMaterialMapping
0771     from material_validation import runMaterialValidation
0772 
0773     map_file = tmp_path / "material-map-volume_tracks.root"
0774     assert not map_file.exists()
0775 
0776     geo_map = Path(__file__).parent / "geometry-volume-map.json"
0777     assert geo_map.exists()
0778     assert geo_map.stat().st_size > 10
0779     with geo_map.open() as fh:
0780         assert json.load(fh)
0781 
0782     s = Sequencer(numThreads=1)
0783 
0784     with getOpenDataDetector(
0785         materialDecorator=acts.IMaterialDecorator.fromFile(geo_map)
0786     ) as detector:
0787         trackingGeometry = detector.trackingGeometry()
0788         decorators = detector.contextDecorators()
0789 
0790         runMaterialMapping(
0791             trackingGeometry,
0792             decorators,
0793             mapName="material-map-volume",
0794             outputDir=str(tmp_path),
0795             inputDir=material_recording,
0796             mappingStep=1,
0797             s=s,
0798         )
0799 
0800         s.run()
0801 
0802     mat_file = tmp_path / "material-map-volume.json"
0803 
0804     assert mat_file.exists()
0805     assert mat_file.stat().st_size > 10
0806 
0807     with mat_file.open() as fh:
0808         assert json.load(fh)
0809 
0810     assert map_file.exists()
0811     assert_entries(map_file, "material-tracks", 200)
0812     assert_root_hash(map_file.name, map_file)
0813 
0814     val_file = tmp_path / "propagation-volume-material.root"
0815     assert not val_file.exists()
0816 
0817     # test the validation as well
0818 
0819     field = acts.NullBField()
0820 
0821     s = Sequencer(events=10, numThreads=1)
0822 
0823     with getOpenDataDetector(
0824         materialDecorator=acts.IMaterialDecorator.fromFile(mat_file)
0825     ) as detector:
0826         trackingGeometry = detector.trackingGeometry()
0827         decorators = detector.contextDecorators()
0828 
0829         runMaterialValidation(
0830             10,
0831             1000,
0832             trackingGeometry,
0833             decorators,
0834             field,
0835             outputDir=str(tmp_path),
0836             outputName="propagation-volume-material",
0837             s=s,
0838         )
0839 
0840         s.run()
0841 
0842     assert val_file.exists()
0843 
0844     assert_root_hash(val_file.name, val_file)
0845 
0846 
0847 ACTS_DIR = Path(__file__).parent.parent.parent.parent
0848 CONFIG_DIR = ACTS_DIR / "Examples/Configs"
0849 DIGI_SHARE_DIR = (
0850     Path(__file__).parent.parent.parent.parent
0851     / "Examples/Algorithms/Digitization/share"
0852 )
0853 
0854 
0855 @pytest.mark.parametrize(
0856     "digi_config_file",
0857     [
0858         CONFIG_DIR / "generic-digi-smearing-config.json",
0859         CONFIG_DIR / "generic-digi-geometric-config.json",
0860     ],
0861     ids=["smeared", "geometric"],
0862 )
0863 def test_digitization_example(trk_geo, tmp_path, assert_root_hash, digi_config_file):
0864     from digitization import runDigitization
0865 
0866     s = Sequencer(events=10, numThreads=-1)
0867 
0868     csv_dir = tmp_path / "csv"
0869     root_file = tmp_path / "measurements.root"
0870 
0871     assert not root_file.exists()
0872     assert not csv_dir.exists()
0873 
0874     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0875     runDigitization(
0876         trk_geo, field, outputDir=tmp_path, digiConfigFile=digi_config_file, s=s
0877     )
0878 
0879     s.run()
0880 
0881     assert root_file.exists()
0882     assert csv_dir.exists()
0883 
0884     assert len(list(csv_dir.iterdir())) == 3 * s.config.events
0885     assert all(f.stat().st_size > 50 for f in csv_dir.iterdir())
0886 
0887     assert_root_hash(root_file.name, root_file)
0888 
0889 
0890 @pytest.mark.parametrize(
0891     "digi_config_file",
0892     [
0893         CONFIG_DIR / "generic-digi-smearing-config.json",
0894         CONFIG_DIR / "generic-digi-geometric-config.json",
0895         pytest.param(
0896             (ACTS_DIR / "Examples/Configs" / "odd-digi-smearing-config.json"),
0897             marks=[
0898                 pytest.mark.odd,
0899             ],
0900         ),
0901         pytest.param(
0902             (ACTS_DIR / "Examples/Configs" / "odd-digi-geometric-config.json"),
0903             marks=[
0904                 pytest.mark.odd,
0905             ],
0906         ),
0907     ],
0908     ids=["smeared", "geometric", "odd-smeared", "odd-geometric"],
0909 )
0910 def test_digitization_example_input_parsing(digi_config_file):
0911     from acts.examples import readDigiConfigFromJson
0912 
0913     readDigiConfigFromJson(str(digi_config_file))
0914 
0915 
0916 @pytest.mark.parametrize(
0917     "digi_config_file",
0918     [
0919         CONFIG_DIR / "generic-digi-smearing-config.json",
0920         CONFIG_DIR / "generic-digi-geometric-config.json",
0921     ],
0922     ids=["smeared", "geometric"],
0923 )
0924 def test_digitization_example_input(
0925     trk_geo, tmp_path, assert_root_hash, digi_config_file
0926 ):
0927     from particle_gun import runParticleGun
0928     from digitization import runDigitization
0929 
0930     ptcl_dir = tmp_path / "ptcl"
0931     ptcl_dir.mkdir()
0932     pgs = Sequencer(events=20, numThreads=-1)
0933     runParticleGun(str(ptcl_dir), s=pgs)
0934 
0935     pgs.run()
0936 
0937     s = Sequencer(numThreads=-1)
0938 
0939     csv_dir = tmp_path / "csv"
0940     root_file = tmp_path / "measurements.root"
0941 
0942     assert not root_file.exists()
0943     assert not csv_dir.exists()
0944 
0945     assert_root_hash(
0946         "particles.root",
0947         ptcl_dir / "particles.root",
0948     )
0949 
0950     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0951     runDigitization(
0952         trk_geo,
0953         field,
0954         outputDir=tmp_path,
0955         digiConfigFile=digi_config_file,
0956         particlesInput=ptcl_dir / "particles.root",
0957         s=s,
0958         doMerge=True,
0959     )
0960 
0961     s.run()
0962 
0963     assert root_file.exists()
0964     assert csv_dir.exists()
0965 
0966     assert len(list(csv_dir.iterdir())) == 3 * pgs.config.events
0967     assert all(f.stat().st_size > 50 for f in csv_dir.iterdir())
0968 
0969     assert_root_hash(root_file.name, root_file)
0970 
0971 
0972 def test_digitization_config_example(trk_geo, tmp_path):
0973     from digitization_config import runDigitizationConfig
0974 
0975     out_file = tmp_path / "output.json"
0976     assert not out_file.exists()
0977 
0978     input = (
0979         Path(__file__).parent
0980         / "../../../Examples/Configs/generic-digi-smearing-config.json"
0981     )
0982     assert input.exists(), input.resolve()
0983 
0984     runDigitizationConfig(trk_geo, input=input, output=out_file)
0985 
0986     assert out_file.exists()
0987 
0988     with out_file.open() as fh:
0989         data = json.load(fh)
0990     assert len(data.keys()) == 2
0991     assert data["acts-geometry-hierarchy-map"]["format-version"] == 0
0992     assert (
0993         data["acts-geometry-hierarchy-map"]["value-identifier"]
0994         == "digitization-configuration"
0995     )
0996     assert len(data["entries"]) == 27
0997 
0998 
0999 @pytest.mark.parametrize(
1000     "truthSmeared,truthEstimated",
1001     [
1002         [False, False],
1003         [False, True],
1004         [True, False],
1005     ],
1006     ids=["full_seeding", "truth_estimated", "truth_smeared"],
1007 )
1008 @pytest.mark.slow
1009 def test_ckf_tracks_example(
1010     tmp_path, assert_root_hash, truthSmeared, truthEstimated, detector_config
1011 ):
1012     csv = tmp_path / "csv"
1013 
1014     assert not csv.exists()
1015 
1016     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
1017     events = 100
1018     s = Sequencer(events=events, numThreads=-1)
1019 
1020     root_files = [
1021         (
1022             "performance_finding_ckf.root",
1023             None,
1024         ),
1025         (
1026             "trackstates_ckf.root",
1027             "trackstates",
1028         ),
1029         (
1030             "tracksummary_ckf.root",
1031             "tracksummary",
1032         ),
1033     ]
1034 
1035     if not truthSmeared:
1036         root_files += [
1037             (
1038                 "performance_seeding.root",
1039                 None,
1040             ),
1041         ]
1042 
1043     for rf, _ in root_files:
1044         assert not (tmp_path / rf).exists()
1045 
1046     from ckf_tracks import runCKFTracks
1047 
1048     with detector_config.detector:
1049         runCKFTracks(
1050             detector_config.trackingGeometry,
1051             detector_config.decorators,
1052             field=field,
1053             outputCsv=True,
1054             outputDir=tmp_path,
1055             geometrySelection=detector_config.geometrySelection,
1056             digiConfigFile=detector_config.digiConfigFile,
1057             truthSmearedSeeded=truthSmeared,
1058             truthEstimatedSeeded=truthEstimated,
1059             s=s,
1060         )
1061 
1062         s.run()
1063 
1064     assert csv.exists()
1065     for rf, tn in root_files:
1066         rp = tmp_path / rf
1067         assert rp.exists()
1068         if tn is not None:
1069             assert_root_hash(rf, rp)
1070 
1071     assert (
1072         len([f for f in csv.iterdir() if f.name.endswith("tracks_ckf.csv")]) == events
1073     )
1074     assert all([f.stat().st_size > 300 for f in csv.iterdir()])
1075 
1076 
1077 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
1078 @pytest.mark.odd
1079 @pytest.mark.slow
1080 def test_full_chain_odd_example(tmp_path):
1081     # This test literally only ensures that the full chain example can run without erroring out
1082 
1083     # just to make sure it can build the odd
1084     with getOpenDataDetector():
1085         pass
1086 
1087     script = (
1088         Path(__file__).parent.parent.parent.parent
1089         / "Examples"
1090         / "Scripts"
1091         / "Python"
1092         / "full_chain_odd.py"
1093     )
1094     assert script.exists()
1095     env = os.environ.copy()
1096     env["ACTS_LOG_FAILURE_THRESHOLD"] = "ERROR"
1097     try:
1098         subprocess.check_call(
1099             [sys.executable, str(script), "-n1"],
1100             cwd=tmp_path,
1101             env=env,
1102             stderr=subprocess.STDOUT,
1103         )
1104     except subprocess.CalledProcessError as e:
1105         print(e.output.decode("utf-8"))
1106         raise
1107 
1108 
1109 @pytest.mark.skipif(
1110     not dd4hepEnabled or not geant4Enabled, reason="DD4hep and/or Geant4 not set up"
1111 )
1112 @pytest.mark.slow
1113 def test_full_chain_odd_example_pythia_geant4(tmp_path):
1114     # This test literally only ensures that the full chain example can run without erroring out
1115 
1116     # just to make sure it can build the odd
1117     with getOpenDataDetector():
1118         pass
1119 
1120     script = (
1121         Path(__file__).parent.parent.parent.parent
1122         / "Examples"
1123         / "Scripts"
1124         / "Python"
1125         / "full_chain_odd.py"
1126     )
1127     assert script.exists()
1128     env = os.environ.copy()
1129     env["ACTS_LOG_FAILURE_THRESHOLD"] = "ERROR"
1130     try:
1131         stdout = subprocess.check_output(
1132             [
1133                 sys.executable,
1134                 str(script),
1135                 "-n1",
1136                 "--geant4",
1137                 "--ttbar",
1138                 "--ttbar-pu",
1139                 "50",
1140             ],
1141             cwd=tmp_path,
1142             env=env,
1143             stderr=subprocess.STDOUT,
1144         )
1145         stdout = stdout.decode("utf-8")
1146     except subprocess.CalledProcessError as e:
1147         print(e.output.decode("utf-8"))
1148         raise
1149 
1150     # collect and compare known errors
1151     errors = []
1152     error_regex = re.compile(r"^\d\d:\d\d:\d\d\s+(\w+)\s+ERROR\s+", re.MULTILINE)
1153     for match in error_regex.finditer(stdout):
1154         (algo,) = match.groups()
1155         errors.append(algo)
1156     errors = collections.Counter(errors)
1157     assert dict(errors) == {}, stdout
1158 
1159 
1160 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
1161 @pytest.mark.skipif(not onnxEnabled, reason="ONNX plugin not enabled")
1162 @pytest.mark.slow
1163 def test_ML_Ambiguity_Solver(tmp_path, assert_root_hash):
1164     # This test literally only ensures that the full chain example can run without erroring out
1165 
1166     root_file = "performance_finding_ambiML.root"
1167     output_dir = "odd_output"
1168     assert not (tmp_path / root_file).exists()
1169 
1170     # just to make sure it can build the odd
1171     with getOpenDataDetector():
1172         pass
1173 
1174     script = (
1175         Path(__file__).parent.parent.parent.parent
1176         / "Examples"
1177         / "Scripts"
1178         / "Python"
1179         / "full_chain_odd.py"
1180     )
1181     assert script.exists()
1182     env = os.environ.copy()
1183     env["ACTS_LOG_FAILURE_THRESHOLD"] = "ERROR"
1184     try:
1185         subprocess.check_call(
1186             [sys.executable, str(script), "-n1", "--ambi-solver", "ML"],
1187             cwd=tmp_path,
1188             env=env,
1189             stderr=subprocess.STDOUT,
1190         )
1191     except subprocess.CalledProcessError as e:
1192         print(e.output.decode("utf-8"))
1193         raise
1194 
1195     rfp = tmp_path / output_dir / root_file
1196     assert rfp.exists()
1197 
1198     assert_root_hash(root_file, rfp)
1199 
1200 
1201 def test_bfield_writing(tmp_path, seq, assert_root_hash):
1202     from bfield_writing import runBFieldWriting
1203 
1204     root_files = [
1205         ("solenoid.root", "solenoid", 100),
1206         ("solenoid2.root", "solenoid", 100),
1207     ]
1208 
1209     for fn, _, _ in root_files:
1210         fp = tmp_path / fn
1211         assert not fp.exists()
1212 
1213     runBFieldWriting(outputDir=tmp_path, rewrites=1)
1214 
1215     for fn, tn, ee in root_files:
1216         fp = tmp_path / fn
1217         assert fp.exists()
1218         assert fp.stat().st_size > 2**10 * 2
1219         assert_entries(fp, tn, ee)
1220         assert_root_hash(fn, fp)
1221 
1222 
1223 @pytest.mark.parametrize("backend", ["onnx", "torch"])
1224 @pytest.mark.parametrize("hardware", ["cpu", "gpu"])
1225 @pytest.mark.skipif(not exatrkxEnabled, reason="ExaTrkX environment not set up")
1226 def test_exatrkx(tmp_path, trk_geo, field, assert_root_hash, backend, hardware):
1227     if backend == "onnx" and hardware == "cpu":
1228         pytest.skip("Combination of ONNX and CPU not yet supported")
1229 
1230     if backend == "torch":
1231         pytest.skip(
1232             "Disabled torch support until replacement for torch-scatter is found"
1233         )
1234 
1235     root_file = "performance_track_finding.root"
1236     assert not (tmp_path / root_file).exists()
1237 
1238     # Extract both models, since we currently don't have a working implementation
1239     # of metric learning with ONNX and we need to use torch here
1240     onnx_url = "https://acts.web.cern.ch/ci/exatrkx/onnx_models_v01.tar"
1241     torch_url = "https://acts.web.cern.ch/ci/exatrkx/torchscript_models_v01.tar"
1242 
1243     for url in [onnx_url, torch_url]:
1244         tarfile_name = tmp_path / "models.tar"
1245         urllib.request.urlretrieve(url, tarfile_name)
1246         tarfile.open(tarfile_name).extractall(tmp_path)
1247 
1248     shutil.copyfile(
1249         tmp_path / "torchscript_models/embed.pt", tmp_path / "onnx_models/embed.pt"
1250     )
1251 
1252     script = (
1253         Path(__file__).parent.parent.parent.parent
1254         / "Examples"
1255         / "Scripts"
1256         / "Python"
1257         / "exatrkx.py"
1258     )
1259     assert script.exists()
1260     env = os.environ.copy()
1261     env["ACTS_LOG_FAILURE_THRESHOLD"] = "WARNING"
1262 
1263     if hardware == "cpu":
1264         env["CUDA_VISIBLE_DEVICES"] = ""
1265 
1266     try:
1267         subprocess.check_call(
1268             [sys.executable, str(script), backend],
1269             cwd=tmp_path,
1270             env=env,
1271             stderr=subprocess.STDOUT,
1272         )
1273     except subprocess.CalledProcessError as e:
1274         print(e.output.decode("utf-8"))
1275         raise
1276 
1277     rfp = tmp_path / root_file
1278     assert rfp.exists()
1279 
1280     assert_root_hash(root_file, rfp)
1281 
1282 
1283 @pytest.mark.odd
1284 def test_strip_spacepoints(detector_config, field, tmp_path, assert_root_hash):
1285     if detector_config.name == "generic":
1286         pytest.skip("No strip spacepoint formation for the generic detector currently")
1287 
1288     from strip_spacepoints import createStripSpacepoints
1289 
1290     s = Sequencer(events=20, numThreads=-1)
1291 
1292     config_path = Path(__file__).parent.parent.parent.parent / "Examples" / "Configs"
1293 
1294     geo_selection = config_path / "odd-strip-spacepoint-selection.json"
1295     digi_config_file = config_path / "odd-digi-smearing-config.json"
1296 
1297     with detector_config.detector:
1298         createStripSpacepoints(
1299             trackingGeometry=detector_config.trackingGeometry,
1300             field=field,
1301             digiConfigFile=digi_config_file,
1302             geoSelection=geo_selection,
1303             outputDir=tmp_path,
1304             s=s,
1305         ).run()
1306 
1307     root_file = "strip_spacepoints.root"
1308     rfp = tmp_path / root_file
1309 
1310     assert_root_hash(root_file, rfp)