Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-01-09 09:26:45

0001 import sys, inspect
0002 from pathlib import Path
0003 from typing import Optional, Protocol, Union, List, Dict, Tuple
0004 import os
0005 import re
0006 import acts
0007 
0008 from .ActsExamplesPythonBindings import *
0009 
0010 from acts._adapter import _patch_config
0011 
0012 _patch_config(ActsExamplesPythonBindings)
0013 
0014 _propagators = []
0015 _concrete_propagators = []
0016 for stepper in ("Eigen", "Atlas", "StraightLine", "Sympy"):
0017     _propagators.append(getattr(acts, f"{stepper}Propagator"))
0018     _concrete_propagators.append(
0019         getattr(
0020             ActsExamplesPythonBindings,
0021             f"{stepper}ConcretePropagator",
0022         )
0023     )
0024 
0025 
0026 def ConcretePropagator(propagator):
0027     for prop, prop_if in zip(_propagators, _concrete_propagators):
0028         if isinstance(propagator, prop):
0029             return prop_if(propagator)
0030 
0031     raise TypeError(f"Unknown propagator {type(propagator).__name__}")
0032 
0033 
0034 def NamedTypeArgs(**namedTypeArgs):
0035     """Decorator to move args of a named type (e.g. `namedtuple` or `Enum`) to kwargs based on type, so user doesn't need to specify the key name.
0036     Also allows the keyword argument to be converted from a built-in type (eg. `tuple` or `int`).
0037     """
0038 
0039     namedTypeClasses = {c: a for a, c in namedTypeArgs.items()}
0040 
0041     def NamedTypeArgsDecorator(func):
0042         from functools import wraps
0043 
0044         @wraps(func)
0045         def NamedTypeArgsWrapper(*args, **kwargs):
0046             from collections.abc import Iterable
0047 
0048             for k, v in kwargs.items():
0049                 cls = namedTypeArgs.get(k)
0050                 if (
0051                     cls is not None
0052                     and v is not None
0053                     and type(v).__module__ == int.__module__  # is v a 'builtins'?
0054                     and not (
0055                         issubclass(type(v), Iterable) and all(type(e) is cls for e in v)
0056                     )  # not [cls]
0057                 ):
0058                     if issubclass(cls, Iterable):
0059                         kwargs[k] = cls(*v)
0060                     else:
0061                         kwargs[k] = cls(v)
0062 
0063             newargs = []
0064             for i, a in enumerate(args):
0065                 k = namedTypeClasses.get(type(a))
0066                 if k is None:
0067                     newargs.append(a)
0068                     if i > len(newargs):
0069                         types = [type(a).__name__ for a in args]
0070                         raise TypeError(
0071                             f"{func.__name__}() positional argument {i} of type {type(a)} follows named-type arguments, which were converted to keyword arguments. All argument types: {types}"
0072                         )
0073                 elif k in kwargs:
0074                     raise TypeError(f"{func.__name__}() keyword argument repeated: {k}")
0075                 else:
0076                     kwargs[k] = a
0077             return func(*newargs, **kwargs)
0078 
0079         return NamedTypeArgsWrapper
0080 
0081     return NamedTypeArgsDecorator
0082 
0083 
0084 def defaultKWArgs(**kwargs) -> dict:
0085     """Removes keyword arguments that are None or a list of all None (eg. [None,None]).
0086     This keeps the called function's defaults."""
0087     from collections.abc import Iterable
0088 
0089     return {
0090         k: v
0091         for k, v in kwargs.items()
0092         if not (
0093             v is None or (isinstance(v, Iterable) and all([vv is None for vv in v]))
0094         )
0095     }
0096 
0097 
0098 def dump_func_args(func, *args, **kwargs):
0099     def valstr(v, d=set()):
0100         from collections.abc import Callable
0101 
0102         if re.match(r"^<[\w.]+ object at 0x[\da-f]+>$", repr(v)):
0103             name = type(v).__module__ + "." + type(v).__qualname__
0104             if len(d) < 10 and name not in d and type(v).__name__ != "Sequencer":
0105                 try:
0106                     a = [
0107                         k + " = " + valstr(getattr(v, k), set(d | {name}))
0108                         for k in dir(v)
0109                         if not (
0110                             k.startswith("__") or isinstance(getattr(v, k), Callable)
0111                         )
0112                     ]
0113                 except:
0114                     a = []
0115             else:
0116                 a = []
0117             if a:
0118                 return name + "{ " + ", ".join(a) + " }"
0119             else:
0120                 return name + "{}"
0121         else:
0122             return repr(v)
0123 
0124     def keyvalstr(kv):
0125         return "{0} = {1}".format(kv[0], valstr(kv[1]))
0126 
0127     try:
0128         func_kwargs = inspect.signature(func).bind(*args, **kwargs).arguments
0129         func_args = func_kwargs.pop("args", [])
0130         func_args.count(None)  # raise AttributeError if not tuple/list
0131         func_kwargs.update(func_kwargs.pop("kwargs", {}))
0132     except (ValueError, AttributeError):
0133         func_kwargs = kwargs
0134         func_args = args
0135     func_args_str = ", ".join(
0136         list(map(valstr, func_args)) + list(map(keyvalstr, func_kwargs.items()))
0137     )
0138     if not (
0139         func_args_str == ""
0140         and any([a == "Config" for a in func.__qualname__.split(".")])
0141     ):
0142         print(f"{func.__module__}.{func.__qualname__} ( {func_args_str} )")
0143 
0144 
0145 def dump_args(func):
0146     """
0147     Decorator to print function call details.
0148     This includes parameters names and effective values.
0149     https://stackoverflow.com/questions/6200270/decorator-that-prints-function-call-details-parameters-names-and-effective-valu
0150     """
0151     from functools import wraps
0152 
0153     @wraps(func)
0154     def dump_args_wrapper(*args, **kwargs):
0155         dump_func_args(func, *args, **kwargs)
0156         return func(*args, **kwargs)
0157 
0158     # fix up any attributes broken by the wrapping
0159     for name in dir(func):
0160         if not name.startswith("__"):
0161             obj = getattr(func, name)
0162             wrapped = getattr(dump_args_wrapper, name, None)
0163             if type(obj) is not type(wrapped):
0164                 setattr(dump_args_wrapper, name, obj)
0165 
0166     return dump_args_wrapper
0167 
0168 
0169 def dump_args_calls(myLocal=None, mods=None, quiet=False):
0170     """
0171     Wrap all Python bindings calls to acts and its submodules in dump_args.
0172     Specify myLocal=locals() to include imported symbols too.
0173     """
0174     from collections.abc import Callable
0175 
0176     def _allmods(mod, base, found):
0177         import types
0178 
0179         mods = [mod]
0180         found.add(mod)
0181         for name, obj in sorted(
0182             vars(mod).items(),
0183             key=lambda m: (
0184                 (2, m[0])
0185                 if m[0] == "ActsPythonBindings"
0186                 else (1, m[0]) if m[0].startswith("_") else (0, m[0])
0187             ),
0188         ):
0189             if (
0190                 not name.startswith("__")
0191                 and type(obj) is types.ModuleType
0192                 and obj.__name__.startswith(base)
0193                 and f"{mod.__name__}.{name}" in sys.modules
0194                 and obj not in found
0195             ):
0196                 mods += _allmods(obj, base, found)
0197         return mods
0198 
0199     if mods is None:
0200         mods = _allmods(acts, "acts.", set())
0201     elif not isinstance(mods, list):
0202         mods = [mods]
0203 
0204     donemods = []
0205     alldone = 0
0206     for mod in mods:
0207         done = 0
0208         for name in dir(mod):
0209             # if name in {"Config", "Interval", "IMaterialDecorator"}: continue  # skip here if we don't fix up attributes in dump_args and unwrap classes elsewhere
0210             obj = getattr(mod, name, None)
0211             if not (
0212                 not name.startswith("__")
0213                 and isinstance(obj, Callable)
0214                 and hasattr(obj, "__module__")
0215                 and obj.__module__.startswith("acts.ActsPythonBindings")
0216                 and obj.__qualname__ != "_Sequencer.Config"
0217                 and not hasattr(obj, "__wrapped__")
0218             ):
0219                 continue
0220             # wrap class's contained methods
0221             done += dump_args_calls(myLocal, [obj], True)
0222             wrapped = dump_args(obj)
0223             setattr(mod, name, wrapped)
0224             if myLocal and hasattr(myLocal, name):
0225                 setattr(myLocal, name, wrapped)
0226             done += 1
0227         if done:
0228             alldone += done
0229             donemods.append(f"{mod.__name__}:{done}")
0230     if not quiet and donemods:
0231         print("dump_args for module functions:", ", ".join(donemods))
0232     return alldone
0233 
0234 
0235 class CustomLogLevel(Protocol):
0236     def __call__(
0237         self,
0238         minLevel: acts.logging.Level = acts.logging.VERBOSE,
0239         maxLevel: acts.logging.Level = acts.logging.FATAL,
0240     ) -> acts.logging.Level: ...
0241 
0242 
0243 def defaultLogging(
0244     s=None,
0245     logLevel: Optional[acts.logging.Level] = None,
0246 ) -> CustomLogLevel:
0247     """
0248     Establishes a default logging strategy for the python examples interface.
0249 
0250     Returns a function that determines the log level in the following schema:
0251     - if `logLevel` is set use it otherwise use the log level of the sequencer `s.config.logLevel`
0252     - the returned log level is bound between `minLevel` and `maxLevel` provided to `customLogLevel`
0253 
0254     Examples:
0255     - `customLogLevel(minLevel=acts.logging.INFO)` to get a log level that is INFO or higher
0256       (depending on the sequencer and `logLevel` param) which is useful to suppress a component which
0257       produces a bunch of logs below INFO and you are actually more interested in another component
0258     - `customLogLevel(maxLevel=acts.logging.INFO)` to get a log level that is INFO or lower
0259       (depending on the sequencer and `logLevel` param) which is useful to get more details from a
0260       component that will produce logs of interest below the default level
0261     - in summary `minLevel` defines the maximum amount of logging and `maxLevel` defines the minimum amount of logging
0262     """
0263 
0264     def customLogLevel(
0265         minLevel: acts.logging.Level = acts.logging.VERBOSE,
0266         maxLevel: acts.logging.Level = acts.logging.FATAL,
0267     ) -> acts.logging.Level:
0268         l = logLevel if logLevel is not None else s.config.logLevel
0269         return acts.logging.Level(min(maxLevel.value, max(minLevel.value, l.value)))
0270 
0271     return customLogLevel
0272 
0273 
0274 class Sequencer(ActsExamplesPythonBindings._Sequencer):
0275     _autoFpeMasks: Optional[List["FpeMask"]] = None
0276 
0277     def __init__(self, *args, **kwargs):
0278         # if we have the argument already in kwargs, we optionally convert them from tuples
0279         if "fpeMasks" in kwargs:
0280             m = kwargs["fpeMasks"]
0281             if isinstance(m, list) and len(m) > 0 and isinstance(m[0], tuple):
0282                 n = []
0283                 for loc, fpe, count in m:
0284                     file, lines = self.FpeMask.parse_loc(loc)
0285                     t = _fpe_types_to_enum[fpe] if isinstance(fpe, str) else fpe
0286                     n.append(self.FpeMask(file, lines, t, count))
0287                 kwargs["fpeMasks"] = n
0288 
0289         kwargs["fpeMasks"] = kwargs.get("fpeMasks", []) + self._getAutoFpeMasks()
0290 
0291         if self.config.logLevel >= acts.logging.DEBUG:
0292             self._printFpeSummary(kwargs["fpeMasks"])
0293 
0294         cfg = self.Config()
0295         if len(args) == 1 and isinstance(args[0], self.Config):
0296             cfg = args[0]
0297         if "config" in kwargs:
0298             cfg = kwargs.pop("config")
0299 
0300         for k, v in kwargs.items():
0301             if not hasattr(cfg, k):
0302                 raise ValueError(f"Sequencer.Config does not have field {k}")
0303             if isinstance(v, Path):
0304                 v = str(v)
0305 
0306             setattr(cfg, k, v)
0307 
0308         if hasattr(ActsExamplesPythonBindings._Sequencer, "__wrapped__"):
0309             dump_func_args(Sequencer, cfg)
0310         super().__init__(cfg)
0311 
0312     class FpeMask(ActsExamplesPythonBindings._Sequencer._FpeMask):
0313         @classmethod
0314         def fromFile(cls, file: Union[str, Path]) -> List["FpeMask"]:
0315             if isinstance(file, str):
0316                 file = Path(file)
0317 
0318             if file.suffix in (".yml", ".yaml"):
0319                 try:
0320                     return cls.fromYaml(file)
0321                 except ImportError:
0322                     print("FPE mask input file is YAML, but PyYAML is not installed")
0323                     raise
0324 
0325         @classmethod
0326         def fromYaml(cls, file: Union[str, Path]) -> List["FpeMask"]:
0327             import yaml
0328 
0329             with file.open() as fh:
0330                 d = yaml.safe_load(fh)
0331 
0332             return cls.fromDict(d)
0333 
0334         _fpe_types_to_enum = {
0335             v.name: v for v in ActsExamplesPythonBindings.FpeType.values
0336         }
0337 
0338         @staticmethod
0339         def toDict(
0340             masks: List["FpeMask"],
0341         ) -> Dict[str, Dict[str, int]]:
0342             out = {}
0343             for mask in masks:
0344                 loc_str = f"{mask.file}:"
0345                 start, end = mask.lines
0346                 if start == end - 1:
0347                     loc_str += str(start)
0348                 else:
0349                     loc_str += f"({start}, {end}]"
0350                 out.setdefault(loc_str, {})
0351                 out[loc_str][mask.type.name] = mask.count
0352 
0353                 return out
0354 
0355         @staticmethod
0356         def parse_loc(loc: str) -> Tuple[str, Tuple[int, int]]:
0357             file, lines = loc.split(":", 1)
0358 
0359             if m := re.match(r"^\((\d+) ?, ?(\d+)\]$", lines.strip()):
0360                 start, end = map(int, m.groups())
0361             elif m := re.match(r"^(\d+) ?- ?(\d+)$", lines.strip()):
0362                 start, end = map(int, m.groups())
0363                 end += 1  # assumption here is that it's inclusive
0364             else:
0365                 start = int(lines)
0366                 end = start + 1
0367 
0368             return file, (start, end)
0369 
0370         @classmethod
0371         def fromDict(cls, d: Dict[str, Dict[str, int]]) -> List["FpeMask"]:
0372             out = []
0373             for loc, types in d.items():
0374                 file, lines = cls.parse_loc(loc)
0375 
0376                 for fpe, count in types.items():
0377                     out.append(cls(file, lines, cls._fpe_types_to_enum[fpe], count))
0378             return out
0379 
0380     @classmethod
0381     def srcdir(cls) -> Path:
0382         return Path(cls._sourceLocation).parent.parent.parent.parent
0383 
0384     @classmethod
0385     def _getAutoFpeMasks(cls) -> List[FpeMask]:
0386         if cls._autoFpeMasks is not None:
0387             return cls._autoFpeMasks
0388 
0389         srcdir = cls.srcdir()
0390 
0391         cls._autoFpeMasks = []
0392 
0393         for root, _, files in os.walk(srcdir):
0394             root = Path(root)
0395             for f in files:
0396                 if (
0397                     not f.endswith(".hpp")
0398                     and not f.endswith(".cpp")
0399                     and not f.endswith(".ipp")
0400                 ):
0401                     continue
0402                 f = root / f
0403                 #  print(f)
0404                 with f.open("r") as fh:
0405                     lines = fh.readlines()
0406                 for i, line in enumerate(lines):
0407                     if m := re.match(r".*\/\/ ?MARK: ?(fpeMask\(.*)$", line):
0408                         exp = m.group(1)
0409                         for m in re.findall(
0410                             r"fpeMask\( ?(\w+), ?(\d+) ?, ?#(\d+) ?\)", exp
0411                         ):
0412                             fpeType, count, _ = m
0413                             count = int(count)
0414                             rel = f.relative_to(srcdir)
0415                             cls._autoFpeMasks.append(
0416                                 cls.FpeMask(
0417                                     str(rel),
0418                                     (i + 1, i + 2),
0419                                     cls.FpeMask._fpe_types_to_enum[fpeType],
0420                                     count,
0421                                 )
0422                             )
0423 
0424                     if m := re.match(
0425                         r".*\/\/ ?MARK: ?fpeMaskBegin\( ?(\w+), ?(\d+) ?, ?#?(\d+) ?\)",
0426                         line,
0427                     ):
0428                         fpeType, count, _ = m.groups()
0429                         count = int(count)
0430                         rel = f.relative_to(srcdir)
0431 
0432                         start = i + 1
0433                         end = None
0434 
0435                         # look for end marker
0436                         for j, line2 in enumerate(lines[i:]):
0437                             if m := re.match(
0438                                 r".*\/\/ ?MARK: ?fpeMaskEnd\( ?(\w+) ?\)$", line2
0439                             ):
0440                                 endType = m.group(1)
0441                                 if endType == fpeType:
0442                                     end = i + j + 1
0443                                     break
0444 
0445                         if end is None:
0446                             raise ValueError(
0447                                 f"Found fpeMaskBegin but no fpeMaskEnd for {rel}:{start}"
0448                             )
0449                         cls._autoFpeMasks.append(
0450                             cls.FpeMask(
0451                                 str(rel),
0452                                 (start, end + 1),
0453                                 cls.FpeMask._fpe_types_to_enum[fpeType],
0454                                 count,
0455                             )
0456                         )
0457 
0458         return cls._autoFpeMasks
0459 
0460     @classmethod
0461     def _printFpeSummary(cls, masks: List[FpeMask]):
0462         if len(masks) == 0 or "ACTS_SEQUENCER_DISABLE_FPEMON" in os.environ:
0463             return
0464 
0465         # Try to make a nice summary with rich, or fallback to a plain text one
0466         try:
0467             import rich
0468 
0469             have_rich = True
0470         except ImportError:
0471             have_rich = False
0472 
0473         error = False
0474         srcdir = cls.srcdir()
0475 
0476         if not have_rich or not sys.stdout.isatty():
0477             print("FPE masks:")
0478             for mask in masks:
0479                 s = f"{mask.file}:{mask.lines[0]}: {mask.type.name}: {mask.count}"
0480 
0481                 full_path = srcdir / mask.file
0482                 if not full_path.exists():
0483                     print(f"- {s}\n  [File at {full_path} does not exist!]")
0484                     error = True
0485                 else:
0486                     print(f"- {s}")
0487 
0488         else:
0489             import rich
0490             import rich.rule
0491             import rich.panel
0492             from rich.markdown import Markdown as md
0493             import rich.syntax
0494             import rich.table
0495             import rich.text
0496 
0497             rich.print(rich.rule.Rule("FPE masks"))
0498 
0499             for i, mask in enumerate(masks):
0500                 if i > 0:
0501                     rich.print(rich.rule.Rule())
0502                 full_path = srcdir / mask.file
0503                 if not full_path.exists():
0504                     rich.print(
0505                         rich.panel.Panel(
0506                             md(f"File at **{full_path}** does not exist"),
0507                             title=f"{mask}",
0508                             style="red",
0509                         )
0510                     )
0511                     error = True
0512                     continue
0513 
0514                 start, end = mask.lines
0515                 start = max(0, start - 2)
0516                 end += 2
0517                 rich.print(
0518                     rich.panel.Panel(
0519                         rich.syntax.Syntax.from_path(
0520                             full_path,
0521                             line_numbers=True,
0522                             line_range=(start, end),
0523                             highlight_lines=list(range(*mask.lines)),
0524                         ),
0525                         title=f"{mask}",
0526                         subtitle=f"{full_path}",
0527                     )
0528                 )
0529 
0530             rich.print(rich.rule.Rule())
0531 
0532             table = rich.table.Table(title="FPE Summary", expand=True)
0533             table.add_column("File")
0534             table.add_column("Lines")
0535             table.add_column("FPE type")
0536             table.add_column("Mask limit")
0537 
0538             for mask in masks:
0539                 start, end = mask.lines
0540                 if start + 1 == end:
0541                     line_str = str(start)
0542                 else:
0543                     line_str = f"({start}-{end}]"
0544 
0545                 full_path = srcdir / mask.file
0546 
0547                 table.add_row(
0548                     str(mask.file),
0549                     line_str,
0550                     mask.type.name,
0551                     str(mask.count),
0552                     style="red" if not full_path.exists() else None,
0553                 )
0554 
0555             rich.print(table)
0556 
0557         if error:
0558             raise RuntimeError("Sequencer FPE masking configuration has errors")