File indexing completed on 2026-04-09 07:49:43
0001 """
0002 opticks/sysrap/sphotonlite.py
0003 ==============================
0004
0005 ::
0006
0007 f = Fold.Load(symbol="f")
0008 from opticks.sysrap.sphotonlite import SPhotonLite
0009 p = SPhotonLite.view(f.photonlite) # photonlite shape (N,4) uint32
0010
0011 """
0012 from __future__ import annotations
0013 import logging
0014 log = logging.getLogger(__name__)
0015
0016 import numpy as np
0017 from opticks.ana.fold import append_fields
0018
0019 EFFICIENCY_COLLECT = 0x1 << 13
0020
0021
0022 class SPhotonLite:
0023 """
0024 Structured view of the 16-byte ``sphotonlite`` record.
0025
0026 The binary layout (little-endian, 16 B total) is:
0027
0028 0-1 : identity (u2)
0029 2-3 : hitcount (u2)
0030 4-7 : time (f4)
0031 8-9 : lposfphi (u2) → normalised to lposfphi_f (f4)
0032 10-11 : lposcost (u2) → normalised to lposcost_f (f4)
0033 12-15 : flagmask (u4)
0034
0035 All public API is provided as **class-methods** – no instance needed.
0036 """
0037
0038
0039
0040
0041 DTYPE = np.dtype(
0042 [
0043 ("identity", "<u2"),
0044 ("hitcount", "<u2"),
0045 ("time", "<f4"),
0046 ("lposfphi", "<u2"),
0047 ("lposcost", "<u2"),
0048 ("flagmask", "<u4"),
0049 ],
0050 align=True,
0051 )
0052 assert DTYPE.itemsize == 16, f"expected 16 B, got {DTYPE.itemsize}"
0053
0054
0055
0056
0057 @classmethod
0058 def unpack_uint16_to_float(cls, u16: np.ndarray) -> np.ndarray:
0059 """0xffff → 1.0, 0 → 0.0 (exact, no rounding error)"""
0060 return u16.astype(np.float32) / 0xFFFF
0061
0062
0063
0064
0065 @classmethod
0066 def view_(cls, buf: np.ndarray) -> np.ndarray:
0067 """
0068 Convert a ``uint32`` buffer of shape ``(..., 4)`` into a flat
0069 structured array with the extra float fields ``lposfphi_f``
0070 and ``lposcost_f``.
0071
0072 Parameters
0073 ----------
0074 buf : np.ndarray
0075 Must be ``dtype=uint32`` and ``buf.shape[-1] == 4``.
0076
0077 Returns
0078 -------
0079 np.ndarray
0080 Flat (1-D) structured array.
0081 """
0082 if buf.dtype != np.uint32:
0083 raise TypeError(f"buf must be uint32, got {buf.dtype}")
0084 if buf.shape[-1] != 4:
0085 raise ValueError(f"last dimension must be 4, got {buf.shape[-1]}")
0086
0087 rec = buf.view(cls.DTYPE).reshape(-1)
0088
0089
0090 fphi = cls.unpack_uint16_to_float(rec["lposfphi"])
0091 cost = cls.unpack_uint16_to_float(rec["lposcost"])
0092
0093 phi = ( fphi * 2.0 - 1. ) * np.pi
0094 cos = np.arccos(cost)
0095
0096 rec = append_fields(
0097 rec
0098 ,
0099 [
0100 "lposfphi_f",
0101 "lposcost_f",
0102 "phi",
0103 "cost",
0104 "cos"
0105 ]
0106 ,
0107 [
0108 fphi,
0109 cost,
0110 phi,
0111 cost,
0112 cos
0113 ]
0114 ,
0115 [
0116 np.float32,
0117 np.float32,
0118 np.float32,
0119 np.float32,
0120 np.float32,
0121 ]
0122 ,
0123 usemask=False,
0124 )
0125 return rec
0126
0127 @classmethod
0128 def view(cls, buf: np.ndarray) -> np.recarray:
0129 rec = cls.view_(buf)
0130 return rec.view(np.recarray)
0131
0132
0133
0134
0135
0136
0137 @classmethod
0138 def pprint(cls, rec: np.ndarray, n: int = 5) -> None:
0139 """Print the first *n* records in a readable table."""
0140 print(rec[:n])
0141
0142
0143
0144
0145 @classmethod
0146 def empty(cls, size: int) -> np.ndarray:
0147 """Return a zero-filled structured array of length *size*."""
0148 return np.zeros(size, dtype=cls.DTYPE)
0149
0150
0151
0152
0153
0154
0155 class SPhotonLite_Merge:
0156 """
0157 Select hits and merge them using a NumPy implementation
0158 that follows the thrust::reduce_by_key based approach
0159 of sysrap/SPM.cu SPM::merge_partial_select
0160
0161 For a small number of photons (eg M1)
0162 this precisely duplicates in python the hit selection and merging
0163 done with the CUDA impl
0164
0165 """
0166
0167 def __init__(self, tw=1.0, select_mask = EFFICIENCY_COLLECT ):
0168 self.tw = tw
0169 self.select_mask = select_mask
0170
0171 def __call__(self, _pl):
0172 """
0173 :param _pl: raw photonlite input array
0174 :return hlm: raw hitlitemerged array
0175
0176 Q: If this is applied twice or more, does the hlm array stay the same ?
0177 A: It should do. YES IT DOES STAY THE SAME
0178 """
0179 log.info("[ photonlite.shape %s " % (repr(_pl.shape),) )
0180
0181 tw = self.tw
0182 select_mask = self.select_mask
0183
0184
0185 flagmask = _pl[:,3]
0186
0187 select = (flagmask & select_mask) != 0
0188 if not np.any(select): return np.zeros(0, dtype=_pl.dtype )
0189
0190 _hl = _pl[select]
0191
0192
0193
0194 identity = _hl[:,0] & 0xFFFF
0195 time = _hl[:,1].view(np.float32)
0196 bucket = np.floor(time / tw).astype(np.uint64)
0197 key = (identity.astype(np.uint64) << 48) | bucket
0198
0199
0200
0201
0202
0203 log.info("-argsort[")
0204 sort_idx = np.argsort(key, kind='stable')
0205 log.info("-argsort]")
0206 key_sorted = key[sort_idx]
0207 _hl_sorted = _hl[sort_idx]
0208
0209
0210
0211 log.info("-diff[")
0212 key_diff = np.diff(key_sorted, prepend=key_sorted[0]-1)
0213 log.info("-diff]")
0214 group_start = np.where(key_diff != 0)[0]
0215
0216
0217 n_groups = len(group_start)
0218
0219
0220 hlm = np.zeros( (n_groups,4), dtype=_pl.dtype )
0221
0222
0223 first_in_group = group_start
0224
0225 hlm[:] = _hl_sorted[first_in_group]
0226
0227
0228
0229 log.info("-reducing n_groups %d [" % (n_groups,) )
0230 for i in range(n_groups):
0231 start = group_start[i]
0232 end = group_start[i+1] if i+1 < n_groups else len(key)
0233
0234 if end - start == 1:
0235 continue
0236
0237 group_slice = slice(start, end)
0238
0239
0240 hlm[i,1] = np.min(_hl_sorted[group_slice,1].view(np.float32)).view(np.uint32)
0241
0242
0243 hlm[i,3] |= np.bitwise_or.reduce(_hl_sorted[group_slice,3])
0244
0245 all_identity = _hl_sorted[group_slice,0] & 0xffff
0246
0247
0248 sum_hitcount = np.sum(_hl_sorted[group_slice,0] >> 16, dtype=np.uint32)
0249 hlm[i,0] = sum_hitcount << 16 | all_identity[0]
0250 pass
0251 log.info("-reducing n_groups %d ]" % (n_groups,) )
0252 log.info("] hlm.shape %s " % ( repr(hlm.shape), ))
0253 return hlm
0254 pass
0255 pass
0256
0257
0258