File indexing completed on 2025-01-18 09:12:06
0001 import re
0002 import copy
0003 from itertools import cycle
0004
0005 import numpy as np
0006
0007
0008 try:
0009 from matplotlib.pyplot import subplots
0010 except:
0011
0012 def subplots(*args, **kwargs):
0013 raise RuntimeError("Matplotlib could not be imported")
0014
0015
0016 class AverageTrackPlotter:
0017 def __init__(self, view_drawers, annotate_steps=False):
0018 self.annotate_steps = annotate_steps
0019
0020 self.positions = []
0021 self.ibackward = None
0022
0023 self.view_drawers = view_drawers
0024
0025 def parse_step(self, step):
0026 for line in step:
0027 if line.count("Do backward propagation") == 1:
0028 self.ibackward = len(self.positions)
0029
0030 elif line.count("at mean position") == 1:
0031 line = re.sub(r"^.*position", "", line)
0032 line = re.sub(r"with.*$", "", line)
0033
0034 self.positions.append(np.array([float(item) for item in line.split()]))
0035
0036 def name(self):
0037 return "Average track"
0038
0039 def get_figure_axes(self):
0040 return subplots(1, len(self.view_drawers))
0041
0042 def draw(self, fig, axes, step):
0043 for ax, drawer in zip(axes, self.view_drawers):
0044 ax = drawer.draw_detector(ax)
0045
0046 fwd_to_plot = self.positions[: self.ibackward]
0047 bwd_to_plot = self.positions[self.ibackward :]
0048
0049 if step < len(fwd_to_plot):
0050 fwd_to_plot = fwd_to_plot[:step]
0051 bwd_to_plot = []
0052 else:
0053 bwd_steps = step - len(fwd_to_plot)
0054 bwd_to_plot = bwd_to_plot[:bwd_steps]
0055
0056 positions = [fwd_to_plot, bwd_to_plot]
0057 names = ["forward", "backward"]
0058
0059 for positions, direction in zip(positions, names):
0060 if len(positions) == 0:
0061 continue
0062
0063 positions = np.vstack(positions)
0064
0065 for ax, drawer in zip(axes, self.view_drawers):
0066 ax = drawer.plot(ax, positions, label=direction, marker="x", lw=1)
0067
0068 if self.annotate_steps:
0069 for i, p in enumerate(positions):
0070 ax = drawer.annotate(ax, p, str(i))
0071
0072 ax = drawer.set_title(ax)
0073 ax.legend()
0074
0075 fig.tight_layout()
0076 return fig, axes
0077
0078
0079 class ComponentsPlotter:
0080 def __init__(self, view_drawers):
0081 self.current_direction = "forward"
0082 self.steps = []
0083
0084 self.view_drawers = view_drawers
0085
0086 self.colors = [
0087 "red",
0088 "orangered",
0089 "orange",
0090 "gold",
0091 "olive",
0092 "forestgreen",
0093 "lime",
0094 "teal",
0095 "cyan",
0096 "blue",
0097 "indigo",
0098 "magenta",
0099 "brown",
0100 ]
0101
0102 def parse_step(self, step):
0103 surface_name = None
0104 component_sets = []
0105 components = []
0106
0107 for line in step:
0108 if line.count("Step is at surface") == 1:
0109 surface_name = line[line.find("vol") :]
0110
0111 elif line.count("Do backward propagation") == 1:
0112 self.current_direction = "backward"
0113
0114 elif re.match(r"^.*#[0-9]+\spos", line):
0115 line = line.replace(",", "")
0116 splits = line.split()
0117
0118 current_cmp = int(splits[3][1:])
0119 pos = np.array([float(part) for part in splits[5:8]])
0120
0121
0122 if current_cmp < len(components):
0123 component_sets.append(copy.deepcopy(components))
0124 components = []
0125
0126 components.append(pos)
0127
0128 component_sets.append(copy.deepcopy(components))
0129 assert len(component_sets) <= 2
0130
0131 self.steps.append(
0132 (
0133 surface_name,
0134 self.current_direction,
0135 component_sets,
0136 )
0137 )
0138
0139 def name(self):
0140 return "Components"
0141
0142 def get_figure_axes(self):
0143 return subplots(1, len(self.view_drawers))
0144
0145 def number_steps(self):
0146 return sum([len(s[3]) for s in self.stages])
0147
0148 def draw(self, fig, axes, requested_step):
0149 kSurface = 0
0150 kDirection = 1
0151 kComponents = 2
0152
0153 for ax, drawer in zip(axes, self.view_drawers):
0154 ax = drawer.draw_detector(ax)
0155
0156 if requested_step >= len(self.steps):
0157 fig.suptitle("Step out of range")
0158 return fig, axes
0159
0160 n_components = len(self.steps[requested_step][kComponents][-1])
0161
0162
0163 start_surface = "<unknown>"
0164 positions = []
0165 for si in range(requested_step, 0, -1):
0166 if len(self.steps[si][kComponents][-1]) != n_components:
0167 fig.suptitle("error: component number mismatch")
0168 return fig, axes
0169 positions.append(self.steps[si][kComponents][-1])
0170
0171 if self.steps[si][kSurface] is not None:
0172 start_surface = self.steps[si][kSurface]
0173 break
0174
0175 fig.suptitle(
0176 "Stepping {} with {} components from {}".format(
0177 self.steps[requested_step][kDirection],
0178 n_components,
0179 start_surface.strip(),
0180 )
0181 )
0182
0183 n_steps = len(positions)
0184 if n_steps == 0:
0185 return fig, axes
0186
0187 try:
0188 positions = np.array(positions)
0189 assert len(positions.shape) == 3
0190
0191 assert positions.shape[0] == n_steps
0192 assert positions.shape[1] == n_components
0193 assert positions.shape[2] == 3
0194
0195 for i, color in zip(range(n_components), cycle(self.colors)):
0196 for ax, drawer in zip(axes, self.view_drawers):
0197 ax = drawer.plot(ax, positions[:, i, :], c=color, marker="x", lw=1)
0198 except:
0199 fig.suptitle("Error drawing")
0200 import pprint
0201
0202 pprint.pprint(positions)
0203
0204 fig.tight_layout()
0205 return fig, axes
0206
0207
0208 class GsfMomentumRecorder:
0209 def __init__(self):
0210
0211 self.gsf_started_backwards = False
0212 self.gsf_accumulated_pathlength = 0
0213 self.printed_qop_warning = False
0214
0215
0216 self.gsf_momenta = []
0217 self.gsf_cmp_data = []
0218 self.gsf_pathlengths = []
0219
0220
0221 self.gsf_current_step_cmp_momenta = []
0222 self.gsf_current_step_cmp_weights = []
0223 self.gsf_have_momentum = False
0224 self.gsf_last_step_number = None
0225
0226 def parse_line(self, line):
0227 if line.count("Gsf step") == 1:
0228
0229 if int(line.split()[5]) == self.gsf_last_step_number:
0230 return
0231
0232 self.gsf_last_step_number = int(line.split()[5])
0233
0234
0235 if len(self.gsf_current_step_cmp_momenta) > 0:
0236 self.gsf_cmp_data.append(
0237 (
0238 self.gsf_current_step_cmp_momenta,
0239 self.gsf_current_step_cmp_weights,
0240 )
0241 )
0242 self.gsf_current_step_cmp_momenta = []
0243 self.gsf_current_step_cmp_weights = []
0244
0245
0246 assert len(self.gsf_momenta) == len(self.gsf_pathlengths)
0247 self.gsf_momenta.append(float(line.split()[-4]))
0248 self.gsf_have_momentum = True
0249
0250 elif re.match(r"^.*#[0-9]+\spos", line) and not self.gsf_started_backwards:
0251 line = line.replace(",", "")
0252 qop = float(line.split()[-3])
0253 p = abs(1 / qop)
0254 w = float(line.split()[-7])
0255 self.gsf_current_step_cmp_momenta.append(p)
0256 self.gsf_current_step_cmp_weights.append(w)
0257
0258 elif line.count("Step with size") == 1:
0259 self.gsf_accumulated_pathlength += float(line.split()[-2])
0260
0261 if self.gsf_have_momentum:
0262 self.gsf_have_momentum = False
0263 self.gsf_pathlengths.append(self.gsf_accumulated_pathlength)
0264 assert len(self.gsf_pathlengths) == len(self.gsf_momenta)
0265
0266
0267 class MomentumGraph:
0268 def __init__(self):
0269 self._flipped = False
0270
0271 self.momenta = []
0272 self.pathlenghts = []
0273 self.iBackward = None
0274
0275 def parse_step(self, step):
0276 mom = None
0277 pl = None
0278
0279 for line in step:
0280 if line.count("Gsf step") == 1:
0281 mom = float(line.split()[-4])
0282
0283 elif line.count("Step with size") == 1:
0284 pl = float(line.split()[-2])
0285
0286 if line.count("Do backward") == 1:
0287 self.iBackward = len(self.momenta)
0288
0289 if mom is None or pl is None:
0290 return
0291
0292 self.momenta.append(mom)
0293 self.pathlenghts.append(pl)
0294
0295 def name(self):
0296 return "Momentum"
0297
0298 def get_figure_axes(self):
0299 return subplots()
0300
0301 def draw(self, fig, ax, requested_step):
0302 fwd_mom, bwd_mom, fwd_pls, bwd_pls = [], [], [], []
0303
0304 if self.iBackward is None:
0305 fwd_mom = self.momenta
0306 fwd_pls = self.pathlenghts
0307 else:
0308 fwd_mom = self.momenta[: self.iBackward]
0309 fwd_pls = self.pathlenghts[: self.iBackward]
0310
0311 bwd_mom = self.momenta[self.iBackward :]
0312 bwd_pls = self.pathlenghts[self.iBackward :]
0313
0314 fwd_pls = np.cumsum(fwd_pls)
0315 ax.plot(fwd_pls, fwd_mom, color="tab:blue")
0316
0317 bwd_pls = np.cumsum(bwd_pls)
0318 bwd_pls = max(abs(bwd_pls)) + bwd_pls
0319 ax.plot(bwd_pls, bwd_mom, color="tab:orange")
0320
0321 if requested_step < self.iBackward:
0322 ax.vlines(
0323 fwd_pls[requested_step], *ax.get_ylim(), alpha=0.5, color="tab:blue"
0324 )
0325 else:
0326 s = requested_step - self.iBackward
0327 if s < len(bwd_pls):
0328 ax.vlines(bwd_pls[s], *ax.get_ylim(), alpha=0.5, color="tab:orange")
0329
0330
0331 class BoundParametersProcessor:
0332 def __init__(self, key):
0333 assert key == "Filtered" or key == "Predicted"
0334 self.key = key
0335 self.pars_pattern = "^.*" + self.key + " parameters:(.*)$"
0336 self.cov_preamble = "^.*" + self.key + " covariance:$"
0337
0338 self.step_data = []
0339
0340 def parse_step(self, step):
0341 weights = []
0342 parameters = []
0343 covs = []
0344 for i in range(len(step)):
0345 line = step[i]
0346
0347 m = re.match(r"^.*weight: (.*), status:.*$", line)
0348 if m:
0349 weights.append(float(m[1]))
0350
0351 if not (
0352 f"{self.key} parameters" in line or f"{self.key} covariance" in line
0353 ):
0354 continue
0355
0356 m = re.match(self.pars_pattern, line)
0357 if m:
0358 pars = np.array([float(f) for f in m[1].split()])
0359 assert len(pars) == 6
0360 parameters.append(pars)
0361 continue
0362
0363 m = re.match(self.cov_preamble, line)
0364
0365 if m:
0366 cov = []
0367 for j in range(i + 1, i + 7):
0368 cov.append([float(f) for f in step[j].split()])
0369 covs.append(np.array(cov))
0370 assert covs[-1].shape == (6, 6)
0371 i = i + 6
0372
0373 assert len(parameters) == len(covs)
0374 assert len(weights) >= len(parameters)
0375 if len(parameters) > 0:
0376 self.step_data.append((weights[: len(parameters)], parameters, covs))
0377 else:
0378 self.step_data.append(None)
0379
0380 def name(self):
0381 return f"{self.key} state"
0382
0383 def get_figure_axes(self):
0384 return subplots(2, 3)
0385
0386 def draw(self, fig, axes, requested_step):
0387 import scipy
0388
0389 if requested_step >= len(self.step_data):
0390 fig.suptitle("Error: Step out of bound")
0391 return fig, axes
0392
0393 if self.step_data[requested_step] is None:
0394 fig.suptitle("nothing to draw, not on surface")
0395 return fig, axes
0396
0397 ws, pars, covs = self.step_data[requested_step]
0398
0399 w_str = ", ".join([f"{w:.2f}" for w in ws])
0400 fig.suptitle(f"draw {len(ws)} components\nweights: {w_str}")
0401
0402 j_max = np.argmax(ws)
0403
0404 for i, (ax, title) in enumerate(
0405 zip(axes.flatten(), ["l0", "l1", "theta", "phi", "qop", "t"])
0406 ):
0407 cmps = [(ws[j], pars[j][i], covs[j][i, i]) for j in range(len(ws))]
0408
0409
0410
0411 minx = cmps[j_max][1] - 3 * np.sqrt(cmps[j_max][2])
0412 maxx = cmps[j_max][1] + 3 * np.sqrt(cmps[j_max][2])
0413
0414 minx = min(minx, min([m[1] for m in cmps if m[0] > 0.05]))
0415 maxx = max(maxx, max([m[1] for m in cmps if m[0] > 0.05]))
0416
0417
0418 mixture = lambda x: sum(
0419 [
0420 w * scipy.stats.norm(loc=mu, scale=np.sqrt(var)).pdf(x)
0421 for w, mu, var in cmps
0422 ]
0423 )
0424
0425 x = np.linspace(minx, maxx, 200)
0426 y = sum(
0427 [
0428 w * scipy.stats.norm(loc=mu, scale=np.sqrt(var)).pdf(x)
0429 for w, mu, var in cmps
0430 ]
0431 )
0432
0433 if len(ws) > 1:
0434 for w, mu, var in cmps:
0435 ax.plot(
0436 x, w * scipy.stats.norm(loc=mu, scale=np.sqrt(var)).pdf(x), lw=1
0437 )
0438
0439 ax.plot(x, y, lw=3, color="black")
0440
0441 ax.set_title(title)
0442
0443 fig.tight_layout()
0444 return fig, ax