File indexing completed on 2025-07-12 07:52:37
0001 import os
0002 import inspect
0003 from pathlib import Path
0004 import shutil
0005 import math
0006 import sys
0007 import tempfile
0008
0009 import pytest
0010
0011 from helpers import (
0012 dd4hepEnabled,
0013 hepmc3Enabled,
0014 geant4Enabled,
0015 AssertCollectionExistsAlg,
0016 )
0017
0018 import acts
0019 from acts import UnitConstants as u
0020 from acts.examples import (
0021 ObjPropagationStepsWriter,
0022 TrackFinderNTupleWriter,
0023 RootPropagationStepsWriter,
0024 RootParticleWriter,
0025 RootTrackParameterWriter,
0026 RootMaterialTrackWriter,
0027 RootMaterialWriter,
0028 RootSimHitWriter,
0029 RootTrackStatesWriter,
0030 RootTrackSummaryWriter,
0031 VertexNTupleWriter,
0032 RootMeasurementWriter,
0033 CsvParticleWriter,
0034 CsvSimHitWriter,
0035 CsvTrackWriter,
0036 CsvTrackingGeometryWriter,
0037 CsvMeasurementWriter,
0038 JsonMaterialWriter,
0039 JsonFormat,
0040 Sequencer,
0041 GenericDetector,
0042 )
0043 from acts.examples.odd import getOpenDataDetectorDirectory
0044
0045
0046 @pytest.mark.obj
0047 def test_obj_propagation_step_writer(tmp_path, trk_geo, conf_const, basic_prop_seq):
0048 with pytest.raises(TypeError):
0049 ObjPropagationStepsWriter()
0050
0051 obj = tmp_path / "obj"
0052 obj.mkdir()
0053
0054 s, alg = basic_prop_seq(trk_geo)
0055 w = conf_const(
0056 ObjPropagationStepsWriter,
0057 acts.logging.INFO,
0058 collection=alg.config.outputSummaryCollection,
0059 outputDir=str(obj),
0060 )
0061
0062 s.addWriter(w)
0063
0064 s.run()
0065
0066 assert len([f for f in obj.iterdir() if f.is_file()]) == s.config.events
0067 for f in obj.iterdir():
0068 assert f.stat().st_size > 1024
0069
0070
0071 @pytest.mark.csv
0072 def test_csv_particle_writer(tmp_path, conf_const, ptcl_gun):
0073 s = Sequencer(numThreads=1, events=10)
0074 _, h3conv = ptcl_gun(s)
0075
0076 out = tmp_path / "csv"
0077
0078 out.mkdir()
0079
0080 s.addWriter(
0081 conf_const(
0082 CsvParticleWriter,
0083 acts.logging.INFO,
0084 inputParticles=h3conv.config.outputParticles,
0085 outputStem="particle",
0086 outputDir=str(out),
0087 )
0088 )
0089
0090 s.run()
0091
0092 assert len([f for f in out.iterdir() if f.is_file()]) == s.config.events
0093 assert all(f.stat().st_size > 200 for f in out.iterdir())
0094
0095
0096 @pytest.mark.root
0097 def test_root_prop_step_writer(
0098 tmp_path, trk_geo, conf_const, basic_prop_seq, assert_root_hash
0099 ):
0100 with pytest.raises(TypeError):
0101 RootPropagationStepsWriter()
0102
0103 file = tmp_path / "prop_steps.root"
0104 assert not file.exists()
0105
0106 s, alg = basic_prop_seq(trk_geo)
0107 w = conf_const(
0108 RootPropagationStepsWriter,
0109 acts.logging.INFO,
0110 collection=alg.config.outputSummaryCollection,
0111 filePath=str(file),
0112 )
0113
0114 s.addWriter(w)
0115
0116 s.run()
0117
0118 assert file.exists()
0119 assert file.stat().st_size > 2**10 * 50
0120 assert_root_hash(file.name, file)
0121
0122
0123 @pytest.mark.root
0124 def test_root_particle_writer(tmp_path, conf_const, ptcl_gun, assert_root_hash):
0125 s = Sequencer(numThreads=1, events=10)
0126 _, h3conv = ptcl_gun(s)
0127
0128 file = tmp_path / "particles.root"
0129
0130 assert not file.exists()
0131
0132 s.addWriter(
0133 conf_const(
0134 RootParticleWriter,
0135 acts.logging.INFO,
0136 inputParticles=h3conv.config.outputParticles,
0137 filePath=str(file),
0138 )
0139 )
0140
0141 s.run()
0142
0143 assert file.exists()
0144 assert file.stat().st_size > 1024 * 10
0145 assert_root_hash(file.name, file)
0146
0147
0148 @pytest.mark.root
0149 def test_root_meas_writer(tmp_path, fatras, trk_geo, assert_root_hash):
0150 s = Sequencer(numThreads=1, events=10)
0151 evGen, simAlg, digiAlg = fatras(s)
0152
0153 out = tmp_path / "meas.root"
0154
0155 assert not out.exists()
0156
0157 config = RootMeasurementWriter.Config(
0158 inputMeasurements=digiAlg.config.outputMeasurements,
0159 inputClusters=digiAlg.config.outputClusters,
0160 inputSimHits=simAlg.config.outputSimHits,
0161 inputMeasurementSimHitsMap=digiAlg.config.outputMeasurementSimHitsMap,
0162 filePath=str(out),
0163 surfaceByIdentifier=trk_geo.geoIdSurfaceMap(),
0164 )
0165 s.addWriter(RootMeasurementWriter(level=acts.logging.INFO, config=config))
0166 s.run()
0167
0168 assert out.exists()
0169 assert out.stat().st_size > 40000
0170 assert_root_hash(out.name, out)
0171
0172
0173 @pytest.mark.root
0174 def test_root_simhits_writer(tmp_path, fatras, conf_const, assert_root_hash):
0175 s = Sequencer(numThreads=1, events=10)
0176 evGen, simAlg, digiAlg = fatras(s)
0177
0178 out = tmp_path / "meas.root"
0179
0180 assert not out.exists()
0181
0182 s.addWriter(
0183 conf_const(
0184 RootSimHitWriter,
0185 level=acts.logging.INFO,
0186 inputSimHits=simAlg.config.outputSimHits,
0187 filePath=str(out),
0188 )
0189 )
0190
0191 s.run()
0192 assert out.exists()
0193 assert out.stat().st_size > 2e4
0194 assert_root_hash(out.name, out)
0195
0196
0197 @pytest.mark.root
0198 def test_root_tracksummary_writer(tmp_path, fatras, conf_const):
0199 detector = GenericDetector()
0200 trackingGeometry = detector.trackingGeometry()
0201 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0202 s = Sequencer(numThreads=1, events=10)
0203
0204 from truth_tracking_kalman import runTruthTrackingKalman
0205
0206
0207 runTruthTrackingKalman(
0208 trackingGeometry,
0209 field,
0210 digiConfigFile=Path(
0211 str(
0212 Path(__file__).parent.parent.parent.parent
0213 / "Examples/Configs/generic-digi-smearing-config.json"
0214 )
0215 ),
0216 outputDir=tmp_path,
0217 s=s,
0218 )
0219
0220
0221 s.addWriter(
0222 conf_const(
0223 RootTrackSummaryWriter,
0224 level=acts.logging.INFO,
0225 inputTracks="tracks",
0226 filePath=str(tmp_path / "track_summary_kf_no_truth.root"),
0227 )
0228 )
0229
0230 s.run()
0231 assert (tmp_path / "tracksummary_kf.root").exists()
0232 assert (tmp_path / "track_summary_kf_no_truth.root").exists()
0233
0234
0235 @pytest.mark.csv
0236 def test_csv_meas_writer(tmp_path, fatras, trk_geo, conf_const):
0237 s = Sequencer(numThreads=1, events=10)
0238 evGen, simAlg, digiAlg = fatras(s)
0239
0240 out = tmp_path / "csv"
0241 out.mkdir()
0242
0243 s.addWriter(
0244 conf_const(
0245 CsvMeasurementWriter,
0246 level=acts.logging.INFO,
0247 inputMeasurements=digiAlg.config.outputMeasurements,
0248 inputClusters=digiAlg.config.outputClusters,
0249 inputMeasurementSimHitsMap=digiAlg.config.outputMeasurementSimHitsMap,
0250 outputDir=str(out),
0251 )
0252 )
0253 s.run()
0254
0255 assert len([f for f in out.iterdir() if f.is_file()]) == s.config.events * 3
0256 assert all(f.stat().st_size > 10 for f in out.iterdir())
0257
0258
0259 @pytest.mark.csv
0260 def test_csv_simhits_writer(tmp_path, fatras, conf_const):
0261 s = Sequencer(numThreads=1, events=10)
0262 evGen, simAlg, digiAlg = fatras(s)
0263
0264 out = tmp_path / "csv"
0265 out.mkdir()
0266
0267 s.addWriter(
0268 conf_const(
0269 CsvSimHitWriter,
0270 level=acts.logging.INFO,
0271 inputSimHits=simAlg.config.outputSimHits,
0272 outputDir=str(out),
0273 outputStem="hits",
0274 )
0275 )
0276
0277 s.run()
0278 assert len([f for f in out.iterdir() if f.is_file()]) == s.config.events
0279 assert all(f.stat().st_size > 200 for f in out.iterdir())
0280
0281
0282 @pytest.mark.parametrize(
0283 "writer",
0284 [
0285 RootPropagationStepsWriter,
0286 RootParticleWriter,
0287 TrackFinderNTupleWriter,
0288 RootTrackParameterWriter,
0289 RootMaterialTrackWriter,
0290 RootMeasurementWriter,
0291 RootMaterialWriter,
0292 RootSimHitWriter,
0293 RootTrackStatesWriter,
0294 RootTrackSummaryWriter,
0295 VertexNTupleWriter,
0296 ],
0297 )
0298 @pytest.mark.root
0299 def test_root_writer_interface(writer, conf_const, tmp_path, trk_geo):
0300 assert hasattr(writer, "Config")
0301
0302 config = writer.Config
0303
0304 assert hasattr(config, "filePath")
0305 assert hasattr(config, "fileMode")
0306
0307 f = tmp_path / "target.root"
0308 assert not f.exists()
0309
0310 kw = {"level": acts.logging.INFO, "filePath": str(f)}
0311
0312 for k, _ in inspect.getmembers(config):
0313 if k.startswith("input"):
0314 kw[k] = "collection"
0315 if k == "surfaceByIdentifier":
0316 kw[k] = trk_geo.geoIdSurfaceMap()
0317
0318 assert conf_const(writer, **kw)
0319
0320 assert f.exists()
0321
0322
0323 @pytest.mark.parametrize(
0324 "writer",
0325 [
0326 CsvParticleWriter,
0327 CsvMeasurementWriter,
0328 CsvSimHitWriter,
0329 CsvTrackWriter,
0330 CsvTrackingGeometryWriter,
0331 ],
0332 )
0333 @pytest.mark.csv
0334 def test_csv_writer_interface(writer, conf_const, tmp_path, trk_geo):
0335 assert hasattr(writer, "Config")
0336
0337 config = writer.Config
0338
0339 assert hasattr(config, "outputDir")
0340
0341 kw = {"level": acts.logging.INFO, "outputDir": str(tmp_path)}
0342
0343 for k, _ in inspect.getmembers(config):
0344 if k.startswith("input"):
0345 kw[k] = "collection"
0346 if k == "trackingGeometry":
0347 kw[k] = trk_geo
0348 if k == "outputStem":
0349 kw[k] = "stem"
0350
0351 assert conf_const(writer, **kw)
0352
0353
0354 @pytest.mark.root
0355 @pytest.mark.odd
0356 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0357 def test_root_material_writer(tmp_path, assert_root_hash):
0358 from acts.examples.odd import getOpenDataDetector
0359
0360 with getOpenDataDetector() as detector:
0361 trackingGeometry = detector.trackingGeometry()
0362
0363 out = tmp_path / "material.root"
0364
0365 assert not out.exists()
0366
0367 rmw = RootMaterialWriter(level=acts.logging.WARNING, filePath=str(out))
0368 assert out.exists()
0369 assert out.stat().st_size > 0 and out.stat().st_size < 500
0370 rmw.write(trackingGeometry)
0371
0372 assert out.stat().st_size > 1000
0373 assert_root_hash(out.name, out)
0374
0375
0376 @pytest.mark.json
0377 @pytest.mark.odd
0378 @pytest.mark.parametrize("fmt", [JsonFormat.Json, JsonFormat.Cbor])
0379 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0380 def test_json_material_writer(tmp_path, fmt):
0381 from acts.examples.dd4hep import DD4hepDetector
0382
0383 detector = DD4hepDetector(
0384 xmlFileNames=[str(getOpenDataDetectorDirectory() / "xml/OpenDataDetector.xml")]
0385 )
0386 trackingGeometry = detector.trackingGeometry()
0387
0388 out = (tmp_path / "material").with_suffix("." + fmt.name.lower())
0389
0390 assert not out.exists()
0391
0392 jmw = JsonMaterialWriter(
0393 level=acts.logging.WARNING, fileName=str(out.with_suffix("")), writeFormat=fmt
0394 )
0395 assert not out.exists()
0396 jmw.write(trackingGeometry)
0397
0398 assert out.stat().st_size > 1000
0399
0400
0401 @pytest.mark.csv
0402 def test_csv_multitrajectory_writer(tmp_path):
0403 detector = GenericDetector()
0404 trackingGeometry = detector.trackingGeometry()
0405 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0406
0407 from truth_tracking_kalman import runTruthTrackingKalman
0408
0409 s = Sequencer(numThreads=1, events=10)
0410 runTruthTrackingKalman(
0411 trackingGeometry,
0412 field,
0413 digiConfigFile=Path(
0414 str(
0415 Path(__file__).parent.parent.parent.parent
0416 / "Examples/Configs/generic-digi-smearing-config.json"
0417 )
0418 ),
0419 outputDir=tmp_path,
0420 s=s,
0421 )
0422
0423 csv_dir = tmp_path / "csv"
0424 csv_dir.mkdir()
0425 s.addWriter(
0426 CsvTrackWriter(
0427 level=acts.logging.INFO,
0428 inputTracks="tracks",
0429 inputMeasurementParticlesMap="measurement_particles_map",
0430 outputDir=str(csv_dir),
0431 )
0432 )
0433 s.run()
0434 assert len([f for f in csv_dir.iterdir() if f.is_file()]) == 10
0435 assert all(f.stat().st_size > 20 for f in csv_dir.iterdir())