File indexing completed on 2026-03-28 07:46:22
0001 from pathlib import Path
0002
0003 import acts
0004 import acts.examples
0005
0006
0007 class PyAlgorithm(acts.examples.IAlgorithm):
0008 def __init__(self, name: str, output_file: Path, skip_even: bool):
0009 acts.examples.IAlgorithm.__init__(self, name, acts.logging.INFO)
0010 self.name = name
0011 self.output_file = output_file
0012 self.skip_even = skip_even
0013
0014 def execute(self, context):
0015
0016 with self.output_file.open("a", encoding="utf-8") as out:
0017 out.write(f"{context.eventNumber}\n")
0018 print(f"{self.name} - writing event {context.eventNumber}")
0019
0020 if context.eventNumber % 2 == 0:
0021 print(f"{self.name} - skipping event {context.eventNumber}")
0022 return acts.examples.ProcessCode.SKIP
0023
0024 return acts.examples.ProcessCode.SUCCESS
0025
0026
0027 def _read_sequence(path: Path) -> list[int]:
0028 with path.open(encoding="utf-8") as f:
0029 return [int(line.strip()) for line in f if line.strip()]
0030
0031
0032 def test_event_skip(tmp_path):
0033 output_a = tmp_path / "algorithm_a.txt"
0034 output_b = tmp_path / "algorithm_b.txt"
0035
0036 seq = acts.examples.Sequencer(events=20, numThreads=1)
0037 seq.addAlgorithm(PyAlgorithm("AlgorithmA", output_a, skip_even=True))
0038 seq.addAlgorithm(PyAlgorithm("AlgorithmB", output_b, skip_even=False))
0039 seq.run()
0040
0041 sequence_a = _read_sequence(output_a)
0042 sequence_b = _read_sequence(output_b)
0043
0044 print(sequence_a)
0045 print(sequence_b)
0046
0047 assert sequence_a == list(range(20))
0048 assert sequence_b == list(range(1, 20, 2))