Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 09:12:06

0001 import re
0002 import copy
0003 from itertools import cycle
0004 
0005 import numpy as np
0006 
0007 # This should allow to run the non-gui mode in the CI
0008 try:
0009     from matplotlib.pyplot import subplots
0010 except:
0011 
0012     def subplots(*args, **kwargs):
0013         raise RuntimeError("Matplotlib could not be imported")
0014 
0015 
0016 class AverageTrackPlotter:
0017     def __init__(self, view_drawers, annotate_steps=False):
0018         self.annotate_steps = annotate_steps
0019 
0020         self.positions = []
0021         self.ibackward = None
0022 
0023         self.view_drawers = view_drawers
0024 
0025     def parse_step(self, step):
0026         for line in step:
0027             if line.count("Do backward propagation") == 1:
0028                 self.ibackward = len(self.positions)
0029 
0030             elif line.count("at mean position") == 1:
0031                 line = re.sub(r"^.*position", "", line)
0032                 line = re.sub(r"with.*$", "", line)
0033 
0034                 self.positions.append(np.array([float(item) for item in line.split()]))
0035 
0036     def name(self):
0037         return "Average track"
0038 
0039     def get_figure_axes(self):
0040         return subplots(1, len(self.view_drawers))
0041 
0042     def draw(self, fig, axes, step):
0043         for ax, drawer in zip(axes, self.view_drawers):
0044             ax = drawer.draw_detector(ax)
0045 
0046         fwd_to_plot = self.positions[: self.ibackward]
0047         bwd_to_plot = self.positions[self.ibackward :]
0048 
0049         if step < len(fwd_to_plot):
0050             fwd_to_plot = fwd_to_plot[:step]
0051             bwd_to_plot = []
0052         else:
0053             bwd_steps = step - len(fwd_to_plot)
0054             bwd_to_plot = bwd_to_plot[:bwd_steps]
0055 
0056         positions = [fwd_to_plot, bwd_to_plot]
0057         names = ["forward", "backward"]
0058 
0059         for positions, direction in zip(positions, names):
0060             if len(positions) == 0:
0061                 continue
0062 
0063             positions = np.vstack(positions)
0064 
0065             for ax, drawer in zip(axes, self.view_drawers):
0066                 ax = drawer.plot(ax, positions, label=direction, marker="x", lw=1)
0067 
0068                 if self.annotate_steps:
0069                     for i, p in enumerate(positions):
0070                         ax = drawer.annotate(ax, p, str(i))
0071 
0072                 ax = drawer.set_title(ax)
0073                 ax.legend()
0074 
0075         fig.tight_layout()
0076         return fig, axes
0077 
0078 
0079 class ComponentsPlotter:
0080     def __init__(self, view_drawers):
0081         self.current_direction = "forward"
0082         self.steps = []
0083 
0084         self.view_drawers = view_drawers
0085 
0086         self.colors = [
0087             "red",
0088             "orangered",
0089             "orange",
0090             "gold",
0091             "olive",
0092             "forestgreen",
0093             "lime",
0094             "teal",
0095             "cyan",
0096             "blue",
0097             "indigo",
0098             "magenta",
0099             "brown",
0100         ]
0101 
0102     def parse_step(self, step):
0103         surface_name = None
0104         component_sets = []
0105         components = []
0106 
0107         for line in step:
0108             if line.count("Step is at surface") == 1:
0109                 surface_name = line[line.find("vol") :]
0110 
0111             elif line.count("Do backward propagation") == 1:
0112                 self.current_direction = "backward"
0113 
0114             elif re.match(r"^.*#[0-9]+\spos", line):
0115                 line = line.replace(",", "")
0116                 splits = line.split()
0117 
0118                 current_cmp = int(splits[3][1:])
0119                 pos = np.array([float(part) for part in splits[5:8]])
0120 
0121                 # There can be two sets of components printed in a step
0122                 if current_cmp < len(components):
0123                     component_sets.append(copy.deepcopy(components))
0124                     components = []
0125 
0126                 components.append(pos)
0127 
0128         component_sets.append(copy.deepcopy(components))
0129         assert len(component_sets) <= 2
0130 
0131         self.steps.append(
0132             (
0133                 surface_name,
0134                 self.current_direction,
0135                 component_sets,
0136             )
0137         )
0138 
0139     def name(self):
0140         return "Components"
0141 
0142     def get_figure_axes(self):
0143         return subplots(1, len(self.view_drawers))
0144 
0145     def number_steps(self):
0146         return sum([len(s[3]) for s in self.stages])
0147 
0148     def draw(self, fig, axes, requested_step):
0149         kSurface = 0
0150         kDirection = 1
0151         kComponents = 2
0152 
0153         for ax, drawer in zip(axes, self.view_drawers):
0154             ax = drawer.draw_detector(ax)
0155 
0156         if requested_step >= len(self.steps):
0157             fig.suptitle("Step out of range")
0158             return fig, axes
0159 
0160         n_components = len(self.steps[requested_step][kComponents][-1])
0161 
0162         # go back to last surface
0163         start_surface = "<unknown>"
0164         positions = []
0165         for si in range(requested_step, 0, -1):
0166             if len(self.steps[si][kComponents][-1]) != n_components:
0167                 fig.suptitle("error: component number mismatch")
0168                 return fig, axes
0169             positions.append(self.steps[si][kComponents][-1])
0170 
0171             if self.steps[si][kSurface] is not None:
0172                 start_surface = self.steps[si][kSurface]
0173                 break
0174 
0175         fig.suptitle(
0176             "Stepping {} with {} components from {}".format(
0177                 self.steps[requested_step][kDirection],
0178                 n_components,
0179                 start_surface.strip(),
0180             )
0181         )
0182 
0183         n_steps = len(positions)
0184         if n_steps == 0:
0185             return fig, axes
0186 
0187         try:
0188             positions = np.array(positions)
0189             assert len(positions.shape) == 3
0190 
0191             assert positions.shape[0] == n_steps
0192             assert positions.shape[1] == n_components
0193             assert positions.shape[2] == 3
0194 
0195             for i, color in zip(range(n_components), cycle(self.colors)):
0196                 for ax, drawer in zip(axes, self.view_drawers):
0197                     ax = drawer.plot(ax, positions[:, i, :], c=color, marker="x", lw=1)
0198         except:
0199             fig.suptitle("Error drawing")
0200             import pprint
0201 
0202             pprint.pprint(positions)
0203 
0204         fig.tight_layout()
0205         return fig, axes
0206 
0207 
0208 class GsfMomentumRecorder:
0209     def __init__(self):
0210         # Global state
0211         self.gsf_started_backwards = False
0212         self.gsf_accumulated_pathlength = 0
0213         self.printed_qop_warning = False
0214 
0215         # Recordings
0216         self.gsf_momenta = []
0217         self.gsf_cmp_data = []
0218         self.gsf_pathlengths = []
0219 
0220         # Current step
0221         self.gsf_current_step_cmp_momenta = []
0222         self.gsf_current_step_cmp_weights = []
0223         self.gsf_have_momentum = False
0224         self.gsf_last_step_number = None
0225 
0226     def parse_line(self, line):
0227         if line.count("Gsf step") == 1:
0228             # Last step appears twice in log, prevent this
0229             if int(line.split()[5]) == self.gsf_last_step_number:
0230                 return
0231 
0232             self.gsf_last_step_number = int(line.split()[5])
0233 
0234             # Update component states if not in first step
0235             if len(self.gsf_current_step_cmp_momenta) > 0:
0236                 self.gsf_cmp_data.append(
0237                     (
0238                         self.gsf_current_step_cmp_momenta,
0239                         self.gsf_current_step_cmp_weights,
0240                     )
0241                 )
0242                 self.gsf_current_step_cmp_momenta = []
0243                 self.gsf_current_step_cmp_weights = []
0244 
0245             # Save momentum
0246             assert len(self.gsf_momenta) == len(self.gsf_pathlengths)
0247             self.gsf_momenta.append(float(line.split()[-4]))
0248             self.gsf_have_momentum = True
0249 
0250         elif re.match(r"^.*#[0-9]+\spos", line) and not self.gsf_started_backwards:
0251             line = line.replace(",", "")
0252             qop = float(line.split()[-3])
0253             p = abs(1 / qop)
0254             w = float(line.split()[-7])
0255             self.gsf_current_step_cmp_momenta.append(p)
0256             self.gsf_current_step_cmp_weights.append(w)
0257 
0258         elif line.count("Step with size") == 1:
0259             self.gsf_accumulated_pathlength += float(line.split()[-2])
0260 
0261             if self.gsf_have_momentum:
0262                 self.gsf_have_momentum = False
0263                 self.gsf_pathlengths.append(self.gsf_accumulated_pathlength)
0264                 assert len(self.gsf_pathlengths) == len(self.gsf_momenta)
0265 
0266 
0267 class MomentumGraph:
0268     def __init__(self):
0269         self._flipped = False
0270 
0271         self.momenta = []
0272         self.pathlenghts = []
0273         self.iBackward = None
0274 
0275     def parse_step(self, step):
0276         mom = None
0277         pl = None
0278 
0279         for line in step:
0280             if line.count("Gsf step") == 1:
0281                 mom = float(line.split()[-4])
0282 
0283             elif line.count("Step with size") == 1:
0284                 pl = float(line.split()[-2])
0285 
0286             if line.count("Do backward") == 1:
0287                 self.iBackward = len(self.momenta)
0288 
0289         if mom is None or pl is None:
0290             return
0291 
0292         self.momenta.append(mom)
0293         self.pathlenghts.append(pl)
0294 
0295     def name(self):
0296         return "Momentum"
0297 
0298     def get_figure_axes(self):
0299         return subplots()
0300 
0301     def draw(self, fig, ax, requested_step):
0302         fwd_mom, bwd_mom, fwd_pls, bwd_pls = [], [], [], []
0303 
0304         if self.iBackward is None:
0305             fwd_mom = self.momenta
0306             fwd_pls = self.pathlenghts
0307         else:
0308             fwd_mom = self.momenta[: self.iBackward]
0309             fwd_pls = self.pathlenghts[: self.iBackward]
0310 
0311             bwd_mom = self.momenta[self.iBackward :]
0312             bwd_pls = self.pathlenghts[self.iBackward :]
0313 
0314         fwd_pls = np.cumsum(fwd_pls)
0315         ax.plot(fwd_pls, fwd_mom, color="tab:blue")
0316 
0317         bwd_pls = np.cumsum(bwd_pls)
0318         bwd_pls = max(abs(bwd_pls)) + bwd_pls
0319         ax.plot(bwd_pls, bwd_mom, color="tab:orange")
0320 
0321         if requested_step < self.iBackward:
0322             ax.vlines(
0323                 fwd_pls[requested_step], *ax.get_ylim(), alpha=0.5, color="tab:blue"
0324             )
0325         else:
0326             s = requested_step - self.iBackward
0327             if s < len(bwd_pls):
0328                 ax.vlines(bwd_pls[s], *ax.get_ylim(), alpha=0.5, color="tab:orange")
0329 
0330 
0331 class BoundParametersProcessor:
0332     def __init__(self, key):
0333         assert key == "Filtered" or key == "Predicted"
0334         self.key = key
0335         self.pars_pattern = "^.*" + self.key + " parameters:(.*)$"
0336         self.cov_preamble = "^.*" + self.key + " covariance:$"
0337 
0338         self.step_data = []
0339 
0340     def parse_step(self, step):
0341         weights = []
0342         parameters = []
0343         covs = []
0344         for i in range(len(step)):
0345             line = step[i]
0346 
0347             m = re.match(r"^.*weight: (.*), status:.*$", line)
0348             if m:
0349                 weights.append(float(m[1]))
0350 
0351             if not (
0352                 f"{self.key} parameters" in line or f"{self.key} covariance" in line
0353             ):
0354                 continue
0355 
0356             m = re.match(self.pars_pattern, line)
0357             if m:
0358                 pars = np.array([float(f) for f in m[1].split()])
0359                 assert len(pars) == 6
0360                 parameters.append(pars)
0361                 continue
0362 
0363             m = re.match(self.cov_preamble, line)
0364 
0365             if m:
0366                 cov = []
0367                 for j in range(i + 1, i + 7):
0368                     cov.append([float(f) for f in step[j].split()])
0369                 covs.append(np.array(cov))
0370                 assert covs[-1].shape == (6, 6)
0371                 i = i + 6
0372 
0373         assert len(parameters) == len(covs)
0374         assert len(weights) >= len(parameters)
0375         if len(parameters) > 0:
0376             self.step_data.append((weights[: len(parameters)], parameters, covs))
0377         else:
0378             self.step_data.append(None)
0379 
0380     def name(self):
0381         return f"{self.key} state"
0382 
0383     def get_figure_axes(self):
0384         return subplots(2, 3)
0385 
0386     def draw(self, fig, axes, requested_step):
0387         import scipy
0388 
0389         if requested_step >= len(self.step_data):
0390             fig.suptitle("Error: Step out of bound")
0391             return fig, axes
0392 
0393         if self.step_data[requested_step] is None:
0394             fig.suptitle("nothing to draw, not on surface")
0395             return fig, axes
0396 
0397         ws, pars, covs = self.step_data[requested_step]
0398 
0399         w_str = ", ".join([f"{w:.2f}" for w in ws])
0400         fig.suptitle(f"draw {len(ws)} components\nweights: {w_str}")
0401 
0402         j_max = np.argmax(ws)
0403 
0404         for i, (ax, title) in enumerate(
0405             zip(axes.flatten(), ["l0", "l1", "theta", "phi", "qop", "t"])
0406         ):
0407             cmps = [(ws[j], pars[j][i], covs[j][i, i]) for j in range(len(ws))]
0408 
0409             # minx = min([ m[1]-3*np.sqrt(m[2]) for m in cmps ])
0410             # maxx = max([ m[1]+3*np.sqrt(m[2]) for m in cmps ])
0411             minx = cmps[j_max][1] - 3 * np.sqrt(cmps[j_max][2])
0412             maxx = cmps[j_max][1] + 3 * np.sqrt(cmps[j_max][2])
0413 
0414             minx = min(minx, min([m[1] for m in cmps if m[0] > 0.05]))
0415             maxx = max(maxx, max([m[1] for m in cmps if m[0] > 0.05]))
0416 
0417             # minx = pars[0][i]-3*covs[0]
0418             mixture = lambda x: sum(
0419                 [
0420                     w * scipy.stats.norm(loc=mu, scale=np.sqrt(var)).pdf(x)
0421                     for w, mu, var in cmps
0422                 ]
0423             )
0424 
0425             x = np.linspace(minx, maxx, 200)
0426             y = sum(
0427                 [
0428                     w * scipy.stats.norm(loc=mu, scale=np.sqrt(var)).pdf(x)
0429                     for w, mu, var in cmps
0430                 ]
0431             )
0432 
0433             if len(ws) > 1:
0434                 for w, mu, var in cmps:
0435                     ax.plot(
0436                         x, w * scipy.stats.norm(loc=mu, scale=np.sqrt(var)).pdf(x), lw=1
0437                     )
0438 
0439             ax.plot(x, y, lw=3, color="black")
0440             # ax.plot(x, [ scipy.stats.norm(loc=pars[0][i], scale=np.sqrt(covs[0][i,i])).pdf(xx) for xx in x])
0441             ax.set_title(title)
0442 
0443         fig.tight_layout()
0444         return fig, ax