File indexing completed on 2026-01-09 09:26:45
0001 import sys, inspect
0002 from pathlib import Path
0003 from typing import Optional, Protocol, Union, List, Dict, Tuple
0004 import os
0005 import re
0006 import acts
0007
0008 from .ActsExamplesPythonBindings import *
0009
0010 from acts._adapter import _patch_config
0011
0012 _patch_config(ActsExamplesPythonBindings)
0013
0014 _propagators = []
0015 _concrete_propagators = []
0016 for stepper in ("Eigen", "Atlas", "StraightLine", "Sympy"):
0017 _propagators.append(getattr(acts, f"{stepper}Propagator"))
0018 _concrete_propagators.append(
0019 getattr(
0020 ActsExamplesPythonBindings,
0021 f"{stepper}ConcretePropagator",
0022 )
0023 )
0024
0025
0026 def ConcretePropagator(propagator):
0027 for prop, prop_if in zip(_propagators, _concrete_propagators):
0028 if isinstance(propagator, prop):
0029 return prop_if(propagator)
0030
0031 raise TypeError(f"Unknown propagator {type(propagator).__name__}")
0032
0033
0034 def NamedTypeArgs(**namedTypeArgs):
0035 """Decorator to move args of a named type (e.g. `namedtuple` or `Enum`) to kwargs based on type, so user doesn't need to specify the key name.
0036 Also allows the keyword argument to be converted from a built-in type (eg. `tuple` or `int`).
0037 """
0038
0039 namedTypeClasses = {c: a for a, c in namedTypeArgs.items()}
0040
0041 def NamedTypeArgsDecorator(func):
0042 from functools import wraps
0043
0044 @wraps(func)
0045 def NamedTypeArgsWrapper(*args, **kwargs):
0046 from collections.abc import Iterable
0047
0048 for k, v in kwargs.items():
0049 cls = namedTypeArgs.get(k)
0050 if (
0051 cls is not None
0052 and v is not None
0053 and type(v).__module__ == int.__module__
0054 and not (
0055 issubclass(type(v), Iterable) and all(type(e) is cls for e in v)
0056 )
0057 ):
0058 if issubclass(cls, Iterable):
0059 kwargs[k] = cls(*v)
0060 else:
0061 kwargs[k] = cls(v)
0062
0063 newargs = []
0064 for i, a in enumerate(args):
0065 k = namedTypeClasses.get(type(a))
0066 if k is None:
0067 newargs.append(a)
0068 if i > len(newargs):
0069 types = [type(a).__name__ for a in args]
0070 raise TypeError(
0071 f"{func.__name__}() positional argument {i} of type {type(a)} follows named-type arguments, which were converted to keyword arguments. All argument types: {types}"
0072 )
0073 elif k in kwargs:
0074 raise TypeError(f"{func.__name__}() keyword argument repeated: {k}")
0075 else:
0076 kwargs[k] = a
0077 return func(*newargs, **kwargs)
0078
0079 return NamedTypeArgsWrapper
0080
0081 return NamedTypeArgsDecorator
0082
0083
0084 def defaultKWArgs(**kwargs) -> dict:
0085 """Removes keyword arguments that are None or a list of all None (eg. [None,None]).
0086 This keeps the called function's defaults."""
0087 from collections.abc import Iterable
0088
0089 return {
0090 k: v
0091 for k, v in kwargs.items()
0092 if not (
0093 v is None or (isinstance(v, Iterable) and all([vv is None for vv in v]))
0094 )
0095 }
0096
0097
0098 def dump_func_args(func, *args, **kwargs):
0099 def valstr(v, d=set()):
0100 from collections.abc import Callable
0101
0102 if re.match(r"^<[\w.]+ object at 0x[\da-f]+>$", repr(v)):
0103 name = type(v).__module__ + "." + type(v).__qualname__
0104 if len(d) < 10 and name not in d and type(v).__name__ != "Sequencer":
0105 try:
0106 a = [
0107 k + " = " + valstr(getattr(v, k), set(d | {name}))
0108 for k in dir(v)
0109 if not (
0110 k.startswith("__") or isinstance(getattr(v, k), Callable)
0111 )
0112 ]
0113 except:
0114 a = []
0115 else:
0116 a = []
0117 if a:
0118 return name + "{ " + ", ".join(a) + " }"
0119 else:
0120 return name + "{}"
0121 else:
0122 return repr(v)
0123
0124 def keyvalstr(kv):
0125 return "{0} = {1}".format(kv[0], valstr(kv[1]))
0126
0127 try:
0128 func_kwargs = inspect.signature(func).bind(*args, **kwargs).arguments
0129 func_args = func_kwargs.pop("args", [])
0130 func_args.count(None)
0131 func_kwargs.update(func_kwargs.pop("kwargs", {}))
0132 except (ValueError, AttributeError):
0133 func_kwargs = kwargs
0134 func_args = args
0135 func_args_str = ", ".join(
0136 list(map(valstr, func_args)) + list(map(keyvalstr, func_kwargs.items()))
0137 )
0138 if not (
0139 func_args_str == ""
0140 and any([a == "Config" for a in func.__qualname__.split(".")])
0141 ):
0142 print(f"{func.__module__}.{func.__qualname__} ( {func_args_str} )")
0143
0144
0145 def dump_args(func):
0146 """
0147 Decorator to print function call details.
0148 This includes parameters names and effective values.
0149 https://stackoverflow.com/questions/6200270/decorator-that-prints-function-call-details-parameters-names-and-effective-valu
0150 """
0151 from functools import wraps
0152
0153 @wraps(func)
0154 def dump_args_wrapper(*args, **kwargs):
0155 dump_func_args(func, *args, **kwargs)
0156 return func(*args, **kwargs)
0157
0158
0159 for name in dir(func):
0160 if not name.startswith("__"):
0161 obj = getattr(func, name)
0162 wrapped = getattr(dump_args_wrapper, name, None)
0163 if type(obj) is not type(wrapped):
0164 setattr(dump_args_wrapper, name, obj)
0165
0166 return dump_args_wrapper
0167
0168
0169 def dump_args_calls(myLocal=None, mods=None, quiet=False):
0170 """
0171 Wrap all Python bindings calls to acts and its submodules in dump_args.
0172 Specify myLocal=locals() to include imported symbols too.
0173 """
0174 from collections.abc import Callable
0175
0176 def _allmods(mod, base, found):
0177 import types
0178
0179 mods = [mod]
0180 found.add(mod)
0181 for name, obj in sorted(
0182 vars(mod).items(),
0183 key=lambda m: (
0184 (2, m[0])
0185 if m[0] == "ActsPythonBindings"
0186 else (1, m[0]) if m[0].startswith("_") else (0, m[0])
0187 ),
0188 ):
0189 if (
0190 not name.startswith("__")
0191 and type(obj) is types.ModuleType
0192 and obj.__name__.startswith(base)
0193 and f"{mod.__name__}.{name}" in sys.modules
0194 and obj not in found
0195 ):
0196 mods += _allmods(obj, base, found)
0197 return mods
0198
0199 if mods is None:
0200 mods = _allmods(acts, "acts.", set())
0201 elif not isinstance(mods, list):
0202 mods = [mods]
0203
0204 donemods = []
0205 alldone = 0
0206 for mod in mods:
0207 done = 0
0208 for name in dir(mod):
0209
0210 obj = getattr(mod, name, None)
0211 if not (
0212 not name.startswith("__")
0213 and isinstance(obj, Callable)
0214 and hasattr(obj, "__module__")
0215 and obj.__module__.startswith("acts.ActsPythonBindings")
0216 and obj.__qualname__ != "_Sequencer.Config"
0217 and not hasattr(obj, "__wrapped__")
0218 ):
0219 continue
0220
0221 done += dump_args_calls(myLocal, [obj], True)
0222 wrapped = dump_args(obj)
0223 setattr(mod, name, wrapped)
0224 if myLocal and hasattr(myLocal, name):
0225 setattr(myLocal, name, wrapped)
0226 done += 1
0227 if done:
0228 alldone += done
0229 donemods.append(f"{mod.__name__}:{done}")
0230 if not quiet and donemods:
0231 print("dump_args for module functions:", ", ".join(donemods))
0232 return alldone
0233
0234
0235 class CustomLogLevel(Protocol):
0236 def __call__(
0237 self,
0238 minLevel: acts.logging.Level = acts.logging.VERBOSE,
0239 maxLevel: acts.logging.Level = acts.logging.FATAL,
0240 ) -> acts.logging.Level: ...
0241
0242
0243 def defaultLogging(
0244 s=None,
0245 logLevel: Optional[acts.logging.Level] = None,
0246 ) -> CustomLogLevel:
0247 """
0248 Establishes a default logging strategy for the python examples interface.
0249
0250 Returns a function that determines the log level in the following schema:
0251 - if `logLevel` is set use it otherwise use the log level of the sequencer `s.config.logLevel`
0252 - the returned log level is bound between `minLevel` and `maxLevel` provided to `customLogLevel`
0253
0254 Examples:
0255 - `customLogLevel(minLevel=acts.logging.INFO)` to get a log level that is INFO or higher
0256 (depending on the sequencer and `logLevel` param) which is useful to suppress a component which
0257 produces a bunch of logs below INFO and you are actually more interested in another component
0258 - `customLogLevel(maxLevel=acts.logging.INFO)` to get a log level that is INFO or lower
0259 (depending on the sequencer and `logLevel` param) which is useful to get more details from a
0260 component that will produce logs of interest below the default level
0261 - in summary `minLevel` defines the maximum amount of logging and `maxLevel` defines the minimum amount of logging
0262 """
0263
0264 def customLogLevel(
0265 minLevel: acts.logging.Level = acts.logging.VERBOSE,
0266 maxLevel: acts.logging.Level = acts.logging.FATAL,
0267 ) -> acts.logging.Level:
0268 l = logLevel if logLevel is not None else s.config.logLevel
0269 return acts.logging.Level(min(maxLevel.value, max(minLevel.value, l.value)))
0270
0271 return customLogLevel
0272
0273
0274 class Sequencer(ActsExamplesPythonBindings._Sequencer):
0275 _autoFpeMasks: Optional[List["FpeMask"]] = None
0276
0277 def __init__(self, *args, **kwargs):
0278
0279 if "fpeMasks" in kwargs:
0280 m = kwargs["fpeMasks"]
0281 if isinstance(m, list) and len(m) > 0 and isinstance(m[0], tuple):
0282 n = []
0283 for loc, fpe, count in m:
0284 file, lines = self.FpeMask.parse_loc(loc)
0285 t = _fpe_types_to_enum[fpe] if isinstance(fpe, str) else fpe
0286 n.append(self.FpeMask(file, lines, t, count))
0287 kwargs["fpeMasks"] = n
0288
0289 kwargs["fpeMasks"] = kwargs.get("fpeMasks", []) + self._getAutoFpeMasks()
0290
0291 if self.config.logLevel >= acts.logging.DEBUG:
0292 self._printFpeSummary(kwargs["fpeMasks"])
0293
0294 cfg = self.Config()
0295 if len(args) == 1 and isinstance(args[0], self.Config):
0296 cfg = args[0]
0297 if "config" in kwargs:
0298 cfg = kwargs.pop("config")
0299
0300 for k, v in kwargs.items():
0301 if not hasattr(cfg, k):
0302 raise ValueError(f"Sequencer.Config does not have field {k}")
0303 if isinstance(v, Path):
0304 v = str(v)
0305
0306 setattr(cfg, k, v)
0307
0308 if hasattr(ActsExamplesPythonBindings._Sequencer, "__wrapped__"):
0309 dump_func_args(Sequencer, cfg)
0310 super().__init__(cfg)
0311
0312 class FpeMask(ActsExamplesPythonBindings._Sequencer._FpeMask):
0313 @classmethod
0314 def fromFile(cls, file: Union[str, Path]) -> List["FpeMask"]:
0315 if isinstance(file, str):
0316 file = Path(file)
0317
0318 if file.suffix in (".yml", ".yaml"):
0319 try:
0320 return cls.fromYaml(file)
0321 except ImportError:
0322 print("FPE mask input file is YAML, but PyYAML is not installed")
0323 raise
0324
0325 @classmethod
0326 def fromYaml(cls, file: Union[str, Path]) -> List["FpeMask"]:
0327 import yaml
0328
0329 with file.open() as fh:
0330 d = yaml.safe_load(fh)
0331
0332 return cls.fromDict(d)
0333
0334 _fpe_types_to_enum = {
0335 v.name: v for v in ActsExamplesPythonBindings.FpeType.values
0336 }
0337
0338 @staticmethod
0339 def toDict(
0340 masks: List["FpeMask"],
0341 ) -> Dict[str, Dict[str, int]]:
0342 out = {}
0343 for mask in masks:
0344 loc_str = f"{mask.file}:"
0345 start, end = mask.lines
0346 if start == end - 1:
0347 loc_str += str(start)
0348 else:
0349 loc_str += f"({start}, {end}]"
0350 out.setdefault(loc_str, {})
0351 out[loc_str][mask.type.name] = mask.count
0352
0353 return out
0354
0355 @staticmethod
0356 def parse_loc(loc: str) -> Tuple[str, Tuple[int, int]]:
0357 file, lines = loc.split(":", 1)
0358
0359 if m := re.match(r"^\((\d+) ?, ?(\d+)\]$", lines.strip()):
0360 start, end = map(int, m.groups())
0361 elif m := re.match(r"^(\d+) ?- ?(\d+)$", lines.strip()):
0362 start, end = map(int, m.groups())
0363 end += 1
0364 else:
0365 start = int(lines)
0366 end = start + 1
0367
0368 return file, (start, end)
0369
0370 @classmethod
0371 def fromDict(cls, d: Dict[str, Dict[str, int]]) -> List["FpeMask"]:
0372 out = []
0373 for loc, types in d.items():
0374 file, lines = cls.parse_loc(loc)
0375
0376 for fpe, count in types.items():
0377 out.append(cls(file, lines, cls._fpe_types_to_enum[fpe], count))
0378 return out
0379
0380 @classmethod
0381 def srcdir(cls) -> Path:
0382 return Path(cls._sourceLocation).parent.parent.parent.parent
0383
0384 @classmethod
0385 def _getAutoFpeMasks(cls) -> List[FpeMask]:
0386 if cls._autoFpeMasks is not None:
0387 return cls._autoFpeMasks
0388
0389 srcdir = cls.srcdir()
0390
0391 cls._autoFpeMasks = []
0392
0393 for root, _, files in os.walk(srcdir):
0394 root = Path(root)
0395 for f in files:
0396 if (
0397 not f.endswith(".hpp")
0398 and not f.endswith(".cpp")
0399 and not f.endswith(".ipp")
0400 ):
0401 continue
0402 f = root / f
0403
0404 with f.open("r") as fh:
0405 lines = fh.readlines()
0406 for i, line in enumerate(lines):
0407 if m := re.match(r".*\/\/ ?MARK: ?(fpeMask\(.*)$", line):
0408 exp = m.group(1)
0409 for m in re.findall(
0410 r"fpeMask\( ?(\w+), ?(\d+) ?, ?#(\d+) ?\)", exp
0411 ):
0412 fpeType, count, _ = m
0413 count = int(count)
0414 rel = f.relative_to(srcdir)
0415 cls._autoFpeMasks.append(
0416 cls.FpeMask(
0417 str(rel),
0418 (i + 1, i + 2),
0419 cls.FpeMask._fpe_types_to_enum[fpeType],
0420 count,
0421 )
0422 )
0423
0424 if m := re.match(
0425 r".*\/\/ ?MARK: ?fpeMaskBegin\( ?(\w+), ?(\d+) ?, ?#?(\d+) ?\)",
0426 line,
0427 ):
0428 fpeType, count, _ = m.groups()
0429 count = int(count)
0430 rel = f.relative_to(srcdir)
0431
0432 start = i + 1
0433 end = None
0434
0435
0436 for j, line2 in enumerate(lines[i:]):
0437 if m := re.match(
0438 r".*\/\/ ?MARK: ?fpeMaskEnd\( ?(\w+) ?\)$", line2
0439 ):
0440 endType = m.group(1)
0441 if endType == fpeType:
0442 end = i + j + 1
0443 break
0444
0445 if end is None:
0446 raise ValueError(
0447 f"Found fpeMaskBegin but no fpeMaskEnd for {rel}:{start}"
0448 )
0449 cls._autoFpeMasks.append(
0450 cls.FpeMask(
0451 str(rel),
0452 (start, end + 1),
0453 cls.FpeMask._fpe_types_to_enum[fpeType],
0454 count,
0455 )
0456 )
0457
0458 return cls._autoFpeMasks
0459
0460 @classmethod
0461 def _printFpeSummary(cls, masks: List[FpeMask]):
0462 if len(masks) == 0 or "ACTS_SEQUENCER_DISABLE_FPEMON" in os.environ:
0463 return
0464
0465
0466 try:
0467 import rich
0468
0469 have_rich = True
0470 except ImportError:
0471 have_rich = False
0472
0473 error = False
0474 srcdir = cls.srcdir()
0475
0476 if not have_rich or not sys.stdout.isatty():
0477 print("FPE masks:")
0478 for mask in masks:
0479 s = f"{mask.file}:{mask.lines[0]}: {mask.type.name}: {mask.count}"
0480
0481 full_path = srcdir / mask.file
0482 if not full_path.exists():
0483 print(f"- {s}\n [File at {full_path} does not exist!]")
0484 error = True
0485 else:
0486 print(f"- {s}")
0487
0488 else:
0489 import rich
0490 import rich.rule
0491 import rich.panel
0492 from rich.markdown import Markdown as md
0493 import rich.syntax
0494 import rich.table
0495 import rich.text
0496
0497 rich.print(rich.rule.Rule("FPE masks"))
0498
0499 for i, mask in enumerate(masks):
0500 if i > 0:
0501 rich.print(rich.rule.Rule())
0502 full_path = srcdir / mask.file
0503 if not full_path.exists():
0504 rich.print(
0505 rich.panel.Panel(
0506 md(f"File at **{full_path}** does not exist"),
0507 title=f"{mask}",
0508 style="red",
0509 )
0510 )
0511 error = True
0512 continue
0513
0514 start, end = mask.lines
0515 start = max(0, start - 2)
0516 end += 2
0517 rich.print(
0518 rich.panel.Panel(
0519 rich.syntax.Syntax.from_path(
0520 full_path,
0521 line_numbers=True,
0522 line_range=(start, end),
0523 highlight_lines=list(range(*mask.lines)),
0524 ),
0525 title=f"{mask}",
0526 subtitle=f"{full_path}",
0527 )
0528 )
0529
0530 rich.print(rich.rule.Rule())
0531
0532 table = rich.table.Table(title="FPE Summary", expand=True)
0533 table.add_column("File")
0534 table.add_column("Lines")
0535 table.add_column("FPE type")
0536 table.add_column("Mask limit")
0537
0538 for mask in masks:
0539 start, end = mask.lines
0540 if start + 1 == end:
0541 line_str = str(start)
0542 else:
0543 line_str = f"({start}-{end}]"
0544
0545 full_path = srcdir / mask.file
0546
0547 table.add_row(
0548 str(mask.file),
0549 line_str,
0550 mask.type.name,
0551 str(mask.count),
0552 style="red" if not full_path.exists() else None,
0553 )
0554
0555 rich.print(table)
0556
0557 if error:
0558 raise RuntimeError("Sequencer FPE masking configuration has errors")