File indexing completed on 2026-04-25 08:29:08
0001
0002
0003 from datetime import datetime
0004 from pathlib import Path
0005 import yaml
0006 import cProfile
0007 import pstats
0008 import re
0009 import os
0010 import sys
0011 import itertools
0012 import random
0013
0014 import pprint
0015 if os.uname().sysname!='Darwin' :
0016 import htcondor
0017
0018 from argparsing import submission_args
0019 from sphenixmisc import setup_rot_handler, should_I_quit, shell_command, lock_file, unlock_file
0020 from simpleLogger import slogger, CHATTY, DEBUG, INFO, WARN, ERROR, CRITICAL
0021 from sphenixprodrules import RuleConfig
0022 from sphenixjobdicts import inputs_from_output
0023 from sphenixmatching import MatchConfig
0024 from eradicate_runs import eradicate_runs
0025 from sphenixcondorjobs import CondorJob
0026 from sphenixdbutils import test_mode as dbutils_test_mode
0027 import importlib.util
0028 from sphenixdbutils import cnxn_string_map, dbQuery
0029 from execute_condorsubmission import locate_submitfiles,execute_submission
0030
0031 def get_queued_jobs(rule):
0032 """
0033 Determines the number of jobs currently in the condor queue for a given rule.
0034 """
0035
0036
0037
0038 cq_query = 'condor_q'
0039 cq_query += f" -constraint \'JobBatchName==\"{rule.job_config.batch_name}\"' "
0040 cq_query += ' -format "%d." ClusterId -format "%d\\n" ProcId'
0041
0042
0043
0044
0045
0046
0047
0048
0049
0050
0051
0052
0053
0054 all_procs = shell_command(cq_query)
0055 currently_queued_jobs=len(all_procs)
0056 return currently_queued_jobs
0057
0058
0059
0060 def main():
0061
0062 args = submission_args()
0063 args.force = args.force_delete or args.force
0064
0065
0066 test_mode = (
0067 dbutils_test_mode
0068 or args.test_mode
0069
0070 )
0071
0072
0073
0074 sublogdir=setup_rot_handler(args)
0075 slogger.setLevel(args.loglevel)
0076
0077
0078 if should_I_quit(args=args, myname=sys.argv[0]) and not args.force:
0079 DEBUG("Stop.")
0080 exit(0)
0081
0082 lock_file_path = None
0083 try:
0084 if args.force:
0085
0086
0087
0088 WARN('Got "--force": Override existing output in files, datasets, and production_status DBs.')
0089 WARN(' Note that it\'s YOUR job to ensure there\'s no job in the queue or file in the DST lake which will overwrite this later!')
0090 if args.force_delete:
0091 WARN(' Also got "--force-delete": Deleting existing files that are reproduced.')
0092
0093
0094
0095
0096 WARN("Here we go then.")
0097
0098 INFO(f"Logging to {sublogdir}, level {args.loglevel}")
0099
0100 if args.profile:
0101 DEBUG( "Profiling is ENABLED.")
0102 profiler = cProfile.Profile()
0103 profiler.enable()
0104
0105 if test_mode:
0106 INFO("Running in testbed mode.")
0107 args.mangle_dirpath = 'production-testbed'
0108 else:
0109 INFO("Running in production mode.")
0110
0111
0112
0113
0114
0115 payload_list=[]
0116
0117
0118 sphenixdbutils_spec = importlib.util.find_spec('sphenixdbutils')
0119 if sphenixdbutils_spec and sphenixdbutils_spec.origin:
0120 payload_list += [sphenixdbutils_spec.origin]
0121 else:
0122 ERROR("sphenixdbutils module not found.")
0123 exit(1)
0124
0125 simplelogger_spec = importlib.util.find_spec('simpleLogger')
0126 if simplelogger_spec and simplelogger_spec.origin:
0127 payload_list += [simplelogger_spec.origin]
0128 else:
0129 ERROR("simpleLogger module not found.")
0130 exit(1)
0131
0132 script_path = Path(__file__).parent.resolve()
0133 payload_list += [ f"{script_path}/stageout.sh" ]
0134
0135 payload_list += [ f"{script_path}/GetEntriesAndEventNr.C" ]
0136 payload_list += [ f"{script_path}/common_runscript_prep.sh" ]
0137 payload_list += [ f"{script_path}/create_filelist_run_daqhost.py" ]
0138 payload_list += [ f"{script_path}/create_filelist_run_seg.py" ]
0139 payload_list += [ f"{script_path}/create_full_filelist_run_seg.py" ]
0140
0141
0142 if Path(".testbed").exists():
0143 payload_list += [str(Path('.testbed').resolve())]
0144
0145
0146 if args.append2rsync:
0147 payload_list.insert(args.append2rsync)
0148 DEBUG(f"Addtional resources to be copied to the worker: {payload_list}")
0149
0150
0151
0152 param_overrides = {}
0153 param_overrides["script_path"] = script_path
0154 param_overrides["payload_list"] = payload_list
0155 param_overrides["runs"] = args.runs
0156 param_overrides["runlist"] = args.runlist
0157 param_overrides["nevents"] = args.nevents
0158 param_overrides["combine_seg0_only"] = args.onlyseg0
0159 param_overrides["choose20"] = args.choose20
0160 param_overrides["prodmode"] = "production"
0161
0162 if args.mangle_dirpath:
0163 param_overrides["prodmode"] = args.mangle_dirpath
0164
0165
0166 if args.physicsmode is not None:
0167 param_overrides["physicsmode"] = args.physicsmode
0168
0169 if args.mem:
0170 DEBUG(f"Setting memory to {args.mem}")
0171 param_overrides['request_memory']=args.mem
0172
0173 if args.priority:
0174 DEBUG(f"Setting priority to {args.priority}")
0175 param_overrides['priority']=args.priority
0176
0177 if args.maxjobs is not None:
0178 DEBUG(f"Setting maxjobs to {args.maxjobs}")
0179 param_overrides['max_jobs']=args.maxjobs
0180
0181 if args.maxqueued is not None:
0182 DEBUG(f"Setting max_queued_jobs to {args.maxqueued}")
0183 param_overrides['max_queued_jobs']=args.maxqueued
0184
0185 CHATTY(f"Rule substitutions: {param_overrides}")
0186 INFO("Now loading and building rule configuration.")
0187
0188
0189 try:
0190 rule = RuleConfig.from_yaml_file( yaml_file=args.config,
0191 rule_name=args.rulename,
0192 param_overrides=param_overrides )
0193 INFO(f"Successfully loaded rule configuration: {args.rulename}")
0194 except (ValueError, FileNotFoundError) as e:
0195 ERROR(f"Error: {e}")
0196 exit(1)
0197
0198 CHATTY("Rule configuration:")
0199 CHATTY(yaml.dump(rule.dict))
0200
0201 submitdir = Path(f'{args.submitdir}').resolve()
0202 if not args.dryrun:
0203 Path( submitdir).mkdir( parents=True, exist_ok=True )
0204
0205
0206 lock_file_path = f"{submitdir}/{args.rulename}"
0207 if not lock_file(lock_file_path, args.dryrun):
0208 exit(0)
0209
0210 max_queued_jobs=rule.job_config.max_queued_jobs
0211 currently_queued_jobs = get_queued_jobs(rule)
0212 if max_queued_jobs>0 and currently_queued_jobs >= max_queued_jobs:
0213 WARN(f"There are already {currently_queued_jobs} jobs in the queue, which meets or exceeds the maximum of {max_queued_jobs}.")
0214 WARN("Aborting submission.")
0215 exit(0)
0216
0217
0218 INFO("Rule construction complete. Now constructing corresponding match configuration.")
0219
0220
0221 match_config = MatchConfig.from_rule_config(rule)
0222 CHATTY("Match configuration:")
0223 CHATTY(yaml.dump(match_config.dict))
0224
0225
0226 if args.force:
0227 eradicate_runs(match_config=match_config, dryrun=args.dryrun, delete_files=args.force_delete)
0228
0229
0230
0231 chunk_size = args.chunk_size if args.chunk_size > 0 else None
0232
0233
0234 full_runlist = rule.runlist_int
0235
0236 if chunk_size:
0237 INFO(f"Processing {len(full_runlist)} runs in chunks of {chunk_size}")
0238 else:
0239 INFO(f"Processing all {len(full_runlist)} runs at once (no chunking)")
0240
0241
0242 if chunk_size:
0243
0244 sorted_runlist = sorted(full_runlist, reverse=True)
0245 run_chunks = [sorted_runlist[i:i + chunk_size] for i in range(0, len(sorted_runlist), chunk_size)]
0246 else:
0247
0248 run_chunks = [full_runlist]
0249
0250 INFO(f"Will process {len(run_chunks)} chunk(s) of runs")
0251
0252 submitdir = Path(f'{args.submitdir}').resolve()
0253 if not args.dryrun:
0254 Path( submitdir).mkdir( parents=True, exist_ok=True )
0255 subbase = f'{rule.dsttype}_{rule.dataset}_{rule.outtriplet}'
0256 INFO(f'Submission files based on {subbase}')
0257
0258
0259 CondorJob.job_config = rule.job_config
0260 base_job = htcondor.Submit(CondorJob.job_config.condor_dict())
0261
0262
0263 max_queued_jobs = rule.job_config.max_queued_jobs
0264 DEBUG(f"Maximum allowed queued jobs: {max_queued_jobs}")
0265
0266
0267 currently_queued_jobs = get_queued_jobs(rule)
0268 DEBUG(f"Currently queued jobs at start: {currently_queued_jobs}")
0269
0270
0271 for chunk_idx, run_chunk in enumerate(run_chunks, 1):
0272 INFO(f"===== Processing chunk {chunk_idx}/{len(run_chunks)} with {len(run_chunk)} runs =====")
0273
0274
0275 rule_matches = match_config.devmatches(subset_runlist=run_chunk)
0276 INFO(f"Chunk {chunk_idx}: Matching complete. {len(rule_matches)} jobs to be submitted.")
0277
0278 if len(rule_matches) == 0:
0279 INFO(f"Chunk {chunk_idx}: No jobs to submit, moving to next chunk")
0280 continue
0281
0282
0283 matchlist=list(rule_matches.items())
0284
0285 def keyfunc(item):
0286 return item[1][3]
0287 matchlist=sorted(matchlist, key=keyfunc)
0288 matches_by_run = {k : list(g) for k, g in itertools.groupby(matchlist,key=keyfunc)}
0289 submittable_runs=list(matches_by_run.keys())
0290
0291 submittable_runs=sorted(submittable_runs, reverse=True)
0292
0293 INFO(f"Chunk {chunk_idx}: Creating submission for {len(submittable_runs)} runs")
0294
0295
0296
0297
0298
0299
0300
0301
0302
0303
0304
0305
0306
0307
0308
0309
0310
0311
0312
0313
0314 DEBUG(f"Currently queued/pending jobs (including previous chunks): {currently_queued_jobs}")
0315 for submit_run in submittable_runs:
0316 if max_queued_jobs>0 and currently_queued_jobs>max_queued_jobs:
0317 WARN(f"Reached maximum of {max_queued_jobs} queued, held, or running jobs, stopping here.")
0318 break
0319
0320
0321
0322
0323
0324
0325
0326
0327
0328
0329
0330
0331
0332 matches=matches_by_run[submit_run]
0333 INFO(f"Creating {len(matches)} submission files for run {submit_run}.")
0334 currently_queued_jobs += len(matches)
0335 INFO(f"Total jobs waiting for submission: {currently_queued_jobs}")
0336
0337 condor_subfile=f'{submitdir}/{subbase}_{submit_run}.sub'
0338 condor_infile =f'{submitdir}/{subbase}_{submit_run}.in'
0339 if not args.dryrun:
0340
0341 Path(condor_subfile).unlink(missing_ok=True)
0342 with open(condor_subfile, "w") as f:
0343 f.write(str(base_job))
0344 f.write(
0345 f"""
0346 log = $(log)
0347 output = $(output)
0348 error = $(error)
0349 arguments = $(arguments)
0350 queue log,output,error,arguments from {condor_infile}
0351 """)
0352
0353
0354 prod_state_rows=[]
0355 condor_rows=[]
0356 for out_file,(in_files, outbase, logbase, run, seg, daqhost, dsttype) in matches:
0357
0358 condor_job = CondorJob.make_job( output_file=out_file,
0359 inputs=in_files,
0360 outbase=outbase,
0361 logbase=logbase,
0362 leafdir=dsttype,
0363 run=run,
0364 seg=seg,
0365 daqhost=daqhost,
0366 )
0367 condor_rows.append(condor_job.condor_row())
0368
0369
0370 if not args.dryrun :
0371 Path(condor_job.outdir).mkdir( parents=True, exist_ok=True )
0372 Path(condor_job.histdir).mkdir( parents=True, exist_ok=True )
0373
0374
0375 for file_in_dir in condor_job.output, condor_job.error, condor_job.log :
0376 Path(file_in_dir).parent.mkdir( parents=True, exist_ok=True )
0377
0378
0379 dsttype=logbase.split(f'_{rule.dataset}')[0]
0380 dstfile=out_file
0381
0382 prodstate='submitting'
0383
0384
0385
0386 prod_state_rows.append ("('{dsttype}','{dstname}','{dstfile}',{run},{segment},{nsegments},'{inputs}',{prod_id},{cluster},{process},'{status}','{timestamp}','{host}')".format(
0387 dsttype=dsttype,
0388 dstname=outbase,
0389 dstfile=dstfile,
0390 run=run, segment=seg,
0391 nsegments=0,
0392 inputs='dbquery',
0393 prod_id=0,
0394 cluster=0, process=0,
0395 status=prodstate,
0396 timestamp=str(datetime.now().replace(microsecond=0)),
0397 host=os.uname().nodename.split('.')[0]
0398 ))
0399
0400
0401 comma_prod_state_rows=',\n'.join(prod_state_rows)
0402 insert_prod_state = f"""
0403 insert into production_status
0404 ( dsttype, dstname, dstfile, run, segment, nsegments, inputs, prod_id, cluster, process, status, submitting, submission_host )
0405 values
0406 {comma_prod_state_rows}
0407 returning id
0408 """
0409
0410 if not args.dryrun:
0411
0412 prod_curs = dbQuery( cnxn_string_map['statw'], insert_prod_state )
0413 prod_curs.commit()
0414 ids=[str(id) for (id,) in prod_curs.fetchall()]
0415 CHATTY(f"Inserted {len(ids)} rows into production_status, IDs: {ids}")
0416 condor_rows=[ f"{x} {y}" for x,y in list(zip(condor_rows, ids))]
0417
0418
0419 if not args.dryrun :
0420 with open(condor_infile, "a") as f:
0421 f.writelines(row+'\n' for row in condor_rows)
0422
0423
0424 if args.andgo:
0425 INFO(f"Chunk {chunk_idx}: Submitting jobs to condor")
0426 execute_submission(rule, args, True)
0427
0428 currently_queued_jobs = get_queued_jobs(rule)
0429 DEBUG(f"After submission, currently queued jobs: {currently_queued_jobs}")
0430
0431 INFO(f"===== Completed chunk {chunk_idx}/{len(run_chunks)} =====")
0432
0433
0434 if args.profile:
0435 profiler.disable()
0436 DEBUG("Profiling finished. Printing stats...")
0437 stats = pstats.Stats(profiler)
0438 stats.strip_dirs().sort_stats('time').print_stats(10)
0439
0440 prettyfs=pprint.pformat(rule.job_config.filesystem)
0441 input_stem=inputs_from_output[rule.dsttype]
0442 if isinstance(input_stem, list):
0443 prettyfs=prettyfs.replace('{leafdir}',rule.dsttype)
0444
0445 INFO(f"Submission directory is {submitdir}")
0446 INFO(f"Other location templates:\n{prettyfs}")
0447 INFO( "KTHXBYE!" )
0448
0449 finally:
0450 if lock_file_path is not None:
0451 unlock_file(lock_file_path, args.dryrun)
0452
0453
0454
0455 if __name__ == '__main__':
0456 main()
0457 exit(0)