File indexing completed on 2026-04-25 08:29:08
0001 from dataclasses import dataclass, make_dataclass, field, asdict
0002 from typing import Optional, ClassVar, List, Any
0003 import math
0004 import pprint
0005
0006 from simpleLogger import ERROR, WARN, CHATTY, INFO, DEBUG
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016 glob_arguments_tmpl ="{buildarg} {dataset} {intriplet} {indsttype_str} {run} {seg} {daqhost} {inputs} "
0017 glob_arguments_tmpl+="{nevents} {outdir} {histdir} {outbase} {neventsper} {dbtag} "
0018 glob_arguments_tmpl+="{logbase} {logdir} {payload} "
0019 CondorJobConfig_fields = [
0020
0021 ('universe', str, field(default="vanilla")),
0022 ('getenv', str, field(default="False")),
0023 ('environment', str, field(default=None)),
0024 ('executable', str, field(default="/bin/echo")),
0025 ('comment', str, field(default=None)),
0026 ('user_job_wrapper', str, field(default=None)),
0027 ('batch_name', Optional[str], field(default=None)),
0028
0029
0030 ('request_cpus', str, field(default="1")),
0031 ('request_xferslots', str, field(default=None)),
0032 ('request_memory', str, field(default="4000MB")),
0033 ('request_disk', str, field(default="10GB")),
0034 ('priority', str, field(default="3500")),
0035 ('job_lease_duration', str, field(default="3600")),
0036 ('max_retries', str, field(default=None)),
0037
0038
0039 ('requirements', str, field(default=None)),
0040
0041 ('periodichold', str, field(default="(NumJobStarts>=1 && JobStatus == 1 && !(ON_EVICT_CHECK_RequestMemory_REQUIREMENTS))")),
0042 ('periodicremove', str, field(default=None)),
0043 ('on_exit_hold', str, field(default=None)),
0044 ('on_exit_remove', str, field(default=None)),
0045 ('concurrency_limits', str, field(default=None)),
0046 ('notification', str, field(default=None)),
0047 ('notify_user', str, field(default=None)),
0048
0049
0050 ('accounting_group', str, field(default=None)),
0051 ('accounting_group_user', str, field(default=None)),
0052 ('initialdir', str, field(default=None)),
0053 ('transfer_output_remaps', str, field(default=None)),
0054 ('transferout', str, field(default="false")),
0055 ('transfererr', str, field(default="false")),
0056 ('transfer_input_files', str, field(default=None)),
0057
0058
0059 ('neventsper', int, field(default=None)),
0060 ('filesystem', dict, field(default=None)),
0061 ('arguments_tmpl', str, field(default=None)),
0062 ('log_tmpl', str, field(default=None)),
0063 ('rungroup_tmpl', str, field(default="run_{a:08d}_{b:08d}")),
0064 ('max_jobs', int, field(default=0)),
0065 ('max_queued_jobs', int, field(default=0)),
0066 ]
0067 CondorJobConfig_fieldnames= { f[0] for f in CondorJobConfig_fields }
0068
0069
0070 def condor_dict(self):
0071 """
0072 Returns a dictionary representation suitable for base HTCondor job configuration,
0073 excluding None values and template/internal fields.
0074 - Passed as namespace dictionary to make_dataclass
0075 - A dictionary keeps desired order (for readability) intact
0076 """
0077 all_fields = asdict(self)
0078 ignore_fields = { 'neventsper','filesystem','arguments_tmpl','log_tmpl','rungroup_tmpl', 'max_jobs', 'max_queued_jobs' }
0079
0080 return {key: value for key, value in all_fields.items() if key not in ignore_fields and value is not None}
0081
0082 CondorJobConfig = make_dataclass(
0083 'CondorJobConfig',
0084 CondorJobConfig_fields,
0085 namespace={'condor_dict': condor_dict}
0086 )
0087
0088
0089 @dataclass( frozen = True )
0090 class CondorJob:
0091 """ This class is used for individual condor jobs.
0092 Configured via JobConfig and RuleConfig.
0093 Individual jobs are created with
0094 - an output file
0095 - a list of input files
0096 - a list of arguments, customized for each job
0097 Key logic is to create dicts for htcondor and then either dump them to files or submit them directly
0098 Goal: Create chunks or batches of jobs that can be submitted in one go
0099 Idea: This package fills directories with job files, condor_submit is run as a separate daemon
0100 """
0101
0102
0103 job_config: ClassVar[Any]
0104
0105
0106 arguments: str
0107 outdir: str
0108 finaldir: str
0109 histdir: str
0110 output: str
0111 error: str
0112 log: str
0113 output_file: str
0114 inputs: List[Any]
0115 outbase: str
0116 logbase: str
0117 run: int
0118 seg: int
0119 daqhost: str
0120
0121
0122 @classmethod
0123 def make_job(cls,
0124 output_file: str,
0125 run: int,
0126 seg: int,
0127 daqhost: str,
0128 inputs: List[str],
0129 leafdir: str,
0130 outbase: str,
0131 logbase: str,
0132 ) -> 'CondorJob':
0133 """
0134 Constructs a CondorJob instance.
0135 """
0136
0137 rungroup=cls.job_config.rungroup_tmpl.format(a=100*math.floor(run/100), b=100*math.ceil((run+1)/100))
0138 arguments = cls.job_config.arguments_tmpl.format(
0139 outbase=outbase,
0140 logbase=logbase,
0141 run=run,
0142 seg=seg,
0143 daqhost=daqhost,
0144 leafdir=leafdir,
0145 rungroup=rungroup,
0146 neventsper=cls.job_config.neventsper,
0147 inputs=",".join(inputs),
0148 )
0149 outdir = cls.job_config.filesystem['outdir'] .format(rungroup=rungroup, leafdir=leafdir)
0150 finaldir = cls.job_config.filesystem['finaldir'].format(rungroup=rungroup, leafdir=leafdir)
0151 logdir = cls.job_config.filesystem['logdir'] .format(rungroup=rungroup, leafdir=leafdir)
0152 histdir = cls.job_config.filesystem['histdir'] .format(rungroup=rungroup, leafdir=leafdir)
0153 log = cls.job_config.log_tmpl.format(rungroup=rungroup, leafdir=leafdir, logbase=logbase)
0154
0155 output = f'{logdir}/{logbase}.out'
0156 error = f'{logdir}/{logbase}.err'
0157
0158 return cls(
0159 arguments = arguments,
0160 outdir = outdir,
0161 finaldir = finaldir,
0162 histdir = histdir,
0163 log = log,
0164 output = output,
0165 error = error,
0166 outbase = outbase,
0167 logbase = logbase,
0168 output_file = output_file,
0169 inputs = inputs,
0170 run = run,
0171 seg = seg,
0172 daqhost = daqhost,
0173 )
0174
0175
0176 def dict(self):
0177 """
0178 Returns a dictionary representation suitable for htcondor.Submit,
0179 excluding None values.
0180 """
0181 data = {}
0182
0183 data.update({
0184 'arguments': self.arguments,
0185 'outdir': self.outdir,
0186 'finaldir': self.finaldir,
0187 'log': self.log,
0188 'output': self.output,
0189 'error': self.error,
0190 })
0191
0192
0193
0194 return {k: str(v) for k, v in data.items() if v is not None}
0195
0196
0197 def condor_row(self):
0198 """
0199 Returns a one line string suitable for queue a,b,... from jobrows.in
0200 FIXME: None values?
0201 """
0202 data = {}
0203
0204
0205
0206 data.update({
0207 'log': self.log,
0208 'output': self.output,
0209 'error': self.error,
0210 'arguments': self.arguments,
0211 })
0212
0213
0214
0215
0216 return ",".join([str(v) for v in data.values()])
0217
0218