Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-25 08:29:08

0001 #!/bin/env python
0002 
0003 from pathlib import Path
0004 from datetime import datetime
0005 import yaml
0006 import cProfile
0007 import pstats
0008 import sys
0009 import shutil
0010 import os
0011 
0012 import pprint # noqa F401
0013 
0014 from argparsing import submission_args
0015 from sphenixmisc import setup_rot_handler, should_I_quit
0016 from simpleLogger import slogger, CustomFormatter, CHATTY, DEBUG, INFO, WARN, ERROR, CRITICAL  # noqa: F401
0017 from sphenixprodrules import RuleConfig
0018 from sphenixdbutils import dbQuery, cnxn_string_map, list_to_condition
0019 
0020 def process_chunk(chunk, production_status_table, dryrun=False):
0021     """
0022     Processes a single chunk of results from the file catalog.
0023     Checks for existing files in production_status and generates
0024     aggregated INSERT and UPDATE statements.
0025     """
0026     DEBUG(f"Processing chunk of {len(chunk)} files...")
0027     
0028     lfns_in_chunk = [item[0] for item in chunk]
0029     lfn_list_for_sql = "','".join(lfns_in_chunk)
0030     check_query = f"SELECT dstfile FROM {production_status_table} WHERE dstfile IN ('{lfn_list_for_sql}')"
0031     
0032     existing_files_cursor = dbQuery(cnxn_string_map['statr'], check_query)
0033     if not existing_files_cursor:
0034         ERROR("Failed to query production_status for existing files.")
0035         return
0036 
0037     existing_lfns = {row.dstfile for row in existing_files_cursor.fetchall()}
0038     
0039     insert_values = []
0040     update_values = []
0041     for lfn, time, run, seg, dsttype in chunk:
0042         if lfn in existing_lfns:
0043             update_values.append(f"('{lfn}', '{time}'::timestamp)")
0044         else:
0045             dstname = lfn.split('-', 1)[0]
0046             insert_values.append(f"('{dsttype}', '{dstname}', '{lfn}', {run}, {seg}, 0, 'dbquery', 0, 0, 0, 'finished', '{time}')")
0047 
0048     all_statements = []
0049     if insert_values:
0050         values_str = ",\n".join(insert_values)
0051         insert_query = f"""
0052         INSERT INTO {production_status_table} (
0053             dsttype, dstname, dstfile, run, segment, nsegments,
0054             inputs, prod_id, cluster, process, status, ended
0055         ) VALUES
0056         {values_str};
0057         """
0058         all_statements.append(insert_query)
0059 
0060     if update_values:
0061         values_str = ",\n".join(update_values)
0062         update_query = f"""
0063         UPDATE {production_status_table} AS ps SET
0064             ended = v.ended,
0065             status = 'finished'
0066         FROM (VALUES {values_str}) AS v(dstfile, ended)
0067         WHERE ps.dstfile = v.dstfile;
0068         """
0069         all_statements.append(update_query)
0070 
0071     if all_statements:
0072         update_query = "\n".join(all_statements)
0073         CHATTY(update_query)
0074 
0075         if not dryrun:
0076             update_cursor = dbQuery(cnxn_string_map['statw'], update_query)
0077             if update_cursor:
0078                 update_cursor.commit()
0079                 INFO(f"Processed {len(chunk)} entries in production_status.")
0080             else:
0081                 ERROR("Failed to update/insert into production_status.")
0082         else:
0083             INFO("Dry run, not updating database.")
0084             CHATTY(update_query)
0085 
0086 def main():
0087     ### digest arguments
0088     args = submission_args()
0089 
0090     #################### Test mode?
0091     test_mode = args.test_mode
0092 
0093     # Set up submission logging before going any further
0094     sublogdir=setup_rot_handler(args)
0095     slogger.setLevel(args.loglevel)
0096 
0097     # Exit without fuss if we are already running
0098     if should_I_quit(args=args, myname=sys.argv[0]):
0099         DEBUG("Stop.")
0100         exit(0)
0101     INFO(f"Logging to {sublogdir}, level {args.loglevel}")
0102 
0103     if args.profile:
0104         DEBUG( "Profiling is ENABLED.")
0105         profiler = cProfile.Profile()
0106         profiler.enable()
0107 
0108     INFO(f"Starting {sys.argv[0]}.")
0109     INFO(sys.argv)
0110 
0111     if test_mode:
0112         INFO("Running in testbed mode.")
0113         args.mangle_dirpath = 'production-testbed'
0114     else:
0115         INFO("Running in production mode.")
0116 
0117     param_overrides = {}
0118     param_overrides["runs"]=args.runs
0119     param_overrides["runlist"]=args.runlist
0120     param_overrides["nevents"] = 0 # Not relevant, but needed for the RuleConfig ctor
0121 
0122     if args.physicsmode is not None:
0123         param_overrides["physicsmode"] = args.physicsmode # e.g. physics
0124 
0125     param_overrides["prodmode"] = "production"
0126     if args.mangle_dirpath:
0127         param_overrides["prodmode"] = args.mangle_dirpath
0128 
0129     CHATTY(f"Rule substitutions: {param_overrides}")
0130     INFO("Now loading and building rule configuration.")
0131 
0132     try:
0133         rule = RuleConfig.from_yaml_file( yaml_file=args.config, rule_name=args.rulename, param_overrides=param_overrides )
0134         INFO(f"Successfully loaded rule configuration: {args.rulename}")
0135     except (ValueError, FileNotFoundError) as e:
0136         ERROR(f"Error: {e}")
0137         exit(1)
0138 
0139     CHATTY("Rule configuration:")
0140     CHATTY(yaml.dump(rule.dict))
0141 
0142     files_table = 'test_files' if test_mode else 'files'
0143     datasets_table = 'test_datasets' if test_mode else 'datasets'
0144     production_status_table = 'production_status'
0145 
0146     run_condition = list_to_condition(rule.runlist_int, name="d.runnumber")
0147 
0148     recency_interval = '2 DAYS' # args.recent if args.recent else '7 DAYS'
0149     base_query = f"""
0150     SELECT f.lfn, f.time, d.runnumber, d.segment, d.dsttype
0151     FROM {files_table} f
0152     JOIN {datasets_table} d ON f.lfn = d.filename
0153     WHERE d.dsttype like '{rule.dsttype}%'
0154     AND d.tag = '{rule.outtriplet}'
0155     AND {run_condition}
0156     AND f.time > (NOW() - INTERVAL '{recency_interval}')
0157     """
0158 #    AND d.dataset = '{rule.dataset}'
0159 
0160     INFO("Querying file catalog in chunks...")
0161     
0162     last_lfn = ""
0163     chunk_size = 100000
0164     
0165     while True:
0166         
0167         paginated_query = base_query
0168         if last_lfn:
0169             paginated_query += f" AND f.lfn > '{last_lfn}'"
0170 
0171         query = f"""
0172         {paginated_query}
0173         ORDER BY f.lfn
0174         LIMIT {chunk_size}
0175         """
0176 
0177         DEBUG(f"Using query:\n{query}")
0178         files_cursor = dbQuery(cnxn_string_map['fcr'], query)
0179         if not files_cursor:
0180             ERROR("Failed to query file catalog.")
0181             break
0182 
0183         results = files_cursor.fetchall()
0184         if not results:
0185             INFO("No more files to update.")
0186             break
0187         
0188         INFO(f"Found {len(results)} files in this chunk.")
0189         process_chunk(results, production_status_table, args.dryrun)
0190 
0191         last_lfn = results[-1][0]
0192         
0193 
0194     if args.profile:
0195         profiler.disable()
0196         DEBUG("Profiling finished. Printing stats...")
0197         stats = pstats.Stats(profiler)
0198         stats.strip_dirs().sort_stats('time').print_stats(10)
0199 
0200     INFO(f"{Path(sys.argv[0]).name} DONE.")
0201 
0202 if __name__ == '__main__':
0203     main()
0204     exit(0)