Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:48:47

0001 #!/usr/bin/env python
0002 #
0003 # Copyright (c) 2019 Opticks Team. All Rights Reserved.
0004 #
0005 # This file is part of Opticks
0006 # (see https://bitbucket.org/simoncblyth/opticks).
0007 #
0008 # Licensed under the Apache License, Version 2.0 (the "License"); 
0009 # you may not use this file except in compliance with the License.  
0010 # You may obtain a copy of the License at
0011 #
0012 #   http://www.apache.org/licenses/LICENSE-2.0
0013 #
0014 # Unless required by applicable law or agreed to in writing, software 
0015 # distributed under the License is distributed on an "AS IS" BASIS, 
0016 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
0017 # See the License for the specific language governing permissions and 
0018 # limitations under the License.
0019 #
0020 
0021 """
0022 
0023 """
0024 import os, logging, numpy as np
0025 log = logging.getLogger(__name__)
0026 
0027 import sys, codecs
0028 if sys.version_info.major > 2:
0029     u_ = lambda _:_                            # py3 strings are unicode already 
0030     b_ = lambda _:codecs.latin_1_encode(_)[0]  # from py3 unicode string to bytes
0031     d_ = lambda _:codecs.latin_1_decode(_)[0]  # from bytes to py3 unicode string
0032 else:
0033     u_ = lambda _:unicode(_, "utf-8")          # py2 strings are bytes
0034     b_ = lambda _:_
0035     d_ = lambda _:_
0036 pass
0037 lfilter = lambda *args:list(filter(*args))
0038 lmap = lambda *args:list(map(*args))
0039 
0040 
0041 
0042 class Ctx(dict):
0043     """
0044     Utility providing conversions between various ways of addressing comparison histograms
0045     """
0046     DET = "concentric"
0047     TAG = "1"
0048     BASE = "$TMP/CFH"
0049     SEQ0 = "TO AB"
0050     IREC = 0
0051 
0052     def __init__(self, *args, **kwa):
0053         dict.__init__(self, *args, **kwa) 
0054 
0055     QWNS = os.environ.get("OPTICKS_MAIN_QWNS", "XYZTABCR")
0056     qwn = property(lambda self:self.get("qwn",self.QWNS))
0057     det = property(lambda self:self.get("det",self.DET))
0058     tag = property(lambda self:self.get("tag",self.TAG))
0059     irec = property(lambda self:int(self.get("irec",self.IREC)))
0060     srec = property(lambda self:self.srec_(self.irec))
0061     seq0 = property(lambda self:self.get("seq0",self.SEQ0))   # seq0 is space delimited eg "TO AB"
0062     sqs = property(lambda self:self.seq0.split(" "))
0063     sseq0 = property(lambda self:"_".join(self.sqs))
0064     qctx = property(lambda self:os.path.join(self.sseq0,self.srec,self.qwn))
0065     pctx = property(lambda self:os.path.join(self.det,self.tag,self.sseq0,self.srec,self.qwn))
0066     reclab = property(lambda self:" ".join([ ("[%s]" if i == self.irec else "%s") % sq for i,sq in enumerate(self.sqs)]))
0067 
0068     @classmethod
0069     def reclab_(cls, seq0, irec ):
0070         return " ".join([ ("[%s]" if i == irec else "%s") % sq for i,sq in enumerate(seq0.split())])
0071 
0072     @classmethod
0073     def reclabs_(cls, seq0):
0074         """
0075         :param seq0: sequence label such as 'TO RE RE RE RE BT BT BT SC BR BR BR BR BR BR BR'
0076         :return reclabel list: with each irec highlighted sequentially 
0077 
0078         Example::
0079 
0080             In [9]: rl[:5]
0081             Out[9]: 
0082             ['[TO] BT BT BT BT SA',
0083              'TO [BT] BT BT BT SA',
0084              'TO BT [BT] BT BT SA',
0085              'TO BT BT [BT] BT SA',
0086              'TO BT BT BT [BT] SA']
0087 
0088 
0089         reclabs are strings of up to 16*3+2 = 50 chars
0090         """
0091         sqs = seq0.split()
0092         nsqs = len(sqs)
0093 
0094         sqlab = np.zeros( (16), dtype="|S4" )
0095         sqlab[:nsqs] = sqs 
0096 
0097         rls = np.tile(sqlab, nsqs).reshape(nsqs,-1)
0098         for ir in range(nsqs):
0099             rls[ir,ir] = b_("[") + rls[ir,ir] + b_("]")  
0100         pass
0101         rlabs = map(lambda ir:" ".join(lmap(d_, rls[ir][rls[ir] != b_("")])), range(nsqs))  
0102         return rlabs  
0103 
0104 
0105     def qsub(self):
0106         subs = []
0107         for q in list(self.QWNS):
0108             subs.append(Ctx(self, qwn=q))
0109         return subs
0110     
0111     def _get_suptitle(self):
0112         if not "suptitle" in self:
0113             self["suptitle"] = " %s/%s %s " % (self.det, self.tag, self.reclab )
0114         return self["suptitle"]
0115 
0116     def _set_suptitle(self, suptitle):
0117         self["suptitle"] = suptitle
0118     suptitle = property(_get_suptitle, _set_suptitle)
0119 
0120 
0121     @classmethod
0122     def srec_(cls, irec):
0123         """
0124         :param irec: decimal int
0125         :return srec: single char hexint 
0126         """
0127         srec = "%x" % irec  
0128         assert len(srec) == 1, (irec, srec, "expecting single char hexint string")
0129         return srec 
0130 
0131     @classmethod
0132     def base(cls):
0133         return os.path.expandvars(cls.BASE)
0134 
0135     @classmethod
0136     def tagdir_(cls, ctx):
0137         return os.path.expandvars(os.path.join(cls.BASE,ctx["det"],ctx["tag"]))
0138 
0139     @classmethod
0140     def irec_(cls, srec):
0141         """
0142         :param srec: one or more single char hexint
0143         :return irec: one or more ints 
0144         """
0145         return [int(c,16) for c in list(srec)]
0146 
0147     def _get_dir(self):
0148         qctx = self.qctx
0149         tagd = self.tagdir_(self)
0150         return os.path.join(tagd,qctx)
0151     dir = property(_get_dir)
0152 
0153     def path(self, name):
0154         return os.path.join(self.dir, name)
0155 
0156     @classmethod
0157     def debase_(cls, dir_):
0158         """
0159         :param dir_:
0160         :return pctx, adir: path string context, absolute dir
0161         """
0162         base = cls.base()
0163         if dir_.startswith(base):
0164             pctx = dir_[len(base)+1:] 
0165         else:
0166             pctx = dir_
0167         pass
0168         adir = os.path.join(base, pctx)
0169         return pctx, adir
0170 
0171     @classmethod
0172     def det_tag_seq0s_(cls, ctxs):
0173         """
0174         :param ctxs: flat list of ctx
0175         :return seq0s: unique list of seq0 
0176         """
0177         dets = list(set(map(lambda ctx:ctx.get("det", None),ctxs)))
0178         tags = list(set(map(lambda ctx:ctx.get("tag", None),ctxs)))
0179         seq0s = list(set(map(lambda ctx:ctx.get("seq0", None),ctxs)))
0180 
0181         assert len(dets) == 1, (dets, "multiple dets in ctx list not supported")
0182         assert len(tags) == 1, (tags, "multiple tags in ctx list not supported")
0183         assert len(seq0s) >= 1, (seq0s, "unexpected seq0 in ctx list ")
0184 
0185         return dets[0], tags[0], seq0s 
0186 
0187     @classmethod
0188     def filter_ctx_(cls, ctxs, seq0):
0189         """
0190         :param ctxs: flat list of ctx dicts
0191         :param seq0: string 
0192         :return fctx: filtered list ctx dicts 
0193         """
0194         return filter(lambda ctx:ctx.get("seq0",None) == seq0, ctxs)
0195 
0196     @classmethod
0197     def reclab_(cls, ctxs):
0198         """
0199         :param ctxs: flat list of ctx dicts
0200         :return reclab: label like 'TO BT BT BT BT [SC] SA'
0201         """
0202         rls = list(set(map(lambda ctx:ctx.reclab, ctxs)))
0203         n_rls = len(rls)
0204         if n_rls > 1:
0205             log.fatal("n_rls %d " % n_rls )
0206             for ictx, ctx in enumerate(ctxs):
0207                 log.fatal("ictx %d  ctx %s " % (ictx, repr(ctx)))
0208             pass
0209         pass
0210         assert n_rls == 1, rls
0211         return rls[0]
0212 
0213     @classmethod
0214     def dir2ctx_(cls, dir_, **kwa):
0215         """
0216         :param dir_:
0217         :return list of ctx:
0218 
0219         Expect absolute or relative directory paths such as::
0220 
0221             dir_ = "/tmp/blyth/opticks/CFH/concentric/1/TO_BT_BT_BT_BT_SA/0/X"
0222             dir_ = "concentric/1/TO_BT_BT_BT_BT_SA/0/X"
0223 
0224         Last element "X" 
0225            quantity or quantities, with one or more of "XYZTABCR".
0226 
0227         Penultimate element "0" 
0228            irec index within the sequence, with one or more single chars from "0123456789abcdef". 
0229            For example "0" points to the "TO" step for seq0 of "TO_BT_BT_BT_BT_SA".
0230 
0231         """
0232         pctx, adir = cls.debase_(dir_)
0233         return cls.pctx2ctx_(pctx, **kwa)
0234 
0235 
0236     @classmethod
0237     def reclab2ctx_(cls, arg, **kwa):
0238         """
0239         Requires reclab of below form, with a single point highlighted.
0240         Applies reclab into Ctx conversion, eg::
0241 
0242              "[TO] BT BT BT BT SA"                             ->  Ctx(seq0="TO BT BT BT BT SA", irec=0)
0243              "TO BT BT BT BT DR BT BT BT BT BT BT BT BT [SA]"  ->  Ctx(seq0="TO BT BT BT BT DR BT BT BT BT BT BT BT BT SA", irec=14)
0244 
0245         """
0246         arg_ = arg.strip() 
0247         assert arg_.find(" ") > -1, (arg_, "expecting reclab argument with spaces ")
0248 
0249         recs = filter(lambda _:_[1][0] == "[" and _[1][3] == "]", list(enumerate(arg_.split())))
0250         assert len(recs) == 1
0251         irec = int(recs[0][0])
0252 
0253         elem = arg_.replace("[","").replace("]","").split()
0254         seq0 = " ".join(elem)
0255 
0256         return Ctx(seq0=seq0, irec=irec, **kwa)
0257 
0258 
0259     @classmethod
0260     def pctx2ctx_(cls, pctx, **kwa):
0261         """
0262         :param pctx: path context 
0263         :return list of ctx:
0264 
0265         Full pctx has 5 elem::
0266 
0267             pctx = "concentric/1/TO_BT_BT_BT_BT_SA/0/X"
0268   
0269         Shorter qctx has 3 elem or 2 elem::
0270 
0271             qctx = "TO_BT_BT_BT_BT_SA/0/X"
0272             qctx = "TO_BT_BT_BT_BT_SA/0"
0273 
0274         Not using os.listdir for this as want to 
0275         work more generally prior to persisting and with 
0276         path contexts that do not directly correspond to file system paths.
0277         """
0278         ctxs = []
0279         e = pctx.split("/")
0280         ne = len(e)
0281         if ne == 5:
0282             ks = 2   # seq element 
0283             kr = 3   # seq hexstring slot element (?)
0284             kq = 4   # qwns element 
0285         elif ne == 3:
0286             ks = 0
0287             kr = 1
0288             kq = 2
0289         elif ne == 2:
0290             ks = 0
0291             kr = 1
0292             kq = None
0293         else:
0294             log.warning("unexpected path context %s " % pctx )
0295             return []
0296 
0297         if kq is None:
0298             qwns = cls.QWNS
0299         else:
0300             qwns = e[kq]
0301 
0302         log.info("pctx2ctx_ ne %d qwns %s e[kr] %s " % (ne, qwns, e[kr]) )
0303 
0304 
0305         for r in e[kr]:         # over the string  
0306             ir = int(r,16)      #  int("a",16) = 10 
0307             for q in qwns:
0308                 ctx = dict(seq0=e[ks],irec=ir,qwn=q)
0309                 if ne == 5:
0310                     ctx.update({"det":e[0], "tag":e[1]})
0311                 elif ne == 3 or ne == 2:
0312                     ctx.update(kwa)
0313                 else:
0314                     pass
0315                 pass
0316                 ctxs.append(Ctx(ctx))
0317                 pass
0318             pass
0319         pass
0320         return ctxs
0321 
0322 
0323 
0324 
0325 def test_reclab2ctx_():
0326      chks = {
0327                                              "[TO] BT BT BT BT SA":"TO_BT_BT_BT_BT_SA/0/%s" % Ctx.QWNS,
0328                   "TO BT BT BT BT DR BT BT BT BT BT BT BT [BT] SA":"TO_BT_BT_BT_BT_DR_BT_BT_BT_BT_BT_BT_BT_BT_SA/d/%s" % Ctx.QWNS,
0329                   "TO BT BT BT BT DR BT BT BT BT BT BT BT BT [SA]":"TO_BT_BT_BT_BT_DR_BT_BT_BT_BT_BT_BT_BT_BT_SA/e/%s" % Ctx.QWNS,
0330             }
0331 
0332      for k,qctx_x in chks.items():
0333          ctx = Ctx.reclab2ctx_(k)
0334          qctx = ctx.qctx
0335          log.info(" %50s -> %50r -> %s " % (k, ctx, qctx ))
0336          assert qctx == qctx_x, ( qctx, qctx_x ) 
0337 
0338 def test_reclab2ctx_2(ok):
0339      reclab = "[TO] AB"
0340      ctx = Ctx.reclab2ctx_(reclab, det=ok.det, tag=ok.tag)
0341      print(ctx)
0342  
0343 
0344 
0345 def test_pctx2ctx_5():
0346 
0347      chks = { 
0348               "concentric/1/TO_BT_BT_BT_BT_SA/0/X":'TO_BT_BT_BT_BT_SA/0/X' ,
0349             }
0350 
0351      for k,qctx_x in chks.items():
0352          ctxs = Ctx.pctx2ctx_(k)
0353          assert len(ctxs) == 1
0354          ctx = ctxs[0]
0355          qctx = ctx.qctx     
0356          log.info( " %50s -> %r " % (k, qctx))
0357          assert qctx == qctx_x 
0358 
0359 
0360 def test_pctx2ctx_2():
0361 
0362      chks = { 
0363               "TO_BT_BT_BT_BT_SA/0":None ,
0364             }
0365 
0366      for k,qctx_x in chks.items():
0367          ctxs = Ctx.pctx2ctx_(k)
0368          assert len(ctxs) == 8
0369 
0370          for ctx in ctxs:
0371              log.info( " %r -> %s " % (ctx, ctx.pctx))
0372 
0373 
0374 def test_reclabs_0():
0375     """
0376     This works in py2+py3 but its ugly, because py3 makes it 
0377     awkward to flip back and forth between numpy bytes and strings. 
0378 
0379     Solution is to avoid the flipping : do everything in numpy bytes 
0380     and then flip back to python just as the last step.
0381     """
0382 
0383     #seq0 = "AA BB CC DD EE FF GG HH II JJ KK LL MM NN OO PP"
0384     seq0 = "AA BB CC DD EE FF GG HH"
0385 
0386     sqs = seq0.split()
0387     nsqs = len(sqs)
0388 
0389     sqlab = np.zeros( (16), dtype="|S4" )
0390     sqlab[:nsqs] = sqs 
0391 
0392     rls = np.tile(sqlab, nsqs).reshape(nsqs,-1)
0393     for ir in range(nsqs):
0394         rls[ir,ir] = b_("[") + rls[ir,ir] + b_("]")  
0395     pass
0396 
0397     rlabs = lmap(lambda ir:" ".join(lmap(d_,lfilter(lambda _:_ != b_(""),rls[ir]))), range(nsqs))
0398 
0399     for rlab in rlabs:
0400         print(rlab)
0401     pass
0402 
0403 
0404 def test_reclabs_1():
0405 
0406     seq0 = "AA BB CC DD EE FF GG HH"
0407 
0408     sqs = seq0.split()
0409     nsqs = len(sqs)
0410 
0411     sqlab = np.zeros( (16), dtype="|S4" )
0412     sqlab[:nsqs] = sqs 
0413 
0414     rls = np.tile(sqlab, nsqs).reshape(nsqs,-1)
0415     for ir in range(nsqs):
0416         rls[ir,ir] = b_("[") + rls[ir,ir] + b_("]")  
0417     pass
0418 
0419     rlabs = []
0420     for ir in range(nsqs):
0421         a = rls[ir]
0422         l = a[a != b_("")] 
0423         rlab = " ".join(lmap(d_, l))  
0424         rlabs.append(rlab)
0425     pass
0426 
0427     for rlab in rlabs:
0428         print(rlab)
0429     pass
0430 
0431 
0432 def test_reclabs_2():
0433     seq0 = "AA BB CC DD EE FF GG HH"
0434 
0435     sqs = seq0.split()
0436     nsqs = len(sqs)
0437 
0438     sqlab = np.zeros( (16), dtype="|S4" )
0439     sqlab[:nsqs] = sqs 
0440 
0441     rls = np.tile(sqlab, nsqs).reshape(nsqs,-1)
0442     for ir in range(nsqs):
0443         rls[ir,ir] = b_("[") + rls[ir,ir] + b_("]")  
0444     pass
0445     rlabs = map(lambda ir:" ".join(lmap(d_, rls[ir][rls[ir] != b_("")])), range(nsqs))  
0446 
0447     for rlab in rlabs:
0448         print(rlab)
0449     pass
0450 
0451 
0452     
0453 if __name__ == '__main__':
0454     from opticks.ana.main import opticks_main
0455     ok = opticks_main()
0456 
0457     #test_reclab2ctx_()
0458     #test_reclab2ctx_2(ok)
0459     #test_pctx2ctx_5()
0460     #test_pctx2ctx_2()
0461 
0462 
0463     #test_reclabs_0() 
0464     #test_reclabs_1() 
0465 
0466 
0467 
0468 
0469