File indexing completed on 2025-10-13 08:17:45
0001 from pathlib import Path
0002 import os
0003 import json
0004 import functools
0005 import tarfile
0006 import urllib.request
0007 import subprocess
0008 import sys
0009 import re
0010 import collections
0011 import shutil
0012
0013 import pytest
0014
0015 from helpers import (
0016 geant4Enabled,
0017 geomodelEnabled,
0018 dd4hepEnabled,
0019 hepmc3Enabled,
0020 pythia8Enabled,
0021 gnnEnabled,
0022 onnxEnabled,
0023 hashingSeedingEnabled,
0024 AssertCollectionExistsAlg,
0025 failure_threshold,
0026 )
0027
0028 import acts
0029 from acts.examples import (
0030 Sequencer,
0031 GenericDetector,
0032 )
0033 from acts.examples.odd import getOpenDataDetector, getOpenDataDetectorDirectory
0034
0035
0036 u = acts.UnitConstants
0037
0038
0039 @pytest.fixture
0040 def field():
0041 return acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0042
0043
0044 @pytest.fixture
0045 def seq():
0046 return Sequencer(events=10, numThreads=1)
0047
0048
0049 def assert_csv_output(csv_path, stem):
0050 __tracebackhide__ = True
0051
0052 assert len([f for f in csv_path.iterdir() if f.name.endswith(stem + ".csv")]) > 0
0053 assert all(
0054 [
0055 f.stat().st_size > 100
0056 for f in csv_path.iterdir()
0057 if f.name.endswith(stem + ".csv")
0058 ]
0059 )
0060
0061
0062 def assert_entries(root_file, tree_name, exp=None, non_zero=False):
0063 __tracebackhide__ = True
0064 import ROOT
0065
0066 ROOT.PyConfig.IgnoreCommandLineOptions = True
0067 ROOT.gROOT.SetBatch(True)
0068
0069 rf = ROOT.TFile.Open(str(root_file))
0070 keys = [k.GetName() for k in rf.GetListOfKeys()]
0071 assert tree_name in keys
0072 print("Entries:", rf.Get(tree_name).GetEntries())
0073 if non_zero:
0074 assert rf.Get(tree_name).GetEntries() > 0, f"{root_file}:{tree_name}"
0075 if exp is not None:
0076 assert rf.Get(tree_name).GetEntries() == exp, f"{root_file}:{tree_name}"
0077
0078
0079 def assert_has_entries(root_file, tree_name):
0080 __tracebackhide__ = True
0081 assert_entries(root_file, tree_name, non_zero=True)
0082
0083
0084 @pytest.mark.slow
0085 @pytest.mark.skipif(not pythia8Enabled, reason="Pythia8 not set up")
0086 def test_pythia8(tmp_path, seq, assert_root_hash):
0087 from pythia8 import runPythia8
0088
0089 (tmp_path / "csv").mkdir()
0090
0091 assert not (tmp_path / "particles.root").exists()
0092 assert len(list((tmp_path / "csv").iterdir())) == 0
0093
0094 events = seq.config.events
0095
0096 vtxGen = acts.examples.GaussianVertexGenerator(
0097 stddev=acts.Vector4(50 * u.um, 50 * u.um, 150 * u.mm, 0),
0098 mean=acts.Vector4(0, 0, 0, 0),
0099 )
0100
0101 runPythia8(
0102 str(tmp_path), outputRoot=True, outputCsv=True, vtxGen=vtxGen, s=seq
0103 ).run()
0104
0105 fp = tmp_path / "particles.root"
0106 assert fp.exists()
0107 assert fp.stat().st_size > 2**10 * 50
0108 assert_entries(fp, "particles", events)
0109 assert_root_hash(fp.name, fp)
0110
0111 assert len(list((tmp_path / "csv").iterdir())) > 0
0112 assert_csv_output(tmp_path / "csv", "particles")
0113
0114
0115 def test_fatras(trk_geo, tmp_path, field, assert_root_hash):
0116 from fatras import runFatras
0117
0118 csv = tmp_path / "csv"
0119 csv.mkdir()
0120
0121 nevents = 10
0122
0123 root_files = [
0124 (
0125 "particles_simulation.root",
0126 "particles",
0127 ),
0128 (
0129 "hits.root",
0130 "hits",
0131 ),
0132 ]
0133
0134 assert len(list(csv.iterdir())) == 0
0135 for rf, _ in root_files:
0136 assert not (tmp_path / rf).exists()
0137
0138 seq = Sequencer(events=nevents)
0139 runFatras(trk_geo, field, str(tmp_path), s=seq).run()
0140
0141 assert_csv_output(csv, "particles_simulated")
0142 assert_csv_output(csv, "hits")
0143 for f, tn in root_files:
0144 rfp = tmp_path / f
0145 assert rfp.exists()
0146 assert rfp.stat().st_size > 2**10 * 10
0147
0148 assert_has_entries(rfp, tn)
0149 assert_root_hash(f, rfp)
0150
0151
0152 @pytest.mark.slow
0153 @pytest.mark.odd
0154 @pytest.mark.skipif(not geant4Enabled, reason="Geant4 not set up")
0155 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0156 def test_geant4(tmp_path, assert_root_hash):
0157
0158
0159
0160 with getOpenDataDetector():
0161 pass
0162
0163 csv = tmp_path / "csv"
0164 csv.mkdir()
0165
0166 root_files = [
0167 "particles_simulation.root",
0168 "hits.root",
0169 ]
0170
0171 assert len(list(csv.iterdir())) == 0
0172 for rf in root_files:
0173 assert not (tmp_path / rf).exists()
0174
0175 script = (
0176 Path(__file__).parent.parent.parent.parent
0177 / "Examples"
0178 / "Scripts"
0179 / "Python"
0180 / "geant4.py"
0181 )
0182 assert script.exists()
0183 env = os.environ.copy()
0184 env["ACTS_LOG_FAILURE_THRESHOLD"] = "WARNING"
0185 try:
0186 subprocess.check_call(
0187 [sys.executable, str(script)],
0188 cwd=tmp_path,
0189 env=env,
0190 stderr=subprocess.STDOUT,
0191 )
0192 except subprocess.CalledProcessError as e:
0193 if e.output is not None:
0194 print(e.output.decode("utf-8"))
0195 if e.stderr is not None:
0196 print(e.stderr.decode("utf-8"))
0197 raise
0198
0199 assert_csv_output(csv, "particles_simulated")
0200 assert_csv_output(csv, "hits")
0201 for f in root_files:
0202 rfp = tmp_path / f
0203 assert rfp.exists()
0204 assert rfp.stat().st_size > 2**10 * 10
0205
0206 assert_root_hash(f, rfp)
0207
0208
0209 def test_seeding(tmp_path, trk_geo, field, assert_root_hash):
0210 from seeding import runSeeding
0211
0212 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0213
0214 csv = tmp_path / "csv"
0215 csv.mkdir()
0216
0217 seq = Sequencer(events=10, numThreads=1)
0218
0219 root_files = [
0220 (
0221 "estimatedparams.root",
0222 "estimatedparams",
0223 ),
0224 (
0225 "performance_seeding.root",
0226 None,
0227 ),
0228 (
0229 "particles.root",
0230 "particles",
0231 ),
0232 (
0233 "particles_simulation.root",
0234 "particles",
0235 ),
0236 ]
0237
0238 for fn, _ in root_files:
0239 fp = tmp_path / fn
0240 assert not fp.exists()
0241
0242 assert len(list(csv.iterdir())) == 0
0243
0244 runSeeding(trk_geo, field, outputDir=str(tmp_path), s=seq).run()
0245
0246 for fn, tn in root_files:
0247 fp = tmp_path / fn
0248 assert fp.exists()
0249 assert fp.stat().st_size > 100
0250
0251 if tn is not None:
0252 assert_has_entries(fp, tn)
0253 assert_root_hash(fn, fp)
0254
0255 assert_csv_output(csv, "particles")
0256 assert_csv_output(csv, "particles_simulated")
0257
0258
0259 @pytest.mark.slow
0260 @pytest.mark.skipif(not hashingSeedingEnabled, reason="HashingSeeding not set up")
0261 def test_hashing_seeding(tmp_path, trk_geo, field, assert_root_hash):
0262 from hashing_seeding import runHashingSeeding, Config
0263
0264 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0265
0266 seq = Sequencer(events=10, numThreads=1)
0267
0268 rnd = acts.examples.RandomNumbers(seed=4242)
0269
0270 root_files = [
0271 (
0272 "estimatedparams.root",
0273 "estimatedparams",
0274 ),
0275 (
0276 "performance_seeding.root",
0277 None,
0278 ),
0279 ]
0280
0281 for fn, _ in root_files:
0282 fp = tmp_path / fn
0283 assert not fp.exists(), f"{fp} exists"
0284
0285 config = Config(
0286 mu=50,
0287 )
0288
0289 _, _, digiConfig, geoSelectionConfigFile = config.getDetectorInfo()
0290
0291 runHashingSeeding(
0292 10,
0293 trk_geo,
0294 field,
0295 outputDir=str(tmp_path),
0296 saveFiles=True,
0297 npileup=config.mu,
0298 seedingAlgorithm=config.seedingAlgorithm,
0299 maxSeedsPerSpM=config.maxSeedsPerSpM,
0300 digiConfig=digiConfig,
0301 geoSelectionConfigFile=geoSelectionConfigFile,
0302 config=config,
0303 s=seq,
0304 rnd=rnd,
0305 ).run()
0306
0307 del seq
0308
0309 for fn, tn in root_files:
0310 fp = tmp_path / fn
0311 assert fp.exists(), f"{fp} does not exist"
0312 assert fp.stat().st_size > 100, f"{fp} is too small: {fp.stat().st_size} bytes"
0313
0314 if tn is not None:
0315 assert_has_entries(fp, tn)
0316 assert_root_hash(fn, fp)
0317
0318 assert_csv_output(tmp_path, "particles_simulated")
0319 assert_csv_output(tmp_path, "buckets")
0320 assert_csv_output(tmp_path, "seed")
0321
0322
0323 def test_seeding_orthogonal(tmp_path, trk_geo, field, assert_root_hash):
0324 from seeding import runSeeding, SeedingAlgorithm
0325
0326 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0327
0328 csv = tmp_path / "csv"
0329 csv.mkdir()
0330
0331 seq = Sequencer(events=10, numThreads=1)
0332
0333 root_files = [
0334 (
0335 "estimatedparams.root",
0336 "estimatedparams",
0337 ),
0338 (
0339 "performance_seeding.root",
0340 None,
0341 ),
0342 (
0343 "particles.root",
0344 "particles",
0345 ),
0346 (
0347 "particles_simulation.root",
0348 "particles",
0349 ),
0350 ]
0351
0352 for fn, _ in root_files:
0353 fp = tmp_path / fn
0354 assert not fp.exists()
0355
0356 assert len(list(csv.iterdir())) == 0
0357
0358 runSeeding(
0359 trk_geo,
0360 field,
0361 outputDir=str(tmp_path),
0362 s=seq,
0363 seedingAlgorithm=SeedingAlgorithm.OrthogonalTriplet,
0364 ).run()
0365
0366 for fn, tn in root_files:
0367 fp = tmp_path / fn
0368 assert fp.exists()
0369 assert fp.stat().st_size > 100
0370
0371 if tn is not None:
0372 assert_has_entries(fp, tn)
0373 assert_root_hash(fn, fp)
0374
0375 assert_csv_output(csv, "particles")
0376 assert_csv_output(csv, "particles_simulated")
0377
0378
0379 def test_itk_seeding(tmp_path, trk_geo, field, assert_root_hash):
0380 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0381
0382 csv = tmp_path / "csv"
0383 csv.mkdir()
0384
0385 seq = Sequencer(events=10, numThreads=1)
0386
0387 root_files = [
0388 (
0389 "estimatedparams.root",
0390 "estimatedparams",
0391 ),
0392 (
0393 "performance_seeding.root",
0394 None,
0395 ),
0396 (
0397 "particles.root",
0398 "particles",
0399 ),
0400 (
0401 "particles_simulation.root",
0402 "particles",
0403 ),
0404 ]
0405
0406 for fn, _ in root_files:
0407 fp = tmp_path / fn
0408 assert not fp.exists()
0409
0410 assert len(list(csv.iterdir())) == 0
0411
0412 rnd = acts.examples.RandomNumbers(seed=42)
0413
0414 from acts.examples.simulation import (
0415 addParticleGun,
0416 EtaConfig,
0417 MomentumConfig,
0418 ParticleConfig,
0419 addFatras,
0420 addDigitization,
0421 ParticleSelectorConfig,
0422 addDigiParticleSelection,
0423 )
0424
0425 addParticleGun(
0426 seq,
0427 MomentumConfig(1.0 * u.GeV, 10.0 * u.GeV, True),
0428 EtaConfig(-4.0, 4.0, True),
0429 ParticleConfig(1, acts.PdgParticle.eMuon, True),
0430 outputDirCsv=tmp_path / "csv",
0431 outputDirRoot=str(tmp_path),
0432 rnd=rnd,
0433 )
0434
0435 addFatras(
0436 seq,
0437 trk_geo,
0438 field,
0439 outputDirCsv=tmp_path / "csv",
0440 outputDirRoot=str(tmp_path),
0441 rnd=rnd,
0442 )
0443
0444 srcdir = Path(__file__).resolve().parent.parent.parent.parent
0445 addDigitization(
0446 seq,
0447 trk_geo,
0448 field,
0449 digiConfigFile=srcdir / "Examples/Configs/generic-digi-smearing-config.json",
0450 rnd=rnd,
0451 )
0452
0453 addDigiParticleSelection(
0454 seq,
0455 ParticleSelectorConfig(
0456 pt=(0.9 * u.GeV, None),
0457 eta=(-4, 4),
0458 measurements=(9, None),
0459 removeNeutral=True,
0460 ),
0461 )
0462
0463 from acts.examples.reconstruction import (
0464 addSeeding,
0465 )
0466 from acts.examples.itk import itkSeedingAlgConfig, InputSpacePointsType
0467
0468 addSeeding(
0469 seq,
0470 trk_geo,
0471 field,
0472 *itkSeedingAlgConfig(InputSpacePointsType.PixelSpacePoints),
0473 acts.logging.VERBOSE,
0474 geoSelectionConfigFile=srcdir / "Examples/Configs/generic-seeding-config.json",
0475 outputDirRoot=str(tmp_path),
0476 )
0477
0478 seq.run()
0479
0480 for fn, tn in root_files:
0481 fp = tmp_path / fn
0482 assert fp.exists()
0483 assert fp.stat().st_size > 100
0484
0485 if tn is not None:
0486 assert_has_entries(fp, tn)
0487 assert_root_hash(fn, fp)
0488
0489 assert_csv_output(csv, "particles")
0490 assert_csv_output(csv, "particles_simulated")
0491
0492
0493 @pytest.mark.slow
0494 def test_propagation(tmp_path, trk_geo, field, seq, assert_root_hash):
0495 from propagation import runPropagation
0496
0497 root_files = [
0498 (
0499 "propagation_summary.root",
0500 "propagation_summary",
0501 10000,
0502 )
0503 ]
0504
0505 for fn, _, _ in root_files:
0506 fp = tmp_path / fn
0507 assert not fp.exists()
0508
0509 runPropagation(trk_geo, field, str(tmp_path), s=seq).run()
0510
0511 for fn, tn, ee in root_files:
0512 fp = tmp_path / fn
0513 assert fp.exists()
0514 assert fp.stat().st_size > 2**10 * 50
0515 assert_entries(fp, tn, ee)
0516 assert_root_hash(fn, fp)
0517
0518
0519 @pytest.mark.slow
0520 @pytest.mark.odd
0521 @pytest.mark.skipif(not geant4Enabled, reason="Geant4 not set up")
0522 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0523 def test_material_recording(tmp_path, material_recording, assert_root_hash):
0524 root_files = [
0525 (
0526 "geant4_material_tracks.root",
0527 "material-tracks",
0528 200,
0529 )
0530 ]
0531
0532 for fn, tn, ee in root_files:
0533 fp = material_recording / fn
0534 assert fp.exists()
0535 assert fp.stat().st_size > 2**10 * 50
0536 assert_entries(fp, tn, ee)
0537 assert_root_hash(fn, fp)
0538
0539
0540 @pytest.mark.parametrize("revFiltMomThresh", [0 * u.GeV, 1 * u.TeV])
0541 def test_truth_tracking_kalman(
0542 tmp_path, assert_root_hash, revFiltMomThresh, detector_config
0543 ):
0544 root_files = [
0545 ("trackstates_kf.root", "trackstates", 19),
0546 ("tracksummary_kf.root", "tracksummary", 10),
0547 ("performance_kf.root", None, -1),
0548 ]
0549
0550 for fn, _, _ in root_files:
0551 fp = tmp_path / fn
0552 assert not fp.exists()
0553
0554 with detector_config.detector:
0555 from truth_tracking_kalman import runTruthTrackingKalman
0556
0557 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0558
0559 seq = Sequencer(events=10, numThreads=1)
0560
0561 runTruthTrackingKalman(
0562 trackingGeometry=detector_config.trackingGeometry,
0563 field=field,
0564 digiConfigFile=detector_config.digiConfigFile,
0565 outputDir=tmp_path,
0566 reverseFilteringMomThreshold=revFiltMomThresh,
0567 s=seq,
0568 )
0569
0570 seq.run()
0571
0572 for fn, tn, ee in root_files:
0573 fp = tmp_path / fn
0574 assert fp.exists()
0575 assert fp.stat().st_size > 1024
0576 if tn is not None:
0577 assert_has_entries(fp, tn)
0578 assert_root_hash(fn, fp)
0579
0580 import ROOT
0581
0582 ROOT.PyConfig.IgnoreCommandLineOptions = True
0583 ROOT.gROOT.SetBatch(True)
0584 rf = ROOT.TFile.Open(str(tmp_path / "tracksummary_kf.root"))
0585 keys = [k.GetName() for k in rf.GetListOfKeys()]
0586 assert "tracksummary" in keys
0587 for entry in rf.Get("tracksummary"):
0588 assert entry.hasFittedParams
0589
0590
0591 def test_truth_tracking_gsf(tmp_path, assert_root_hash, detector_config):
0592 from truth_tracking_gsf import runTruthTrackingGsf
0593
0594 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0595
0596 seq = Sequencer(
0597 events=10,
0598 numThreads=1,
0599 fpeMasks=[
0600 (
0601 "Core/include/Acts/TrackFitting/detail/GsfUtils.hpp:197",
0602 acts.FpeType.FLTUND,
0603 1,
0604 ),
0605 ],
0606 )
0607
0608 root_files = [
0609 ("trackstates_gsf.root", "trackstates"),
0610 ("tracksummary_gsf.root", "tracksummary"),
0611 ]
0612
0613 for fn, _ in root_files:
0614 fp = tmp_path / fn
0615 assert not fp.exists()
0616
0617 with detector_config.detector:
0618 runTruthTrackingGsf(
0619 trackingGeometry=detector_config.trackingGeometry,
0620 decorators=detector_config.decorators,
0621 field=field,
0622 digiConfigFile=detector_config.digiConfigFile,
0623 outputDir=tmp_path,
0624 s=seq,
0625 )
0626
0627
0628 with failure_threshold(acts.logging.FATAL):
0629 seq.run()
0630
0631 for fn, tn in root_files:
0632 fp = tmp_path / fn
0633 assert fp.exists()
0634 assert fp.stat().st_size > 1024
0635 if tn is not None:
0636 assert_root_hash(fn, fp)
0637
0638
0639 def test_refitting(tmp_path, detector_config, assert_root_hash):
0640 from truth_tracking_gsf_refitting import runRefittingGsf
0641
0642 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0643
0644 seq = Sequencer(
0645 events=10,
0646 numThreads=1,
0647 )
0648
0649 with detector_config.detector:
0650
0651
0652 runRefittingGsf(
0653 trackingGeometry=detector_config.trackingGeometry,
0654 field=field,
0655 digiConfigFile=detector_config.digiConfigFile,
0656 outputDir=tmp_path,
0657 s=seq,
0658 ).run()
0659
0660 root_files = [
0661 ("trackstates_gsf_refit.root", "trackstates"),
0662 ("tracksummary_gsf_refit.root", "tracksummary"),
0663 ]
0664
0665 for fn, tn in root_files:
0666 fp = tmp_path / fn
0667 assert fp.exists()
0668 assert fp.stat().st_size > 1024
0669 if tn is not None:
0670 assert_root_hash(fn, fp)
0671
0672
0673 def test_particle_gun(tmp_path, assert_root_hash):
0674 from particle_gun import runParticleGun
0675
0676 s = Sequencer(events=20, numThreads=-1)
0677
0678 csv_dir = tmp_path / "csv"
0679 root_file = tmp_path / "particles.root"
0680
0681 assert not csv_dir.exists()
0682 assert not root_file.exists()
0683
0684 runParticleGun(str(tmp_path), s=s).run()
0685
0686 assert csv_dir.exists()
0687 assert root_file.exists()
0688
0689 assert len([f for f in csv_dir.iterdir() if f.name.endswith("particles.csv")]) > 0
0690 assert all([f.stat().st_size > 100 for f in csv_dir.iterdir()])
0691
0692 assert root_file.stat().st_size > 200
0693 assert_entries(root_file, "particles", 20)
0694 assert_root_hash(root_file.name, root_file)
0695
0696
0697 @pytest.mark.slow
0698 @pytest.mark.odd
0699 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0700 def test_material_mapping(material_recording, tmp_path, assert_root_hash):
0701 from material_mapping import runMaterialMapping
0702 from material_validation import runMaterialValidation
0703
0704 map_file = tmp_path / "material-map_tracks.root"
0705 assert not map_file.exists()
0706
0707 odd_dir = getOpenDataDetectorDirectory()
0708 config = acts.MaterialMapJsonConverter.Config()
0709 materialDecorator = acts.JsonMaterialDecorator(
0710 level=acts.logging.INFO,
0711 rConfig=config,
0712 jFileName=str(odd_dir / "config/odd-material-mapping-config.json"),
0713 )
0714
0715 s = Sequencer(numThreads=1)
0716
0717 with getOpenDataDetector(materialDecorator) as detector:
0718 trackingGeometry = detector.trackingGeometry()
0719 decorators = detector.contextDecorators()
0720
0721 runMaterialMapping(
0722 trackingGeometry,
0723 decorators,
0724 outputDir=str(tmp_path),
0725 inputDir=material_recording,
0726 mappingStep=1,
0727 s=s,
0728 )
0729
0730 s.run()
0731
0732 mat_file = tmp_path / "material-map.json"
0733
0734 assert mat_file.exists()
0735 assert mat_file.stat().st_size > 10
0736
0737 with mat_file.open() as fh:
0738 assert json.load(fh)
0739
0740 assert map_file.exists()
0741 assert_entries(map_file, "material-tracks", 200)
0742 assert_root_hash(map_file.name, map_file)
0743
0744 val_file = tmp_path / "propagation-material.root"
0745 assert not val_file.exists()
0746
0747
0748
0749 field = acts.NullBField()
0750
0751 s = Sequencer(events=10, numThreads=1)
0752
0753 with getOpenDataDetector(
0754 materialDecorator=acts.IMaterialDecorator.fromFile(mat_file)
0755 ) as detector:
0756 trackingGeometry = detector.trackingGeometry()
0757 decorators = detector.contextDecorators()
0758
0759 runMaterialValidation(
0760 10, 1000, trackingGeometry, decorators, field, outputDir=str(tmp_path), s=s
0761 )
0762
0763 s.run()
0764
0765 assert val_file.exists()
0766 assert_entries(val_file, "material-tracks", 10000)
0767 assert_root_hash(val_file.name, val_file)
0768
0769
0770 @pytest.mark.slow
0771 @pytest.mark.odd
0772 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0773 def test_volume_material_mapping(material_recording, tmp_path, assert_root_hash):
0774 from material_mapping import runMaterialMapping
0775 from material_validation import runMaterialValidation
0776
0777 map_file = tmp_path / "material-map-volume_tracks.root"
0778 assert not map_file.exists()
0779
0780 geo_map = Path(__file__).parent / "geometry-volume-map.json"
0781 assert geo_map.exists()
0782 assert geo_map.stat().st_size > 10
0783 with geo_map.open() as fh:
0784 assert json.load(fh)
0785
0786 s = Sequencer(numThreads=1)
0787
0788 with getOpenDataDetector(
0789 materialDecorator=acts.IMaterialDecorator.fromFile(geo_map)
0790 ) as detector:
0791 trackingGeometry = detector.trackingGeometry()
0792 decorators = detector.contextDecorators()
0793
0794 runMaterialMapping(
0795 trackingGeometry,
0796 decorators,
0797 mapName="material-map-volume",
0798 outputDir=str(tmp_path),
0799 inputDir=material_recording,
0800 mappingStep=1,
0801 s=s,
0802 )
0803
0804 s.run()
0805
0806 mat_file = tmp_path / "material-map-volume.json"
0807
0808 assert mat_file.exists()
0809 assert mat_file.stat().st_size > 10
0810
0811 with mat_file.open() as fh:
0812 assert json.load(fh)
0813
0814 assert map_file.exists()
0815 assert_entries(map_file, "material-tracks", 200)
0816 assert_root_hash(map_file.name, map_file)
0817
0818 val_file = tmp_path / "propagation-volume-material.root"
0819 assert not val_file.exists()
0820
0821
0822
0823 field = acts.NullBField()
0824
0825 s = Sequencer(events=10, numThreads=1)
0826
0827 with getOpenDataDetector(
0828 materialDecorator=acts.IMaterialDecorator.fromFile(mat_file)
0829 ) as detector:
0830 trackingGeometry = detector.trackingGeometry()
0831 decorators = detector.contextDecorators()
0832
0833 runMaterialValidation(
0834 10,
0835 1000,
0836 trackingGeometry,
0837 decorators,
0838 field,
0839 outputDir=str(tmp_path),
0840 outputName="propagation-volume-material",
0841 s=s,
0842 )
0843
0844 s.run()
0845
0846 assert val_file.exists()
0847
0848 assert_root_hash(val_file.name, val_file)
0849
0850
0851 ACTS_DIR = Path(__file__).parent.parent.parent.parent
0852 CONFIG_DIR = ACTS_DIR / "Examples/Configs"
0853 DIGI_SHARE_DIR = (
0854 Path(__file__).parent.parent.parent.parent
0855 / "Examples/Algorithms/Digitization/share"
0856 )
0857
0858
0859 @pytest.mark.parametrize(
0860 "digi_config_file",
0861 [
0862 CONFIG_DIR / "generic-digi-smearing-config.json",
0863 CONFIG_DIR / "generic-digi-geometric-config.json",
0864 ],
0865 ids=["smeared", "geometric"],
0866 )
0867 def test_digitization_example(trk_geo, tmp_path, assert_root_hash, digi_config_file):
0868 from digitization import runDigitization
0869
0870 s = Sequencer(events=10, numThreads=-1)
0871
0872 csv_dir = tmp_path / "csv"
0873 root_file = tmp_path / "measurements.root"
0874
0875 assert not root_file.exists()
0876 assert not csv_dir.exists()
0877
0878 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0879 runDigitization(
0880 trk_geo, field, outputDir=tmp_path, digiConfigFile=digi_config_file, s=s
0881 )
0882
0883 s.run()
0884
0885 assert root_file.exists()
0886 assert csv_dir.exists()
0887
0888 assert len(list(csv_dir.iterdir())) == 3 * s.config.events
0889 assert all(f.stat().st_size > 50 for f in csv_dir.iterdir())
0890
0891 assert_root_hash(root_file.name, root_file)
0892
0893
0894 @pytest.mark.parametrize(
0895 "digi_config_file",
0896 [
0897 CONFIG_DIR / "generic-digi-smearing-config.json",
0898 CONFIG_DIR / "generic-digi-geometric-config.json",
0899 pytest.param(
0900 (ACTS_DIR / "Examples/Configs" / "odd-digi-smearing-config.json"),
0901 marks=[
0902 pytest.mark.odd,
0903 ],
0904 ),
0905 pytest.param(
0906 (ACTS_DIR / "Examples/Configs" / "odd-digi-geometric-config.json"),
0907 marks=[
0908 pytest.mark.odd,
0909 ],
0910 ),
0911 ],
0912 ids=["smeared", "geometric", "odd-smeared", "odd-geometric"],
0913 )
0914 def test_digitization_example_input_parsing(digi_config_file):
0915 from acts.examples import readDigiConfigFromJson
0916
0917 readDigiConfigFromJson(str(digi_config_file))
0918
0919
0920 @pytest.mark.parametrize(
0921 "digi_config_file",
0922 [
0923 CONFIG_DIR / "generic-digi-smearing-config.json",
0924 CONFIG_DIR / "generic-digi-geometric-config.json",
0925 ],
0926 ids=["smeared", "geometric"],
0927 )
0928 def test_digitization_example_input(
0929 trk_geo, tmp_path, assert_root_hash, digi_config_file
0930 ):
0931 from particle_gun import runParticleGun
0932 from digitization import runDigitization
0933
0934 ptcl_dir = tmp_path / "ptcl"
0935 ptcl_dir.mkdir()
0936 pgs = Sequencer(events=20, numThreads=-1)
0937 runParticleGun(str(ptcl_dir), s=pgs)
0938
0939 pgs.run()
0940
0941 s = Sequencer(numThreads=-1)
0942
0943 csv_dir = tmp_path / "csv"
0944 root_file = tmp_path / "measurements.root"
0945
0946 assert not root_file.exists()
0947 assert not csv_dir.exists()
0948
0949 assert_root_hash(
0950 "particles.root",
0951 ptcl_dir / "particles.root",
0952 )
0953
0954 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0955 runDigitization(
0956 trk_geo,
0957 field,
0958 outputDir=tmp_path,
0959 digiConfigFile=digi_config_file,
0960 particlesInput=ptcl_dir / "particles.root",
0961 s=s,
0962 doMerge=True,
0963 )
0964
0965 s.run()
0966
0967 assert root_file.exists()
0968 assert csv_dir.exists()
0969
0970 assert len(list(csv_dir.iterdir())) == 3 * pgs.config.events
0971 assert all(f.stat().st_size > 50 for f in csv_dir.iterdir())
0972
0973 assert_root_hash(root_file.name, root_file)
0974
0975
0976 def test_digitization_config_example(trk_geo, tmp_path):
0977 from digitization_config import runDigitizationConfig
0978
0979 out_file = tmp_path / "output.json"
0980 assert not out_file.exists()
0981
0982 input = (
0983 Path(__file__).parent
0984 / "../../../Examples/Configs/generic-digi-smearing-config.json"
0985 )
0986 assert input.exists(), input.resolve()
0987
0988 runDigitizationConfig(trk_geo, input=input, output=out_file)
0989
0990 assert out_file.exists()
0991
0992 with out_file.open() as fh:
0993 data = json.load(fh)
0994 assert len(data.keys()) == 2
0995 assert data["acts-geometry-hierarchy-map"]["format-version"] == 0
0996 assert (
0997 data["acts-geometry-hierarchy-map"]["value-identifier"]
0998 == "digitization-configuration"
0999 )
1000 assert len(data["entries"]) == 27
1001
1002
1003 @pytest.mark.parametrize(
1004 "truthSmeared,truthEstimated",
1005 [
1006 [False, False],
1007 [False, True],
1008 [True, False],
1009 ],
1010 ids=["full_seeding", "truth_estimated", "truth_smeared"],
1011 )
1012 @pytest.mark.slow
1013 def test_ckf_tracks_example(
1014 tmp_path, assert_root_hash, truthSmeared, truthEstimated, detector_config
1015 ):
1016 csv = tmp_path / "csv"
1017
1018 assert not csv.exists()
1019
1020 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
1021 events = 100
1022 s = Sequencer(events=events, numThreads=-1)
1023
1024 root_files = [
1025 (
1026 "performance_finding_ckf.root",
1027 None,
1028 ),
1029 (
1030 "trackstates_ckf.root",
1031 "trackstates",
1032 ),
1033 (
1034 "tracksummary_ckf.root",
1035 "tracksummary",
1036 ),
1037 ]
1038
1039 if not truthSmeared:
1040 root_files += [
1041 (
1042 "performance_seeding.root",
1043 None,
1044 ),
1045 ]
1046
1047 for rf, _ in root_files:
1048 assert not (tmp_path / rf).exists()
1049
1050 from ckf_tracks import runCKFTracks
1051
1052 with detector_config.detector:
1053 runCKFTracks(
1054 detector_config.trackingGeometry,
1055 detector_config.decorators,
1056 field=field,
1057 outputCsv=True,
1058 outputDir=tmp_path,
1059 geometrySelection=detector_config.geometrySelection,
1060 digiConfigFile=detector_config.digiConfigFile,
1061 truthSmearedSeeded=truthSmeared,
1062 truthEstimatedSeeded=truthEstimated,
1063 s=s,
1064 )
1065
1066 s.run()
1067
1068 assert csv.exists()
1069 for rf, tn in root_files:
1070 rp = tmp_path / rf
1071 assert rp.exists()
1072 if tn is not None:
1073 assert_root_hash(rf, rp)
1074
1075 assert (
1076 len([f for f in csv.iterdir() if f.name.endswith("tracks_ckf.csv")]) == events
1077 )
1078 assert all([f.stat().st_size > 300 for f in csv.iterdir()])
1079
1080
1081 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
1082 @pytest.mark.odd
1083 @pytest.mark.slow
1084 def test_full_chain_odd_example(tmp_path):
1085
1086
1087
1088 with getOpenDataDetector():
1089 pass
1090
1091 script = (
1092 Path(__file__).parent.parent.parent.parent
1093 / "Examples"
1094 / "Scripts"
1095 / "Python"
1096 / "full_chain_odd.py"
1097 )
1098 assert script.exists()
1099 env = os.environ.copy()
1100 env["ACTS_LOG_FAILURE_THRESHOLD"] = "ERROR"
1101 try:
1102 subprocess.check_call(
1103 [sys.executable, str(script), "-n1"],
1104 cwd=tmp_path,
1105 env=env,
1106 stderr=subprocess.STDOUT,
1107 )
1108 except subprocess.CalledProcessError as e:
1109 print(e.output.decode("utf-8"))
1110 raise
1111
1112
1113 @pytest.mark.skipif(
1114 not dd4hepEnabled or not geant4Enabled, reason="DD4hep and/or Geant4 not set up"
1115 )
1116 @pytest.mark.slow
1117 def test_full_chain_odd_example_pythia_geant4(tmp_path):
1118
1119
1120
1121 with getOpenDataDetector():
1122 pass
1123
1124 script = (
1125 Path(__file__).parent.parent.parent.parent
1126 / "Examples"
1127 / "Scripts"
1128 / "Python"
1129 / "full_chain_odd.py"
1130 )
1131 assert script.exists()
1132 env = os.environ.copy()
1133 env["ACTS_LOG_FAILURE_THRESHOLD"] = "ERROR"
1134 try:
1135 stdout = subprocess.check_output(
1136 [
1137 sys.executable,
1138 str(script),
1139 "-n1",
1140 "--geant4",
1141 "--ttbar",
1142 "--ttbar-pu",
1143 "50",
1144 ],
1145 cwd=tmp_path,
1146 env=env,
1147 stderr=subprocess.STDOUT,
1148 )
1149 stdout = stdout.decode("utf-8")
1150 except subprocess.CalledProcessError as e:
1151 print(e.output.decode("utf-8"))
1152 raise
1153
1154
1155 errors = []
1156 error_regex = re.compile(r"^\d\d:\d\d:\d\d\s+(\w+)\s+ERROR\s+", re.MULTILINE)
1157 for match in error_regex.finditer(stdout):
1158 (algo,) = match.groups()
1159 errors.append(algo)
1160 errors = collections.Counter(errors)
1161 assert dict(errors) == {}, stdout
1162
1163
1164 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
1165 @pytest.mark.skipif(not onnxEnabled, reason="ONNX plugin not enabled")
1166 @pytest.mark.slow
1167 def test_ML_Ambiguity_Solver(tmp_path, assert_root_hash):
1168
1169
1170 root_file = "performance_finding_ambiML.root"
1171 output_dir = "odd_output"
1172 assert not (tmp_path / root_file).exists()
1173
1174
1175 with getOpenDataDetector():
1176 pass
1177
1178 script = (
1179 Path(__file__).parent.parent.parent.parent
1180 / "Examples"
1181 / "Scripts"
1182 / "Python"
1183 / "full_chain_odd.py"
1184 )
1185 assert script.exists()
1186 env = os.environ.copy()
1187 env["ACTS_LOG_FAILURE_THRESHOLD"] = "ERROR"
1188 try:
1189 subprocess.check_call(
1190 [sys.executable, str(script), "-n1", "--ambi-solver", "ML"],
1191 cwd=tmp_path,
1192 env=env,
1193 stderr=subprocess.STDOUT,
1194 )
1195 except subprocess.CalledProcessError as e:
1196 print(e.output.decode("utf-8"))
1197 raise
1198
1199 rfp = tmp_path / output_dir / root_file
1200 assert rfp.exists()
1201
1202 assert_root_hash(root_file, rfp)
1203
1204
1205 def test_bfield_writing(tmp_path, seq, assert_root_hash):
1206 from bfield_writing import runBFieldWriting
1207
1208 root_files = [
1209 ("solenoid.root", "solenoid", 100),
1210 ("solenoid2.root", "solenoid", 100),
1211 ]
1212
1213 for fn, _, _ in root_files:
1214 fp = tmp_path / fn
1215 assert not fp.exists()
1216
1217 runBFieldWriting(outputDir=tmp_path, rewrites=1)
1218
1219 for fn, tn, ee in root_files:
1220 fp = tmp_path / fn
1221 assert fp.exists()
1222 assert fp.stat().st_size > 2**10 * 2
1223 assert_entries(fp, tn, ee)
1224 assert_root_hash(fn, fp)
1225
1226
1227 @pytest.mark.parametrize("backend", ["onnx", "torch"])
1228 @pytest.mark.parametrize("hardware", ["cpu", "gpu"])
1229 @pytest.mark.skipif(not gnnEnabled, reason="Gnn environment not set up")
1230 def test_gnn(tmp_path, trk_geo, field, assert_root_hash, backend, hardware):
1231 if backend == "onnx" and hardware == "cpu":
1232 pytest.skip("Combination of ONNX and CPU not yet supported")
1233
1234 if backend == "torch":
1235 pytest.skip(
1236 "Disabled torch support until replacement for torch-scatter is found"
1237 )
1238
1239 root_file = "performance_track_finding.root"
1240 assert not (tmp_path / root_file).exists()
1241
1242
1243
1244 onnx_url = "https://acts.web.cern.ch/ci/exatrkx/onnx_models_v01.tar"
1245 torch_url = "https://acts.web.cern.ch/ci/exatrkx/torchscript_models_v01.tar"
1246
1247 for url in [onnx_url, torch_url]:
1248 tarfile_name = tmp_path / "models.tar"
1249 urllib.request.urlretrieve(url, tarfile_name)
1250 tarfile.open(tarfile_name).extractall(tmp_path)
1251
1252 shutil.copyfile(
1253 tmp_path / "torchscript_models/embed.pt", tmp_path / "onnx_models/embed.pt"
1254 )
1255
1256 script = (
1257 Path(__file__).parent.parent.parent.parent
1258 / "Examples"
1259 / "Scripts"
1260 / "Python"
1261 / "gnn.py"
1262 )
1263 assert script.exists()
1264 env = os.environ.copy()
1265 env["ACTS_LOG_FAILURE_THRESHOLD"] = "WARNING"
1266
1267 if hardware == "cpu":
1268 env["CUDA_VISIBLE_DEVICES"] = ""
1269
1270 try:
1271 subprocess.check_call(
1272 [sys.executable, str(script), backend],
1273 cwd=tmp_path,
1274 env=env,
1275 stderr=subprocess.STDOUT,
1276 )
1277 except subprocess.CalledProcessError as e:
1278 print(e.output.decode("utf-8"))
1279 raise
1280
1281 rfp = tmp_path / root_file
1282 assert rfp.exists()
1283
1284 assert_root_hash(root_file, rfp)
1285
1286
1287 @pytest.mark.odd
1288 def test_strip_spacepoints(detector_config, field, tmp_path, assert_root_hash):
1289 if detector_config.name == "generic":
1290 pytest.skip("No strip spacepoint formation for the generic detector currently")
1291
1292 from strip_spacepoints import createStripSpacepoints
1293
1294 s = Sequencer(events=20, numThreads=-1)
1295
1296 config_path = Path(__file__).parent.parent.parent.parent / "Examples" / "Configs"
1297
1298 geo_selection = config_path / "odd-strip-spacepoint-selection.json"
1299 digi_config_file = config_path / "odd-digi-smearing-config.json"
1300
1301 with detector_config.detector:
1302 createStripSpacepoints(
1303 trackingGeometry=detector_config.trackingGeometry,
1304 field=field,
1305 digiConfigFile=digi_config_file,
1306 geoSelection=geo_selection,
1307 outputDir=tmp_path,
1308 s=s,
1309 ).run()
1310
1311 root_file = "strip_spacepoints.root"
1312 rfp = tmp_path / root_file
1313
1314 assert_root_hash(root_file, rfp)
1315
1316
1317 @pytest.mark.skipif(not geant4Enabled, reason="Geant4 not set up")
1318 @pytest.mark.skipif(not geomodelEnabled, reason="Geomodel not set up")
1319 def test_geomodel_G4(tmp_path):
1320 script = (
1321 Path(__file__).parent.parent.parent.parent
1322 / "Examples"
1323 / "Scripts"
1324 / "Python"
1325 / "geomodel_G4.py"
1326 )
1327 assert script.exists()
1328
1329 mockup_det = "Muon"
1330 out_dir = tmp_path / "geomodel_g4_out"
1331 out_dir.mkdir()
1332 args = [
1333 "python3",
1334 str(script),
1335 "--mockupDetector",
1336 str(mockup_det),
1337 "--outDir",
1338 str(out_dir),
1339 ]
1340 subprocess.check_call(args)
1341
1342 assert (out_dir / "obj").exists()