File indexing completed on 2025-09-13 08:13:54
0001 from pathlib import Path
0002 import os
0003 import json
0004 import functools
0005 import tarfile
0006 import urllib.request
0007 import subprocess
0008 import sys
0009 import re
0010 import collections
0011 import shutil
0012
0013 import pytest
0014
0015 from helpers import (
0016 geant4Enabled,
0017 geomodelEnabled,
0018 dd4hepEnabled,
0019 hepmc3Enabled,
0020 pythia8Enabled,
0021 gnnEnabled,
0022 onnxEnabled,
0023 hashingSeedingEnabled,
0024 AssertCollectionExistsAlg,
0025 failure_threshold,
0026 )
0027
0028 import acts
0029 from acts.examples import (
0030 Sequencer,
0031 GenericDetector,
0032 )
0033 from acts.examples.odd import getOpenDataDetector, getOpenDataDetectorDirectory
0034
0035
0036 u = acts.UnitConstants
0037
0038
0039 @pytest.fixture
0040 def field():
0041 return acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0042
0043
0044 @pytest.fixture
0045 def seq():
0046 return Sequencer(events=10, numThreads=1)
0047
0048
0049 def assert_csv_output(csv_path, stem):
0050 __tracebackhide__ = True
0051
0052 assert len([f for f in csv_path.iterdir() if f.name.endswith(stem + ".csv")]) > 0
0053 assert all(
0054 [
0055 f.stat().st_size > 100
0056 for f in csv_path.iterdir()
0057 if f.name.endswith(stem + ".csv")
0058 ]
0059 )
0060
0061
0062 def assert_entries(root_file, tree_name, exp=None, non_zero=False):
0063 __tracebackhide__ = True
0064 import ROOT
0065
0066 ROOT.PyConfig.IgnoreCommandLineOptions = True
0067 ROOT.gROOT.SetBatch(True)
0068
0069 rf = ROOT.TFile.Open(str(root_file))
0070 keys = [k.GetName() for k in rf.GetListOfKeys()]
0071 assert tree_name in keys
0072 print("Entries:", rf.Get(tree_name).GetEntries())
0073 if non_zero:
0074 assert rf.Get(tree_name).GetEntries() > 0, f"{root_file}:{tree_name}"
0075 if exp is not None:
0076 assert rf.Get(tree_name).GetEntries() == exp, f"{root_file}:{tree_name}"
0077
0078
0079 def assert_has_entries(root_file, tree_name):
0080 __tracebackhide__ = True
0081 assert_entries(root_file, tree_name, non_zero=True)
0082
0083
0084 @pytest.mark.slow
0085 @pytest.mark.skipif(not pythia8Enabled, reason="Pythia8 not set up")
0086 def test_pythia8(tmp_path, seq, assert_root_hash):
0087 from pythia8 import runPythia8
0088
0089 (tmp_path / "csv").mkdir()
0090
0091 assert not (tmp_path / "particles.root").exists()
0092 assert len(list((tmp_path / "csv").iterdir())) == 0
0093
0094 events = seq.config.events
0095
0096 vtxGen = acts.examples.GaussianVertexGenerator(
0097 stddev=acts.Vector4(50 * u.um, 50 * u.um, 150 * u.mm, 0),
0098 mean=acts.Vector4(0, 0, 0, 0),
0099 )
0100
0101 runPythia8(
0102 str(tmp_path), outputRoot=True, outputCsv=True, vtxGen=vtxGen, s=seq
0103 ).run()
0104
0105 fp = tmp_path / "particles.root"
0106 assert fp.exists()
0107 assert fp.stat().st_size > 2**10 * 50
0108 assert_entries(fp, "particles", events)
0109 assert_root_hash(fp.name, fp)
0110
0111 assert len(list((tmp_path / "csv").iterdir())) > 0
0112 assert_csv_output(tmp_path / "csv", "particles")
0113
0114
0115 def test_fatras(trk_geo, tmp_path, field, assert_root_hash):
0116 from fatras import runFatras
0117
0118 csv = tmp_path / "csv"
0119 csv.mkdir()
0120
0121 nevents = 10
0122
0123 root_files = [
0124 (
0125 "particles_simulation.root",
0126 "particles",
0127 ),
0128 (
0129 "hits.root",
0130 "hits",
0131 ),
0132 ]
0133
0134 assert len(list(csv.iterdir())) == 0
0135 for rf, _ in root_files:
0136 assert not (tmp_path / rf).exists()
0137
0138 seq = Sequencer(events=nevents)
0139 runFatras(trk_geo, field, str(tmp_path), s=seq).run()
0140
0141 assert_csv_output(csv, "particles_simulated")
0142 assert_csv_output(csv, "hits")
0143 for f, tn in root_files:
0144 rfp = tmp_path / f
0145 assert rfp.exists()
0146 assert rfp.stat().st_size > 2**10 * 10
0147
0148 assert_has_entries(rfp, tn)
0149 assert_root_hash(f, rfp)
0150
0151
0152 @pytest.mark.slow
0153 @pytest.mark.odd
0154 @pytest.mark.skipif(not geant4Enabled, reason="Geant4 not set up")
0155 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0156 def test_geant4(tmp_path, assert_root_hash):
0157
0158
0159
0160 with getOpenDataDetector():
0161 pass
0162
0163 csv = tmp_path / "csv"
0164 csv.mkdir()
0165
0166 root_files = [
0167 "particles_simulation.root",
0168 "hits.root",
0169 ]
0170
0171 assert len(list(csv.iterdir())) == 0
0172 for rf in root_files:
0173 assert not (tmp_path / rf).exists()
0174
0175 script = (
0176 Path(__file__).parent.parent.parent.parent
0177 / "Examples"
0178 / "Scripts"
0179 / "Python"
0180 / "geant4.py"
0181 )
0182 assert script.exists()
0183 env = os.environ.copy()
0184 env["ACTS_LOG_FAILURE_THRESHOLD"] = "WARNING"
0185 try:
0186 subprocess.check_call(
0187 [sys.executable, str(script)],
0188 cwd=tmp_path,
0189 env=env,
0190 stderr=subprocess.STDOUT,
0191 )
0192 except subprocess.CalledProcessError as e:
0193 if e.output is not None:
0194 print(e.output.decode("utf-8"))
0195 if e.stderr is not None:
0196 print(e.stderr.decode("utf-8"))
0197 raise
0198
0199 assert_csv_output(csv, "particles_simulated")
0200 assert_csv_output(csv, "hits")
0201 for f in root_files:
0202 rfp = tmp_path / f
0203 assert rfp.exists()
0204 assert rfp.stat().st_size > 2**10 * 10
0205
0206 assert_root_hash(f, rfp)
0207
0208
0209 def test_seeding(tmp_path, trk_geo, field, assert_root_hash):
0210 from seeding import runSeeding
0211
0212 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0213
0214 csv = tmp_path / "csv"
0215 csv.mkdir()
0216
0217 seq = Sequencer(events=10, numThreads=1)
0218
0219 root_files = [
0220 (
0221 "estimatedparams.root",
0222 "estimatedparams",
0223 ),
0224 (
0225 "performance_seeding.root",
0226 None,
0227 ),
0228 (
0229 "particles.root",
0230 "particles",
0231 ),
0232 (
0233 "particles_simulation.root",
0234 "particles",
0235 ),
0236 ]
0237
0238 for fn, _ in root_files:
0239 fp = tmp_path / fn
0240 assert not fp.exists()
0241
0242 assert len(list(csv.iterdir())) == 0
0243
0244 runSeeding(trk_geo, field, outputDir=str(tmp_path), s=seq).run()
0245
0246 for fn, tn in root_files:
0247 fp = tmp_path / fn
0248 assert fp.exists()
0249 assert fp.stat().st_size > 100
0250
0251 if tn is not None:
0252 assert_has_entries(fp, tn)
0253 assert_root_hash(fn, fp)
0254
0255 assert_csv_output(csv, "particles")
0256 assert_csv_output(csv, "particles_simulated")
0257
0258
0259 @pytest.mark.slow
0260 @pytest.mark.skipif(not hashingSeedingEnabled, reason="HashingSeeding not set up")
0261 def test_hashing_seeding(tmp_path, trk_geo, field, assert_root_hash):
0262 from hashing_seeding import runHashingSeeding, Config
0263
0264 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0265
0266 seq = Sequencer(events=10, numThreads=1)
0267
0268 root_files = [
0269 (
0270 "estimatedparams.root",
0271 "estimatedparams",
0272 ),
0273 (
0274 "performance_seeding.root",
0275 None,
0276 ),
0277 ]
0278
0279 for fn, _ in root_files:
0280 fp = tmp_path / fn
0281 assert not fp.exists(), f"{fp} exists"
0282
0283 config = Config(
0284 mu=50,
0285 )
0286
0287 _, _, digiConfig, geoSelectionConfigFile = config.getDetectorInfo()
0288
0289 runHashingSeeding(
0290 10,
0291 trk_geo,
0292 field,
0293 outputDir=str(tmp_path),
0294 saveFiles=True,
0295 npileup=config.mu,
0296 seedingAlgorithm=config.seedingAlgorithm,
0297 maxSeedsPerSpM=config.maxSeedsPerSpM,
0298 digiConfig=digiConfig,
0299 geoSelectionConfigFile=geoSelectionConfigFile,
0300 config=config,
0301 s=seq,
0302 ).run()
0303
0304 del seq
0305
0306 for fn, tn in root_files:
0307 fp = tmp_path / fn
0308 assert fp.exists(), f"{fp} does not exist"
0309 assert fp.stat().st_size > 100, f"{fp} is too small: {fp.stat().st_size} bytes"
0310
0311 if tn is not None:
0312 assert_has_entries(fp, tn)
0313 assert_root_hash(fn, fp)
0314
0315 assert_csv_output(tmp_path, "particles_simulated")
0316 assert_csv_output(tmp_path, "buckets")
0317 assert_csv_output(tmp_path, "seed")
0318
0319
0320 def test_seeding_orthogonal(tmp_path, trk_geo, field, assert_root_hash):
0321 from seeding import runSeeding, SeedingAlgorithm
0322
0323 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0324
0325 csv = tmp_path / "csv"
0326 csv.mkdir()
0327
0328 seq = Sequencer(events=10, numThreads=1)
0329
0330 root_files = [
0331 (
0332 "estimatedparams.root",
0333 "estimatedparams",
0334 ),
0335 (
0336 "performance_seeding.root",
0337 None,
0338 ),
0339 (
0340 "particles.root",
0341 "particles",
0342 ),
0343 (
0344 "particles_simulation.root",
0345 "particles",
0346 ),
0347 ]
0348
0349 for fn, _ in root_files:
0350 fp = tmp_path / fn
0351 assert not fp.exists()
0352
0353 assert len(list(csv.iterdir())) == 0
0354
0355 runSeeding(
0356 trk_geo,
0357 field,
0358 outputDir=str(tmp_path),
0359 s=seq,
0360 seedingAlgorithm=SeedingAlgorithm.Orthogonal,
0361 ).run()
0362
0363 for fn, tn in root_files:
0364 fp = tmp_path / fn
0365 assert fp.exists()
0366 assert fp.stat().st_size > 100
0367
0368 if tn is not None:
0369 assert_has_entries(fp, tn)
0370 assert_root_hash(fn, fp)
0371
0372 assert_csv_output(csv, "particles")
0373 assert_csv_output(csv, "particles_simulated")
0374
0375
0376 def test_itk_seeding(tmp_path, trk_geo, field, assert_root_hash):
0377 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0378
0379 csv = tmp_path / "csv"
0380 csv.mkdir()
0381
0382 seq = Sequencer(events=10, numThreads=1)
0383
0384 root_files = [
0385 (
0386 "estimatedparams.root",
0387 "estimatedparams",
0388 ),
0389 (
0390 "performance_seeding.root",
0391 None,
0392 ),
0393 (
0394 "particles.root",
0395 "particles",
0396 ),
0397 (
0398 "particles_simulation.root",
0399 "particles",
0400 ),
0401 ]
0402
0403 for fn, _ in root_files:
0404 fp = tmp_path / fn
0405 assert not fp.exists()
0406
0407 assert len(list(csv.iterdir())) == 0
0408
0409 rnd = acts.examples.RandomNumbers(seed=42)
0410
0411 from acts.examples.simulation import (
0412 addParticleGun,
0413 EtaConfig,
0414 MomentumConfig,
0415 ParticleConfig,
0416 addFatras,
0417 addDigitization,
0418 ParticleSelectorConfig,
0419 addDigiParticleSelection,
0420 )
0421
0422 addParticleGun(
0423 seq,
0424 MomentumConfig(1.0 * u.GeV, 10.0 * u.GeV, True),
0425 EtaConfig(-4.0, 4.0, True),
0426 ParticleConfig(1, acts.PdgParticle.eMuon, True),
0427 outputDirCsv=tmp_path / "csv",
0428 outputDirRoot=str(tmp_path),
0429 rnd=rnd,
0430 )
0431
0432 addFatras(
0433 seq,
0434 trk_geo,
0435 field,
0436 outputDirCsv=tmp_path / "csv",
0437 outputDirRoot=str(tmp_path),
0438 rnd=rnd,
0439 )
0440
0441 srcdir = Path(__file__).resolve().parent.parent.parent.parent
0442 addDigitization(
0443 seq,
0444 trk_geo,
0445 field,
0446 digiConfigFile=srcdir / "Examples/Configs/generic-digi-smearing-config.json",
0447 rnd=rnd,
0448 )
0449
0450 addDigiParticleSelection(
0451 seq,
0452 ParticleSelectorConfig(
0453 pt=(0.9 * u.GeV, None),
0454 eta=(-4, 4),
0455 measurements=(9, None),
0456 removeNeutral=True,
0457 ),
0458 )
0459
0460 from acts.examples.reconstruction import (
0461 addSeeding,
0462 )
0463 from acts.examples.itk import itkSeedingAlgConfig, InputSpacePointsType
0464
0465 addSeeding(
0466 seq,
0467 trk_geo,
0468 field,
0469 *itkSeedingAlgConfig(InputSpacePointsType.PixelSpacePoints),
0470 acts.logging.VERBOSE,
0471 geoSelectionConfigFile=srcdir / "Examples/Configs/generic-seeding-config.json",
0472 outputDirRoot=str(tmp_path),
0473 )
0474
0475 seq.run()
0476
0477 for fn, tn in root_files:
0478 fp = tmp_path / fn
0479 assert fp.exists()
0480 assert fp.stat().st_size > 100
0481
0482 if tn is not None:
0483 assert_has_entries(fp, tn)
0484 assert_root_hash(fn, fp)
0485
0486 assert_csv_output(csv, "particles")
0487 assert_csv_output(csv, "particles_simulated")
0488
0489
0490 @pytest.mark.slow
0491 def test_propagation(tmp_path, trk_geo, field, seq, assert_root_hash):
0492 from propagation import runPropagation
0493
0494 root_files = [
0495 (
0496 "propagation_summary.root",
0497 "propagation_summary",
0498 10000,
0499 )
0500 ]
0501
0502 for fn, _, _ in root_files:
0503 fp = tmp_path / fn
0504 assert not fp.exists()
0505
0506 runPropagation(trk_geo, field, str(tmp_path), s=seq).run()
0507
0508 for fn, tn, ee in root_files:
0509 fp = tmp_path / fn
0510 assert fp.exists()
0511 assert fp.stat().st_size > 2**10 * 50
0512 assert_entries(fp, tn, ee)
0513 assert_root_hash(fn, fp)
0514
0515
0516 @pytest.mark.slow
0517 @pytest.mark.odd
0518 @pytest.mark.skipif(not geant4Enabled, reason="Geant4 not set up")
0519 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0520 def test_material_recording(tmp_path, material_recording, assert_root_hash):
0521 root_files = [
0522 (
0523 "geant4_material_tracks.root",
0524 "material-tracks",
0525 200,
0526 )
0527 ]
0528
0529 for fn, tn, ee in root_files:
0530 fp = material_recording / fn
0531 assert fp.exists()
0532 assert fp.stat().st_size > 2**10 * 50
0533 assert_entries(fp, tn, ee)
0534 assert_root_hash(fn, fp)
0535
0536
0537 @pytest.mark.parametrize("revFiltMomThresh", [0 * u.GeV, 1 * u.TeV])
0538 def test_truth_tracking_kalman(
0539 tmp_path, assert_root_hash, revFiltMomThresh, detector_config
0540 ):
0541 root_files = [
0542 ("trackstates_kf.root", "trackstates", 19),
0543 ("tracksummary_kf.root", "tracksummary", 10),
0544 ("performance_kf.root", None, -1),
0545 ]
0546
0547 for fn, _, _ in root_files:
0548 fp = tmp_path / fn
0549 assert not fp.exists()
0550
0551 with detector_config.detector:
0552 from truth_tracking_kalman import runTruthTrackingKalman
0553
0554 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0555
0556 seq = Sequencer(events=10, numThreads=1)
0557
0558 runTruthTrackingKalman(
0559 trackingGeometry=detector_config.trackingGeometry,
0560 field=field,
0561 digiConfigFile=detector_config.digiConfigFile,
0562 outputDir=tmp_path,
0563 reverseFilteringMomThreshold=revFiltMomThresh,
0564 s=seq,
0565 )
0566
0567 seq.run()
0568
0569 for fn, tn, ee in root_files:
0570 fp = tmp_path / fn
0571 assert fp.exists()
0572 assert fp.stat().st_size > 1024
0573 if tn is not None:
0574 assert_has_entries(fp, tn)
0575 assert_root_hash(fn, fp)
0576
0577 import ROOT
0578
0579 ROOT.PyConfig.IgnoreCommandLineOptions = True
0580 ROOT.gROOT.SetBatch(True)
0581 rf = ROOT.TFile.Open(str(tmp_path / "tracksummary_kf.root"))
0582 keys = [k.GetName() for k in rf.GetListOfKeys()]
0583 assert "tracksummary" in keys
0584 for entry in rf.Get("tracksummary"):
0585 assert entry.hasFittedParams
0586
0587
0588 def test_truth_tracking_gsf(tmp_path, assert_root_hash, detector_config):
0589 from truth_tracking_gsf import runTruthTrackingGsf
0590
0591 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0592
0593 seq = Sequencer(
0594 events=10,
0595 numThreads=1,
0596 fpeMasks=[
0597 (
0598 "Core/include/Acts/TrackFitting/detail/GsfUtils.hpp:197",
0599 acts.FpeType.FLTUND,
0600 1,
0601 ),
0602 ],
0603 )
0604
0605 root_files = [
0606 ("trackstates_gsf.root", "trackstates"),
0607 ("tracksummary_gsf.root", "tracksummary"),
0608 ]
0609
0610 for fn, _ in root_files:
0611 fp = tmp_path / fn
0612 assert not fp.exists()
0613
0614 with detector_config.detector:
0615 runTruthTrackingGsf(
0616 trackingGeometry=detector_config.trackingGeometry,
0617 decorators=detector_config.decorators,
0618 field=field,
0619 digiConfigFile=detector_config.digiConfigFile,
0620 outputDir=tmp_path,
0621 s=seq,
0622 )
0623
0624
0625 with failure_threshold(acts.logging.FATAL):
0626 seq.run()
0627
0628 for fn, tn in root_files:
0629 fp = tmp_path / fn
0630 assert fp.exists()
0631 assert fp.stat().st_size > 1024
0632 if tn is not None:
0633 assert_root_hash(fn, fp)
0634
0635
0636 def test_refitting(tmp_path, detector_config, assert_root_hash):
0637 from truth_tracking_gsf_refitting import runRefittingGsf
0638
0639 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0640
0641 seq = Sequencer(
0642 events=10,
0643 numThreads=1,
0644 )
0645
0646 with detector_config.detector:
0647
0648
0649 runRefittingGsf(
0650 trackingGeometry=detector_config.trackingGeometry,
0651 field=field,
0652 digiConfigFile=detector_config.digiConfigFile,
0653 outputDir=tmp_path,
0654 s=seq,
0655 ).run()
0656
0657 root_files = [
0658 ("trackstates_gsf_refit.root", "trackstates"),
0659 ("tracksummary_gsf_refit.root", "tracksummary"),
0660 ]
0661
0662 for fn, tn in root_files:
0663 fp = tmp_path / fn
0664 assert fp.exists()
0665 assert fp.stat().st_size > 1024
0666 if tn is not None:
0667 assert_root_hash(fn, fp)
0668
0669
0670 def test_particle_gun(tmp_path, assert_root_hash):
0671 from particle_gun import runParticleGun
0672
0673 s = Sequencer(events=20, numThreads=-1)
0674
0675 csv_dir = tmp_path / "csv"
0676 root_file = tmp_path / "particles.root"
0677
0678 assert not csv_dir.exists()
0679 assert not root_file.exists()
0680
0681 runParticleGun(str(tmp_path), s=s).run()
0682
0683 assert csv_dir.exists()
0684 assert root_file.exists()
0685
0686 assert len([f for f in csv_dir.iterdir() if f.name.endswith("particles.csv")]) > 0
0687 assert all([f.stat().st_size > 100 for f in csv_dir.iterdir()])
0688
0689 assert root_file.stat().st_size > 200
0690 assert_entries(root_file, "particles", 20)
0691 assert_root_hash(root_file.name, root_file)
0692
0693
0694 @pytest.mark.slow
0695 @pytest.mark.odd
0696 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0697 def test_material_mapping(material_recording, tmp_path, assert_root_hash):
0698 from material_mapping import runMaterialMapping
0699 from material_validation import runMaterialValidation
0700
0701 map_file = tmp_path / "material-map_tracks.root"
0702 assert not map_file.exists()
0703
0704 odd_dir = getOpenDataDetectorDirectory()
0705 config = acts.MaterialMapJsonConverter.Config()
0706 materialDecorator = acts.JsonMaterialDecorator(
0707 level=acts.logging.INFO,
0708 rConfig=config,
0709 jFileName=str(odd_dir / "config/odd-material-mapping-config.json"),
0710 )
0711
0712 s = Sequencer(numThreads=1)
0713
0714 with getOpenDataDetector(materialDecorator) as detector:
0715 trackingGeometry = detector.trackingGeometry()
0716 decorators = detector.contextDecorators()
0717
0718 runMaterialMapping(
0719 trackingGeometry,
0720 decorators,
0721 outputDir=str(tmp_path),
0722 inputDir=material_recording,
0723 mappingStep=1,
0724 s=s,
0725 )
0726
0727 s.run()
0728
0729 mat_file = tmp_path / "material-map.json"
0730
0731 assert mat_file.exists()
0732 assert mat_file.stat().st_size > 10
0733
0734 with mat_file.open() as fh:
0735 assert json.load(fh)
0736
0737 assert map_file.exists()
0738 assert_entries(map_file, "material-tracks", 200)
0739 assert_root_hash(map_file.name, map_file)
0740
0741 val_file = tmp_path / "propagation-material.root"
0742 assert not val_file.exists()
0743
0744
0745
0746 field = acts.NullBField()
0747
0748 s = Sequencer(events=10, numThreads=1)
0749
0750 with getOpenDataDetector(
0751 materialDecorator=acts.IMaterialDecorator.fromFile(mat_file)
0752 ) as detector:
0753 trackingGeometry = detector.trackingGeometry()
0754 decorators = detector.contextDecorators()
0755
0756 runMaterialValidation(
0757 10, 1000, trackingGeometry, decorators, field, outputDir=str(tmp_path), s=s
0758 )
0759
0760 s.run()
0761
0762 assert val_file.exists()
0763 assert_entries(val_file, "material-tracks", 10000)
0764 assert_root_hash(val_file.name, val_file)
0765
0766
0767 @pytest.mark.slow
0768 @pytest.mark.odd
0769 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0770 def test_volume_material_mapping(material_recording, tmp_path, assert_root_hash):
0771 from material_mapping import runMaterialMapping
0772 from material_validation import runMaterialValidation
0773
0774 map_file = tmp_path / "material-map-volume_tracks.root"
0775 assert not map_file.exists()
0776
0777 geo_map = Path(__file__).parent / "geometry-volume-map.json"
0778 assert geo_map.exists()
0779 assert geo_map.stat().st_size > 10
0780 with geo_map.open() as fh:
0781 assert json.load(fh)
0782
0783 s = Sequencer(numThreads=1)
0784
0785 with getOpenDataDetector(
0786 materialDecorator=acts.IMaterialDecorator.fromFile(geo_map)
0787 ) as detector:
0788 trackingGeometry = detector.trackingGeometry()
0789 decorators = detector.contextDecorators()
0790
0791 runMaterialMapping(
0792 trackingGeometry,
0793 decorators,
0794 mapName="material-map-volume",
0795 outputDir=str(tmp_path),
0796 inputDir=material_recording,
0797 mappingStep=1,
0798 s=s,
0799 )
0800
0801 s.run()
0802
0803 mat_file = tmp_path / "material-map-volume.json"
0804
0805 assert mat_file.exists()
0806 assert mat_file.stat().st_size > 10
0807
0808 with mat_file.open() as fh:
0809 assert json.load(fh)
0810
0811 assert map_file.exists()
0812 assert_entries(map_file, "material-tracks", 200)
0813 assert_root_hash(map_file.name, map_file)
0814
0815 val_file = tmp_path / "propagation-volume-material.root"
0816 assert not val_file.exists()
0817
0818
0819
0820 field = acts.NullBField()
0821
0822 s = Sequencer(events=10, numThreads=1)
0823
0824 with getOpenDataDetector(
0825 materialDecorator=acts.IMaterialDecorator.fromFile(mat_file)
0826 ) as detector:
0827 trackingGeometry = detector.trackingGeometry()
0828 decorators = detector.contextDecorators()
0829
0830 runMaterialValidation(
0831 10,
0832 1000,
0833 trackingGeometry,
0834 decorators,
0835 field,
0836 outputDir=str(tmp_path),
0837 outputName="propagation-volume-material",
0838 s=s,
0839 )
0840
0841 s.run()
0842
0843 assert val_file.exists()
0844
0845 assert_root_hash(val_file.name, val_file)
0846
0847
0848 ACTS_DIR = Path(__file__).parent.parent.parent.parent
0849 CONFIG_DIR = ACTS_DIR / "Examples/Configs"
0850 DIGI_SHARE_DIR = (
0851 Path(__file__).parent.parent.parent.parent
0852 / "Examples/Algorithms/Digitization/share"
0853 )
0854
0855
0856 @pytest.mark.parametrize(
0857 "digi_config_file",
0858 [
0859 CONFIG_DIR / "generic-digi-smearing-config.json",
0860 CONFIG_DIR / "generic-digi-geometric-config.json",
0861 ],
0862 ids=["smeared", "geometric"],
0863 )
0864 def test_digitization_example(trk_geo, tmp_path, assert_root_hash, digi_config_file):
0865 from digitization import runDigitization
0866
0867 s = Sequencer(events=10, numThreads=-1)
0868
0869 csv_dir = tmp_path / "csv"
0870 root_file = tmp_path / "measurements.root"
0871
0872 assert not root_file.exists()
0873 assert not csv_dir.exists()
0874
0875 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0876 runDigitization(
0877 trk_geo, field, outputDir=tmp_path, digiConfigFile=digi_config_file, s=s
0878 )
0879
0880 s.run()
0881
0882 assert root_file.exists()
0883 assert csv_dir.exists()
0884
0885 assert len(list(csv_dir.iterdir())) == 3 * s.config.events
0886 assert all(f.stat().st_size > 50 for f in csv_dir.iterdir())
0887
0888 assert_root_hash(root_file.name, root_file)
0889
0890
0891 @pytest.mark.parametrize(
0892 "digi_config_file",
0893 [
0894 CONFIG_DIR / "generic-digi-smearing-config.json",
0895 CONFIG_DIR / "generic-digi-geometric-config.json",
0896 pytest.param(
0897 (ACTS_DIR / "Examples/Configs" / "odd-digi-smearing-config.json"),
0898 marks=[
0899 pytest.mark.odd,
0900 ],
0901 ),
0902 pytest.param(
0903 (ACTS_DIR / "Examples/Configs" / "odd-digi-geometric-config.json"),
0904 marks=[
0905 pytest.mark.odd,
0906 ],
0907 ),
0908 ],
0909 ids=["smeared", "geometric", "odd-smeared", "odd-geometric"],
0910 )
0911 def test_digitization_example_input_parsing(digi_config_file):
0912 from acts.examples import readDigiConfigFromJson
0913
0914 readDigiConfigFromJson(str(digi_config_file))
0915
0916
0917 @pytest.mark.parametrize(
0918 "digi_config_file",
0919 [
0920 CONFIG_DIR / "generic-digi-smearing-config.json",
0921 CONFIG_DIR / "generic-digi-geometric-config.json",
0922 ],
0923 ids=["smeared", "geometric"],
0924 )
0925 def test_digitization_example_input(
0926 trk_geo, tmp_path, assert_root_hash, digi_config_file
0927 ):
0928 from particle_gun import runParticleGun
0929 from digitization import runDigitization
0930
0931 ptcl_dir = tmp_path / "ptcl"
0932 ptcl_dir.mkdir()
0933 pgs = Sequencer(events=20, numThreads=-1)
0934 runParticleGun(str(ptcl_dir), s=pgs)
0935
0936 pgs.run()
0937
0938 s = Sequencer(numThreads=-1)
0939
0940 csv_dir = tmp_path / "csv"
0941 root_file = tmp_path / "measurements.root"
0942
0943 assert not root_file.exists()
0944 assert not csv_dir.exists()
0945
0946 assert_root_hash(
0947 "particles.root",
0948 ptcl_dir / "particles.root",
0949 )
0950
0951 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0952 runDigitization(
0953 trk_geo,
0954 field,
0955 outputDir=tmp_path,
0956 digiConfigFile=digi_config_file,
0957 particlesInput=ptcl_dir / "particles.root",
0958 s=s,
0959 doMerge=True,
0960 )
0961
0962 s.run()
0963
0964 assert root_file.exists()
0965 assert csv_dir.exists()
0966
0967 assert len(list(csv_dir.iterdir())) == 3 * pgs.config.events
0968 assert all(f.stat().st_size > 50 for f in csv_dir.iterdir())
0969
0970 assert_root_hash(root_file.name, root_file)
0971
0972
0973 def test_digitization_config_example(trk_geo, tmp_path):
0974 from digitization_config import runDigitizationConfig
0975
0976 out_file = tmp_path / "output.json"
0977 assert not out_file.exists()
0978
0979 input = (
0980 Path(__file__).parent
0981 / "../../../Examples/Configs/generic-digi-smearing-config.json"
0982 )
0983 assert input.exists(), input.resolve()
0984
0985 runDigitizationConfig(trk_geo, input=input, output=out_file)
0986
0987 assert out_file.exists()
0988
0989 with out_file.open() as fh:
0990 data = json.load(fh)
0991 assert len(data.keys()) == 2
0992 assert data["acts-geometry-hierarchy-map"]["format-version"] == 0
0993 assert (
0994 data["acts-geometry-hierarchy-map"]["value-identifier"]
0995 == "digitization-configuration"
0996 )
0997 assert len(data["entries"]) == 27
0998
0999
1000 @pytest.mark.parametrize(
1001 "truthSmeared,truthEstimated",
1002 [
1003 [False, False],
1004 [False, True],
1005 [True, False],
1006 ],
1007 ids=["full_seeding", "truth_estimated", "truth_smeared"],
1008 )
1009 @pytest.mark.slow
1010 def test_ckf_tracks_example(
1011 tmp_path, assert_root_hash, truthSmeared, truthEstimated, detector_config
1012 ):
1013 csv = tmp_path / "csv"
1014
1015 assert not csv.exists()
1016
1017 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
1018 events = 100
1019 s = Sequencer(events=events, numThreads=-1)
1020
1021 root_files = [
1022 (
1023 "performance_finding_ckf.root",
1024 None,
1025 ),
1026 (
1027 "trackstates_ckf.root",
1028 "trackstates",
1029 ),
1030 (
1031 "tracksummary_ckf.root",
1032 "tracksummary",
1033 ),
1034 ]
1035
1036 if not truthSmeared:
1037 root_files += [
1038 (
1039 "performance_seeding.root",
1040 None,
1041 ),
1042 ]
1043
1044 for rf, _ in root_files:
1045 assert not (tmp_path / rf).exists()
1046
1047 from ckf_tracks import runCKFTracks
1048
1049 with detector_config.detector:
1050 runCKFTracks(
1051 detector_config.trackingGeometry,
1052 detector_config.decorators,
1053 field=field,
1054 outputCsv=True,
1055 outputDir=tmp_path,
1056 geometrySelection=detector_config.geometrySelection,
1057 digiConfigFile=detector_config.digiConfigFile,
1058 truthSmearedSeeded=truthSmeared,
1059 truthEstimatedSeeded=truthEstimated,
1060 s=s,
1061 )
1062
1063 s.run()
1064
1065 assert csv.exists()
1066 for rf, tn in root_files:
1067 rp = tmp_path / rf
1068 assert rp.exists()
1069 if tn is not None:
1070 assert_root_hash(rf, rp)
1071
1072 assert (
1073 len([f for f in csv.iterdir() if f.name.endswith("tracks_ckf.csv")]) == events
1074 )
1075 assert all([f.stat().st_size > 300 for f in csv.iterdir()])
1076
1077
1078 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
1079 @pytest.mark.odd
1080 @pytest.mark.slow
1081 def test_full_chain_odd_example(tmp_path):
1082
1083
1084
1085 with getOpenDataDetector():
1086 pass
1087
1088 script = (
1089 Path(__file__).parent.parent.parent.parent
1090 / "Examples"
1091 / "Scripts"
1092 / "Python"
1093 / "full_chain_odd.py"
1094 )
1095 assert script.exists()
1096 env = os.environ.copy()
1097 env["ACTS_LOG_FAILURE_THRESHOLD"] = "ERROR"
1098 try:
1099 subprocess.check_call(
1100 [sys.executable, str(script), "-n1"],
1101 cwd=tmp_path,
1102 env=env,
1103 stderr=subprocess.STDOUT,
1104 )
1105 except subprocess.CalledProcessError as e:
1106 print(e.output.decode("utf-8"))
1107 raise
1108
1109
1110 @pytest.mark.skipif(
1111 not dd4hepEnabled or not geant4Enabled, reason="DD4hep and/or Geant4 not set up"
1112 )
1113 @pytest.mark.slow
1114 def test_full_chain_odd_example_pythia_geant4(tmp_path):
1115
1116
1117
1118 with getOpenDataDetector():
1119 pass
1120
1121 script = (
1122 Path(__file__).parent.parent.parent.parent
1123 / "Examples"
1124 / "Scripts"
1125 / "Python"
1126 / "full_chain_odd.py"
1127 )
1128 assert script.exists()
1129 env = os.environ.copy()
1130 env["ACTS_LOG_FAILURE_THRESHOLD"] = "ERROR"
1131 try:
1132 stdout = subprocess.check_output(
1133 [
1134 sys.executable,
1135 str(script),
1136 "-n1",
1137 "--geant4",
1138 "--ttbar",
1139 "--ttbar-pu",
1140 "50",
1141 ],
1142 cwd=tmp_path,
1143 env=env,
1144 stderr=subprocess.STDOUT,
1145 )
1146 stdout = stdout.decode("utf-8")
1147 except subprocess.CalledProcessError as e:
1148 print(e.output.decode("utf-8"))
1149 raise
1150
1151
1152 errors = []
1153 error_regex = re.compile(r"^\d\d:\d\d:\d\d\s+(\w+)\s+ERROR\s+", re.MULTILINE)
1154 for match in error_regex.finditer(stdout):
1155 (algo,) = match.groups()
1156 errors.append(algo)
1157 errors = collections.Counter(errors)
1158 assert dict(errors) == {}, stdout
1159
1160
1161 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
1162 @pytest.mark.skipif(not onnxEnabled, reason="ONNX plugin not enabled")
1163 @pytest.mark.slow
1164 def test_ML_Ambiguity_Solver(tmp_path, assert_root_hash):
1165
1166
1167 root_file = "performance_finding_ambiML.root"
1168 output_dir = "odd_output"
1169 assert not (tmp_path / root_file).exists()
1170
1171
1172 with getOpenDataDetector():
1173 pass
1174
1175 script = (
1176 Path(__file__).parent.parent.parent.parent
1177 / "Examples"
1178 / "Scripts"
1179 / "Python"
1180 / "full_chain_odd.py"
1181 )
1182 assert script.exists()
1183 env = os.environ.copy()
1184 env["ACTS_LOG_FAILURE_THRESHOLD"] = "ERROR"
1185 try:
1186 subprocess.check_call(
1187 [sys.executable, str(script), "-n1", "--ambi-solver", "ML"],
1188 cwd=tmp_path,
1189 env=env,
1190 stderr=subprocess.STDOUT,
1191 )
1192 except subprocess.CalledProcessError as e:
1193 print(e.output.decode("utf-8"))
1194 raise
1195
1196 rfp = tmp_path / output_dir / root_file
1197 assert rfp.exists()
1198
1199 assert_root_hash(root_file, rfp)
1200
1201
1202 def test_bfield_writing(tmp_path, seq, assert_root_hash):
1203 from bfield_writing import runBFieldWriting
1204
1205 root_files = [
1206 ("solenoid.root", "solenoid", 100),
1207 ("solenoid2.root", "solenoid", 100),
1208 ]
1209
1210 for fn, _, _ in root_files:
1211 fp = tmp_path / fn
1212 assert not fp.exists()
1213
1214 runBFieldWriting(outputDir=tmp_path, rewrites=1)
1215
1216 for fn, tn, ee in root_files:
1217 fp = tmp_path / fn
1218 assert fp.exists()
1219 assert fp.stat().st_size > 2**10 * 2
1220 assert_entries(fp, tn, ee)
1221 assert_root_hash(fn, fp)
1222
1223
1224 @pytest.mark.parametrize("backend", ["onnx", "torch"])
1225 @pytest.mark.parametrize("hardware", ["cpu", "gpu"])
1226 @pytest.mark.skipif(not gnnEnabled, reason="Gnn environment not set up")
1227 def test_gnn(tmp_path, trk_geo, field, assert_root_hash, backend, hardware):
1228 if backend == "onnx" and hardware == "cpu":
1229 pytest.skip("Combination of ONNX and CPU not yet supported")
1230
1231 if backend == "torch":
1232 pytest.skip(
1233 "Disabled torch support until replacement for torch-scatter is found"
1234 )
1235
1236 root_file = "performance_track_finding.root"
1237 assert not (tmp_path / root_file).exists()
1238
1239
1240
1241 onnx_url = "https://acts.web.cern.ch/ci/exatrkx/onnx_models_v01.tar"
1242 torch_url = "https://acts.web.cern.ch/ci/exatrkx/torchscript_models_v01.tar"
1243
1244 for url in [onnx_url, torch_url]:
1245 tarfile_name = tmp_path / "models.tar"
1246 urllib.request.urlretrieve(url, tarfile_name)
1247 tarfile.open(tarfile_name).extractall(tmp_path)
1248
1249 shutil.copyfile(
1250 tmp_path / "torchscript_models/embed.pt", tmp_path / "onnx_models/embed.pt"
1251 )
1252
1253 script = (
1254 Path(__file__).parent.parent.parent.parent
1255 / "Examples"
1256 / "Scripts"
1257 / "Python"
1258 / "gnn.py"
1259 )
1260 assert script.exists()
1261 env = os.environ.copy()
1262 env["ACTS_LOG_FAILURE_THRESHOLD"] = "WARNING"
1263
1264 if hardware == "cpu":
1265 env["CUDA_VISIBLE_DEVICES"] = ""
1266
1267 try:
1268 subprocess.check_call(
1269 [sys.executable, str(script), backend],
1270 cwd=tmp_path,
1271 env=env,
1272 stderr=subprocess.STDOUT,
1273 )
1274 except subprocess.CalledProcessError as e:
1275 print(e.output.decode("utf-8"))
1276 raise
1277
1278 rfp = tmp_path / root_file
1279 assert rfp.exists()
1280
1281 assert_root_hash(root_file, rfp)
1282
1283
1284 @pytest.mark.odd
1285 def test_strip_spacepoints(detector_config, field, tmp_path, assert_root_hash):
1286 if detector_config.name == "generic":
1287 pytest.skip("No strip spacepoint formation for the generic detector currently")
1288
1289 from strip_spacepoints import createStripSpacepoints
1290
1291 s = Sequencer(events=20, numThreads=-1)
1292
1293 config_path = Path(__file__).parent.parent.parent.parent / "Examples" / "Configs"
1294
1295 geo_selection = config_path / "odd-strip-spacepoint-selection.json"
1296 digi_config_file = config_path / "odd-digi-smearing-config.json"
1297
1298 with detector_config.detector:
1299 createStripSpacepoints(
1300 trackingGeometry=detector_config.trackingGeometry,
1301 field=field,
1302 digiConfigFile=digi_config_file,
1303 geoSelection=geo_selection,
1304 outputDir=tmp_path,
1305 s=s,
1306 ).run()
1307
1308 root_file = "strip_spacepoints.root"
1309 rfp = tmp_path / root_file
1310
1311 assert_root_hash(root_file, rfp)
1312
1313
1314 @pytest.mark.skipif(not geant4Enabled, reason="Geant4 not set up")
1315 @pytest.mark.skipif(not geomodelEnabled, reason="Geomodel not set up")
1316 def test_geomodel_G4(tmp_path):
1317 script = (
1318 Path(__file__).parent.parent.parent.parent
1319 / "Examples"
1320 / "Scripts"
1321 / "Python"
1322 / "geomodel_G4.py"
1323 )
1324 assert script.exists()
1325
1326 mockup_det = "Muon"
1327 out_dir = tmp_path / "geomodel_g4_out"
1328 out_dir.mkdir()
1329 args = [
1330 "python3",
1331 str(script),
1332 "--mockupDetector",
1333 str(mockup_det),
1334 "--outDir",
1335 str(out_dir),
1336 ]
1337 subprocess.check_call(args)
1338
1339 assert (out_dir / "obj").exists()