File indexing completed on 2026-01-09 09:26:49
0001 import pytest
0002 from pathlib import Path
0003
0004 from helpers import (
0005 geant4Enabled,
0006 AssertCollectionExistsAlg,
0007 )
0008
0009 import acts
0010 from acts import UnitConstants as u
0011 from acts.examples import (
0012 CsvParticleWriter,
0013 CsvParticleReader,
0014 CsvMeasurementWriter,
0015 CsvMeasurementReader,
0016 CsvSimHitWriter,
0017 CsvSimHitReader,
0018 Sequencer,
0019 )
0020 from acts.examples.root import (
0021 RootParticleWriter,
0022 RootParticleReader,
0023 RootMaterialTrackReader,
0024 RootTrackSummaryReader,
0025 )
0026
0027 from acts.examples.odd import getOpenDataDetector, getOpenDataDetectorDirectory
0028
0029
0030 @pytest.mark.root
0031 def test_root_particle_reader(tmp_path, conf_const, ptcl_gun):
0032
0033 s = Sequencer(numThreads=1, events=10, logLevel=acts.logging.WARNING)
0034 _, h3conv = ptcl_gun(s)
0035
0036 file = tmp_path / "particles.root"
0037 s.addWriter(
0038 conf_const(
0039 RootParticleWriter,
0040 acts.logging.WARNING,
0041 inputParticles=h3conv.config.outputParticles,
0042 filePath=str(file),
0043 )
0044 )
0045
0046 s.run()
0047
0048
0049
0050 s2 = Sequencer(numThreads=1, logLevel=acts.logging.WARNING)
0051
0052 s2.addReader(
0053 conf_const(
0054 RootParticleReader,
0055 acts.logging.WARNING,
0056 outputParticles="particles_generated",
0057 filePath=str(file),
0058 )
0059 )
0060
0061 alg = AssertCollectionExistsAlg(
0062 "particles_generated", "check_alg", acts.logging.WARNING
0063 )
0064 s2.addAlgorithm(alg)
0065
0066 s2.run()
0067
0068 assert alg.events_seen == 10
0069
0070
0071 @pytest.mark.csv
0072 def test_csv_particle_reader(tmp_path, conf_const, ptcl_gun):
0073 s = Sequencer(numThreads=1, events=10, logLevel=acts.logging.WARNING)
0074 _, h3conv = ptcl_gun(s)
0075
0076 out = tmp_path / "csv"
0077
0078 out.mkdir()
0079
0080 s.addWriter(
0081 conf_const(
0082 CsvParticleWriter,
0083 acts.logging.WARNING,
0084 inputParticles=h3conv.config.outputParticles,
0085 outputStem="particle",
0086 outputDir=str(out),
0087 )
0088 )
0089
0090 s.run()
0091
0092
0093 s = Sequencer(numThreads=1, logLevel=acts.logging.WARNING)
0094
0095 s.addReader(
0096 conf_const(
0097 CsvParticleReader,
0098 acts.logging.WARNING,
0099 inputDir=str(out),
0100 inputStem="particle",
0101 outputParticles="input_particles",
0102 )
0103 )
0104
0105 alg = AssertCollectionExistsAlg(
0106 "input_particles", "check_alg", acts.logging.WARNING
0107 )
0108
0109 s.addAlgorithm(alg)
0110
0111 s.run()
0112
0113 assert alg.events_seen == 10
0114
0115
0116 @pytest.mark.parametrize(
0117 "reader",
0118 [RootParticleReader, RootTrackSummaryReader],
0119 )
0120 @pytest.mark.root
0121 def test_root_reader_interface(reader, conf_const, tmp_path):
0122 assert hasattr(reader, "Config")
0123
0124 config = reader.Config
0125
0126 assert hasattr(config, "filePath")
0127
0128 kw = {"level": acts.logging.INFO, "filePath": str(tmp_path / "file.root")}
0129
0130 assert conf_const(reader, **kw)
0131
0132
0133 @pytest.mark.slow
0134 @pytest.mark.root
0135 @pytest.mark.odd
0136 @pytest.mark.skipif(not geant4Enabled, reason="Geant4 not set up")
0137 def test_root_material_track_reader(material_recording):
0138 input_tracks = material_recording / "geant4_material_tracks.root"
0139 assert input_tracks.exists()
0140
0141 s = Sequencer(numThreads=1)
0142
0143 s.addReader(
0144 RootMaterialTrackReader(
0145 level=acts.logging.INFO,
0146 fileList=[str(input_tracks)],
0147 outputMaterialTracks="material-tracks",
0148 )
0149 )
0150
0151 alg = AssertCollectionExistsAlg(
0152 "material-tracks", "check_alg", acts.logging.WARNING
0153 )
0154 s.addAlgorithm(alg)
0155
0156 s.run()
0157
0158 assert alg.events_seen == 2
0159
0160
0161 @pytest.mark.csv
0162 def test_csv_meas_reader(tmp_path, fatras, trk_geo, conf_const):
0163 s = Sequencer(numThreads=1, events=10)
0164 evGen, simAlg, digiAlg = fatras(s)
0165
0166 out = tmp_path / "csv"
0167 out.mkdir()
0168
0169 s.addWriter(
0170 CsvMeasurementWriter(
0171 level=acts.logging.INFO,
0172 inputMeasurements=digiAlg.config.outputMeasurements,
0173 inputClusters=digiAlg.config.outputClusters,
0174 inputMeasurementSimHitsMap=digiAlg.config.outputMeasurementSimHitsMap,
0175 outputDir=str(out),
0176 )
0177 )
0178
0179
0180 s.addWriter(
0181 CsvSimHitWriter(
0182 level=acts.logging.INFO,
0183 inputSimHits=simAlg.config.outputSimHits,
0184 outputDir=str(out),
0185 outputStem="hits",
0186 )
0187 )
0188
0189 s.run()
0190
0191
0192 s = Sequencer(numThreads=1)
0193
0194 s.addReader(
0195 CsvSimHitReader(
0196 level=acts.logging.INFO,
0197 outputSimHits=simAlg.config.outputSimHits,
0198 inputDir=str(out),
0199 inputStem="hits",
0200 )
0201 )
0202
0203 s.addReader(
0204 conf_const(
0205 CsvMeasurementReader,
0206 level=acts.logging.WARNING,
0207 outputMeasurements="measurements",
0208 outputMeasurementSimHitsMap="simhitsmap",
0209 outputMeasurementParticlesMap="meas_ptcl_map",
0210 inputSimHits=simAlg.config.outputSimHits,
0211 inputDir=str(out),
0212 )
0213 )
0214
0215 algs = [
0216 AssertCollectionExistsAlg(k, f"check_alg_{k}", acts.logging.WARNING)
0217 for k in ("measurements", "simhitsmap", "meas_ptcl_map")
0218 ]
0219 for alg in algs:
0220 s.addAlgorithm(alg)
0221
0222 s.run()
0223
0224 for alg in algs:
0225 assert alg.events_seen == 10
0226
0227
0228 @pytest.mark.csv
0229 def test_csv_simhits_reader(tmp_path, fatras, conf_const):
0230 s = Sequencer(numThreads=1, events=10)
0231 evGen, simAlg, digiAlg = fatras(s)
0232
0233 out = tmp_path / "csv"
0234 out.mkdir()
0235
0236 s.addWriter(
0237 CsvSimHitWriter(
0238 level=acts.logging.INFO,
0239 inputSimHits=simAlg.config.outputSimHits,
0240 outputDir=str(out),
0241 outputStem="hits",
0242 )
0243 )
0244
0245 s.run()
0246
0247 s = Sequencer(numThreads=1)
0248
0249 s.addReader(
0250 conf_const(
0251 CsvSimHitReader,
0252 level=acts.logging.INFO,
0253 inputDir=str(out),
0254 inputStem="hits",
0255 outputSimHits="simhits",
0256 )
0257 )
0258
0259 alg = AssertCollectionExistsAlg("simhits", "check_alg", acts.logging.WARNING)
0260 s.addAlgorithm(alg)
0261
0262 s.run()
0263
0264 assert alg.events_seen == 10
0265
0266
0267 @pytest.mark.root
0268 def test_buffered_reader(tmp_path, conf_const, ptcl_gun):
0269
0270
0271 eventsInBuffer = 5
0272 eventsToProcess = 10
0273
0274 s = Sequencer(numThreads=1, events=eventsInBuffer, logLevel=acts.logging.WARNING)
0275 _, h3conv = ptcl_gun(s)
0276
0277 file = tmp_path / "particles.root"
0278 s.addWriter(
0279 conf_const(
0280 RootParticleWriter,
0281 acts.logging.WARNING,
0282 inputParticles=h3conv.config.outputParticles,
0283 filePath=str(file),
0284 )
0285 )
0286
0287 s.run()
0288
0289
0290 s2 = Sequencer(events=eventsToProcess, numThreads=1, logLevel=acts.logging.WARNING)
0291
0292 reader = RootParticleReader(
0293 level=acts.logging.WARNING,
0294 outputParticles="particles_input",
0295 filePath=str(file),
0296 )
0297
0298 s2.addReader(
0299 acts.examples.BufferedReader(
0300 level=acts.logging.WARNING,
0301 upstreamReader=reader,
0302 bufferSize=eventsInBuffer,
0303 )
0304 )
0305
0306 alg = AssertCollectionExistsAlg(
0307 "particles_input", "check_alg", acts.logging.WARNING
0308 )
0309 s2.addAlgorithm(alg)
0310
0311 s2.run()
0312
0313 assert alg.events_seen == eventsToProcess