Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:06

0001 #!/usr/bin/env python
0002 
0003 import sys
0004 import xml.dom.minidom
0005 from urllib.parse import quote
0006 
0007 
0008 class dom_job:
0009     """infiles[inds]=[file1,file2...]
0010     outfiles = [file1,file2...]
0011     command  - script that will be executed on the grid
0012     prepend  - list of (option,value) prepended to output file name
0013     forward  - list of (option,value) forwarded to the grid job
0014     """
0015 
0016     def __init__(s, domjob=None, primaryds=None, defaultcmd=None, defaultout=[]):
0017         """Loads <job></job> from xml file.
0018         If primaryds is set, makes sure it is present in job spec"""
0019         s.infiles = {}
0020         s.outfiles = []
0021         s.command = defaultcmd
0022         s.prepend = []
0023         s.forward = []
0024         if not domjob:
0025             return
0026         # script executed on the grid node for this job
0027         if len(domjob.getElementsByTagName("command")) > 0:
0028             s.command = dom_parser.text(domjob.getElementsByTagName("command")[0])
0029         # input files
0030         for inds in domjob.getElementsByTagName("inds"):
0031             name = dom_parser.text(inds.getElementsByTagName("name")[0])
0032             files = inds.getElementsByTagName("file")
0033             if len(files) == 0:
0034                 continue
0035             s.infiles[name] = []
0036             for file in files:
0037                 s.infiles[name].append(dom_parser.text(file))
0038         if primaryds and primaryds not in s.infiles.keys():
0039             print(f"ERROR: primaryds={primaryds} must be present in each job")
0040             sys.exit(0)
0041         # output files (also, drop duplicates within this job)
0042         outfiles = set(defaultout)
0043         [outfiles.add(dom_parser.text(v)) for v in domjob.getElementsByTagName("output")]
0044         s.outfiles = list(outfiles)
0045         # gearing options
0046         for o in domjob.getElementsByTagName("option"):
0047             name = o.attributes["name"].value
0048             value = dom_parser.text(o)
0049             prepend = dom_parser.true(o.attributes["prepend"].value)
0050             forward = dom_parser.true(o.attributes["forward"].value)
0051             if prepend:
0052                 s.prepend.append((name, value))
0053             if forward:
0054                 s.forward.append((name, value))
0055 
0056     def to_dom(s):
0057         """Converts this job to a dom tree branch"""
0058         x = xml.dom.minidom.Document()
0059         job = x.createElement("job")
0060         for inds in s.infiles.keys():
0061             job.appendChild(x.createElement("inds"))
0062             job.childNodes[-1].appendChild(x.createElement("name"))
0063             job.childNodes[-1].childNodes[-1].appendChild(x.createTextNode(inds))
0064             for file in s.infiles[inds]:
0065                 job.childNodes[-1].appendChild(x.createElement("file"))
0066                 job.childNodes[-1].childNodes[-1].appendChild(x.createTextNode(file))
0067         for outfile in s.outfiles:
0068             job.appendChild(x.createElement("output"))
0069             job.childNodes[-1].appendChild(x.createTextNode(outfile))
0070         if s.command:
0071             job.appendChild(x.createElement("command"))
0072             job.childNodes[-1].appendChild(x.createTextNode(s.command))
0073         for option in s.prepend + list(set(s.prepend + s.forward) - set(s.prepend)):
0074             job.appendChild(x.createElement("option"))
0075             job.childNodes[-1].setAttribute("name", str(option[0]))
0076             if option in s.forward:
0077                 job.childNodes[-1].setAttribute("forward", "true")
0078             else:
0079                 job.childNodes[-1].setAttribute("forward", "false")
0080             if option in s.prepend:
0081                 job.childNodes[-1].setAttribute("prepend", "true")
0082             else:
0083                 job.childNodes[-1].setAttribute("prepend", "false")
0084             job.childNodes[-1].appendChild(x.createTextNode(str(option[1])))
0085         return job
0086 
0087     def files_in_DS(s, DS):
0088         """Returns a list of files used in a given job in a given dataset"""
0089         if DS in s.infiles:
0090             return s.infiles[DS]
0091         else:
0092             return []
0093 
0094     def forward_opts(s):
0095         """passable string of forward options"""
0096         return " ".join([f"{v[0]}={v[1]}" for v in s.forward])
0097 
0098     def prepend_string(s):
0099         """a tag string prepended to output files"""
0100         return "_".join([f"{v[0]}{v[1]}" for v in s.prepend])
0101 
0102     def exec_string(s):
0103         """exec string for prun.
0104         If user requested to run script run.sh (via <command>run.sh</command>), it will return
0105         opt1=value1 opt2=value2 opt3=value3 run.sh
0106         This way, all options will be set inside run.sh
0107         """
0108         return f"{s.forward_opts()} {s.command}"
0109 
0110     def exec_string_enc(s):
0111         """exec string for prun.
0112         If user requested to run script run.sh (via <command>run.sh</command>), it will return
0113         opt1=value1 opt2=value2 opt3=value3 run.sh
0114         This way, all options will be set inside run.sh
0115         """
0116         comStr = f"{s.forward_opts()} {s.command}"
0117         return quote(comStr)
0118 
0119     def get_outmap_str(s, outMap):
0120         """return mapping of original and new filenames"""
0121         newMap = {}
0122         for oldLFN, fileSpec in outMap.items():
0123             newMap[oldLFN] = str(fileSpec.lfn)
0124         return str(newMap)
0125 
0126     def outputs_list(s, prepend=False):
0127         """python list with finalized output file names"""
0128         if prepend and s.prepend_string():
0129             return [s.prepend_string() + "." + o for o in s.outfiles]
0130         else:
0131             return [o for o in s.outfiles]
0132 
0133     def outputs(s, prepend=False):
0134         """Comma-separated list of output files accepted by prun"""
0135         return ",".join(s.outputs_list(prepend))
0136 
0137 
0138 class dom_parser:
0139     def __init__(s, fname=None, xmlStr=None):
0140         """creates a dom object out of a text file (if provided)"""
0141         s.fname = fname
0142         s.dom = None
0143         s.title = None
0144         s.tag = None
0145         s.command = None
0146         s.outds = None
0147         s.inds = {}
0148         s.global_outfiles = []
0149         s.jobs = []
0150         s.primaryds = None
0151         if fname:
0152             s.dom = xml.dom.minidom.parse(fname)
0153             s.parse()
0154             s.check()
0155         if xmlStr is not None:
0156             s.dom = xml.dom.minidom.parseString(xmlStr)
0157             s.parse()
0158             s.check()
0159 
0160     @staticmethod
0161     def true(v):
0162         """define True"""
0163         return v in ("1", "true", "True", "TRUE", "yes", "Yes", "YES")
0164 
0165     @staticmethod
0166     def text(pnode):
0167         """extracts the value stored in the node"""
0168         rc = []
0169         for node in pnode.childNodes:
0170             if node.nodeType == node.TEXT_NODE:
0171                 rc.append(str(node.data).strip())
0172         return "".join(rc)
0173 
0174     def parse(s):
0175         """loads submission configuration from an xml file"""
0176         try:
0177             # general settings
0178             if len(s.dom.getElementsByTagName("title")) > 0:
0179                 s.title = dom_parser.text(s.dom.getElementsByTagName("title")[0])
0180             else:
0181                 s.title = "Default title"
0182             if len(s.dom.getElementsByTagName("tag")) > 0:
0183                 s.tag = dom_parser.text(s.dom.getElementsByTagName("tag")[0])
0184             else:
0185                 s.tag = "default_tag"
0186             s.command = None  # can be overridden in subjobs
0187             for elm in s.dom.getElementsByTagName("submission")[0].childNodes:
0188                 if elm.nodeName != "command":
0189                     continue
0190                 s.command = dom_parser.text(elm)
0191                 break
0192             s.global_outfiles = []  # subjobs can append *additional* outputs
0193             for elm in s.dom.getElementsByTagName("submission")[0].childNodes:
0194                 if elm.nodeName != "output":
0195                     continue
0196                 s.global_outfiles.append(dom_parser.text(elm))
0197             s.outds = dom_parser.text(s.dom.getElementsByTagName("outds")[0])
0198             # declaration of all input datasets
0199             primarydss = []
0200             for elm in s.dom.getElementsByTagName("submission")[0].childNodes:
0201                 if elm.nodeName != "inds":
0202                     continue
0203                 if "primary" in elm.attributes.keys() and dom_parser.true(elm.attributes["primary"].value):
0204                     primary = True
0205                 else:
0206                     primary = False
0207                 stream = dom_parser.text(elm.getElementsByTagName("stream")[0])
0208                 name = dom_parser.text(elm.getElementsByTagName("name")[0])
0209                 s.inds[name] = stream
0210                 if primary:
0211                     primarydss.append(name)
0212             # see if one of the input datasets was explicitly labeled as inDS
0213             if len(primarydss) == 1:
0214                 s.primaryds = primarydss[0]
0215             else:
0216                 s.primaryds = None
0217             for job in s.dom.getElementsByTagName("job"):
0218                 s.jobs.append(dom_job(job, primaryds=s.primaryds, defaultcmd=s.command, defaultout=s.global_outfiles))
0219         except Exception:
0220             print("ERROR: failed to parse" + " " + s.fname)
0221             raise
0222 
0223     def to_dom(s):
0224         """Converts this submission to a dom tree branch"""
0225         x = xml.dom.minidom.Document()
0226         submission = x.createElement("submission")
0227         if s.title:
0228             submission.appendChild(x.createElement("title"))
0229             submission.childNodes[-1].appendChild(x.createTextNode(s.title))
0230         if s.tag:
0231             submission.appendChild(x.createElement("tag"))
0232             submission.childNodes[-1].appendChild(x.createTextNode(s.tag))
0233         for name, stream in s.inds.items():
0234             submission.appendChild(x.createElement("inds"))
0235             if name == s.primaryds:
0236                 submission.childNodes[-1].setAttribute("primary", "true")
0237             else:
0238                 submission.childNodes[-1].setAttribute("primary", "false")
0239             submission.childNodes[-1].appendChild(x.createElement("stream"))
0240             submission.childNodes[-1].childNodes[-1].appendChild(x.createTextNode(stream))
0241             submission.childNodes[-1].appendChild(x.createElement("name"))
0242             submission.childNodes[-1].childNodes[-1].appendChild(x.createTextNode(name))
0243         if s.command:
0244             submission.appendChild(x.createElement("command"))
0245             submission.childNodes[-1].appendChild(x.createTextNode(s.command))
0246         for outfile in s.global_outfiles:
0247             submission.appendChild(x.createElement("output"))
0248             submission.childNodes[-1].appendChild(x.createTextNode(outfile))
0249         submission.appendChild(x.createElement("outds"))
0250         submission.childNodes[-1].appendChild(x.createTextNode(s.outds))
0251         for job in s.jobs:
0252             submission.appendChild(job.to_dom())
0253         return submission
0254 
0255     def check(s):
0256         """checks that all output files have unique qualifiers"""
0257         quals = []
0258         for j in s.jobs:
0259             quals += j.outputs_list(True)
0260         if len(list(set(quals))) != len(quals):
0261             print("ERROR: found non-unique output file names across the jobs")
0262             print("(you likely need to review xml options with prepend=true)")
0263             sys.exit(0)
0264 
0265     def input_datasets(s):
0266         """returns a list of all used input datasets"""
0267         DSs = set()
0268         for j in s.jobs:
0269             for ds in j.infiles.keys():
0270                 DSs.add(ds)
0271         return list(DSs)
0272 
0273     def inDS(s):
0274         """chooses a dataset we'll call inDS; others will become secondaryDS"""
0275         # user manually labeled one of datasets as primary, so make it inDS:
0276         if s.primaryds:
0277             return s.primaryds
0278         # OR: choose inDS dataset randomly
0279         else:
0280             return s.input_datasets()[0]
0281 
0282     def secondaryDSs(s):
0283         """returns all secondaryDSs. This excludes inDS, unless inDS is managed by prun"""
0284         return [d for d in s.input_datasets() if d != s.inDS()]
0285 
0286     def writeInputToTxt(s):
0287         """Prepares prun option --writeInputToTxt
0288         comma-separated list of STREAM:STREAM.files.dat
0289         """
0290         out = []
0291         DSs = s.secondaryDSs()
0292         for i, DS in enumerate(DSs):
0293             if DS in s.inds:
0294                 stream = s.inds[DS]
0295             else:
0296                 stream = "IN%d" % (i + 1,)
0297             out.append(f"{stream}:{stream}.files.dat")
0298         out.append("IN:IN.files.dat")
0299         return ",".join(out)
0300 
0301     def files_in_DS(s, DS, regex=False):
0302         """Returns a list of all files from a given dataset
0303         that will be used in at least one job in this submission
0304         If regex==True, the list is converted to a regex string
0305         """
0306         assert DS in s.input_datasets(), f"ERROR: dataset {DS} was not requested in the xml file"
0307         files = []
0308         for j in s.jobs:
0309             if DS in j.infiles.keys():
0310                 files += j.infiles[DS]
0311         if regex:
0312             return "|".join(sorted(list(set(files))))
0313         else:
0314             return sorted(list(set(files)))
0315 
0316     def nJobs(s):
0317         return len(s.jobs)
0318 
0319     def dump(s, verbose=True):
0320         """prints a summary of this submission"""
0321 
0322         def P(key, value=""):
0323             if value == "":
0324                 print(key)
0325             else:
0326                 print((key + ":").ljust(14) + " " + value)
0327 
0328         P("XML FILE LOADED", s.fname)
0329         P("Title", s.title)
0330         P("Command", s.command)
0331         P("InDS", s.inDS())
0332         P("Output DS", s.outds)
0333         P("njobs", s.nJobs())
0334         if verbose:
0335             for i, job in enumerate(s.jobs):
0336                 P("===============> JOB%d" % i)
0337                 P("command", job.exec_string())
0338                 P("outfiles", job.outputs())
0339                 P("INPUTS:")
0340                 j = 0
0341                 for dsname, files in job.infiles.items():
0342                     P("  Dataset%d" % j, dsname)
0343                     for k, fname in enumerate(files):
0344                         P("     File%d" % k, fname)
0345                     j += 1
0346 
0347 
0348 if __name__ == "__main__":
0349     p = dom_parser("./job.xml")
0350     p.dump()