File indexing completed on 2026-04-25 08:29:08
0001
0002
0003 from pathlib import Path
0004 from datetime import datetime
0005 import yaml
0006 import cProfile
0007 import pstats
0008 import sys
0009 import shutil
0010 import os
0011
0012 import pprint
0013
0014 from argparsing import submission_args
0015 from sphenixmisc import setup_rot_handler, should_I_quit
0016 from simpleLogger import slogger, CustomFormatter, CHATTY, DEBUG, INFO, WARN, ERROR, CRITICAL
0017 from sphenixprodrules import RuleConfig
0018 from sphenixdbutils import dbQuery, cnxn_string_map, list_to_condition
0019
0020 def process_chunk(chunk, production_status_table, dryrun=False):
0021 """
0022 Processes a single chunk of results from the file catalog.
0023 Checks for existing files in production_status and generates
0024 aggregated INSERT and UPDATE statements.
0025 """
0026 DEBUG(f"Processing chunk of {len(chunk)} files...")
0027
0028 lfns_in_chunk = [item[0] for item in chunk]
0029 lfn_list_for_sql = "','".join(lfns_in_chunk)
0030 check_query = f"SELECT dstfile FROM {production_status_table} WHERE dstfile IN ('{lfn_list_for_sql}')"
0031
0032 existing_files_cursor = dbQuery(cnxn_string_map['statr'], check_query)
0033 if not existing_files_cursor:
0034 ERROR("Failed to query production_status for existing files.")
0035 return
0036
0037 existing_lfns = {row.dstfile for row in existing_files_cursor.fetchall()}
0038
0039 insert_values = []
0040 update_values = []
0041 for lfn, time, run, seg, dsttype in chunk:
0042 if lfn in existing_lfns:
0043 update_values.append(f"('{lfn}', '{time}'::timestamp)")
0044 else:
0045 dstname = lfn.split('-', 1)[0]
0046 insert_values.append(f"('{dsttype}', '{dstname}', '{lfn}', {run}, {seg}, 0, 'dbquery', 0, 0, 0, 'finished', '{time}')")
0047
0048 all_statements = []
0049 if insert_values:
0050 values_str = ",\n".join(insert_values)
0051 insert_query = f"""
0052 INSERT INTO {production_status_table} (
0053 dsttype, dstname, dstfile, run, segment, nsegments,
0054 inputs, prod_id, cluster, process, status, ended
0055 ) VALUES
0056 {values_str};
0057 """
0058 all_statements.append(insert_query)
0059
0060 if update_values:
0061 values_str = ",\n".join(update_values)
0062 update_query = f"""
0063 UPDATE {production_status_table} AS ps SET
0064 ended = v.ended,
0065 status = 'finished'
0066 FROM (VALUES {values_str}) AS v(dstfile, ended)
0067 WHERE ps.dstfile = v.dstfile;
0068 """
0069 all_statements.append(update_query)
0070
0071 if all_statements:
0072 update_query = "\n".join(all_statements)
0073 CHATTY(update_query)
0074
0075 if not dryrun:
0076 update_cursor = dbQuery(cnxn_string_map['statw'], update_query)
0077 if update_cursor:
0078 update_cursor.commit()
0079 INFO(f"Processed {len(chunk)} entries in production_status.")
0080 else:
0081 ERROR("Failed to update/insert into production_status.")
0082 else:
0083 INFO("Dry run, not updating database.")
0084 CHATTY(update_query)
0085
0086 def main():
0087
0088 args = submission_args()
0089
0090
0091 test_mode = args.test_mode
0092
0093
0094 sublogdir=setup_rot_handler(args)
0095 slogger.setLevel(args.loglevel)
0096
0097
0098 if should_I_quit(args=args, myname=sys.argv[0]):
0099 DEBUG("Stop.")
0100 exit(0)
0101 INFO(f"Logging to {sublogdir}, level {args.loglevel}")
0102
0103 if args.profile:
0104 DEBUG( "Profiling is ENABLED.")
0105 profiler = cProfile.Profile()
0106 profiler.enable()
0107
0108 INFO(f"Starting {sys.argv[0]}.")
0109 INFO(sys.argv)
0110
0111 if test_mode:
0112 INFO("Running in testbed mode.")
0113 args.mangle_dirpath = 'production-testbed'
0114 else:
0115 INFO("Running in production mode.")
0116
0117 param_overrides = {}
0118 param_overrides["runs"]=args.runs
0119 param_overrides["runlist"]=args.runlist
0120 param_overrides["nevents"] = 0
0121
0122 if args.physicsmode is not None:
0123 param_overrides["physicsmode"] = args.physicsmode
0124
0125 param_overrides["prodmode"] = "production"
0126 if args.mangle_dirpath:
0127 param_overrides["prodmode"] = args.mangle_dirpath
0128
0129 CHATTY(f"Rule substitutions: {param_overrides}")
0130 INFO("Now loading and building rule configuration.")
0131
0132 try:
0133 rule = RuleConfig.from_yaml_file( yaml_file=args.config, rule_name=args.rulename, param_overrides=param_overrides )
0134 INFO(f"Successfully loaded rule configuration: {args.rulename}")
0135 except (ValueError, FileNotFoundError) as e:
0136 ERROR(f"Error: {e}")
0137 exit(1)
0138
0139 CHATTY("Rule configuration:")
0140 CHATTY(yaml.dump(rule.dict))
0141
0142 files_table = 'test_files' if test_mode else 'files'
0143 datasets_table = 'test_datasets' if test_mode else 'datasets'
0144 production_status_table = 'production_status'
0145
0146 run_condition = list_to_condition(rule.runlist_int, name="d.runnumber")
0147
0148 recency_interval = '2 DAYS'
0149 base_query = f"""
0150 SELECT f.lfn, f.time, d.runnumber, d.segment, d.dsttype
0151 FROM {files_table} f
0152 JOIN {datasets_table} d ON f.lfn = d.filename
0153 WHERE d.dsttype like '{rule.dsttype}%'
0154 AND d.tag = '{rule.outtriplet}'
0155 AND {run_condition}
0156 AND f.time > (NOW() - INTERVAL '{recency_interval}')
0157 """
0158
0159
0160 INFO("Querying file catalog in chunks...")
0161
0162 last_lfn = ""
0163 chunk_size = 100000
0164
0165 while True:
0166
0167 paginated_query = base_query
0168 if last_lfn:
0169 paginated_query += f" AND f.lfn > '{last_lfn}'"
0170
0171 query = f"""
0172 {paginated_query}
0173 ORDER BY f.lfn
0174 LIMIT {chunk_size}
0175 """
0176
0177 DEBUG(f"Using query:\n{query}")
0178 files_cursor = dbQuery(cnxn_string_map['fcr'], query)
0179 if not files_cursor:
0180 ERROR("Failed to query file catalog.")
0181 break
0182
0183 results = files_cursor.fetchall()
0184 if not results:
0185 INFO("No more files to update.")
0186 break
0187
0188 INFO(f"Found {len(results)} files in this chunk.")
0189 process_chunk(results, production_status_table, args.dryrun)
0190
0191 last_lfn = results[-1][0]
0192
0193
0194 if args.profile:
0195 profiler.disable()
0196 DEBUG("Profiling finished. Printing stats...")
0197 stats = pstats.Stats(profiler)
0198 stats.strip_dirs().sort_stats('time').print_stats(10)
0199
0200 INFO(f"{Path(sys.argv[0]).name} DONE.")
0201
0202 if __name__ == '__main__':
0203 main()
0204 exit(0)