Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-25 08:29:08

0001 #!/usr/bin/env python
0002 
0003 from datetime import datetime
0004 from pathlib import Path
0005 import yaml
0006 import cProfile
0007 import pstats
0008 import re
0009 import os
0010 import sys
0011 import itertools
0012 import random
0013 
0014 import pprint # noqa F401
0015 if os.uname().sysname!='Darwin' :
0016     import htcondor # type: ignore
0017 
0018 from argparsing import submission_args
0019 from sphenixmisc import setup_rot_handler, should_I_quit, shell_command, lock_file, unlock_file
0020 from simpleLogger import slogger, CHATTY, DEBUG, INFO, WARN, ERROR, CRITICAL  # noqa: F401
0021 from sphenixprodrules import RuleConfig
0022 from sphenixjobdicts import inputs_from_output
0023 from sphenixmatching import MatchConfig
0024 from eradicate_runs import eradicate_runs
0025 from sphenixcondorjobs import CondorJob
0026 from sphenixdbutils import test_mode as dbutils_test_mode
0027 import importlib.util # to resolve the path of sphenixdbutils without importing it as a whole
0028 from sphenixdbutils import cnxn_string_map, dbQuery
0029 from execute_condorsubmission import locate_submitfiles,execute_submission
0030 
0031 def get_queued_jobs(rule):
0032     """
0033     Determines the number of jobs currently in the condor queue for a given rule.
0034     """
0035     # Determine what's already in "idle"
0036     # Note: For this, we cannot use runnumber cuts, too difficult (and expensive) to get from condor.
0037     # Bit of a clunky method. But it works and doesn't get called all that often.
0038     cq_query  =  'condor_q'
0039     cq_query += f" -constraint \'JobBatchName==\"{rule.job_config.batch_name}\"' "  # Select our batch
0040     cq_query +=  ' -format "%d." ClusterId -format "%d\\n" ProcId'                  # any kind of one-line-per-job output. e.g. 6398.10
0041 
0042     # # Detailed method: requires three queries
0043     # run_procs = shell_command(cq_query + ' -run' )
0044     # idle_procs = shell_command(cq_query + ' -idle' )
0045     # held_procs = shell_command(cq_query + ' -held' )
0046     # if len(run_procs) > 0:
0047     #     INFO(f"We already have {len(run_procs)} jobs in the queue running.")
0048     # if len(idle_procs) > 0:
0049     #     INFO(f"We already have {len(idle_procs)} jobs in the queue waiting for execution.")
0050     # if len(held_procs) > 0:
0051     #     WARN(f"There are {len(held_procs)} held jobs that should be removed and/or resubmitted.")
0052     # currently_queued_jobs= len(run_procs) + len(idle_procs) + len(held_procs)
0053     
0054     all_procs = shell_command(cq_query)
0055     currently_queued_jobs=len(all_procs)
0056     return currently_queued_jobs
0057 
0058 # ============================================================================================
0059 
0060 def main():
0061     ### digest arguments
0062     args = submission_args()
0063     args.force = args.force_delete or args.force # -fd implies -f
0064 
0065     #################### Test mode?
0066     test_mode = (
0067             dbutils_test_mode
0068             or args.test_mode
0069             # or ( hasattr(rule, 'test_mode') and rule.test_mode ) ## allow in the yaml file?
0070         )
0071 
0072     #################### Set up submission logging before going any further
0073     # Set up submission logging before going any further
0074     sublogdir=setup_rot_handler(args)
0075     slogger.setLevel(args.loglevel)
0076 
0077     # Exit without fuss if we are already running
0078     if should_I_quit(args=args, myname=sys.argv[0]) and not args.force:
0079         DEBUG("Stop.")
0080         exit(0)
0081 
0082     lock_file_path = None
0083     try:
0084         if args.force:
0085             #### For --force, we could do the file and database deletion in RuleConfig.
0086             # Would be kinda nice because only then we'll know what's _really_ affected, and we could use the logic there.
0087             # Instead, ensure that the rule logic needs no special cases, set everything up here.
0088             WARN('Got "--force": Override existing output in files, datasets, and production_status DBs.')
0089             WARN('               Note that it\'s YOUR job to ensure there\'s no job in the queue or file in the DST lake which will overwrite this later!')
0090             if args.force_delete:
0091                 WARN('               Also got "--force-delete": Deleting existing files that are reproduced.')
0092             # answer = input("Do you want to continue? (yes/no): ")
0093             # if answer.lower() != "yes":
0094             #     print("Exiting. Smart.")
0095             #     exit(0)
0096             WARN("Here we go then.")
0097 
0098         INFO(f"Logging to {sublogdir}, level {args.loglevel}")
0099 
0100         if args.profile:
0101             DEBUG( "Profiling is ENABLED.")
0102             profiler = cProfile.Profile()
0103             profiler.enable()
0104 
0105         if test_mode:
0106             INFO("Running in testbed mode.")
0107             args.mangle_dirpath = 'production-testbed'
0108         else:
0109             INFO("Running in production mode.")
0110 
0111         #################### Rule has steering parameters and two subclasses for input and job specifics
0112         # Rule is instantiated via the yaml reader.
0113 
0114         # Files to copy to the worker - can be added to later by yaml and args
0115         payload_list=[]
0116 
0117         # Safely add module origins
0118         sphenixdbutils_spec = importlib.util.find_spec('sphenixdbutils')
0119         if sphenixdbutils_spec and sphenixdbutils_spec.origin:
0120             payload_list += [sphenixdbutils_spec.origin]
0121         else:
0122             ERROR("sphenixdbutils module not found.")
0123             exit(1)
0124 
0125         simplelogger_spec = importlib.util.find_spec('simpleLogger')
0126         if simplelogger_spec and simplelogger_spec.origin:
0127             payload_list += [simplelogger_spec.origin]
0128         else:
0129             ERROR("simpleLogger module not found.")
0130             exit(1)
0131 
0132         script_path = Path(__file__).parent.resolve()
0133         payload_list += [ f"{script_path}/stageout.sh" ]
0134         # payload_list += [ f"{script_path}/GetNumbers.C" ]
0135         payload_list += [ f"{script_path}/GetEntriesAndEventNr.C" ]
0136         payload_list += [ f"{script_path}/common_runscript_prep.sh" ]
0137         payload_list += [ f"{script_path}/create_filelist_run_daqhost.py" ]
0138         payload_list += [ f"{script_path}/create_filelist_run_seg.py" ]
0139         payload_list += [ f"{script_path}/create_full_filelist_run_seg.py" ]
0140 
0141         # .testbed: indicate test mode -- Search in the _submission_ directory
0142         if Path(".testbed").exists():
0143             payload_list += [str(Path('.testbed').resolve())]
0144 
0145         # from command line - the order means these can overwrite the default files from above
0146         if args.append2rsync:
0147             payload_list.insert(args.append2rsync)
0148         DEBUG(f"Addtional resources to be copied to the worker: {payload_list}")
0149 
0150         ### Parse command line arguments into a substitution dictionary
0151         # This dictionary is passed to the ctor to override/customize yaml file parameters
0152         param_overrides = {}
0153         param_overrides["script_path"]       = script_path
0154         param_overrides["payload_list"]      = payload_list
0155         param_overrides["runs"]              = args.runs
0156         param_overrides["runlist"]           = args.runlist
0157         param_overrides["nevents"]           = args.nevents
0158         param_overrides["combine_seg0_only"] = args.onlyseg0  # "None" if not explicitly given, to allow precedence of the yaml in that case
0159         param_overrides["choose20"]          = args.choose20  # default is False
0160         param_overrides["prodmode"]          = "production"
0161         # For testing, "production" (close to the root of all paths) in the default filesystem) can be replaced
0162         if args.mangle_dirpath:
0163             param_overrides["prodmode"] = args.mangle_dirpath
0164 
0165         # Rest of the input substitutions
0166         if args.physicsmode is not None:
0167             param_overrides["physicsmode"] = args.physicsmode # e.g. physics
0168 
0169         if args.mem:
0170             DEBUG(f"Setting memory to {args.mem}")
0171             param_overrides['request_memory']=args.mem
0172 
0173         if args.priority:
0174             DEBUG(f"Setting priority to {args.priority}")
0175             param_overrides['priority']=args.priority
0176 
0177         if args.maxjobs is not None:
0178             DEBUG(f"Setting maxjobs to {args.maxjobs}")
0179             param_overrides['max_jobs']=args.maxjobs
0180 
0181         if args.maxqueued is not None:
0182             DEBUG(f"Setting max_queued_jobs to {args.maxqueued}")
0183             param_overrides['max_queued_jobs']=args.maxqueued
0184      
0185         CHATTY(f"Rule substitutions: {param_overrides}")
0186         INFO("Now loading and building rule configuration.")
0187 
0188         #################### Load specific rule from the given yaml file.
0189         try:
0190             rule =  RuleConfig.from_yaml_file( yaml_file=args.config,
0191                                                rule_name=args.rulename,
0192                                                param_overrides=param_overrides )
0193             INFO(f"Successfully loaded rule configuration: {args.rulename}")
0194         except (ValueError, FileNotFoundError) as e:
0195             ERROR(f"Error: {e}")
0196             exit(1)
0197 
0198         CHATTY("Rule configuration:")
0199         CHATTY(yaml.dump(rule.dict))
0200 
0201         submitdir = Path(f'{args.submitdir}').resolve()
0202         if not args.dryrun:
0203             Path( submitdir).mkdir( parents=True, exist_ok=True )
0204 
0205         # TODO: submitdir is crude, this won't catch simultaneous submissions using different submission dirs
0206         lock_file_path = f"{submitdir}/{args.rulename}"
0207         if not lock_file(lock_file_path, args.dryrun):
0208             exit(0)
0209 
0210         max_queued_jobs=rule.job_config.max_queued_jobs
0211         currently_queued_jobs = get_queued_jobs(rule)
0212         if max_queued_jobs>0 and currently_queued_jobs >= max_queued_jobs:
0213             WARN(f"There are already {currently_queued_jobs} jobs in the queue, which meets or exceeds the maximum of {max_queued_jobs}.")
0214             WARN("Aborting submission.")
0215             exit(0)
0216 
0217         #################### Rule and its subfields for input and job details now have all the information needed for submitting jobs
0218         INFO("Rule construction complete. Now constructing corresponding match configuration.")
0219 
0220         # Create a match configuration from the rule
0221         match_config = MatchConfig.from_rule_config(rule)
0222         CHATTY("Match configuration:")
0223         CHATTY(yaml.dump(match_config.dict))
0224 
0225         # #################### With the matching rules constructed, first remove all traces of the given runs
0226         if args.force:
0227             eradicate_runs(match_config=match_config, dryrun=args.dryrun, delete_files=args.force_delete)
0228 
0229         # #################### Now proceed with submission
0230         # Determine chunk size for processing runs
0231         chunk_size = args.chunk_size if args.chunk_size > 0 else None
0232         
0233         # Get the full runlist from the rule (already filtered by RuleConfig)
0234         full_runlist = rule.runlist_int
0235         
0236         if chunk_size:
0237             INFO(f"Processing {len(full_runlist)} runs in chunks of {chunk_size}")
0238         else:
0239             INFO(f"Processing all {len(full_runlist)} runs at once (no chunking)")
0240         
0241         # Create chunks of runs to process
0242         if chunk_size:
0243             # Sort runs newest first for chunking
0244             sorted_runlist = sorted(full_runlist, reverse=True)
0245             run_chunks = [sorted_runlist[i:i + chunk_size] for i in range(0, len(sorted_runlist), chunk_size)]
0246         else:
0247             # Process all runs at once
0248             run_chunks = [full_runlist]
0249         
0250         INFO(f"Will process {len(run_chunks)} chunk(s) of runs")
0251 
0252         submitdir = Path(f'{args.submitdir}').resolve()
0253         if not args.dryrun:
0254             Path( submitdir).mkdir( parents=True, exist_ok=True )
0255         subbase = f'{rule.dsttype}_{rule.dataset}_{rule.outtriplet}'
0256         INFO(f'Submission files based on {subbase}')
0257 
0258         # Header for all submission files
0259         CondorJob.job_config = rule.job_config
0260         base_job = htcondor.Submit(CondorJob.job_config.condor_dict())
0261 
0262         # Track queued jobs across all chunks
0263         max_queued_jobs = rule.job_config.max_queued_jobs
0264         DEBUG(f"Maximum allowed queued jobs: {max_queued_jobs}")
0265         
0266         # Get initial queue status once before processing chunks
0267         currently_queued_jobs = get_queued_jobs(rule)
0268         DEBUG(f"Currently queued jobs at start: {currently_queued_jobs}")
0269 
0270         # Process each chunk
0271         for chunk_idx, run_chunk in enumerate(run_chunks, 1):
0272             INFO(f"===== Processing chunk {chunk_idx}/{len(run_chunks)} with {len(run_chunk)} runs =====")
0273             
0274             # Match for this chunk of runs
0275             rule_matches = match_config.devmatches(subset_runlist=run_chunk)
0276             INFO(f"Chunk {chunk_idx}: Matching complete. {len(rule_matches)} jobs to be submitted.")
0277             
0278             if len(rule_matches) == 0:
0279                 INFO(f"Chunk {chunk_idx}: No jobs to submit, moving to next chunk")
0280                 continue
0281 
0282             ## Instead of same-size chunks, group submission files by runnumber
0283             matchlist=list(rule_matches.items())
0284             ## Brittle! Assumes value[key][3] == runnumber
0285             def keyfunc(item):
0286                 return item[1][3]  # x[0] is outfilename, x[1] is tuple, 4th field is runnumber
0287             matchlist=sorted(matchlist, key=keyfunc)
0288             matches_by_run = {k : list(g) for k, g in itertools.groupby(matchlist,key=keyfunc)}
0289             submittable_runs=list(matches_by_run.keys())
0290             # Newest first
0291             submittable_runs=sorted(submittable_runs, reverse=True)
0292 
0293             INFO(f"Chunk {chunk_idx}: Creating submission for {len(submittable_runs)} runs")
0294 
0295             ### Limit number of job files lying around
0296             ##### CHANGE Dec 16 2025: This should no longer be needed, as we check max_jobs in sphenixmatching.py
0297             # max_jobs=rule.job_config.max_jobs
0298             # # Count up what we already have
0299                 # existing_jobs=0
0300             # sub_files=locate_submitfiles(rule,args)
0301             # for sub_file in sub_files:
0302             #     in_file=re.sub(r".sub$",".in",str(sub_file))
0303             #     if not Path(in_file).is_file():
0304             #         continue
0305             #     with open(in_file,'r') as f:
0306             #         existing_jobs += len(f.readlines())
0307             # if existing_jobs>0:
0308             #     INFO(f"We already have {existing_jobs} jobs waiting for submission.")
0309 
0310             ## Instead, matching has limited the number of jobs to max_jobs 
0311             ## Here, we further make sure we don't exceed the number of already queued jobs
0312             ## Note: currently_queued_jobs is tracked across all chunks to avoid exceeding max_queued_jobs
0313             
0314             DEBUG(f"Currently queued/pending jobs (including previous chunks): {currently_queued_jobs}")
0315             for submit_run in submittable_runs:
0316                 if max_queued_jobs>0 and currently_queued_jobs>max_queued_jobs:
0317                     WARN(f"Reached maximum of {max_queued_jobs} queued, held, or running jobs, stopping here.")
0318                     break
0319 
0320                 ### Make the decision here whether to skip this run
0321                 ### This will be recorded in the prod db, so subsequent calls
0322                 ### will continue to skip the same runs unless and until their rows are deleted.
0323                 # keep_this_run=True
0324                 # random.seed()
0325                 # if rule.input_config.choose20:
0326                 #         if random.uniform(0,1) > 0.21: # Nudge a bit above 20. Tests indicated we land significantly lower otherwise
0327                 #         DEBUG(f"Run {submit_run} will be skipped.")
0328                 #         keep_this_run=False
0329                 #     else:
0330                 #         DEBUG(f"Producing run {submit_run}")
0331 
0332                 matches=matches_by_run[submit_run]
0333                 INFO(f"Creating {len(matches)} submission files for run {submit_run}.")
0334                 currently_queued_jobs += len(matches)
0335                 INFO(f"Total jobs waiting for submission: {currently_queued_jobs}")
0336 
0337                 condor_subfile=f'{submitdir}/{subbase}_{submit_run}.sub'
0338                 condor_infile =f'{submitdir}/{subbase}_{submit_run}.in'
0339                 if not args.dryrun: # Note: Deletion of skipped submission files is handled in execute_condorsubmission.py
0340                     # (Re-) create the "header" - common job parameters
0341                     Path(condor_subfile).unlink(missing_ok=True)
0342                     with open(condor_subfile, "w") as f:
0343                         f.write(str(base_job))
0344                         f.write(
0345         f"""
0346         log = $(log)
0347         output = $(output)
0348         error = $(error)
0349         arguments = $(arguments)
0350         queue log,output,error,arguments from {condor_infile}
0351         """)
0352 
0353                 # individual lines per job
0354                 prod_state_rows=[]
0355                 condor_rows=[]
0356                 for out_file,(in_files, outbase, logbase, run, seg, daqhost, dsttype) in matches:
0357                     # Create .in file row
0358                     condor_job = CondorJob.make_job( output_file=out_file,
0359                                                      inputs=in_files,
0360                                                      outbase=outbase,
0361                                                      logbase=logbase,
0362                                                      leafdir=dsttype,
0363                                                      run=run,
0364                                                      seg=seg,
0365                                                      daqhost=daqhost,
0366                                                     )
0367                     condor_rows.append(condor_job.condor_row())
0368 
0369                     # Make sure directories exist
0370                     if not args.dryrun : #  and keep_this_run:
0371                         Path(condor_job.outdir).mkdir( parents=True, exist_ok=True ) # dstlake on lustre
0372                         Path(condor_job.histdir).mkdir( parents=True, exist_ok=True ) # dstlake on lustre
0373 
0374                         # stdout, stderr, and condorlog locations, usually on sphenix02:
0375                         for file_in_dir in condor_job.output, condor_job.error, condor_job.log :
0376                             Path(file_in_dir).parent.mkdir( parents=True, exist_ok=True )
0377 
0378                     # Add to production database
0379                     dsttype=logbase.split(f'_{rule.dataset}')[0]
0380                     dstfile=out_file # this is much more robust and correct
0381                     # Following is fragile, don't add spaces
0382                     prodstate='submitting'
0383                     # if not keep_this_run:
0384                     #     prodstate='skipped'
0385 
0386                     prod_state_rows.append ("('{dsttype}','{dstname}','{dstfile}',{run},{segment},{nsegments},'{inputs}',{prod_id},{cluster},{process},'{status}','{timestamp}','{host}')".format(
0387                         dsttype=dsttype,
0388                         dstname=outbase,
0389                         dstfile=dstfile,
0390                         run=run, segment=seg,
0391                         nsegments=0, # CHECKME
0392                         inputs='dbquery',
0393                         prod_id=0, # CHECKME
0394                         cluster=0, process=0,
0395                         status=prodstate,
0396                         timestamp=str(datetime.now().replace(microsecond=0)),
0397                         host=os.uname().nodename.split('.')[0]
0398                     ))
0399                     # end of collecting job lines for this run
0400 
0401                 comma_prod_state_rows=',\n'.join(prod_state_rows)
0402                 insert_prod_state = f"""
0403         insert into production_status
0404         ( dsttype, dstname, dstfile, run, segment, nsegments, inputs, prod_id, cluster, process, status, submitting, submission_host )
0405         values
0406         {comma_prod_state_rows}
0407         returning id
0408         """
0409                 # Commit "submitting" or "skipped" to db
0410                 if not args.dryrun:
0411                     # Register in the db, hand the ids the condor job (for faster db access; usually passed through to head node daemons)
0412                     prod_curs = dbQuery( cnxn_string_map['statw'], insert_prod_state )
0413                     prod_curs.commit()
0414                     ids=[str(id) for (id,) in prod_curs.fetchall()]
0415                     CHATTY(f"Inserted {len(ids)} rows into production_status, IDs: {ids}")
0416                     condor_rows=[ f"{x} {y}" for x,y in list(zip(condor_rows, ids))]
0417 
0418                 # Write or update job line file
0419                 if not args.dryrun : #  and keep_this_run:
0420                     with open(condor_infile, "a") as f:
0421                         f.writelines(row+'\n' for row in condor_rows)
0422 
0423             # After processing this chunk, optionally submit if --andgo is specified
0424             if args.andgo:
0425                 INFO(f"Chunk {chunk_idx}: Submitting jobs to condor")
0426                 execute_submission(rule, args, True)
0427                 # Refresh the queue count after submission since jobs are now in the queue
0428                 currently_queued_jobs = get_queued_jobs(rule)
0429                 DEBUG(f"After submission, currently queued jobs: {currently_queued_jobs}")
0430             
0431             INFO(f"===== Completed chunk {chunk_idx}/{len(run_chunks)} =====")
0432 
0433 
0434         if args.profile:
0435             profiler.disable()
0436             DEBUG("Profiling finished. Printing stats...")
0437             stats = pstats.Stats(profiler)
0438             stats.strip_dirs().sort_stats('time').print_stats(10)
0439 
0440         prettyfs=pprint.pformat(rule.job_config.filesystem)
0441         input_stem=inputs_from_output[rule.dsttype]
0442         if isinstance(input_stem, list):
0443             prettyfs=prettyfs.replace('{leafdir}',rule.dsttype)
0444 
0445         INFO(f"Submission directory is {submitdir}")
0446         INFO(f"Other location templates:\n{prettyfs}")
0447         INFO( "KTHXBYE!" )
0448 
0449     finally:
0450         if lock_file_path is not None:
0451             unlock_file(lock_file_path, args.dryrun)
0452 
0453 # ============================================================================================
0454 
0455 if __name__ == '__main__':
0456     main()
0457     exit(0)