Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-25 08:29:08

0001 from dataclasses import dataclass, make_dataclass, field, asdict
0002 from typing import Optional, ClassVar, List, Any
0003 import math
0004 import pprint # noqa: F401
0005 
0006 from simpleLogger import ERROR, WARN, CHATTY, INFO, DEBUG  # noqa: F401
0007 
0008 # ============================================================================
0009 # CondorJobConfig. Members that are modified by individual jobs are tagged _tmpl.
0010 # Implemented via list and make_dataclass for simple and dynamic use of the available fields.
0011 # The list of (name, type, Field) tuples for make_dataclass
0012 #    echo "Usage: $0 <buildarg> <dataset> <intriplet> <indsttype> <run> <seg> <daqhost> <inputs>"
0013 #    echo "       <nevents> <outdir> <histdir> <outbase> <neventsper> <dbtag>"
0014 #    echo "       <logbase> <logdir> <condor_rsync> [dbid]"
0015 
0016 glob_arguments_tmpl ="{buildarg} {dataset} {intriplet} {indsttype_str} {run} {seg} {daqhost} {inputs} "
0017 glob_arguments_tmpl+="{nevents} {outdir} {histdir} {outbase} {neventsper} {dbtag} "
0018 glob_arguments_tmpl+="{logbase} {logdir} {payload} "
0019 CondorJobConfig_fields = [
0020     # --- Core Job Attributes ---
0021     ('universe',               str,            field(default="vanilla")),
0022     ('getenv',                 str,            field(default="False")),
0023     ('environment',            str,            field(default=None)),
0024     ('executable',             str,            field(default="/bin/echo")),
0025     ('comment',                str,            field(default=None)),
0026     ('user_job_wrapper',       str,            field(default=None)),
0027     ('batch_name',             Optional[str],  field(default=None)),
0028 
0029     # --- Resource Requests ---
0030     ('request_cpus',           str,            field(default="1")),
0031     ('request_xferslots',      str,            field(default=None)),
0032     ('request_memory',         str,            field(default="4000MB")),
0033     ('request_disk',           str,            field(default="10GB")),
0034     ('priority',               str,            field(default="3500")),
0035     ('job_lease_duration',     str,            field(default="3600")),
0036     ('max_retries',            str,            field(default=None)),
0037 
0038     # --- Job Lifecycle and Policy ---
0039     ('requirements',           str,            field(default=None)),
0040     # ('periodichold',           str,            field(default="(NumJobStarts>=1 && JobStatus == 1)")),
0041     ('periodichold',           str,            field(default="(NumJobStarts>=1 && JobStatus == 1 && !(ON_EVICT_CHECK_RequestMemory_REQUIREMENTS))")),
0042     ('periodicremove',         str,            field(default=None)),
0043     ('on_exit_hold',           str,            field(default=None)),
0044     ('on_exit_remove',         str,            field(default=None)),
0045     ('concurrency_limits',     str,            field(default=None)),
0046     ('notification',           str,            field(default=None)),
0047     ('notify_user',            str,            field(default=None)),
0048 
0049     # --- Accounting and File Transfer ---
0050     ('accounting_group',       str,            field(default=None)),
0051     ('accounting_group_user',  str,            field(default=None)),
0052     ('initialdir',             str,            field(default=None)),
0053     ('transfer_output_remaps', str,            field(default=None)),
0054     ('transferout',            str,            field(default="false")),
0055     ('transfererr',            str,            field(default="false")),
0056     ('transfer_input_files',   str,            field(default=None)),
0057 
0058     # --- Non-Condor Data Members for Steering ---
0059     ('neventsper',             int,            field(default=None)),
0060     ('filesystem',             dict,           field(default=None)),
0061     ('arguments_tmpl',         str,            field(default=None)),
0062     ('log_tmpl',               str,            field(default=None)),
0063     ('rungroup_tmpl',          str,            field(default="run_{a:08d}_{b:08d}")),
0064     ('max_jobs',               int,            field(default=0)),
0065     ('max_queued_jobs',        int,            field(default=0)),
0066 ]
0067 CondorJobConfig_fieldnames= { f[0] for f in CondorJobConfig_fields }
0068 
0069 # ----------------------------------------------------------------------------
0070 def condor_dict(self):
0071     """
0072     Returns a dictionary representation suitable for base HTCondor job configuration,
0073     excluding None values and template/internal fields.
0074     - Passed as  namespace dictionary to make_dataclass
0075     - A dictionary keeps desired order (for readability) intact
0076     """
0077     all_fields = asdict(self)
0078     ignore_fields = { 'neventsper','filesystem','arguments_tmpl','log_tmpl','rungroup_tmpl', 'max_jobs', 'max_queued_jobs' }
0079 
0080     return {key: value for key, value in all_fields.items() if key not in ignore_fields and value is not None}
0081 # ----------------------------------------------------------------------------
0082 CondorJobConfig = make_dataclass(
0083     'CondorJobConfig',
0084     CondorJobConfig_fields,
0085     namespace={'condor_dict': condor_dict}
0086 )
0087 
0088 # ============================================================================
0089 @dataclass( frozen = True )
0090 class CondorJob:
0091     """ This class is used for individual condor jobs.
0092     Configured via JobConfig and RuleConfig.
0093     Individual jobs are created with
0094     - an output file
0095     - a list of input files
0096     - a list of arguments, customized for each job
0097     Key logic is to create dicts for htcondor and then either dump them to files or submit them directly
0098     Goal: Create chunks or batches of jobs that can be submitted in one go
0099     Idea: This package fills directories with job files, condor_submit is run as a separate daemon
0100     """
0101 
0102     # --- Class Variable (Shared across all instances) ---
0103     job_config:            ClassVar[Any]  # CondorJobConfig created dynamically via make_dataclass
0104 
0105     # --- Instance Variables (Specific to each job) ---
0106     arguments:              str
0107     outdir:                 str  # where the DST files are written to
0108     finaldir:               str  # where the DST files are eventually moved to by a spider - currently unused, the spider should know
0109     histdir:                str  # where histograms go
0110     output:                 str  # Stdout file for condor
0111     error:                  str  # Stderr file for condor
0112     log:                    str  # Log file for condor
0113     output_file:            str           # Output file for the job --> not used directly except for bookkeeping
0114     inputs:                 List[Any]     # Can be list of input files for the job; usually holds a steering string or flag though.
0115     outbase:                str           # Base name for the output file
0116     logbase:                str           # Base name for the log file
0117     run:                    int
0118     seg:                    int
0119     daqhost:                str
0120 
0121     # ------------------------------------------------
0122     @classmethod
0123     def make_job(cls,
0124                 output_file: str,
0125                 run: int,
0126                 seg: int,
0127                 daqhost: str,
0128                 inputs: List[str],
0129                 leafdir: str,
0130                 outbase: str,
0131                 logbase: str,
0132                 ) -> 'CondorJob':
0133         """
0134         Constructs a CondorJob instance.
0135         """
0136         # Group blocks of 100 runnumbers together to control directory size
0137         rungroup=cls.job_config.rungroup_tmpl.format(a=100*math.floor(run/100), b=100*math.ceil((run+1)/100))
0138         arguments = cls.job_config.arguments_tmpl.format(
0139             outbase=outbase,
0140             logbase=logbase,
0141             run=run,
0142             seg=seg,
0143             daqhost=daqhost,
0144             leafdir=leafdir,
0145             rungroup=rungroup,
0146             neventsper=cls.job_config.neventsper,
0147             inputs=",".join(inputs),
0148         )
0149         outdir    = cls.job_config.filesystem['outdir'] .format(rungroup=rungroup, leafdir=leafdir)
0150         finaldir  = cls.job_config.filesystem['finaldir'].format(rungroup=rungroup, leafdir=leafdir)
0151         logdir    = cls.job_config.filesystem['logdir'] .format(rungroup=rungroup, leafdir=leafdir)
0152         histdir   = cls.job_config.filesystem['histdir'] .format(rungroup=rungroup, leafdir=leafdir)
0153         log       = cls.job_config.log_tmpl.format(rungroup=rungroup, leafdir=leafdir, logbase=logbase)
0154 
0155         output    = f'{logdir}/{logbase}.out'
0156         error     = f'{logdir}/{logbase}.err'
0157 
0158         return cls(
0159             arguments           = arguments,
0160             outdir              = outdir,
0161             finaldir            = finaldir,
0162             histdir             = histdir,
0163             log                 = log,
0164             output              = output,
0165             error               = error,
0166             outbase             = outbase,
0167             logbase             = logbase,
0168             output_file         = output_file,
0169             inputs              = inputs,
0170             run                 = run,
0171             seg                 = seg,
0172             daqhost             = daqhost,
0173         )
0174 
0175     # ------------------------------------------------
0176     def dict(self):
0177         """
0178         Returns a dictionary representation suitable for htcondor.Submit,
0179         excluding None values.
0180         """
0181         data = {}
0182         # Add instance-specific fields
0183         data.update({
0184             'arguments':    self.arguments,
0185             'outdir':       self.outdir,
0186             'finaldir':     self.finaldir,
0187             'log':          self.log,
0188             'output':       self.output,
0189             'error':        self.error,
0190         })
0191 
0192         # Filter out any potential None values if necessary, though current
0193         # fields seem mostly required or have defaults.
0194         return {k: str(v) for k, v in data.items() if v is not None}
0195 
0196         # ------------------------------------------------
0197     def condor_row(self):
0198         """
0199         Returns a one line string suitable for queue a,b,... from jobrows.in
0200         FIXME: None values?
0201         """
0202         data = {}
0203         # Add instance-specific fields
0204         # arguments _must_ come last because it can contain spaces and errors
0205         # and condor's multi-queue from file mechanism only accepts that as the last, catchall, input
0206         data.update({
0207             'log':                   self.log,
0208             'output':                self.output,
0209             'error':                 self.error,
0210             'arguments':             self.arguments,
0211         })
0212 
0213         # Filter out any potential None values if necessary, though current
0214         # fields seem mostly required or have defaults.
0215         # return ",".join([str(v) for v in data.values()])+"\n"
0216         return ",".join([str(v) for v in data.values()])
0217 
0218 # ============================================================================