File indexing completed on 2025-07-14 08:11:37
0001 from pathlib import Path
0002 import os
0003 import json
0004 import functools
0005 import tarfile
0006 import urllib.request
0007 import subprocess
0008 import sys
0009 import re
0010 import collections
0011 import shutil
0012
0013 import pytest
0014
0015 from helpers import (
0016 geant4Enabled,
0017 dd4hepEnabled,
0018 hepmc3Enabled,
0019 pythia8Enabled,
0020 exatrkxEnabled,
0021 onnxEnabled,
0022 hashingSeedingEnabled,
0023 AssertCollectionExistsAlg,
0024 failure_threshold,
0025 )
0026
0027 import acts
0028 from acts.examples import (
0029 Sequencer,
0030 GenericDetector,
0031 )
0032 from acts.examples.odd import getOpenDataDetector, getOpenDataDetectorDirectory
0033
0034
0035 u = acts.UnitConstants
0036
0037
0038 @pytest.fixture
0039 def field():
0040 return acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0041
0042
0043 @pytest.fixture
0044 def seq():
0045 return Sequencer(events=10, numThreads=1)
0046
0047
0048 def assert_csv_output(csv_path, stem):
0049 __tracebackhide__ = True
0050
0051 assert len([f for f in csv_path.iterdir() if f.name.endswith(stem + ".csv")]) > 0
0052 assert all(
0053 [
0054 f.stat().st_size > 100
0055 for f in csv_path.iterdir()
0056 if f.name.endswith(stem + ".csv")
0057 ]
0058 )
0059
0060
0061 def assert_entries(root_file, tree_name, exp=None, non_zero=False):
0062 __tracebackhide__ = True
0063 import ROOT
0064
0065 ROOT.PyConfig.IgnoreCommandLineOptions = True
0066 ROOT.gROOT.SetBatch(True)
0067
0068 rf = ROOT.TFile.Open(str(root_file))
0069 keys = [k.GetName() for k in rf.GetListOfKeys()]
0070 assert tree_name in keys
0071 print("Entries:", rf.Get(tree_name).GetEntries())
0072 if non_zero:
0073 assert rf.Get(tree_name).GetEntries() > 0, f"{root_file}:{tree_name}"
0074 if exp is not None:
0075 assert rf.Get(tree_name).GetEntries() == exp, f"{root_file}:{tree_name}"
0076
0077
0078 def assert_has_entries(root_file, tree_name):
0079 __tracebackhide__ = True
0080 assert_entries(root_file, tree_name, non_zero=True)
0081
0082
0083 @pytest.mark.slow
0084 @pytest.mark.skipif(not pythia8Enabled, reason="Pythia8 not set up")
0085 def test_pythia8(tmp_path, seq, assert_root_hash):
0086 from pythia8 import runPythia8
0087
0088 (tmp_path / "csv").mkdir()
0089
0090 assert not (tmp_path / "particles.root").exists()
0091 assert len(list((tmp_path / "csv").iterdir())) == 0
0092
0093 events = seq.config.events
0094
0095 vtxGen = acts.examples.GaussianVertexGenerator(
0096 stddev=acts.Vector4(50 * u.um, 50 * u.um, 150 * u.mm, 0),
0097 mean=acts.Vector4(0, 0, 0, 0),
0098 )
0099
0100 runPythia8(
0101 str(tmp_path), outputRoot=True, outputCsv=True, vtxGen=vtxGen, s=seq
0102 ).run()
0103
0104 fp = tmp_path / "particles.root"
0105 assert fp.exists()
0106 assert fp.stat().st_size > 2**10 * 50
0107 assert_entries(fp, "particles", events)
0108 assert_root_hash(fp.name, fp)
0109
0110 assert len(list((tmp_path / "csv").iterdir())) > 0
0111 assert_csv_output(tmp_path / "csv", "particles")
0112
0113
0114 def test_fatras(trk_geo, tmp_path, field, assert_root_hash):
0115 from fatras import runFatras
0116
0117 csv = tmp_path / "csv"
0118 csv.mkdir()
0119
0120 nevents = 10
0121
0122 root_files = [
0123 (
0124 "particles_simulation.root",
0125 "particles",
0126 ),
0127 (
0128 "hits.root",
0129 "hits",
0130 ),
0131 ]
0132
0133 assert len(list(csv.iterdir())) == 0
0134 for rf, _ in root_files:
0135 assert not (tmp_path / rf).exists()
0136
0137 seq = Sequencer(events=nevents)
0138 runFatras(trk_geo, field, str(tmp_path), s=seq).run()
0139
0140 assert_csv_output(csv, "particles_simulated")
0141 assert_csv_output(csv, "hits")
0142 for f, tn in root_files:
0143 rfp = tmp_path / f
0144 assert rfp.exists()
0145 assert rfp.stat().st_size > 2**10 * 10
0146
0147 assert_has_entries(rfp, tn)
0148 assert_root_hash(f, rfp)
0149
0150
0151 @pytest.mark.slow
0152 @pytest.mark.odd
0153 @pytest.mark.skipif(not geant4Enabled, reason="Geant4 not set up")
0154 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0155 def test_geant4(tmp_path, assert_root_hash):
0156
0157
0158
0159 with getOpenDataDetector():
0160 pass
0161
0162 csv = tmp_path / "csv"
0163 csv.mkdir()
0164
0165 root_files = [
0166 "particles_simulation.root",
0167 "hits.root",
0168 ]
0169
0170 assert len(list(csv.iterdir())) == 0
0171 for rf in root_files:
0172 assert not (tmp_path / rf).exists()
0173
0174 script = (
0175 Path(__file__).parent.parent.parent.parent
0176 / "Examples"
0177 / "Scripts"
0178 / "Python"
0179 / "geant4.py"
0180 )
0181 assert script.exists()
0182 env = os.environ.copy()
0183 env["ACTS_LOG_FAILURE_THRESHOLD"] = "WARNING"
0184 try:
0185 subprocess.check_call(
0186 [sys.executable, str(script)],
0187 cwd=tmp_path,
0188 env=env,
0189 stderr=subprocess.STDOUT,
0190 )
0191 except subprocess.CalledProcessError as e:
0192 if e.output is not None:
0193 print(e.output.decode("utf-8"))
0194 if e.stderr is not None:
0195 print(e.stderr.decode("utf-8"))
0196 raise
0197
0198 assert_csv_output(csv, "particles_simulated")
0199 assert_csv_output(csv, "hits")
0200 for f in root_files:
0201 rfp = tmp_path / f
0202 assert rfp.exists()
0203 assert rfp.stat().st_size > 2**10 * 10
0204
0205 assert_root_hash(f, rfp)
0206
0207
0208 def test_seeding(tmp_path, trk_geo, field, assert_root_hash):
0209 from seeding import runSeeding
0210
0211 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0212
0213 csv = tmp_path / "csv"
0214 csv.mkdir()
0215
0216 seq = Sequencer(events=10, numThreads=1)
0217
0218 root_files = [
0219 (
0220 "estimatedparams.root",
0221 "estimatedparams",
0222 ),
0223 (
0224 "performance_seeding.root",
0225 None,
0226 ),
0227 (
0228 "particles.root",
0229 "particles",
0230 ),
0231 (
0232 "particles_simulation.root",
0233 "particles",
0234 ),
0235 ]
0236
0237 for fn, _ in root_files:
0238 fp = tmp_path / fn
0239 assert not fp.exists()
0240
0241 assert len(list(csv.iterdir())) == 0
0242
0243 runSeeding(trk_geo, field, outputDir=str(tmp_path), s=seq).run()
0244
0245 for fn, tn in root_files:
0246 fp = tmp_path / fn
0247 assert fp.exists()
0248 assert fp.stat().st_size > 100
0249
0250 if tn is not None:
0251 assert_has_entries(fp, tn)
0252 assert_root_hash(fn, fp)
0253
0254 assert_csv_output(csv, "particles")
0255 assert_csv_output(csv, "particles_simulated")
0256
0257
0258 @pytest.mark.slow
0259 @pytest.mark.skipif(not hashingSeedingEnabled, reason="HashingSeeding not set up")
0260 def test_hashing_seeding(tmp_path, trk_geo, field, assert_root_hash):
0261 from hashing_seeding import runHashingSeeding, Config
0262
0263 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0264
0265 seq = Sequencer(events=10, numThreads=1)
0266
0267 root_files = [
0268 (
0269 "estimatedparams.root",
0270 "estimatedparams",
0271 ),
0272 (
0273 "performance_seeding.root",
0274 None,
0275 ),
0276 ]
0277
0278 for fn, _ in root_files:
0279 fp = tmp_path / fn
0280 assert not fp.exists(), f"{fp} exists"
0281
0282 config = Config(
0283 mu=50,
0284 )
0285
0286 _, _, digiConfig, geoSelectionConfigFile = config.getDetectorInfo()
0287
0288 runHashingSeeding(
0289 10,
0290 trk_geo,
0291 field,
0292 outputDir=str(tmp_path),
0293 saveFiles=True,
0294 npileup=config.mu,
0295 seedingAlgorithm=config.seedingAlgorithm,
0296 maxSeedsPerSpM=config.maxSeedsPerSpM,
0297 digiConfig=digiConfig,
0298 geoSelectionConfigFile=geoSelectionConfigFile,
0299 config=config,
0300 s=seq,
0301 ).run()
0302
0303 del seq
0304
0305 for fn, tn in root_files:
0306 fp = tmp_path / fn
0307 assert fp.exists(), f"{fp} does not exist"
0308 assert fp.stat().st_size > 100, f"{fp} is too small: {fp.stat().st_size} bytes"
0309
0310 if tn is not None:
0311 assert_has_entries(fp, tn)
0312 assert_root_hash(fn, fp)
0313
0314 assert_csv_output(tmp_path, "particles_simulated")
0315 assert_csv_output(tmp_path, "buckets")
0316 assert_csv_output(tmp_path, "seed")
0317
0318
0319 def test_seeding_orthogonal(tmp_path, trk_geo, field, assert_root_hash):
0320 from seeding import runSeeding, SeedingAlgorithm
0321
0322 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0323
0324 csv = tmp_path / "csv"
0325 csv.mkdir()
0326
0327 seq = Sequencer(events=10, numThreads=1)
0328
0329 root_files = [
0330 (
0331 "estimatedparams.root",
0332 "estimatedparams",
0333 ),
0334 (
0335 "performance_seeding.root",
0336 None,
0337 ),
0338 (
0339 "particles.root",
0340 "particles",
0341 ),
0342 (
0343 "particles_simulation.root",
0344 "particles",
0345 ),
0346 ]
0347
0348 for fn, _ in root_files:
0349 fp = tmp_path / fn
0350 assert not fp.exists()
0351
0352 assert len(list(csv.iterdir())) == 0
0353
0354 runSeeding(
0355 trk_geo,
0356 field,
0357 outputDir=str(tmp_path),
0358 s=seq,
0359 seedingAlgorithm=SeedingAlgorithm.Orthogonal,
0360 ).run()
0361
0362 for fn, tn in root_files:
0363 fp = tmp_path / fn
0364 assert fp.exists()
0365 assert fp.stat().st_size > 100
0366
0367 if tn is not None:
0368 assert_has_entries(fp, tn)
0369 assert_root_hash(fn, fp)
0370
0371 assert_csv_output(csv, "particles")
0372 assert_csv_output(csv, "particles_simulated")
0373
0374
0375 def test_itk_seeding(tmp_path, trk_geo, field, assert_root_hash):
0376 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0377
0378 csv = tmp_path / "csv"
0379 csv.mkdir()
0380
0381 seq = Sequencer(events=10, numThreads=1)
0382
0383 root_files = [
0384 (
0385 "estimatedparams.root",
0386 "estimatedparams",
0387 ),
0388 (
0389 "performance_seeding.root",
0390 None,
0391 ),
0392 (
0393 "particles.root",
0394 "particles",
0395 ),
0396 (
0397 "particles_simulation.root",
0398 "particles",
0399 ),
0400 ]
0401
0402 for fn, _ in root_files:
0403 fp = tmp_path / fn
0404 assert not fp.exists()
0405
0406 assert len(list(csv.iterdir())) == 0
0407
0408 rnd = acts.examples.RandomNumbers(seed=42)
0409
0410 from acts.examples.simulation import (
0411 addParticleGun,
0412 EtaConfig,
0413 MomentumConfig,
0414 ParticleConfig,
0415 addFatras,
0416 addDigitization,
0417 ParticleSelectorConfig,
0418 addDigiParticleSelection,
0419 )
0420
0421 addParticleGun(
0422 seq,
0423 MomentumConfig(1.0 * u.GeV, 10.0 * u.GeV, True),
0424 EtaConfig(-4.0, 4.0, True),
0425 ParticleConfig(1, acts.PdgParticle.eMuon, True),
0426 outputDirCsv=tmp_path / "csv",
0427 outputDirRoot=str(tmp_path),
0428 rnd=rnd,
0429 )
0430
0431 addFatras(
0432 seq,
0433 trk_geo,
0434 field,
0435 outputDirCsv=tmp_path / "csv",
0436 outputDirRoot=str(tmp_path),
0437 rnd=rnd,
0438 )
0439
0440 srcdir = Path(__file__).resolve().parent.parent.parent.parent
0441 addDigitization(
0442 seq,
0443 trk_geo,
0444 field,
0445 digiConfigFile=srcdir / "Examples/Configs/generic-digi-smearing-config.json",
0446 rnd=rnd,
0447 )
0448
0449 addDigiParticleSelection(
0450 seq,
0451 ParticleSelectorConfig(
0452 pt=(0.9 * u.GeV, None),
0453 eta=(-4, 4),
0454 measurements=(9, None),
0455 removeNeutral=True,
0456 ),
0457 )
0458
0459 from acts.examples.reconstruction import (
0460 addSeeding,
0461 )
0462 from acts.examples.itk import itkSeedingAlgConfig, InputSpacePointsType
0463
0464 addSeeding(
0465 seq,
0466 trk_geo,
0467 field,
0468 *itkSeedingAlgConfig(InputSpacePointsType.PixelSpacePoints),
0469 acts.logging.VERBOSE,
0470 geoSelectionConfigFile=srcdir / "Examples/Configs/generic-seeding-config.json",
0471 outputDirRoot=str(tmp_path),
0472 )
0473
0474 seq.run()
0475
0476 for fn, tn in root_files:
0477 fp = tmp_path / fn
0478 assert fp.exists()
0479 assert fp.stat().st_size > 100
0480
0481 if tn is not None:
0482 assert_has_entries(fp, tn)
0483 assert_root_hash(fn, fp)
0484
0485 assert_csv_output(csv, "particles")
0486 assert_csv_output(csv, "particles_simulated")
0487
0488
0489 @pytest.mark.slow
0490 def test_propagation(tmp_path, trk_geo, field, seq, assert_root_hash):
0491 from propagation import runPropagation
0492
0493 root_files = [
0494 (
0495 "propagation_summary.root",
0496 "propagation_summary",
0497 10000,
0498 )
0499 ]
0500
0501 for fn, _, _ in root_files:
0502 fp = tmp_path / fn
0503 assert not fp.exists()
0504
0505 runPropagation(trk_geo, field, str(tmp_path), s=seq).run()
0506
0507 for fn, tn, ee in root_files:
0508 fp = tmp_path / fn
0509 assert fp.exists()
0510 assert fp.stat().st_size > 2**10 * 50
0511 assert_entries(fp, tn, ee)
0512 assert_root_hash(fn, fp)
0513
0514
0515 @pytest.mark.slow
0516 @pytest.mark.odd
0517 @pytest.mark.skipif(not geant4Enabled, reason="Geant4 not set up")
0518 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0519 def test_material_recording(tmp_path, material_recording, assert_root_hash):
0520 root_files = [
0521 (
0522 "geant4_material_tracks.root",
0523 "material-tracks",
0524 200,
0525 )
0526 ]
0527
0528 for fn, tn, ee in root_files:
0529 fp = material_recording / fn
0530 assert fp.exists()
0531 assert fp.stat().st_size > 2**10 * 50
0532 assert_entries(fp, tn, ee)
0533 assert_root_hash(fn, fp)
0534
0535
0536 @pytest.mark.parametrize("revFiltMomThresh", [0 * u.GeV, 1 * u.TeV])
0537 def test_truth_tracking_kalman(
0538 tmp_path, assert_root_hash, revFiltMomThresh, detector_config
0539 ):
0540 root_files = [
0541 ("trackstates_kf.root", "trackstates", 19),
0542 ("tracksummary_kf.root", "tracksummary", 10),
0543 ("performance_kf.root", None, -1),
0544 ]
0545
0546 for fn, _, _ in root_files:
0547 fp = tmp_path / fn
0548 assert not fp.exists()
0549
0550 with detector_config.detector:
0551 from truth_tracking_kalman import runTruthTrackingKalman
0552
0553 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0554
0555 seq = Sequencer(events=10, numThreads=1)
0556
0557 runTruthTrackingKalman(
0558 trackingGeometry=detector_config.trackingGeometry,
0559 field=field,
0560 digiConfigFile=detector_config.digiConfigFile,
0561 outputDir=tmp_path,
0562 reverseFilteringMomThreshold=revFiltMomThresh,
0563 s=seq,
0564 )
0565
0566 seq.run()
0567
0568 for fn, tn, ee in root_files:
0569 fp = tmp_path / fn
0570 assert fp.exists()
0571 assert fp.stat().st_size > 1024
0572 if tn is not None:
0573 assert_has_entries(fp, tn)
0574 assert_root_hash(fn, fp)
0575
0576 import ROOT
0577
0578 ROOT.PyConfig.IgnoreCommandLineOptions = True
0579 ROOT.gROOT.SetBatch(True)
0580 rf = ROOT.TFile.Open(str(tmp_path / "tracksummary_kf.root"))
0581 keys = [k.GetName() for k in rf.GetListOfKeys()]
0582 assert "tracksummary" in keys
0583 for entry in rf.Get("tracksummary"):
0584 assert entry.hasFittedParams
0585
0586
0587 def test_truth_tracking_gsf(tmp_path, assert_root_hash, detector_config):
0588 from truth_tracking_gsf import runTruthTrackingGsf
0589
0590 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0591
0592 seq = Sequencer(
0593 events=10,
0594 numThreads=1,
0595 fpeMasks=[
0596 (
0597 "Core/include/Acts/TrackFitting/detail/GsfUtils.hpp:197",
0598 acts.FpeType.FLTUND,
0599 1,
0600 ),
0601 ],
0602 )
0603
0604 root_files = [
0605 ("trackstates_gsf.root", "trackstates"),
0606 ("tracksummary_gsf.root", "tracksummary"),
0607 ]
0608
0609 for fn, _ in root_files:
0610 fp = tmp_path / fn
0611 assert not fp.exists()
0612
0613 with detector_config.detector:
0614 runTruthTrackingGsf(
0615 trackingGeometry=detector_config.trackingGeometry,
0616 decorators=detector_config.decorators,
0617 field=field,
0618 digiConfigFile=detector_config.digiConfigFile,
0619 outputDir=tmp_path,
0620 s=seq,
0621 )
0622
0623
0624 with failure_threshold(acts.logging.FATAL):
0625 seq.run()
0626
0627 for fn, tn in root_files:
0628 fp = tmp_path / fn
0629 assert fp.exists()
0630 assert fp.stat().st_size > 1024
0631 if tn is not None:
0632 assert_root_hash(fn, fp)
0633
0634
0635 def test_refitting(tmp_path, detector_config, assert_root_hash):
0636 from truth_tracking_gsf_refitting import runRefittingGsf
0637
0638 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0639
0640 seq = Sequencer(
0641 events=10,
0642 numThreads=1,
0643 )
0644
0645 with detector_config.detector:
0646
0647
0648 runRefittingGsf(
0649 trackingGeometry=detector_config.trackingGeometry,
0650 field=field,
0651 digiConfigFile=detector_config.digiConfigFile,
0652 outputDir=tmp_path,
0653 s=seq,
0654 ).run()
0655
0656 root_files = [
0657 ("trackstates_gsf_refit.root", "trackstates"),
0658 ("tracksummary_gsf_refit.root", "tracksummary"),
0659 ]
0660
0661 for fn, tn in root_files:
0662 fp = tmp_path / fn
0663 assert fp.exists()
0664 assert fp.stat().st_size > 1024
0665 if tn is not None:
0666 assert_root_hash(fn, fp)
0667
0668
0669 def test_particle_gun(tmp_path, assert_root_hash):
0670 from particle_gun import runParticleGun
0671
0672 s = Sequencer(events=20, numThreads=-1)
0673
0674 csv_dir = tmp_path / "csv"
0675 root_file = tmp_path / "particles.root"
0676
0677 assert not csv_dir.exists()
0678 assert not root_file.exists()
0679
0680 runParticleGun(str(tmp_path), s=s).run()
0681
0682 assert csv_dir.exists()
0683 assert root_file.exists()
0684
0685 assert len([f for f in csv_dir.iterdir() if f.name.endswith("particles.csv")]) > 0
0686 assert all([f.stat().st_size > 100 for f in csv_dir.iterdir()])
0687
0688 assert root_file.stat().st_size > 200
0689 assert_entries(root_file, "particles", 20)
0690 assert_root_hash(root_file.name, root_file)
0691
0692
0693 @pytest.mark.slow
0694 @pytest.mark.odd
0695 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0696 def test_material_mapping(material_recording, tmp_path, assert_root_hash):
0697 from material_mapping import runMaterialMapping
0698 from material_validation import runMaterialValidation
0699
0700 map_file = tmp_path / "material-map_tracks.root"
0701 assert not map_file.exists()
0702
0703 odd_dir = getOpenDataDetectorDirectory()
0704 config = acts.MaterialMapJsonConverter.Config()
0705 materialDecorator = acts.JsonMaterialDecorator(
0706 level=acts.logging.INFO,
0707 rConfig=config,
0708 jFileName=str(odd_dir / "config/odd-material-mapping-config.json"),
0709 )
0710
0711 s = Sequencer(numThreads=1)
0712
0713 with getOpenDataDetector(materialDecorator) as detector:
0714 trackingGeometry = detector.trackingGeometry()
0715 decorators = detector.contextDecorators()
0716
0717 runMaterialMapping(
0718 trackingGeometry,
0719 decorators,
0720 outputDir=str(tmp_path),
0721 inputDir=material_recording,
0722 mappingStep=1,
0723 s=s,
0724 )
0725
0726 s.run()
0727
0728 mat_file = tmp_path / "material-map.json"
0729
0730 assert mat_file.exists()
0731 assert mat_file.stat().st_size > 10
0732
0733 with mat_file.open() as fh:
0734 assert json.load(fh)
0735
0736 assert map_file.exists()
0737 assert_entries(map_file, "material-tracks", 200)
0738 assert_root_hash(map_file.name, map_file)
0739
0740 val_file = tmp_path / "propagation-material.root"
0741 assert not val_file.exists()
0742
0743
0744
0745 field = acts.NullBField()
0746
0747 s = Sequencer(events=10, numThreads=1)
0748
0749 with getOpenDataDetector(
0750 materialDecorator=acts.IMaterialDecorator.fromFile(mat_file)
0751 ) as detector:
0752 trackingGeometry = detector.trackingGeometry()
0753 decorators = detector.contextDecorators()
0754
0755 runMaterialValidation(
0756 10, 1000, trackingGeometry, decorators, field, outputDir=str(tmp_path), s=s
0757 )
0758
0759 s.run()
0760
0761 assert val_file.exists()
0762 assert_entries(val_file, "material-tracks", 10000)
0763 assert_root_hash(val_file.name, val_file)
0764
0765
0766 @pytest.mark.slow
0767 @pytest.mark.odd
0768 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0769 def test_volume_material_mapping(material_recording, tmp_path, assert_root_hash):
0770 from material_mapping import runMaterialMapping
0771 from material_validation import runMaterialValidation
0772
0773 map_file = tmp_path / "material-map-volume_tracks.root"
0774 assert not map_file.exists()
0775
0776 geo_map = Path(__file__).parent / "geometry-volume-map.json"
0777 assert geo_map.exists()
0778 assert geo_map.stat().st_size > 10
0779 with geo_map.open() as fh:
0780 assert json.load(fh)
0781
0782 s = Sequencer(numThreads=1)
0783
0784 with getOpenDataDetector(
0785 materialDecorator=acts.IMaterialDecorator.fromFile(geo_map)
0786 ) as detector:
0787 trackingGeometry = detector.trackingGeometry()
0788 decorators = detector.contextDecorators()
0789
0790 runMaterialMapping(
0791 trackingGeometry,
0792 decorators,
0793 mapName="material-map-volume",
0794 outputDir=str(tmp_path),
0795 inputDir=material_recording,
0796 mappingStep=1,
0797 s=s,
0798 )
0799
0800 s.run()
0801
0802 mat_file = tmp_path / "material-map-volume.json"
0803
0804 assert mat_file.exists()
0805 assert mat_file.stat().st_size > 10
0806
0807 with mat_file.open() as fh:
0808 assert json.load(fh)
0809
0810 assert map_file.exists()
0811 assert_entries(map_file, "material-tracks", 200)
0812 assert_root_hash(map_file.name, map_file)
0813
0814 val_file = tmp_path / "propagation-volume-material.root"
0815 assert not val_file.exists()
0816
0817
0818
0819 field = acts.NullBField()
0820
0821 s = Sequencer(events=10, numThreads=1)
0822
0823 with getOpenDataDetector(
0824 materialDecorator=acts.IMaterialDecorator.fromFile(mat_file)
0825 ) as detector:
0826 trackingGeometry = detector.trackingGeometry()
0827 decorators = detector.contextDecorators()
0828
0829 runMaterialValidation(
0830 10,
0831 1000,
0832 trackingGeometry,
0833 decorators,
0834 field,
0835 outputDir=str(tmp_path),
0836 outputName="propagation-volume-material",
0837 s=s,
0838 )
0839
0840 s.run()
0841
0842 assert val_file.exists()
0843
0844 assert_root_hash(val_file.name, val_file)
0845
0846
0847 ACTS_DIR = Path(__file__).parent.parent.parent.parent
0848 CONFIG_DIR = ACTS_DIR / "Examples/Configs"
0849 DIGI_SHARE_DIR = (
0850 Path(__file__).parent.parent.parent.parent
0851 / "Examples/Algorithms/Digitization/share"
0852 )
0853
0854
0855 @pytest.mark.parametrize(
0856 "digi_config_file",
0857 [
0858 CONFIG_DIR / "generic-digi-smearing-config.json",
0859 CONFIG_DIR / "generic-digi-geometric-config.json",
0860 ],
0861 ids=["smeared", "geometric"],
0862 )
0863 def test_digitization_example(trk_geo, tmp_path, assert_root_hash, digi_config_file):
0864 from digitization import runDigitization
0865
0866 s = Sequencer(events=10, numThreads=-1)
0867
0868 csv_dir = tmp_path / "csv"
0869 root_file = tmp_path / "measurements.root"
0870
0871 assert not root_file.exists()
0872 assert not csv_dir.exists()
0873
0874 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0875 runDigitization(
0876 trk_geo, field, outputDir=tmp_path, digiConfigFile=digi_config_file, s=s
0877 )
0878
0879 s.run()
0880
0881 assert root_file.exists()
0882 assert csv_dir.exists()
0883
0884 assert len(list(csv_dir.iterdir())) == 3 * s.config.events
0885 assert all(f.stat().st_size > 50 for f in csv_dir.iterdir())
0886
0887 assert_root_hash(root_file.name, root_file)
0888
0889
0890 @pytest.mark.parametrize(
0891 "digi_config_file",
0892 [
0893 CONFIG_DIR / "generic-digi-smearing-config.json",
0894 CONFIG_DIR / "generic-digi-geometric-config.json",
0895 pytest.param(
0896 (ACTS_DIR / "Examples/Configs" / "odd-digi-smearing-config.json"),
0897 marks=[
0898 pytest.mark.odd,
0899 ],
0900 ),
0901 pytest.param(
0902 (ACTS_DIR / "Examples/Configs" / "odd-digi-geometric-config.json"),
0903 marks=[
0904 pytest.mark.odd,
0905 ],
0906 ),
0907 ],
0908 ids=["smeared", "geometric", "odd-smeared", "odd-geometric"],
0909 )
0910 def test_digitization_example_input_parsing(digi_config_file):
0911 from acts.examples import readDigiConfigFromJson
0912
0913 readDigiConfigFromJson(str(digi_config_file))
0914
0915
0916 @pytest.mark.parametrize(
0917 "digi_config_file",
0918 [
0919 CONFIG_DIR / "generic-digi-smearing-config.json",
0920 CONFIG_DIR / "generic-digi-geometric-config.json",
0921 ],
0922 ids=["smeared", "geometric"],
0923 )
0924 def test_digitization_example_input(
0925 trk_geo, tmp_path, assert_root_hash, digi_config_file
0926 ):
0927 from particle_gun import runParticleGun
0928 from digitization import runDigitization
0929
0930 ptcl_dir = tmp_path / "ptcl"
0931 ptcl_dir.mkdir()
0932 pgs = Sequencer(events=20, numThreads=-1)
0933 runParticleGun(str(ptcl_dir), s=pgs)
0934
0935 pgs.run()
0936
0937 s = Sequencer(numThreads=-1)
0938
0939 csv_dir = tmp_path / "csv"
0940 root_file = tmp_path / "measurements.root"
0941
0942 assert not root_file.exists()
0943 assert not csv_dir.exists()
0944
0945 assert_root_hash(
0946 "particles.root",
0947 ptcl_dir / "particles.root",
0948 )
0949
0950 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0951 runDigitization(
0952 trk_geo,
0953 field,
0954 outputDir=tmp_path,
0955 digiConfigFile=digi_config_file,
0956 particlesInput=ptcl_dir / "particles.root",
0957 s=s,
0958 doMerge=True,
0959 )
0960
0961 s.run()
0962
0963 assert root_file.exists()
0964 assert csv_dir.exists()
0965
0966 assert len(list(csv_dir.iterdir())) == 3 * pgs.config.events
0967 assert all(f.stat().st_size > 50 for f in csv_dir.iterdir())
0968
0969 assert_root_hash(root_file.name, root_file)
0970
0971
0972 def test_digitization_config_example(trk_geo, tmp_path):
0973 from digitization_config import runDigitizationConfig
0974
0975 out_file = tmp_path / "output.json"
0976 assert not out_file.exists()
0977
0978 input = (
0979 Path(__file__).parent
0980 / "../../../Examples/Configs/generic-digi-smearing-config.json"
0981 )
0982 assert input.exists(), input.resolve()
0983
0984 runDigitizationConfig(trk_geo, input=input, output=out_file)
0985
0986 assert out_file.exists()
0987
0988 with out_file.open() as fh:
0989 data = json.load(fh)
0990 assert len(data.keys()) == 2
0991 assert data["acts-geometry-hierarchy-map"]["format-version"] == 0
0992 assert (
0993 data["acts-geometry-hierarchy-map"]["value-identifier"]
0994 == "digitization-configuration"
0995 )
0996 assert len(data["entries"]) == 27
0997
0998
0999 @pytest.mark.parametrize(
1000 "truthSmeared,truthEstimated",
1001 [
1002 [False, False],
1003 [False, True],
1004 [True, False],
1005 ],
1006 ids=["full_seeding", "truth_estimated", "truth_smeared"],
1007 )
1008 @pytest.mark.slow
1009 def test_ckf_tracks_example(
1010 tmp_path, assert_root_hash, truthSmeared, truthEstimated, detector_config
1011 ):
1012 csv = tmp_path / "csv"
1013
1014 assert not csv.exists()
1015
1016 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
1017 events = 100
1018 s = Sequencer(events=events, numThreads=-1)
1019
1020 root_files = [
1021 (
1022 "performance_finding_ckf.root",
1023 None,
1024 ),
1025 (
1026 "trackstates_ckf.root",
1027 "trackstates",
1028 ),
1029 (
1030 "tracksummary_ckf.root",
1031 "tracksummary",
1032 ),
1033 ]
1034
1035 if not truthSmeared:
1036 root_files += [
1037 (
1038 "performance_seeding.root",
1039 None,
1040 ),
1041 ]
1042
1043 for rf, _ in root_files:
1044 assert not (tmp_path / rf).exists()
1045
1046 from ckf_tracks import runCKFTracks
1047
1048 with detector_config.detector:
1049 runCKFTracks(
1050 detector_config.trackingGeometry,
1051 detector_config.decorators,
1052 field=field,
1053 outputCsv=True,
1054 outputDir=tmp_path,
1055 geometrySelection=detector_config.geometrySelection,
1056 digiConfigFile=detector_config.digiConfigFile,
1057 truthSmearedSeeded=truthSmeared,
1058 truthEstimatedSeeded=truthEstimated,
1059 s=s,
1060 )
1061
1062 s.run()
1063
1064 assert csv.exists()
1065 for rf, tn in root_files:
1066 rp = tmp_path / rf
1067 assert rp.exists()
1068 if tn is not None:
1069 assert_root_hash(rf, rp)
1070
1071 assert (
1072 len([f for f in csv.iterdir() if f.name.endswith("tracks_ckf.csv")]) == events
1073 )
1074 assert all([f.stat().st_size > 300 for f in csv.iterdir()])
1075
1076
1077 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
1078 @pytest.mark.odd
1079 @pytest.mark.slow
1080 def test_full_chain_odd_example(tmp_path):
1081
1082
1083
1084 with getOpenDataDetector():
1085 pass
1086
1087 script = (
1088 Path(__file__).parent.parent.parent.parent
1089 / "Examples"
1090 / "Scripts"
1091 / "Python"
1092 / "full_chain_odd.py"
1093 )
1094 assert script.exists()
1095 env = os.environ.copy()
1096 env["ACTS_LOG_FAILURE_THRESHOLD"] = "ERROR"
1097 try:
1098 subprocess.check_call(
1099 [sys.executable, str(script), "-n1"],
1100 cwd=tmp_path,
1101 env=env,
1102 stderr=subprocess.STDOUT,
1103 )
1104 except subprocess.CalledProcessError as e:
1105 print(e.output.decode("utf-8"))
1106 raise
1107
1108
1109 @pytest.mark.skipif(
1110 not dd4hepEnabled or not geant4Enabled, reason="DD4hep and/or Geant4 not set up"
1111 )
1112 @pytest.mark.slow
1113 def test_full_chain_odd_example_pythia_geant4(tmp_path):
1114
1115
1116
1117 with getOpenDataDetector():
1118 pass
1119
1120 script = (
1121 Path(__file__).parent.parent.parent.parent
1122 / "Examples"
1123 / "Scripts"
1124 / "Python"
1125 / "full_chain_odd.py"
1126 )
1127 assert script.exists()
1128 env = os.environ.copy()
1129 env["ACTS_LOG_FAILURE_THRESHOLD"] = "ERROR"
1130 try:
1131 stdout = subprocess.check_output(
1132 [
1133 sys.executable,
1134 str(script),
1135 "-n1",
1136 "--geant4",
1137 "--ttbar",
1138 "--ttbar-pu",
1139 "50",
1140 ],
1141 cwd=tmp_path,
1142 env=env,
1143 stderr=subprocess.STDOUT,
1144 )
1145 stdout = stdout.decode("utf-8")
1146 except subprocess.CalledProcessError as e:
1147 print(e.output.decode("utf-8"))
1148 raise
1149
1150
1151 errors = []
1152 error_regex = re.compile(r"^\d\d:\d\d:\d\d\s+(\w+)\s+ERROR\s+", re.MULTILINE)
1153 for match in error_regex.finditer(stdout):
1154 (algo,) = match.groups()
1155 errors.append(algo)
1156 errors = collections.Counter(errors)
1157 assert dict(errors) == {}, stdout
1158
1159
1160 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
1161 @pytest.mark.skipif(not onnxEnabled, reason="ONNX plugin not enabled")
1162 @pytest.mark.slow
1163 def test_ML_Ambiguity_Solver(tmp_path, assert_root_hash):
1164
1165
1166 root_file = "performance_finding_ambiML.root"
1167 output_dir = "odd_output"
1168 assert not (tmp_path / root_file).exists()
1169
1170
1171 with getOpenDataDetector():
1172 pass
1173
1174 script = (
1175 Path(__file__).parent.parent.parent.parent
1176 / "Examples"
1177 / "Scripts"
1178 / "Python"
1179 / "full_chain_odd.py"
1180 )
1181 assert script.exists()
1182 env = os.environ.copy()
1183 env["ACTS_LOG_FAILURE_THRESHOLD"] = "ERROR"
1184 try:
1185 subprocess.check_call(
1186 [sys.executable, str(script), "-n1", "--ambi-solver", "ML"],
1187 cwd=tmp_path,
1188 env=env,
1189 stderr=subprocess.STDOUT,
1190 )
1191 except subprocess.CalledProcessError as e:
1192 print(e.output.decode("utf-8"))
1193 raise
1194
1195 rfp = tmp_path / output_dir / root_file
1196 assert rfp.exists()
1197
1198 assert_root_hash(root_file, rfp)
1199
1200
1201 def test_bfield_writing(tmp_path, seq, assert_root_hash):
1202 from bfield_writing import runBFieldWriting
1203
1204 root_files = [
1205 ("solenoid.root", "solenoid", 100),
1206 ("solenoid2.root", "solenoid", 100),
1207 ]
1208
1209 for fn, _, _ in root_files:
1210 fp = tmp_path / fn
1211 assert not fp.exists()
1212
1213 runBFieldWriting(outputDir=tmp_path, rewrites=1)
1214
1215 for fn, tn, ee in root_files:
1216 fp = tmp_path / fn
1217 assert fp.exists()
1218 assert fp.stat().st_size > 2**10 * 2
1219 assert_entries(fp, tn, ee)
1220 assert_root_hash(fn, fp)
1221
1222
1223 @pytest.mark.parametrize("backend", ["onnx", "torch"])
1224 @pytest.mark.parametrize("hardware", ["cpu", "gpu"])
1225 @pytest.mark.skipif(not exatrkxEnabled, reason="ExaTrkX environment not set up")
1226 def test_exatrkx(tmp_path, trk_geo, field, assert_root_hash, backend, hardware):
1227 if backend == "onnx" and hardware == "cpu":
1228 pytest.skip("Combination of ONNX and CPU not yet supported")
1229
1230 if backend == "torch":
1231 pytest.skip(
1232 "Disabled torch support until replacement for torch-scatter is found"
1233 )
1234
1235 root_file = "performance_track_finding.root"
1236 assert not (tmp_path / root_file).exists()
1237
1238
1239
1240 onnx_url = "https://acts.web.cern.ch/ci/exatrkx/onnx_models_v01.tar"
1241 torch_url = "https://acts.web.cern.ch/ci/exatrkx/torchscript_models_v01.tar"
1242
1243 for url in [onnx_url, torch_url]:
1244 tarfile_name = tmp_path / "models.tar"
1245 urllib.request.urlretrieve(url, tarfile_name)
1246 tarfile.open(tarfile_name).extractall(tmp_path)
1247
1248 shutil.copyfile(
1249 tmp_path / "torchscript_models/embed.pt", tmp_path / "onnx_models/embed.pt"
1250 )
1251
1252 script = (
1253 Path(__file__).parent.parent.parent.parent
1254 / "Examples"
1255 / "Scripts"
1256 / "Python"
1257 / "exatrkx.py"
1258 )
1259 assert script.exists()
1260 env = os.environ.copy()
1261 env["ACTS_LOG_FAILURE_THRESHOLD"] = "WARNING"
1262
1263 if hardware == "cpu":
1264 env["CUDA_VISIBLE_DEVICES"] = ""
1265
1266 try:
1267 subprocess.check_call(
1268 [sys.executable, str(script), backend],
1269 cwd=tmp_path,
1270 env=env,
1271 stderr=subprocess.STDOUT,
1272 )
1273 except subprocess.CalledProcessError as e:
1274 print(e.output.decode("utf-8"))
1275 raise
1276
1277 rfp = tmp_path / root_file
1278 assert rfp.exists()
1279
1280 assert_root_hash(root_file, rfp)
1281
1282
1283 @pytest.mark.odd
1284 def test_strip_spacepoints(detector_config, field, tmp_path, assert_root_hash):
1285 if detector_config.name == "generic":
1286 pytest.skip("No strip spacepoint formation for the generic detector currently")
1287
1288 from strip_spacepoints import createStripSpacepoints
1289
1290 s = Sequencer(events=20, numThreads=-1)
1291
1292 config_path = Path(__file__).parent.parent.parent.parent / "Examples" / "Configs"
1293
1294 geo_selection = config_path / "odd-strip-spacepoint-selection.json"
1295 digi_config_file = config_path / "odd-digi-smearing-config.json"
1296
1297 with detector_config.detector:
1298 createStripSpacepoints(
1299 trackingGeometry=detector_config.trackingGeometry,
1300 field=field,
1301 digiConfigFile=digi_config_file,
1302 geoSelection=geo_selection,
1303 outputDir=tmp_path,
1304 s=s,
1305 ).run()
1306
1307 root_file = "strip_spacepoints.root"
1308 rfp = tmp_path / root_file
1309
1310 assert_root_hash(root_file, rfp)