File indexing completed on 2025-01-30 09:14:56
0001 from pathlib import Path
0002 import os
0003 import json
0004 import functools
0005 import tarfile
0006 import urllib.request
0007 import subprocess
0008 import sys
0009 import re
0010 import collections
0011
0012 import pytest
0013
0014 from helpers import (
0015 geant4Enabled,
0016 dd4hepEnabled,
0017 hepmc3Enabled,
0018 pythia8Enabled,
0019 exatrkxEnabled,
0020 onnxEnabled,
0021 hashingSeedingEnabled,
0022 AssertCollectionExistsAlg,
0023 failure_threshold,
0024 )
0025
0026 import acts
0027 from acts.examples import (
0028 Sequencer,
0029 GenericDetector,
0030 AlignedDetector,
0031 )
0032 from acts.examples.odd import getOpenDataDetector, getOpenDataDetectorDirectory
0033
0034
0035 u = acts.UnitConstants
0036
0037
0038 @pytest.fixture
0039 def field():
0040 return acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0041
0042
0043 @pytest.fixture
0044 def seq():
0045 return Sequencer(events=10, numThreads=1)
0046
0047
0048 def assert_csv_output(csv_path, stem):
0049 __tracebackhide__ = True
0050
0051 assert len([f for f in csv_path.iterdir() if f.name.endswith(stem + ".csv")]) > 0
0052 assert all(
0053 [
0054 f.stat().st_size > 100
0055 for f in csv_path.iterdir()
0056 if f.name.endswith(stem + ".csv")
0057 ]
0058 )
0059
0060
0061 def assert_entries(root_file, tree_name, exp=None, non_zero=False):
0062 __tracebackhide__ = True
0063 import ROOT
0064
0065 ROOT.PyConfig.IgnoreCommandLineOptions = True
0066 ROOT.gROOT.SetBatch(True)
0067
0068 rf = ROOT.TFile.Open(str(root_file))
0069 keys = [k.GetName() for k in rf.GetListOfKeys()]
0070 assert tree_name in keys
0071 print("Entries:", rf.Get(tree_name).GetEntries())
0072 if non_zero:
0073 assert rf.Get(tree_name).GetEntries() > 0, f"{root_file}:{tree_name}"
0074 if exp is not None:
0075 assert rf.Get(tree_name).GetEntries() == exp, f"{root_file}:{tree_name}"
0076
0077
0078 def assert_has_entries(root_file, tree_name):
0079 __tracebackhide__ = True
0080 assert_entries(root_file, tree_name, non_zero=True)
0081
0082
0083 @pytest.mark.slow
0084 @pytest.mark.skipif(not pythia8Enabled, reason="Pythia8 not set up")
0085 def test_pythia8(tmp_path, seq, assert_root_hash):
0086 from pythia8 import runPythia8
0087
0088 (tmp_path / "csv").mkdir()
0089
0090 assert not (tmp_path / "particles.root").exists()
0091 assert len(list((tmp_path / "csv").iterdir())) == 0
0092
0093 events = seq.config.events
0094
0095 runPythia8(str(tmp_path), outputRoot=True, outputCsv=True, s=seq).run()
0096
0097 fp = tmp_path / "particles.root"
0098 assert fp.exists()
0099 assert fp.stat().st_size > 2**10 * 50
0100 assert_entries(fp, "particles", events)
0101 assert_root_hash(fp.name, fp)
0102
0103 assert len(list((tmp_path / "csv").iterdir())) > 0
0104 assert_csv_output(tmp_path / "csv", "particles")
0105
0106
0107 def test_fatras(trk_geo, tmp_path, field, assert_root_hash):
0108 from fatras import runFatras
0109
0110 csv = tmp_path / "csv"
0111 csv.mkdir()
0112
0113 nevents = 10
0114
0115 root_files = [
0116 (
0117 "particles_simulation.root",
0118 "particles",
0119 ),
0120 (
0121 "hits.root",
0122 "hits",
0123 ),
0124 ]
0125
0126 assert len(list(csv.iterdir())) == 0
0127 for rf, _ in root_files:
0128 assert not (tmp_path / rf).exists()
0129
0130 seq = Sequencer(events=nevents)
0131 runFatras(trk_geo, field, str(tmp_path), s=seq).run()
0132
0133 assert_csv_output(csv, "particles_simulated")
0134 assert_csv_output(csv, "hits")
0135 for f, tn in root_files:
0136 rfp = tmp_path / f
0137 assert rfp.exists()
0138 assert rfp.stat().st_size > 2**10 * 10
0139
0140 assert_has_entries(rfp, tn)
0141 assert_root_hash(f, rfp)
0142
0143
0144 @pytest.mark.slow
0145 @pytest.mark.odd
0146 @pytest.mark.skipif(not geant4Enabled, reason="Geant4 not set up")
0147 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0148 def test_geant4(tmp_path, assert_root_hash):
0149
0150
0151
0152 with getOpenDataDetector():
0153 pass
0154
0155 csv = tmp_path / "csv"
0156 csv.mkdir()
0157
0158 root_files = [
0159 "particles_simulation.root",
0160 "hits.root",
0161 ]
0162
0163 assert len(list(csv.iterdir())) == 0
0164 for rf in root_files:
0165 assert not (tmp_path / rf).exists()
0166
0167 script = (
0168 Path(__file__).parent.parent.parent.parent
0169 / "Examples"
0170 / "Scripts"
0171 / "Python"
0172 / "geant4.py"
0173 )
0174 assert script.exists()
0175 env = os.environ.copy()
0176 env["ACTS_LOG_FAILURE_THRESHOLD"] = "WARNING"
0177 try:
0178 subprocess.check_call(
0179 [sys.executable, str(script)],
0180 cwd=tmp_path,
0181 env=env,
0182 stderr=subprocess.STDOUT,
0183 )
0184 except subprocess.CalledProcessError as e:
0185 print(e.output.decode("utf-8"))
0186 raise
0187
0188 assert_csv_output(csv, "particles_simulated")
0189 assert_csv_output(csv, "hits")
0190 for f in root_files:
0191 rfp = tmp_path / f
0192 assert rfp.exists()
0193 assert rfp.stat().st_size > 2**10 * 10
0194
0195 assert_root_hash(f, rfp)
0196
0197
0198 def test_seeding(tmp_path, trk_geo, field, assert_root_hash):
0199 from seeding import runSeeding
0200
0201 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0202
0203 csv = tmp_path / "csv"
0204 csv.mkdir()
0205
0206 seq = Sequencer(events=10, numThreads=1)
0207
0208 root_files = [
0209 (
0210 "estimatedparams.root",
0211 "estimatedparams",
0212 ),
0213 (
0214 "performance_seeding.root",
0215 None,
0216 ),
0217 (
0218 "particles.root",
0219 "particles",
0220 ),
0221 (
0222 "particles_simulation.root",
0223 "particles",
0224 ),
0225 ]
0226
0227 for fn, _ in root_files:
0228 fp = tmp_path / fn
0229 assert not fp.exists()
0230
0231 assert len(list(csv.iterdir())) == 0
0232
0233 runSeeding(trk_geo, field, outputDir=str(tmp_path), s=seq).run()
0234
0235 for fn, tn in root_files:
0236 fp = tmp_path / fn
0237 assert fp.exists()
0238 assert fp.stat().st_size > 100
0239
0240 if tn is not None:
0241 assert_has_entries(fp, tn)
0242 assert_root_hash(fn, fp)
0243
0244 assert_csv_output(csv, "particles")
0245 assert_csv_output(csv, "particles_simulated")
0246
0247
0248 @pytest.mark.slow
0249 @pytest.mark.skipif(not hashingSeedingEnabled, reason="HashingSeeding not set up")
0250 def test_hashing_seeding(tmp_path, trk_geo, field, assert_root_hash):
0251 from hashing_seeding import runHashingSeeding, Config
0252
0253 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0254
0255 seq = Sequencer(events=10, numThreads=1)
0256
0257 root_files = [
0258 (
0259 "estimatedparams.root",
0260 "estimatedparams",
0261 ),
0262 (
0263 "performance_seeding.root",
0264 None,
0265 ),
0266 ]
0267
0268 for fn, _ in root_files:
0269 fp = tmp_path / fn
0270 assert not fp.exists(), f"{fp} exists"
0271
0272 config = Config(
0273 mu=50,
0274 )
0275
0276 _, _, digiConfig, geoSelectionConfigFile = config.getDetectorInfo()
0277
0278 runHashingSeeding(
0279 10,
0280 trk_geo,
0281 field,
0282 outputDir=str(tmp_path),
0283 saveFiles=True,
0284 npileup=config.mu,
0285 seedingAlgorithm=config.seedingAlgorithm,
0286 maxSeedsPerSpM=config.maxSeedsPerSpM,
0287 digiConfig=digiConfig,
0288 geoSelectionConfigFile=geoSelectionConfigFile,
0289 config=config,
0290 s=seq,
0291 ).run()
0292
0293 del seq
0294
0295 for fn, tn in root_files:
0296 fp = tmp_path / fn
0297 assert fp.exists(), f"{fp} does not exist"
0298 assert fp.stat().st_size > 100, f"{fp} is too small: {fp.stat().st_size} bytes"
0299
0300 if tn is not None:
0301 assert_has_entries(fp, tn)
0302 assert_root_hash(fn, fp)
0303
0304 assert_csv_output(tmp_path, "particles_simulated")
0305 assert_csv_output(tmp_path, "buckets")
0306 assert_csv_output(tmp_path, "seed")
0307
0308
0309 def test_seeding_orthogonal(tmp_path, trk_geo, field, assert_root_hash):
0310 from seeding import runSeeding, SeedingAlgorithm
0311
0312 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0313
0314 csv = tmp_path / "csv"
0315 csv.mkdir()
0316
0317 seq = Sequencer(events=10, numThreads=1)
0318
0319 root_files = [
0320 (
0321 "estimatedparams.root",
0322 "estimatedparams",
0323 ),
0324 (
0325 "performance_seeding.root",
0326 None,
0327 ),
0328 (
0329 "particles.root",
0330 "particles",
0331 ),
0332 (
0333 "particles_simulation.root",
0334 "particles",
0335 ),
0336 ]
0337
0338 for fn, _ in root_files:
0339 fp = tmp_path / fn
0340 assert not fp.exists()
0341
0342 assert len(list(csv.iterdir())) == 0
0343
0344 runSeeding(
0345 trk_geo,
0346 field,
0347 outputDir=str(tmp_path),
0348 s=seq,
0349 seedingAlgorithm=SeedingAlgorithm.Orthogonal,
0350 ).run()
0351
0352 for fn, tn in root_files:
0353 fp = tmp_path / fn
0354 assert fp.exists()
0355 assert fp.stat().st_size > 100
0356
0357 if tn is not None:
0358 assert_has_entries(fp, tn)
0359 assert_root_hash(fn, fp)
0360
0361 assert_csv_output(csv, "particles")
0362 assert_csv_output(csv, "particles_simulated")
0363
0364
0365 def test_itk_seeding(tmp_path, trk_geo, field, assert_root_hash):
0366 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0367
0368 csv = tmp_path / "csv"
0369 csv.mkdir()
0370
0371 seq = Sequencer(events=10, numThreads=1)
0372
0373 root_files = [
0374 (
0375 "estimatedparams.root",
0376 "estimatedparams",
0377 ),
0378 (
0379 "performance_seeding.root",
0380 None,
0381 ),
0382 (
0383 "particles.root",
0384 "particles",
0385 ),
0386 (
0387 "particles_simulation.root",
0388 "particles",
0389 ),
0390 ]
0391
0392 for fn, _ in root_files:
0393 fp = tmp_path / fn
0394 assert not fp.exists()
0395
0396 assert len(list(csv.iterdir())) == 0
0397
0398 rnd = acts.examples.RandomNumbers(seed=42)
0399
0400 from acts.examples.simulation import (
0401 addParticleGun,
0402 EtaConfig,
0403 MomentumConfig,
0404 ParticleConfig,
0405 addFatras,
0406 addDigitization,
0407 ParticleSelectorConfig,
0408 addDigiParticleSelection,
0409 )
0410
0411 addParticleGun(
0412 seq,
0413 MomentumConfig(1.0 * u.GeV, 10.0 * u.GeV, True),
0414 EtaConfig(-4.0, 4.0, True),
0415 ParticleConfig(1, acts.PdgParticle.eMuon, True),
0416 outputDirCsv=tmp_path / "csv",
0417 outputDirRoot=str(tmp_path),
0418 rnd=rnd,
0419 )
0420
0421 addFatras(
0422 seq,
0423 trk_geo,
0424 field,
0425 outputDirCsv=tmp_path / "csv",
0426 outputDirRoot=str(tmp_path),
0427 rnd=rnd,
0428 )
0429
0430 srcdir = Path(__file__).resolve().parent.parent.parent.parent
0431 addDigitization(
0432 seq,
0433 trk_geo,
0434 field,
0435 digiConfigFile=srcdir
0436 / "Examples/Algorithms/Digitization/share/default-smearing-config-generic.json",
0437 rnd=rnd,
0438 )
0439
0440 addDigiParticleSelection(
0441 seq,
0442 ParticleSelectorConfig(
0443 pt=(0.9 * u.GeV, None),
0444 eta=(-4, 4),
0445 measurements=(9, None),
0446 removeNeutral=True,
0447 ),
0448 )
0449
0450 from acts.examples.reconstruction import (
0451 addSeeding,
0452 )
0453 from acts.examples.itk import itkSeedingAlgConfig, InputSpacePointsType
0454
0455 addSeeding(
0456 seq,
0457 trk_geo,
0458 field,
0459 *itkSeedingAlgConfig(InputSpacePointsType.PixelSpacePoints),
0460 acts.logging.VERBOSE,
0461 geoSelectionConfigFile=srcdir
0462 / "Examples/Algorithms/TrackFinding/share/geoSelection-genericDetector.json",
0463 outputDirRoot=str(tmp_path),
0464 )
0465
0466 seq.run()
0467
0468 for fn, tn in root_files:
0469 fp = tmp_path / fn
0470 assert fp.exists()
0471 assert fp.stat().st_size > 100
0472
0473 if tn is not None:
0474 assert_has_entries(fp, tn)
0475 assert_root_hash(fn, fp)
0476
0477 assert_csv_output(csv, "particles")
0478 assert_csv_output(csv, "particles_simulated")
0479
0480
0481 @pytest.mark.slow
0482 def test_propagation(tmp_path, trk_geo, field, seq, assert_root_hash):
0483 from propagation import runPropagation
0484
0485 root_files = [
0486 (
0487 "propagation_summary.root",
0488 "propagation_summary",
0489 10000,
0490 )
0491 ]
0492
0493 for fn, _, _ in root_files:
0494 fp = tmp_path / fn
0495 assert not fp.exists()
0496
0497 runPropagation(trk_geo, field, str(tmp_path), s=seq).run()
0498
0499 for fn, tn, ee in root_files:
0500 fp = tmp_path / fn
0501 assert fp.exists()
0502 assert fp.stat().st_size > 2**10 * 50
0503 assert_entries(fp, tn, ee)
0504 assert_root_hash(fn, fp)
0505
0506
0507 @pytest.mark.slow
0508 @pytest.mark.odd
0509 @pytest.mark.skipif(not geant4Enabled, reason="Geant4 not set up")
0510 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0511 def test_material_recording(tmp_path, material_recording, assert_root_hash):
0512 root_files = [
0513 (
0514 "geant4_material_tracks.root",
0515 "material-tracks",
0516 200,
0517 )
0518 ]
0519
0520 for fn, tn, ee in root_files:
0521 fp = material_recording / fn
0522 assert fp.exists()
0523 assert fp.stat().st_size > 2**10 * 50
0524 assert_entries(fp, tn, ee)
0525 assert_root_hash(fn, fp)
0526
0527
0528 @pytest.mark.slow
0529 @pytest.mark.odd
0530 @pytest.mark.skipif(not hepmc3Enabled, reason="HepMC3 plugin not available")
0531 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0532 @pytest.mark.skipif(not geant4Enabled, reason="Geant4 not set up")
0533 def test_event_recording(tmp_path):
0534 script = (
0535 Path(__file__).parent.parent.parent.parent
0536 / "Examples"
0537 / "Scripts"
0538 / "Python"
0539 / "event_recording.py"
0540 )
0541 assert script.exists()
0542
0543 env = os.environ.copy()
0544 env["NEVENTS"] = "1"
0545 env["ACTS_LOG_FAILURE_THRESHOLD"] = "WARNING"
0546 try:
0547 subprocess.check_call(
0548 [sys.executable, str(script)],
0549 cwd=tmp_path,
0550 env=env,
0551 stderr=subprocess.STDOUT,
0552 )
0553 except subprocess.CalledProcessError as e:
0554 print(e.output.decode("utf-8"))
0555 raise
0556
0557 from acts.examples.hepmc3 import HepMC3AsciiReader
0558
0559 out_path = tmp_path / "hepmc3"
0560
0561
0562 assert len([f for f in out_path.iterdir() if f.name.endswith("events.hepmc3")]) > 0
0563 assert all([f.stat().st_size > 100 for f in out_path.iterdir()])
0564
0565 s = Sequencer(numThreads=1)
0566
0567 s.addReader(
0568 HepMC3AsciiReader(
0569 level=acts.logging.INFO,
0570 inputDir=str(out_path),
0571 inputStem="events",
0572 outputEvents="hepmc-events",
0573 )
0574 )
0575
0576 alg = AssertCollectionExistsAlg(
0577 "hepmc-events", name="check_alg", level=acts.logging.INFO
0578 )
0579 s.addAlgorithm(alg)
0580
0581 s.run()
0582
0583 assert alg.events_seen == 1
0584
0585
0586 @pytest.mark.parametrize("revFiltMomThresh", [0 * u.GeV, 1 * u.TeV])
0587 def test_truth_tracking_kalman(
0588 tmp_path, assert_root_hash, revFiltMomThresh, detector_config
0589 ):
0590 root_files = [
0591 ("trackstates_kf.root", "trackstates", 19),
0592 ("tracksummary_kf.root", "tracksummary", 10),
0593 ("performance_kf.root", None, -1),
0594 ]
0595
0596 for fn, _, _ in root_files:
0597 fp = tmp_path / fn
0598 assert not fp.exists()
0599
0600 with detector_config.detector:
0601 from truth_tracking_kalman import runTruthTrackingKalman
0602
0603 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0604
0605 seq = Sequencer(events=10, numThreads=1)
0606
0607 runTruthTrackingKalman(
0608 trackingGeometry=detector_config.trackingGeometry,
0609 field=field,
0610 digiConfigFile=detector_config.digiConfigFile,
0611 outputDir=tmp_path,
0612 reverseFilteringMomThreshold=revFiltMomThresh,
0613 s=seq,
0614 )
0615
0616 seq.run()
0617
0618 for fn, tn, ee in root_files:
0619 fp = tmp_path / fn
0620 assert fp.exists()
0621 assert fp.stat().st_size > 1024
0622 if tn is not None:
0623 assert_has_entries(fp, tn)
0624 assert_root_hash(fn, fp)
0625
0626 import ROOT
0627
0628 ROOT.PyConfig.IgnoreCommandLineOptions = True
0629 ROOT.gROOT.SetBatch(True)
0630 rf = ROOT.TFile.Open(str(tmp_path / "tracksummary_kf.root"))
0631 keys = [k.GetName() for k in rf.GetListOfKeys()]
0632 assert "tracksummary" in keys
0633 for entry in rf.Get("tracksummary"):
0634 assert entry.hasFittedParams
0635
0636
0637 def test_truth_tracking_gsf(tmp_path, assert_root_hash, detector_config):
0638 from truth_tracking_gsf import runTruthTrackingGsf
0639
0640 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0641
0642 seq = Sequencer(
0643 events=10,
0644 numThreads=1,
0645 fpeMasks=[
0646 (
0647 "Core/include/Acts/TrackFitting/detail/GsfUtils.hpp:197",
0648 acts.FpeType.FLTUND,
0649 1,
0650 ),
0651 ],
0652 )
0653
0654 root_files = [
0655 ("trackstates_gsf.root", "trackstates"),
0656 ("tracksummary_gsf.root", "tracksummary"),
0657 ]
0658
0659 for fn, _ in root_files:
0660 fp = tmp_path / fn
0661 assert not fp.exists()
0662
0663 with detector_config.detector:
0664 runTruthTrackingGsf(
0665 trackingGeometry=detector_config.trackingGeometry,
0666 decorators=detector_config.decorators,
0667 field=field,
0668 digiConfigFile=detector_config.digiConfigFile,
0669 outputDir=tmp_path,
0670 s=seq,
0671 )
0672
0673
0674 with failure_threshold(acts.logging.FATAL):
0675 seq.run()
0676
0677 for fn, tn in root_files:
0678 fp = tmp_path / fn
0679 assert fp.exists()
0680 assert fp.stat().st_size > 1024
0681 if tn is not None:
0682 assert_root_hash(fn, fp)
0683
0684
0685 def test_refitting(tmp_path, detector_config, assert_root_hash):
0686 from truth_tracking_gsf_refitting import runRefittingGsf
0687
0688 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0689
0690 seq = Sequencer(
0691 events=10,
0692 numThreads=1,
0693 )
0694
0695 with detector_config.detector:
0696
0697
0698 runRefittingGsf(
0699 trackingGeometry=detector_config.trackingGeometry,
0700 field=field,
0701 digiConfigFile=detector_config.digiConfigFile,
0702 outputDir=tmp_path,
0703 s=seq,
0704 ).run()
0705
0706 root_files = [
0707 ("trackstates_gsf_refit.root", "trackstates"),
0708 ("tracksummary_gsf_refit.root", "tracksummary"),
0709 ]
0710
0711 for fn, tn in root_files:
0712 fp = tmp_path / fn
0713 assert fp.exists()
0714 assert fp.stat().st_size > 1024
0715 if tn is not None:
0716 assert_root_hash(fn, fp)
0717
0718
0719 def test_particle_gun(tmp_path, assert_root_hash):
0720 from particle_gun import runParticleGun
0721
0722 s = Sequencer(events=20, numThreads=-1)
0723
0724 csv_dir = tmp_path / "csv"
0725 root_file = tmp_path / "particles.root"
0726
0727 assert not csv_dir.exists()
0728 assert not root_file.exists()
0729
0730 runParticleGun(str(tmp_path), s=s).run()
0731
0732 assert csv_dir.exists()
0733 assert root_file.exists()
0734
0735 assert len([f for f in csv_dir.iterdir() if f.name.endswith("particles.csv")]) > 0
0736 assert all([f.stat().st_size > 100 for f in csv_dir.iterdir()])
0737
0738 assert root_file.stat().st_size > 200
0739 assert_entries(root_file, "particles", 20)
0740 assert_root_hash(root_file.name, root_file)
0741
0742
0743 @pytest.mark.slow
0744 @pytest.mark.odd
0745 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0746 def test_material_mapping(material_recording, tmp_path, assert_root_hash):
0747 from material_mapping import runMaterialMapping
0748 from material_validation import runMaterialValidation
0749
0750 map_file = tmp_path / "material-map_tracks.root"
0751 assert not map_file.exists()
0752
0753 odd_dir = getOpenDataDetectorDirectory()
0754 config = acts.MaterialMapJsonConverter.Config()
0755 mdecorator = acts.JsonMaterialDecorator(
0756 level=acts.logging.INFO,
0757 rConfig=config,
0758 jFileName=str(odd_dir / "config/odd-material-mapping-config.json"),
0759 )
0760
0761 s = Sequencer(numThreads=1)
0762
0763 with getOpenDataDetector(mdecorator) as detector:
0764 trackingGeometry = detector.trackingGeometry()
0765 decorators = detector.contextDecorators()
0766
0767 runMaterialMapping(
0768 trackingGeometry,
0769 decorators,
0770 outputDir=str(tmp_path),
0771 inputDir=material_recording,
0772 mappingStep=1,
0773 s=s,
0774 )
0775
0776 s.run()
0777
0778 mat_file = tmp_path / "material-map.json"
0779
0780 assert mat_file.exists()
0781 assert mat_file.stat().st_size > 10
0782
0783 with mat_file.open() as fh:
0784 assert json.load(fh)
0785
0786 assert map_file.exists()
0787 assert_entries(map_file, "material-tracks", 200)
0788 assert_root_hash(map_file.name, map_file)
0789
0790 val_file = tmp_path / "propagation-material.root"
0791 assert not val_file.exists()
0792
0793
0794
0795 field = acts.NullBField()
0796
0797 s = Sequencer(events=10, numThreads=1)
0798
0799 with getOpenDataDetector(
0800 mdecorator=acts.IMaterialDecorator.fromFile(mat_file)
0801 ) as detector:
0802 trackingGeometry = detector.trackingGeometry()
0803 decorators = detector.contextDecorators()
0804
0805 runMaterialValidation(
0806 10, 1000, trackingGeometry, decorators, field, outputDir=str(tmp_path), s=s
0807 )
0808
0809 s.run()
0810
0811 assert val_file.exists()
0812 assert_entries(val_file, "material-tracks", 10000)
0813 assert_root_hash(val_file.name, val_file)
0814
0815
0816 @pytest.mark.slow
0817 @pytest.mark.odd
0818 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0819 def test_volume_material_mapping(material_recording, tmp_path, assert_root_hash):
0820 from material_mapping import runMaterialMapping
0821 from material_validation import runMaterialValidation
0822
0823 map_file = tmp_path / "material-map-volume_tracks.root"
0824 assert not map_file.exists()
0825
0826 geo_map = Path(__file__).parent / "geometry-volume-map.json"
0827 assert geo_map.exists()
0828 assert geo_map.stat().st_size > 10
0829 with geo_map.open() as fh:
0830 assert json.load(fh)
0831
0832 s = Sequencer(numThreads=1)
0833
0834 with getOpenDataDetector(
0835 mdecorator=acts.IMaterialDecorator.fromFile(geo_map)
0836 ) as detector:
0837 trackingGeometry = detector.trackingGeometry()
0838 decorators = detector.contextDecorators()
0839
0840 runMaterialMapping(
0841 trackingGeometry,
0842 decorators,
0843 mapName="material-map-volume",
0844 outputDir=str(tmp_path),
0845 inputDir=material_recording,
0846 mappingStep=1,
0847 s=s,
0848 )
0849
0850 s.run()
0851
0852 mat_file = tmp_path / "material-map-volume.json"
0853
0854 assert mat_file.exists()
0855 assert mat_file.stat().st_size > 10
0856
0857 with mat_file.open() as fh:
0858 assert json.load(fh)
0859
0860 assert map_file.exists()
0861 assert_entries(map_file, "material-tracks", 200)
0862 assert_root_hash(map_file.name, map_file)
0863
0864 val_file = tmp_path / "propagation-volume-material.root"
0865 assert not val_file.exists()
0866
0867
0868
0869 field = acts.NullBField()
0870
0871 s = Sequencer(events=10, numThreads=1)
0872
0873 with getOpenDataDetector(
0874 mdecorator=acts.IMaterialDecorator.fromFile(mat_file)
0875 ) as detector:
0876 trackingGeometry = detector.trackingGeometry()
0877 decorators = detector.contextDecorators()
0878
0879 runMaterialValidation(
0880 10,
0881 1000,
0882 trackingGeometry,
0883 decorators,
0884 field,
0885 outputDir=str(tmp_path),
0886 outputName="propagation-volume-material",
0887 s=s,
0888 )
0889
0890 s.run()
0891
0892 assert val_file.exists()
0893
0894 assert_root_hash(val_file.name, val_file)
0895
0896
0897 @pytest.mark.parametrize(
0898 "detectorFactory,aligned,nobj",
0899 [
0900 (GenericDetector, True, 450),
0901 pytest.param(
0902 getOpenDataDetector,
0903 True,
0904 540,
0905 marks=[
0906 pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up"),
0907 pytest.mark.slow,
0908 pytest.mark.odd,
0909 ],
0910 ),
0911 (functools.partial(AlignedDetector, iovSize=1), False, 450),
0912 ],
0913 )
0914 @pytest.mark.slow
0915 def test_geometry_example(detectorFactory, aligned, nobj, tmp_path):
0916 detector = detectorFactory()
0917 trackingGeometry = detector.trackingGeometry()
0918 decorators = detector.contextDecorators()
0919
0920 from geometry import runGeometry
0921
0922 json_dir = tmp_path / "json"
0923 csv_dir = tmp_path / "csv"
0924 obj_dir = tmp_path / "obj"
0925
0926 for d in (json_dir, csv_dir, obj_dir):
0927 d.mkdir()
0928
0929 events = 5
0930
0931 kwargs = dict(
0932 trackingGeometry=trackingGeometry,
0933 decorators=decorators,
0934 events=events,
0935 outputDir=str(tmp_path),
0936 )
0937
0938 runGeometry(outputJson=True, **kwargs)
0939 runGeometry(outputJson=False, **kwargs)
0940
0941 assert len(list(obj_dir.iterdir())) == nobj
0942 assert all(f.stat().st_size > 200 for f in obj_dir.iterdir())
0943
0944 assert len(list(csv_dir.iterdir())) == 3 * events
0945 assert all(f.stat().st_size > 200 for f in csv_dir.iterdir())
0946
0947 detector_files = [csv_dir / f"event{i:>09}-detectors.csv" for i in range(events)]
0948 for detector_file in detector_files:
0949 assert detector_file.exists()
0950 assert detector_file.stat().st_size > 200
0951
0952 contents = [f.read_text() for f in detector_files]
0953 ref = contents[0]
0954 for c in contents[1:]:
0955 if aligned:
0956 assert c == ref, "Detector writeout is expected to be identical"
0957 else:
0958 assert c != ref, "Detector writeout is expected to be different"
0959
0960 if aligned:
0961 for f in [json_dir / f"event{i:>09}-detector.json" for i in range(events)]:
0962 assert detector_file.exists()
0963 with f.open() as fh:
0964 data = json.load(fh)
0965 assert data
0966 material_file = tmp_path / "geometry-map.json"
0967 assert material_file.exists()
0968 assert material_file.stat().st_size > 200
0969
0970
0971 DIGI_SHARE_DIR = (
0972 Path(__file__).parent.parent.parent.parent
0973 / "Examples/Algorithms/Digitization/share"
0974 )
0975
0976
0977 @pytest.mark.parametrize(
0978 "digi_config_file",
0979 [
0980 DIGI_SHARE_DIR / "default-smearing-config-generic.json",
0981 DIGI_SHARE_DIR / "default-geometric-config-generic.json",
0982 ],
0983 ids=["smeared", "geometric"],
0984 )
0985 def test_digitization_example(trk_geo, tmp_path, assert_root_hash, digi_config_file):
0986 from digitization import runDigitization
0987
0988 s = Sequencer(events=10, numThreads=-1)
0989
0990 csv_dir = tmp_path / "csv"
0991 root_file = tmp_path / "measurements.root"
0992
0993 assert not root_file.exists()
0994 assert not csv_dir.exists()
0995
0996 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0997 runDigitization(
0998 trk_geo, field, outputDir=tmp_path, digiConfigFile=digi_config_file, s=s
0999 )
1000
1001 s.run()
1002
1003 assert root_file.exists()
1004 assert csv_dir.exists()
1005
1006 assert len(list(csv_dir.iterdir())) == 3 * s.config.events
1007 assert all(f.stat().st_size > 50 for f in csv_dir.iterdir())
1008
1009 assert_root_hash(root_file.name, root_file)
1010
1011
1012 @pytest.mark.parametrize(
1013 "digi_config_file",
1014 [
1015 DIGI_SHARE_DIR / "default-smearing-config-generic.json",
1016 DIGI_SHARE_DIR / "default-geometric-config-generic.json",
1017 pytest.param(
1018 (
1019 getOpenDataDetectorDirectory()
1020 / "config"
1021 / "odd-digi-smearing-config.json"
1022 ),
1023 marks=[
1024 pytest.mark.odd,
1025 ],
1026 ),
1027 pytest.param(
1028 (
1029 getOpenDataDetectorDirectory()
1030 / "config"
1031 / "odd-digi-geometric-config.json"
1032 ),
1033 marks=[
1034 pytest.mark.odd,
1035 ],
1036 ),
1037 ],
1038 ids=["smeared", "geometric", "odd-smeared", "odd-geometric"],
1039 )
1040 def test_digitization_example_input_parsing(digi_config_file):
1041 from acts.examples import readDigiConfigFromJson
1042
1043 acts.examples.readDigiConfigFromJson(str(digi_config_file))
1044
1045
1046 @pytest.mark.parametrize(
1047 "digi_config_file",
1048 [
1049 DIGI_SHARE_DIR / "default-smearing-config-generic.json",
1050 DIGI_SHARE_DIR / "default-geometric-config-generic.json",
1051 ],
1052 ids=["smeared", "geometric"],
1053 )
1054 def test_digitization_example_input(
1055 trk_geo, tmp_path, assert_root_hash, digi_config_file
1056 ):
1057 from particle_gun import runParticleGun
1058 from digitization import runDigitization
1059
1060 ptcl_dir = tmp_path / "ptcl"
1061 ptcl_dir.mkdir()
1062 pgs = Sequencer(events=20, numThreads=-1)
1063 runParticleGun(str(ptcl_dir), s=pgs)
1064
1065 pgs.run()
1066
1067 s = Sequencer(numThreads=-1)
1068
1069 csv_dir = tmp_path / "csv"
1070 root_file = tmp_path / "measurements.root"
1071
1072 assert not root_file.exists()
1073 assert not csv_dir.exists()
1074
1075 assert_root_hash(
1076 "particles.root",
1077 ptcl_dir / "particles.root",
1078 )
1079
1080 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
1081 runDigitization(
1082 trk_geo,
1083 field,
1084 outputDir=tmp_path,
1085 digiConfigFile=digi_config_file,
1086 particlesInput=ptcl_dir / "particles.root",
1087 s=s,
1088 doMerge=True,
1089 )
1090
1091 s.run()
1092
1093 assert root_file.exists()
1094 assert csv_dir.exists()
1095
1096 assert len(list(csv_dir.iterdir())) == 3 * pgs.config.events
1097 assert all(f.stat().st_size > 50 for f in csv_dir.iterdir())
1098
1099 assert_root_hash(root_file.name, root_file)
1100
1101
1102 def test_digitization_config_example(trk_geo, tmp_path):
1103 from digitization_config import runDigitizationConfig
1104
1105 out_file = tmp_path / "output.json"
1106 assert not out_file.exists()
1107
1108 input = (
1109 Path(__file__).parent
1110 / "../../../Examples/Algorithms/Digitization/share/default-smearing-config-generic.json"
1111 )
1112 assert input.exists(), input.resolve()
1113
1114 runDigitizationConfig(trk_geo, input=input, output=out_file)
1115
1116 assert out_file.exists()
1117
1118 with out_file.open() as fh:
1119 data = json.load(fh)
1120 assert len(data.keys()) == 2
1121 assert data["acts-geometry-hierarchy-map"]["format-version"] == 0
1122 assert (
1123 data["acts-geometry-hierarchy-map"]["value-identifier"]
1124 == "digitization-configuration"
1125 )
1126 assert len(data["entries"]) == 27
1127
1128
1129 @pytest.mark.parametrize(
1130 "truthSmeared,truthEstimated",
1131 [
1132 [False, False],
1133 [False, True],
1134 [True, False],
1135 ],
1136 ids=["full_seeding", "truth_estimated", "truth_smeared"],
1137 )
1138 @pytest.mark.slow
1139 def test_ckf_tracks_example(
1140 tmp_path, assert_root_hash, truthSmeared, truthEstimated, detector_config
1141 ):
1142 csv = tmp_path / "csv"
1143
1144 assert not csv.exists()
1145
1146 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
1147 events = 100
1148 s = Sequencer(events=events, numThreads=-1)
1149
1150 root_files = [
1151 (
1152 "performance_finding_ckf.root",
1153 None,
1154 ),
1155 (
1156 "trackstates_ckf.root",
1157 "trackstates",
1158 ),
1159 (
1160 "tracksummary_ckf.root",
1161 "tracksummary",
1162 ),
1163 ]
1164
1165 if not truthSmeared:
1166 root_files += [
1167 (
1168 "performance_seeding.root",
1169 None,
1170 ),
1171 ]
1172
1173 for rf, _ in root_files:
1174 assert not (tmp_path / rf).exists()
1175
1176 from ckf_tracks import runCKFTracks
1177
1178 with detector_config.detector:
1179 runCKFTracks(
1180 detector_config.trackingGeometry,
1181 detector_config.decorators,
1182 field=field,
1183 outputCsv=True,
1184 outputDir=tmp_path,
1185 geometrySelection=detector_config.geometrySelection,
1186 digiConfigFile=detector_config.digiConfigFile,
1187 truthSmearedSeeded=truthSmeared,
1188 truthEstimatedSeeded=truthEstimated,
1189 s=s,
1190 )
1191
1192 s.run()
1193
1194 assert csv.exists()
1195 for rf, tn in root_files:
1196 rp = tmp_path / rf
1197 assert rp.exists()
1198 if tn is not None:
1199 assert_root_hash(rf, rp)
1200
1201 assert (
1202 len([f for f in csv.iterdir() if f.name.endswith("tracks_ckf.csv")]) == events
1203 )
1204 assert all([f.stat().st_size > 300 for f in csv.iterdir()])
1205
1206
1207 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
1208 @pytest.mark.odd
1209 @pytest.mark.slow
1210 def test_full_chain_odd_example(tmp_path):
1211
1212
1213
1214 with getOpenDataDetector():
1215 pass
1216
1217 script = (
1218 Path(__file__).parent.parent.parent.parent
1219 / "Examples"
1220 / "Scripts"
1221 / "Python"
1222 / "full_chain_odd.py"
1223 )
1224 assert script.exists()
1225 env = os.environ.copy()
1226 env["ACTS_LOG_FAILURE_THRESHOLD"] = "ERROR"
1227 try:
1228 subprocess.check_call(
1229 [sys.executable, str(script), "-n1"],
1230 cwd=tmp_path,
1231 env=env,
1232 stderr=subprocess.STDOUT,
1233 )
1234 except subprocess.CalledProcessError as e:
1235 print(e.output.decode("utf-8"))
1236 raise
1237
1238
1239 @pytest.mark.skipif(
1240 not dd4hepEnabled or not geant4Enabled, reason="DD4hep and/or Geant4 not set up"
1241 )
1242 @pytest.mark.slow
1243 def test_full_chain_odd_example_pythia_geant4(tmp_path):
1244
1245
1246
1247 with getOpenDataDetector():
1248 pass
1249
1250 script = (
1251 Path(__file__).parent.parent.parent.parent
1252 / "Examples"
1253 / "Scripts"
1254 / "Python"
1255 / "full_chain_odd.py"
1256 )
1257 assert script.exists()
1258 env = os.environ.copy()
1259 env["ACTS_LOG_FAILURE_THRESHOLD"] = "ERROR"
1260 try:
1261 stdout = subprocess.check_output(
1262 [
1263 sys.executable,
1264 str(script),
1265 "-n1",
1266 "--geant4",
1267 "--ttbar",
1268 "--ttbar-pu",
1269 "50",
1270 ],
1271 cwd=tmp_path,
1272 env=env,
1273 stderr=subprocess.STDOUT,
1274 )
1275 stdout = stdout.decode("utf-8")
1276 except subprocess.CalledProcessError as e:
1277 print(e.output.decode("utf-8"))
1278 raise
1279
1280
1281 errors = []
1282 error_regex = re.compile(r"^\d\d:\d\d:\d\d\s+(\w+)\s+ERROR\s+", re.MULTILINE)
1283 for match in error_regex.finditer(stdout):
1284 (algo,) = match.groups()
1285 errors.append(algo)
1286 errors = collections.Counter(errors)
1287 assert dict(errors) == {}, stdout
1288
1289
1290 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
1291 @pytest.mark.skipif(not onnxEnabled, reason="ONNX plugin not enabled")
1292 @pytest.mark.slow
1293 def test_ML_Ambiguity_Solver(tmp_path, assert_root_hash):
1294
1295
1296 root_file = "performance_finding_ambiML.root"
1297 output_dir = "odd_output"
1298 assert not (tmp_path / root_file).exists()
1299
1300
1301 with getOpenDataDetector():
1302 pass
1303
1304 script = (
1305 Path(__file__).parent.parent.parent.parent
1306 / "Examples"
1307 / "Scripts"
1308 / "Python"
1309 / "full_chain_odd.py"
1310 )
1311 assert script.exists()
1312 env = os.environ.copy()
1313 env["ACTS_LOG_FAILURE_THRESHOLD"] = "ERROR"
1314 try:
1315 subprocess.check_call(
1316 [sys.executable, str(script), "-n1", "--ambi-solver", "ML"],
1317 cwd=tmp_path,
1318 env=env,
1319 stderr=subprocess.STDOUT,
1320 )
1321 except subprocess.CalledProcessError as e:
1322 print(e.output.decode("utf-8"))
1323 raise
1324
1325 rfp = tmp_path / output_dir / root_file
1326 assert rfp.exists()
1327
1328 assert_root_hash(root_file, rfp)
1329
1330
1331 def test_bfield_writing(tmp_path, seq, assert_root_hash):
1332 from bfield_writing import runBFieldWriting
1333
1334 root_files = [
1335 ("solenoid.root", "solenoid", 100),
1336 ("solenoid2.root", "solenoid", 100),
1337 ]
1338
1339 for fn, _, _ in root_files:
1340 fp = tmp_path / fn
1341 assert not fp.exists()
1342
1343 runBFieldWriting(outputDir=tmp_path, rewrites=1)
1344
1345 for fn, tn, ee in root_files:
1346 fp = tmp_path / fn
1347 assert fp.exists()
1348 assert fp.stat().st_size > 2**10 * 2
1349 assert_entries(fp, tn, ee)
1350 assert_root_hash(fn, fp)
1351
1352
1353 @pytest.mark.parametrize("backend", ["onnx", "torch"])
1354 @pytest.mark.parametrize("hardware", ["cpu", "gpu"])
1355 @pytest.mark.skipif(not exatrkxEnabled, reason="ExaTrkX environment not set up")
1356 def test_exatrkx(tmp_path, trk_geo, field, assert_root_hash, backend, hardware):
1357 if backend == "onnx" and hardware == "cpu":
1358 pytest.skip("Combination of ONNX and CPU not yet supported")
1359
1360 root_file = "performance_track_finding.root"
1361 assert not (tmp_path / root_file).exists()
1362
1363 if backend == "onnx":
1364 url = "https://acts.web.cern.ch/ci/exatrkx/onnx_models_v01.tar"
1365 else:
1366 url = "https://acts.web.cern.ch/ci/exatrkx/torchscript_models_v01.tar"
1367
1368 tarfile_name = tmp_path / "models.tar"
1369 urllib.request.urlretrieve(url, tarfile_name)
1370 tarfile.open(tarfile_name).extractall(tmp_path)
1371 script = (
1372 Path(__file__).parent.parent.parent.parent
1373 / "Examples"
1374 / "Scripts"
1375 / "Python"
1376 / "exatrkx.py"
1377 )
1378 assert script.exists()
1379 env = os.environ.copy()
1380 env["ACTS_LOG_FAILURE_THRESHOLD"] = "WARNING"
1381
1382 if hardware == "cpu":
1383 env["CUDA_VISIBLE_DEVICES"] = ""
1384
1385 try:
1386 subprocess.check_call(
1387 [sys.executable, str(script), backend],
1388 cwd=tmp_path,
1389 env=env,
1390 stderr=subprocess.STDOUT,
1391 )
1392 except subprocess.CalledProcessError as e:
1393 print(e.output.decode("utf-8"))
1394 raise
1395
1396 rfp = tmp_path / root_file
1397 assert rfp.exists()
1398
1399 assert_root_hash(root_file, rfp)