File indexing completed on 2026-04-10 08:39:06
0001
0002
0003 import sys
0004 import xml.dom.minidom
0005 from urllib.parse import quote
0006
0007
0008 class dom_job:
0009 """infiles[inds]=[file1,file2...]
0010 outfiles = [file1,file2...]
0011 command - script that will be executed on the grid
0012 prepend - list of (option,value) prepended to output file name
0013 forward - list of (option,value) forwarded to the grid job
0014 """
0015
0016 def __init__(s, domjob=None, primaryds=None, defaultcmd=None, defaultout=[]):
0017 """Loads <job></job> from xml file.
0018 If primaryds is set, makes sure it is present in job spec"""
0019 s.infiles = {}
0020 s.outfiles = []
0021 s.command = defaultcmd
0022 s.prepend = []
0023 s.forward = []
0024 if not domjob:
0025 return
0026
0027 if len(domjob.getElementsByTagName("command")) > 0:
0028 s.command = dom_parser.text(domjob.getElementsByTagName("command")[0])
0029
0030 for inds in domjob.getElementsByTagName("inds"):
0031 name = dom_parser.text(inds.getElementsByTagName("name")[0])
0032 files = inds.getElementsByTagName("file")
0033 if len(files) == 0:
0034 continue
0035 s.infiles[name] = []
0036 for file in files:
0037 s.infiles[name].append(dom_parser.text(file))
0038 if primaryds and primaryds not in s.infiles.keys():
0039 print(f"ERROR: primaryds={primaryds} must be present in each job")
0040 sys.exit(0)
0041
0042 outfiles = set(defaultout)
0043 [outfiles.add(dom_parser.text(v)) for v in domjob.getElementsByTagName("output")]
0044 s.outfiles = list(outfiles)
0045
0046 for o in domjob.getElementsByTagName("option"):
0047 name = o.attributes["name"].value
0048 value = dom_parser.text(o)
0049 prepend = dom_parser.true(o.attributes["prepend"].value)
0050 forward = dom_parser.true(o.attributes["forward"].value)
0051 if prepend:
0052 s.prepend.append((name, value))
0053 if forward:
0054 s.forward.append((name, value))
0055
0056 def to_dom(s):
0057 """Converts this job to a dom tree branch"""
0058 x = xml.dom.minidom.Document()
0059 job = x.createElement("job")
0060 for inds in s.infiles.keys():
0061 job.appendChild(x.createElement("inds"))
0062 job.childNodes[-1].appendChild(x.createElement("name"))
0063 job.childNodes[-1].childNodes[-1].appendChild(x.createTextNode(inds))
0064 for file in s.infiles[inds]:
0065 job.childNodes[-1].appendChild(x.createElement("file"))
0066 job.childNodes[-1].childNodes[-1].appendChild(x.createTextNode(file))
0067 for outfile in s.outfiles:
0068 job.appendChild(x.createElement("output"))
0069 job.childNodes[-1].appendChild(x.createTextNode(outfile))
0070 if s.command:
0071 job.appendChild(x.createElement("command"))
0072 job.childNodes[-1].appendChild(x.createTextNode(s.command))
0073 for option in s.prepend + list(set(s.prepend + s.forward) - set(s.prepend)):
0074 job.appendChild(x.createElement("option"))
0075 job.childNodes[-1].setAttribute("name", str(option[0]))
0076 if option in s.forward:
0077 job.childNodes[-1].setAttribute("forward", "true")
0078 else:
0079 job.childNodes[-1].setAttribute("forward", "false")
0080 if option in s.prepend:
0081 job.childNodes[-1].setAttribute("prepend", "true")
0082 else:
0083 job.childNodes[-1].setAttribute("prepend", "false")
0084 job.childNodes[-1].appendChild(x.createTextNode(str(option[1])))
0085 return job
0086
0087 def files_in_DS(s, DS):
0088 """Returns a list of files used in a given job in a given dataset"""
0089 if DS in s.infiles:
0090 return s.infiles[DS]
0091 else:
0092 return []
0093
0094 def forward_opts(s):
0095 """passable string of forward options"""
0096 return " ".join([f"{v[0]}={v[1]}" for v in s.forward])
0097
0098 def prepend_string(s):
0099 """a tag string prepended to output files"""
0100 return "_".join([f"{v[0]}{v[1]}" for v in s.prepend])
0101
0102 def exec_string(s):
0103 """exec string for prun.
0104 If user requested to run script run.sh (via <command>run.sh</command>), it will return
0105 opt1=value1 opt2=value2 opt3=value3 run.sh
0106 This way, all options will be set inside run.sh
0107 """
0108 return f"{s.forward_opts()} {s.command}"
0109
0110 def exec_string_enc(s):
0111 """exec string for prun.
0112 If user requested to run script run.sh (via <command>run.sh</command>), it will return
0113 opt1=value1 opt2=value2 opt3=value3 run.sh
0114 This way, all options will be set inside run.sh
0115 """
0116 comStr = f"{s.forward_opts()} {s.command}"
0117 return quote(comStr)
0118
0119 def get_outmap_str(s, outMap):
0120 """return mapping of original and new filenames"""
0121 newMap = {}
0122 for oldLFN, fileSpec in outMap.items():
0123 newMap[oldLFN] = str(fileSpec.lfn)
0124 return str(newMap)
0125
0126 def outputs_list(s, prepend=False):
0127 """python list with finalized output file names"""
0128 if prepend and s.prepend_string():
0129 return [s.prepend_string() + "." + o for o in s.outfiles]
0130 else:
0131 return [o for o in s.outfiles]
0132
0133 def outputs(s, prepend=False):
0134 """Comma-separated list of output files accepted by prun"""
0135 return ",".join(s.outputs_list(prepend))
0136
0137
0138 class dom_parser:
0139 def __init__(s, fname=None, xmlStr=None):
0140 """creates a dom object out of a text file (if provided)"""
0141 s.fname = fname
0142 s.dom = None
0143 s.title = None
0144 s.tag = None
0145 s.command = None
0146 s.outds = None
0147 s.inds = {}
0148 s.global_outfiles = []
0149 s.jobs = []
0150 s.primaryds = None
0151 if fname:
0152 s.dom = xml.dom.minidom.parse(fname)
0153 s.parse()
0154 s.check()
0155 if xmlStr is not None:
0156 s.dom = xml.dom.minidom.parseString(xmlStr)
0157 s.parse()
0158 s.check()
0159
0160 @staticmethod
0161 def true(v):
0162 """define True"""
0163 return v in ("1", "true", "True", "TRUE", "yes", "Yes", "YES")
0164
0165 @staticmethod
0166 def text(pnode):
0167 """extracts the value stored in the node"""
0168 rc = []
0169 for node in pnode.childNodes:
0170 if node.nodeType == node.TEXT_NODE:
0171 rc.append(str(node.data).strip())
0172 return "".join(rc)
0173
0174 def parse(s):
0175 """loads submission configuration from an xml file"""
0176 try:
0177
0178 if len(s.dom.getElementsByTagName("title")) > 0:
0179 s.title = dom_parser.text(s.dom.getElementsByTagName("title")[0])
0180 else:
0181 s.title = "Default title"
0182 if len(s.dom.getElementsByTagName("tag")) > 0:
0183 s.tag = dom_parser.text(s.dom.getElementsByTagName("tag")[0])
0184 else:
0185 s.tag = "default_tag"
0186 s.command = None
0187 for elm in s.dom.getElementsByTagName("submission")[0].childNodes:
0188 if elm.nodeName != "command":
0189 continue
0190 s.command = dom_parser.text(elm)
0191 break
0192 s.global_outfiles = []
0193 for elm in s.dom.getElementsByTagName("submission")[0].childNodes:
0194 if elm.nodeName != "output":
0195 continue
0196 s.global_outfiles.append(dom_parser.text(elm))
0197 s.outds = dom_parser.text(s.dom.getElementsByTagName("outds")[0])
0198
0199 primarydss = []
0200 for elm in s.dom.getElementsByTagName("submission")[0].childNodes:
0201 if elm.nodeName != "inds":
0202 continue
0203 if "primary" in elm.attributes.keys() and dom_parser.true(elm.attributes["primary"].value):
0204 primary = True
0205 else:
0206 primary = False
0207 stream = dom_parser.text(elm.getElementsByTagName("stream")[0])
0208 name = dom_parser.text(elm.getElementsByTagName("name")[0])
0209 s.inds[name] = stream
0210 if primary:
0211 primarydss.append(name)
0212
0213 if len(primarydss) == 1:
0214 s.primaryds = primarydss[0]
0215 else:
0216 s.primaryds = None
0217 for job in s.dom.getElementsByTagName("job"):
0218 s.jobs.append(dom_job(job, primaryds=s.primaryds, defaultcmd=s.command, defaultout=s.global_outfiles))
0219 except Exception:
0220 print("ERROR: failed to parse" + " " + s.fname)
0221 raise
0222
0223 def to_dom(s):
0224 """Converts this submission to a dom tree branch"""
0225 x = xml.dom.minidom.Document()
0226 submission = x.createElement("submission")
0227 if s.title:
0228 submission.appendChild(x.createElement("title"))
0229 submission.childNodes[-1].appendChild(x.createTextNode(s.title))
0230 if s.tag:
0231 submission.appendChild(x.createElement("tag"))
0232 submission.childNodes[-1].appendChild(x.createTextNode(s.tag))
0233 for name, stream in s.inds.items():
0234 submission.appendChild(x.createElement("inds"))
0235 if name == s.primaryds:
0236 submission.childNodes[-1].setAttribute("primary", "true")
0237 else:
0238 submission.childNodes[-1].setAttribute("primary", "false")
0239 submission.childNodes[-1].appendChild(x.createElement("stream"))
0240 submission.childNodes[-1].childNodes[-1].appendChild(x.createTextNode(stream))
0241 submission.childNodes[-1].appendChild(x.createElement("name"))
0242 submission.childNodes[-1].childNodes[-1].appendChild(x.createTextNode(name))
0243 if s.command:
0244 submission.appendChild(x.createElement("command"))
0245 submission.childNodes[-1].appendChild(x.createTextNode(s.command))
0246 for outfile in s.global_outfiles:
0247 submission.appendChild(x.createElement("output"))
0248 submission.childNodes[-1].appendChild(x.createTextNode(outfile))
0249 submission.appendChild(x.createElement("outds"))
0250 submission.childNodes[-1].appendChild(x.createTextNode(s.outds))
0251 for job in s.jobs:
0252 submission.appendChild(job.to_dom())
0253 return submission
0254
0255 def check(s):
0256 """checks that all output files have unique qualifiers"""
0257 quals = []
0258 for j in s.jobs:
0259 quals += j.outputs_list(True)
0260 if len(list(set(quals))) != len(quals):
0261 print("ERROR: found non-unique output file names across the jobs")
0262 print("(you likely need to review xml options with prepend=true)")
0263 sys.exit(0)
0264
0265 def input_datasets(s):
0266 """returns a list of all used input datasets"""
0267 DSs = set()
0268 for j in s.jobs:
0269 for ds in j.infiles.keys():
0270 DSs.add(ds)
0271 return list(DSs)
0272
0273 def inDS(s):
0274 """chooses a dataset we'll call inDS; others will become secondaryDS"""
0275
0276 if s.primaryds:
0277 return s.primaryds
0278
0279 else:
0280 return s.input_datasets()[0]
0281
0282 def secondaryDSs(s):
0283 """returns all secondaryDSs. This excludes inDS, unless inDS is managed by prun"""
0284 return [d for d in s.input_datasets() if d != s.inDS()]
0285
0286 def writeInputToTxt(s):
0287 """Prepares prun option --writeInputToTxt
0288 comma-separated list of STREAM:STREAM.files.dat
0289 """
0290 out = []
0291 DSs = s.secondaryDSs()
0292 for i, DS in enumerate(DSs):
0293 if DS in s.inds:
0294 stream = s.inds[DS]
0295 else:
0296 stream = "IN%d" % (i + 1,)
0297 out.append(f"{stream}:{stream}.files.dat")
0298 out.append("IN:IN.files.dat")
0299 return ",".join(out)
0300
0301 def files_in_DS(s, DS, regex=False):
0302 """Returns a list of all files from a given dataset
0303 that will be used in at least one job in this submission
0304 If regex==True, the list is converted to a regex string
0305 """
0306 assert DS in s.input_datasets(), f"ERROR: dataset {DS} was not requested in the xml file"
0307 files = []
0308 for j in s.jobs:
0309 if DS in j.infiles.keys():
0310 files += j.infiles[DS]
0311 if regex:
0312 return "|".join(sorted(list(set(files))))
0313 else:
0314 return sorted(list(set(files)))
0315
0316 def nJobs(s):
0317 return len(s.jobs)
0318
0319 def dump(s, verbose=True):
0320 """prints a summary of this submission"""
0321
0322 def P(key, value=""):
0323 if value == "":
0324 print(key)
0325 else:
0326 print((key + ":").ljust(14) + " " + value)
0327
0328 P("XML FILE LOADED", s.fname)
0329 P("Title", s.title)
0330 P("Command", s.command)
0331 P("InDS", s.inDS())
0332 P("Output DS", s.outds)
0333 P("njobs", s.nJobs())
0334 if verbose:
0335 for i, job in enumerate(s.jobs):
0336 P("===============> JOB%d" % i)
0337 P("command", job.exec_string())
0338 P("outfiles", job.outputs())
0339 P("INPUTS:")
0340 j = 0
0341 for dsname, files in job.infiles.items():
0342 P(" Dataset%d" % j, dsname)
0343 for k, fname in enumerate(files):
0344 P(" File%d" % k, fname)
0345 j += 1
0346
0347
0348 if __name__ == "__main__":
0349 p = dom_parser("./job.xml")
0350 p.dump()