File indexing completed on 2026-04-09 07:48:47
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021 """
0022
0023 """
0024 import os, logging, numpy as np
0025 log = logging.getLogger(__name__)
0026
0027 import sys, codecs
0028 if sys.version_info.major > 2:
0029 u_ = lambda _:_
0030 b_ = lambda _:codecs.latin_1_encode(_)[0]
0031 d_ = lambda _:codecs.latin_1_decode(_)[0]
0032 else:
0033 u_ = lambda _:unicode(_, "utf-8")
0034 b_ = lambda _:_
0035 d_ = lambda _:_
0036 pass
0037 lfilter = lambda *args:list(filter(*args))
0038 lmap = lambda *args:list(map(*args))
0039
0040
0041
0042 class Ctx(dict):
0043 """
0044 Utility providing conversions between various ways of addressing comparison histograms
0045 """
0046 DET = "concentric"
0047 TAG = "1"
0048 BASE = "$TMP/CFH"
0049 SEQ0 = "TO AB"
0050 IREC = 0
0051
0052 def __init__(self, *args, **kwa):
0053 dict.__init__(self, *args, **kwa)
0054
0055 QWNS = os.environ.get("OPTICKS_MAIN_QWNS", "XYZTABCR")
0056 qwn = property(lambda self:self.get("qwn",self.QWNS))
0057 det = property(lambda self:self.get("det",self.DET))
0058 tag = property(lambda self:self.get("tag",self.TAG))
0059 irec = property(lambda self:int(self.get("irec",self.IREC)))
0060 srec = property(lambda self:self.srec_(self.irec))
0061 seq0 = property(lambda self:self.get("seq0",self.SEQ0))
0062 sqs = property(lambda self:self.seq0.split(" "))
0063 sseq0 = property(lambda self:"_".join(self.sqs))
0064 qctx = property(lambda self:os.path.join(self.sseq0,self.srec,self.qwn))
0065 pctx = property(lambda self:os.path.join(self.det,self.tag,self.sseq0,self.srec,self.qwn))
0066 reclab = property(lambda self:" ".join([ ("[%s]" if i == self.irec else "%s") % sq for i,sq in enumerate(self.sqs)]))
0067
0068 @classmethod
0069 def reclab_(cls, seq0, irec ):
0070 return " ".join([ ("[%s]" if i == irec else "%s") % sq for i,sq in enumerate(seq0.split())])
0071
0072 @classmethod
0073 def reclabs_(cls, seq0):
0074 """
0075 :param seq0: sequence label such as 'TO RE RE RE RE BT BT BT SC BR BR BR BR BR BR BR'
0076 :return reclabel list: with each irec highlighted sequentially
0077
0078 Example::
0079
0080 In [9]: rl[:5]
0081 Out[9]:
0082 ['[TO] BT BT BT BT SA',
0083 'TO [BT] BT BT BT SA',
0084 'TO BT [BT] BT BT SA',
0085 'TO BT BT [BT] BT SA',
0086 'TO BT BT BT [BT] SA']
0087
0088
0089 reclabs are strings of up to 16*3+2 = 50 chars
0090 """
0091 sqs = seq0.split()
0092 nsqs = len(sqs)
0093
0094 sqlab = np.zeros( (16), dtype="|S4" )
0095 sqlab[:nsqs] = sqs
0096
0097 rls = np.tile(sqlab, nsqs).reshape(nsqs,-1)
0098 for ir in range(nsqs):
0099 rls[ir,ir] = b_("[") + rls[ir,ir] + b_("]")
0100 pass
0101 rlabs = map(lambda ir:" ".join(lmap(d_, rls[ir][rls[ir] != b_("")])), range(nsqs))
0102 return rlabs
0103
0104
0105 def qsub(self):
0106 subs = []
0107 for q in list(self.QWNS):
0108 subs.append(Ctx(self, qwn=q))
0109 return subs
0110
0111 def _get_suptitle(self):
0112 if not "suptitle" in self:
0113 self["suptitle"] = " %s/%s %s " % (self.det, self.tag, self.reclab )
0114 return self["suptitle"]
0115
0116 def _set_suptitle(self, suptitle):
0117 self["suptitle"] = suptitle
0118 suptitle = property(_get_suptitle, _set_suptitle)
0119
0120
0121 @classmethod
0122 def srec_(cls, irec):
0123 """
0124 :param irec: decimal int
0125 :return srec: single char hexint
0126 """
0127 srec = "%x" % irec
0128 assert len(srec) == 1, (irec, srec, "expecting single char hexint string")
0129 return srec
0130
0131 @classmethod
0132 def base(cls):
0133 return os.path.expandvars(cls.BASE)
0134
0135 @classmethod
0136 def tagdir_(cls, ctx):
0137 return os.path.expandvars(os.path.join(cls.BASE,ctx["det"],ctx["tag"]))
0138
0139 @classmethod
0140 def irec_(cls, srec):
0141 """
0142 :param srec: one or more single char hexint
0143 :return irec: one or more ints
0144 """
0145 return [int(c,16) for c in list(srec)]
0146
0147 def _get_dir(self):
0148 qctx = self.qctx
0149 tagd = self.tagdir_(self)
0150 return os.path.join(tagd,qctx)
0151 dir = property(_get_dir)
0152
0153 def path(self, name):
0154 return os.path.join(self.dir, name)
0155
0156 @classmethod
0157 def debase_(cls, dir_):
0158 """
0159 :param dir_:
0160 :return pctx, adir: path string context, absolute dir
0161 """
0162 base = cls.base()
0163 if dir_.startswith(base):
0164 pctx = dir_[len(base)+1:]
0165 else:
0166 pctx = dir_
0167 pass
0168 adir = os.path.join(base, pctx)
0169 return pctx, adir
0170
0171 @classmethod
0172 def det_tag_seq0s_(cls, ctxs):
0173 """
0174 :param ctxs: flat list of ctx
0175 :return seq0s: unique list of seq0
0176 """
0177 dets = list(set(map(lambda ctx:ctx.get("det", None),ctxs)))
0178 tags = list(set(map(lambda ctx:ctx.get("tag", None),ctxs)))
0179 seq0s = list(set(map(lambda ctx:ctx.get("seq0", None),ctxs)))
0180
0181 assert len(dets) == 1, (dets, "multiple dets in ctx list not supported")
0182 assert len(tags) == 1, (tags, "multiple tags in ctx list not supported")
0183 assert len(seq0s) >= 1, (seq0s, "unexpected seq0 in ctx list ")
0184
0185 return dets[0], tags[0], seq0s
0186
0187 @classmethod
0188 def filter_ctx_(cls, ctxs, seq0):
0189 """
0190 :param ctxs: flat list of ctx dicts
0191 :param seq0: string
0192 :return fctx: filtered list ctx dicts
0193 """
0194 return filter(lambda ctx:ctx.get("seq0",None) == seq0, ctxs)
0195
0196 @classmethod
0197 def reclab_(cls, ctxs):
0198 """
0199 :param ctxs: flat list of ctx dicts
0200 :return reclab: label like 'TO BT BT BT BT [SC] SA'
0201 """
0202 rls = list(set(map(lambda ctx:ctx.reclab, ctxs)))
0203 n_rls = len(rls)
0204 if n_rls > 1:
0205 log.fatal("n_rls %d " % n_rls )
0206 for ictx, ctx in enumerate(ctxs):
0207 log.fatal("ictx %d ctx %s " % (ictx, repr(ctx)))
0208 pass
0209 pass
0210 assert n_rls == 1, rls
0211 return rls[0]
0212
0213 @classmethod
0214 def dir2ctx_(cls, dir_, **kwa):
0215 """
0216 :param dir_:
0217 :return list of ctx:
0218
0219 Expect absolute or relative directory paths such as::
0220
0221 dir_ = "/tmp/blyth/opticks/CFH/concentric/1/TO_BT_BT_BT_BT_SA/0/X"
0222 dir_ = "concentric/1/TO_BT_BT_BT_BT_SA/0/X"
0223
0224 Last element "X"
0225 quantity or quantities, with one or more of "XYZTABCR".
0226
0227 Penultimate element "0"
0228 irec index within the sequence, with one or more single chars from "0123456789abcdef".
0229 For example "0" points to the "TO" step for seq0 of "TO_BT_BT_BT_BT_SA".
0230
0231 """
0232 pctx, adir = cls.debase_(dir_)
0233 return cls.pctx2ctx_(pctx, **kwa)
0234
0235
0236 @classmethod
0237 def reclab2ctx_(cls, arg, **kwa):
0238 """
0239 Requires reclab of below form, with a single point highlighted.
0240 Applies reclab into Ctx conversion, eg::
0241
0242 "[TO] BT BT BT BT SA" -> Ctx(seq0="TO BT BT BT BT SA", irec=0)
0243 "TO BT BT BT BT DR BT BT BT BT BT BT BT BT [SA]" -> Ctx(seq0="TO BT BT BT BT DR BT BT BT BT BT BT BT BT SA", irec=14)
0244
0245 """
0246 arg_ = arg.strip()
0247 assert arg_.find(" ") > -1, (arg_, "expecting reclab argument with spaces ")
0248
0249 recs = filter(lambda _:_[1][0] == "[" and _[1][3] == "]", list(enumerate(arg_.split())))
0250 assert len(recs) == 1
0251 irec = int(recs[0][0])
0252
0253 elem = arg_.replace("[","").replace("]","").split()
0254 seq0 = " ".join(elem)
0255
0256 return Ctx(seq0=seq0, irec=irec, **kwa)
0257
0258
0259 @classmethod
0260 def pctx2ctx_(cls, pctx, **kwa):
0261 """
0262 :param pctx: path context
0263 :return list of ctx:
0264
0265 Full pctx has 5 elem::
0266
0267 pctx = "concentric/1/TO_BT_BT_BT_BT_SA/0/X"
0268
0269 Shorter qctx has 3 elem or 2 elem::
0270
0271 qctx = "TO_BT_BT_BT_BT_SA/0/X"
0272 qctx = "TO_BT_BT_BT_BT_SA/0"
0273
0274 Not using os.listdir for this as want to
0275 work more generally prior to persisting and with
0276 path contexts that do not directly correspond to file system paths.
0277 """
0278 ctxs = []
0279 e = pctx.split("/")
0280 ne = len(e)
0281 if ne == 5:
0282 ks = 2
0283 kr = 3
0284 kq = 4
0285 elif ne == 3:
0286 ks = 0
0287 kr = 1
0288 kq = 2
0289 elif ne == 2:
0290 ks = 0
0291 kr = 1
0292 kq = None
0293 else:
0294 log.warning("unexpected path context %s " % pctx )
0295 return []
0296
0297 if kq is None:
0298 qwns = cls.QWNS
0299 else:
0300 qwns = e[kq]
0301
0302 log.info("pctx2ctx_ ne %d qwns %s e[kr] %s " % (ne, qwns, e[kr]) )
0303
0304
0305 for r in e[kr]:
0306 ir = int(r,16)
0307 for q in qwns:
0308 ctx = dict(seq0=e[ks],irec=ir,qwn=q)
0309 if ne == 5:
0310 ctx.update({"det":e[0], "tag":e[1]})
0311 elif ne == 3 or ne == 2:
0312 ctx.update(kwa)
0313 else:
0314 pass
0315 pass
0316 ctxs.append(Ctx(ctx))
0317 pass
0318 pass
0319 pass
0320 return ctxs
0321
0322
0323
0324
0325 def test_reclab2ctx_():
0326 chks = {
0327 "[TO] BT BT BT BT SA":"TO_BT_BT_BT_BT_SA/0/%s" % Ctx.QWNS,
0328 "TO BT BT BT BT DR BT BT BT BT BT BT BT [BT] SA":"TO_BT_BT_BT_BT_DR_BT_BT_BT_BT_BT_BT_BT_BT_SA/d/%s" % Ctx.QWNS,
0329 "TO BT BT BT BT DR BT BT BT BT BT BT BT BT [SA]":"TO_BT_BT_BT_BT_DR_BT_BT_BT_BT_BT_BT_BT_BT_SA/e/%s" % Ctx.QWNS,
0330 }
0331
0332 for k,qctx_x in chks.items():
0333 ctx = Ctx.reclab2ctx_(k)
0334 qctx = ctx.qctx
0335 log.info(" %50s -> %50r -> %s " % (k, ctx, qctx ))
0336 assert qctx == qctx_x, ( qctx, qctx_x )
0337
0338 def test_reclab2ctx_2(ok):
0339 reclab = "[TO] AB"
0340 ctx = Ctx.reclab2ctx_(reclab, det=ok.det, tag=ok.tag)
0341 print(ctx)
0342
0343
0344
0345 def test_pctx2ctx_5():
0346
0347 chks = {
0348 "concentric/1/TO_BT_BT_BT_BT_SA/0/X":'TO_BT_BT_BT_BT_SA/0/X' ,
0349 }
0350
0351 for k,qctx_x in chks.items():
0352 ctxs = Ctx.pctx2ctx_(k)
0353 assert len(ctxs) == 1
0354 ctx = ctxs[0]
0355 qctx = ctx.qctx
0356 log.info( " %50s -> %r " % (k, qctx))
0357 assert qctx == qctx_x
0358
0359
0360 def test_pctx2ctx_2():
0361
0362 chks = {
0363 "TO_BT_BT_BT_BT_SA/0":None ,
0364 }
0365
0366 for k,qctx_x in chks.items():
0367 ctxs = Ctx.pctx2ctx_(k)
0368 assert len(ctxs) == 8
0369
0370 for ctx in ctxs:
0371 log.info( " %r -> %s " % (ctx, ctx.pctx))
0372
0373
0374 def test_reclabs_0():
0375 """
0376 This works in py2+py3 but its ugly, because py3 makes it
0377 awkward to flip back and forth between numpy bytes and strings.
0378
0379 Solution is to avoid the flipping : do everything in numpy bytes
0380 and then flip back to python just as the last step.
0381 """
0382
0383
0384 seq0 = "AA BB CC DD EE FF GG HH"
0385
0386 sqs = seq0.split()
0387 nsqs = len(sqs)
0388
0389 sqlab = np.zeros( (16), dtype="|S4" )
0390 sqlab[:nsqs] = sqs
0391
0392 rls = np.tile(sqlab, nsqs).reshape(nsqs,-1)
0393 for ir in range(nsqs):
0394 rls[ir,ir] = b_("[") + rls[ir,ir] + b_("]")
0395 pass
0396
0397 rlabs = lmap(lambda ir:" ".join(lmap(d_,lfilter(lambda _:_ != b_(""),rls[ir]))), range(nsqs))
0398
0399 for rlab in rlabs:
0400 print(rlab)
0401 pass
0402
0403
0404 def test_reclabs_1():
0405
0406 seq0 = "AA BB CC DD EE FF GG HH"
0407
0408 sqs = seq0.split()
0409 nsqs = len(sqs)
0410
0411 sqlab = np.zeros( (16), dtype="|S4" )
0412 sqlab[:nsqs] = sqs
0413
0414 rls = np.tile(sqlab, nsqs).reshape(nsqs,-1)
0415 for ir in range(nsqs):
0416 rls[ir,ir] = b_("[") + rls[ir,ir] + b_("]")
0417 pass
0418
0419 rlabs = []
0420 for ir in range(nsqs):
0421 a = rls[ir]
0422 l = a[a != b_("")]
0423 rlab = " ".join(lmap(d_, l))
0424 rlabs.append(rlab)
0425 pass
0426
0427 for rlab in rlabs:
0428 print(rlab)
0429 pass
0430
0431
0432 def test_reclabs_2():
0433 seq0 = "AA BB CC DD EE FF GG HH"
0434
0435 sqs = seq0.split()
0436 nsqs = len(sqs)
0437
0438 sqlab = np.zeros( (16), dtype="|S4" )
0439 sqlab[:nsqs] = sqs
0440
0441 rls = np.tile(sqlab, nsqs).reshape(nsqs,-1)
0442 for ir in range(nsqs):
0443 rls[ir,ir] = b_("[") + rls[ir,ir] + b_("]")
0444 pass
0445 rlabs = map(lambda ir:" ".join(lmap(d_, rls[ir][rls[ir] != b_("")])), range(nsqs))
0446
0447 for rlab in rlabs:
0448 print(rlab)
0449 pass
0450
0451
0452
0453 if __name__ == '__main__':
0454 from opticks.ana.main import opticks_main
0455 ok = opticks_main()
0456
0457
0458
0459
0460
0461
0462
0463
0464
0465
0466
0467
0468
0469