Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-25 08:29:08

0001 #!/usr/bin/env python
0002 import pyodbc
0003 from pathlib import Path
0004 import pprint # noqa: F401
0005 
0006 import time
0007 from datetime import datetime
0008 import random
0009 import os
0010 import argparse
0011 
0012 from typing import overload, List, Union
0013 from collections import namedtuple
0014 
0015 def get_parser():
0016     parser = argparse.ArgumentParser(description='sPHENIX DB utilities')
0017     subparsers = parser.add_subparsers(dest='command', required=True)
0018 
0019     # jobstarted subcommand
0020     parser_jobstarted = subparsers.add_parser('jobstarted', help='Mark a job as started')
0021     parser_jobstarted.add_argument('--dbid', required=True, type=int, help='Database ID of the job.')
0022     parser_jobstarted.add_argument('--dryrun', action='store_true', help='Do not perform database updates.')
0023 
0024     # jobended subcommand
0025     parser_jobended = subparsers.add_parser('jobended', help='Mark a job as ended')
0026     parser_jobended.add_argument('--dbid', required=True, type=int, help='Database ID of the job.')
0027     parser_jobended.add_argument('--exit-code', required=False, type=int, default=0, help='Exit code of the job.')
0028     parser_jobended.add_argument('-n','--dryrun', action='store_true', help='Do not perform database updates.')
0029 
0030     return parser.parse_args()
0031 
0032 filedb_info = namedtuple('filedb_info', ['dsttype','run','seg','lfn','nevents','first','last','md5','size','ctime'])
0033 long_filedb_info = namedtuple('long_filedb_info', [
0034     'origfile',                                                               # for moving
0035     'lfn','full_host_name','full_file_path','ctime','size','md5',             # for files
0036     'run','seg','dataset','dsttype','nevents','first','last','status','tag',  # addtl. for datasets
0037 ])
0038 
0039 from simpleLogger import WARN, ERROR, DEBUG, INFO, CHATTY  # noqa: E402, F401
0040 
0041 """
0042 This module provides an interface to the sPHENIX databases.
0043 Used both by submission scripts and by the production payload scripts themselves,
0044 so it should remain lightweight and not depend on any other package modules.
0045 Also, it needs a robust way to establish things like testbed vs. production mode.
0046 """
0047 
0048 # ============================================================================
0049 
0050 #################### Test mode? Multiple ways to turn it on
0051 test_mode = (
0052         False
0053         or 'testbed' in str(Path(".").absolute()).lower()
0054         or Path(".testbed").exists()
0055         or Path("SPHNX_TESTBED_MODE").exists()
0056     )
0057 
0058 prod_mode = Path("SPHNX_PRODUCTION_MODE").exists()
0059 if ( prod_mode ):
0060     dsnprodr = 'Production_read'
0061     dsnprodw = 'Production_write'
0062     dsnfilec = 'FileCatalog'
0063 elif ( test_mode ):
0064     dsnprodr = 'ProductionStatus'
0065     dsnprodw = 'ProductionStatusWrite'
0066     dsnfilec = 'FileCatalog'
0067 else:
0068     INFO("Neither production nor testbed mode set. Default to PRODUCTION.  YMMV.")
0069     dsnprodr = 'Production_read'
0070     dsnprodw = 'Production_write'
0071     dsnfilec = 'FileCatalog'
0072 
0073 # ============================================================================
0074 cnxn_string_map = {
0075     'fcw'         : f'DSN={dsnfilec};UID=phnxrc',
0076     'fcr'         : f'DSN={dsnfilec};READONLY=True;UID=phnxrc',
0077     'statr'       : f'DSN={dsnprodr};READONLY=True;UID=argouser',
0078     'statw'       : f'DSN={dsnprodw};UID=argouser',
0079     'daqr'        :  'DSN=daq;READONLY=True;UID=phnxrc',
0080     'rawr'        :  'DSN=RawdataCatalog_read;READONLY=True;UID=phnxrc',
0081     'testw'       :  'DSN=FileCatalogTest;UID=phnxrc',
0082 
0083 }
0084 
0085 # Hack to test locally on Mac
0086 if os.uname().sysname=='Darwin' :
0087     cnxn_string_map = {
0088         'fcw'         : 'DRIVER=PostgreSQL Unicode;SERVER=localhost;DATABASE=filecatalogdb;UID=eickolja',
0089         'fcr'         : 'DRIVER=PostgreSQL Unicode;SERVER=localhost;DATABASE=filecatalogdb;READONLY=True;UID=eickolja',
0090         'statr'       : 'DRIVER=PostgreSQL Unicode;SERVER=localhost;DATABASE=productiondb;READONLY=True;UID=eickolja',
0091         'statw'       : 'DRIVER=PostgreSQL Unicode;SERVER=localhost;DATABASE=productiondb;UID=eickolja',
0092         'rawr'        : 'DRIVER=PostgreSQL Unicode;SERVER=localhost;DATABASE=rawdatacatalogdb;READONLY=True;UID=eickolja',
0093     }
0094 
0095 # # Hack to use local PostgreSQL database from inside a docker container
0096 # if Path('/.dockerenv').exists() :
0097 #     driverstring='DRIVER=PostgreSQL;SERVER=host.docker.internal;'
0098 #     cnxn_string_map = {
0099 #         'fcw'         : f'{driverstring}DATABASE=filecatalogdb;UID=eickolja',
0100 #         'fcr'         : f'{driverstring}DATABASE=filecatalogdb;READONLY=True;UID=eickolja',
0101 #         'statr'       : f'{driverstring}DATABASE=productiondb;READONLY=True;UID=eickolja',
0102 #         'statw'       : f'{driverstring}DATABASE=productiondb;UID=eickolja',
0103 #         'rawr'        : f'{driverstring}DATABASE=rawdatacatalogdb;READONLY=True;UID=eickolja',
0104 #     }
0105 
0106 # ============================================================================================
0107 def full_db_info(origfile: str, info: filedb_info, lfn: str, full_file_path: str, dataset: str, tag: str) -> long_filedb_info:
0108     return long_filedb_info(
0109         origfile=origfile,
0110         lfn=lfn,
0111         full_host_name = "lustre" if 'lustre' in full_file_path else 'gpfs',
0112         full_file_path=full_file_path,
0113         ctime=info.ctime,
0114         size=info.size,
0115         md5=info.md5,
0116         run=info.run,
0117         seg=info.seg,
0118         dataset=dataset,
0119         dsttype=info.dsttype,
0120         nevents=info.nevents,first=info.first,last=info.last,
0121         status=1,
0122         tag=tag,
0123     )
0124 
0125 # ============================================================================================
0126 files_db_line = "('{lfn}','{full_host_name}','{full_file_path}','{ctimestamp}',{file_size_bytes},'{md5}')"
0127 insert_files_tmpl="""
0128 insert into {files_table} (lfn,full_host_name,full_file_path,time,size,md5)
0129 values
0130 {files_db_lines}
0131 on conflict
0132 on constraint {files_table}_pkey
0133 do update set
0134 time=EXCLUDED.time,
0135 size=EXCLUDED.size,
0136 md5=EXCLUDED.md5
0137 ;
0138 """
0139 
0140 # ---------------------------------------------------------------------------------------------
0141 datasets_db_line="('{lfn}',{run},{segment},{file_size_bytes},'{dataset}','{dsttype}',{nevents},{firstevent},{lastevent},'{tag}')"
0142 insert_datasets_tmpl="""
0143 insert into {datasets_table} (filename,runnumber,segment,size,dataset,dsttype,events,firstevent,lastevent,tag)
0144 values
0145 {datasets_db_lines}
0146 on conflict
0147 on constraint {datasets_table}_pkey
0148 do update set
0149 runnumber=EXCLUDED.runnumber,
0150 segment=EXCLUDED.segment,
0151 size=EXCLUDED.size,
0152 dsttype=EXCLUDED.dsttype,
0153 events=EXCLUDED.events,
0154 firstevent=EXCLUDED.firstevent,
0155 lastevent=EXCLUDED.lastevent,
0156 tag=EXCLUDED.tag
0157 ;
0158 """
0159 
0160 # ---------------------------------------------------------------------------------------------
0161 #def upsert_filecatalog(lfn: str, info: filedb_info, full_file_path: str, dataset: str, tag: str, filestat=None, dryrun=True ):
0162 # ---------------------------------------------------------------------------------------------
0163 @overload
0164 def upsert_filecatalog(fullinfos: long_filedb_info, dryrun=True ):
0165     ...
0166 @overload
0167 def upsert_filecatalog(fullinfos: List[long_filedb_info], dryrun=True ):
0168     ...
0169 
0170 def upsert_filecatalog(fullinfos: Union[long_filedb_info,List[long_filedb_info]], dryrun=True ):
0171     if isinstance(fullinfos, long_filedb_info):
0172         fullinfos=[fullinfos]
0173     elif isinstance(fullinfos, list):
0174         pass
0175     else:
0176         raise TypeError("Unsupported data type")
0177 
0178     files_db_lines = []
0179     datasets_db_lines=[]
0180     for fullinfo in fullinfos:
0181         files_db_lines.append( files_db_line.format(
0182             lfn=fullinfo.lfn,
0183             full_host_name = fullinfo.full_host_name,
0184             full_file_path = fullinfo.full_file_path,
0185             ctimestamp = datetime.fromtimestamp(fullinfo.ctime),
0186             file_size_bytes = fullinfo.size,
0187             md5=fullinfo.md5,
0188         ))
0189         datasets_db_lines.append( datasets_db_line.format(
0190             lfn=fullinfo.lfn,
0191             md5=fullinfo.md5,
0192             run=fullinfo.run, segment=fullinfo.seg,
0193             file_size_bytes=fullinfo.size,
0194             dataset=fullinfo.dataset,
0195             dsttype=fullinfo.dsttype,
0196             nevents=fullinfo.nevents,
0197             firstevent=fullinfo.first,
0198             lastevent=fullinfo.last,
0199             tag=fullinfo.tag,
0200         ))
0201 
0202     files_db_lines = ",\n".join(files_db_lines)
0203     insert_files=insert_files_tmpl.format(
0204         files_table='test_files' if test_mode else 'files',
0205         files_db_lines = files_db_lines,
0206     )
0207     CHATTY(insert_files)
0208 
0209     datasets_db_lines=",\n".join(datasets_db_lines)
0210     insert_datasets=insert_datasets_tmpl.format(
0211         datasets_table='test_datasets' if test_mode else 'datasets',
0212         datasets_db_lines=datasets_db_lines,
0213     )
0214     CHATTY(insert_datasets)
0215     if not dryrun:
0216         dbstring = 'testw' if test_mode else 'fcw'
0217         files_curs = dbQuery( cnxn_string_map[ dbstring ], insert_files )
0218         if files_curs:
0219             files_curs.commit()
0220         else:
0221             ERROR(f"Failed to insert file(s)into database {dbstring}. Line was:")
0222             ERROR(f"{insert_files}")
0223             exit(1)
0224         datasets_curs = dbQuery( cnxn_string_map[ dbstring ], insert_datasets )
0225         if datasets_curs:
0226             datasets_curs.commit()
0227         else:
0228             ERROR(f"Failed to insert dataset(s)into database {dbstring}. Line was:")
0229             ERROR(f"{insert_datasets}")
0230             exit(1)
0231 
0232 # ============================================================================================
0233 
0234 update_prodstate_tmpl = """
0235 update production_status
0236 set status='{status}', ended='{ended}'
0237 where
0238 id={dbid}
0239 ;
0240 """
0241 # ---------------------------------------------------------------------------------------------
0242 def update_proddb( dbid: int, filestat=None, dryrun=True ):
0243         # for "files"
0244         update_prodstate=update_prodstate_tmpl.format(
0245             dbid=dbid,
0246             status='finished',
0247             ended=datetime.fromtimestamp(filestat.st_ctime) if filestat else str(datetime.now().replace(microsecond=0)),
0248         )
0249         CHATTY(update_prodstate)
0250         if not dryrun:
0251             dbstring = 'statw'
0252             prodstate_curs = dbQuery( cnxn_string_map[ dbstring ], update_prodstate )
0253             if prodstate_curs:
0254                 prodstate_curs.commit()
0255             else:
0256                 ERROR(f"Failed to update production status for {dbid} in database {dbstring}")
0257                 exit(1)
0258 
0259 # ============================================================================================
0260 def jobstarted(dbid: int, dryrun: bool = False):
0261     """
0262     Marks a job as started in the production database.
0263     This includes setting the status to 'running', recording the start time,
0264     and capturing the execution node from the Condor environment.
0265     """
0266     execution_node = "UNKNOWN"
0267     condor_job_ad_file = os.getenv("_CONDOR_JOB_AD")
0268     if condor_job_ad_file and os.path.exists(condor_job_ad_file):
0269         with open(condor_job_ad_file, 'r') as f:
0270             for line in f:
0271                 if line.startswith("RemoteHost"):
0272                     execution_node = line.split(" = ")[1].strip().strip('"')
0273                     # RemoteHost = "slot1@bnl-fn1.local" -> bnl-fn1.local
0274                     execution_node = execution_node.split('@')[-1]
0275                     break
0276 
0277     update_sql = f"""
0278         UPDATE production_status
0279         SET
0280             status = 'running',
0281             started = '{datetime.now().replace(microsecond=0)}',
0282             execution_node = '{execution_node}'
0283         WHERE id = {dbid};
0284     """
0285     DEBUG(update_sql)
0286     if not dryrun:
0287         dbstring = 'statw'
0288         prodstate_curs = dbQuery(cnxn_string_map[dbstring], update_sql)
0289         if prodstate_curs:
0290             prodstate_curs.commit()
0291         else:
0292             ERROR(f"Failed to update production status for {dbid} in database {dbstring}")
0293             # Do not exit, as the job might still be able to run
0294             # and we don't want to cause a job failure just for a DB update failure.
0295             pass
0296 
0297 
0298 def jobended(dbid: int, exit_code: int, dryrun: bool = False):
0299     """
0300     Marks a job as ended in the production database.
0301     The final status is determined by the exit_code.
0302     """
0303     status = 'finished' if exit_code == 0 else 'failed'
0304     update_sql = f"""
0305         UPDATE production_status
0306         SET
0307             status = '{status}',
0308             ended = '{datetime.now().replace(microsecond=0)}'
0309         WHERE id = {dbid};
0310     """
0311     CHATTY(update_sql)
0312     if not dryrun:
0313         dbstring = 'statw'
0314         prodstate_curs = dbQuery(cnxn_string_map[dbstring], update_sql)
0315         if prodstate_curs:
0316             prodstate_curs.commit()
0317         else:
0318             ERROR(f"Failed to update production status for {dbid} in database {dbstring}")
0319             # Do not exit, as we want to avoid causing further issues
0320             # at the end of a job.
0321             pass
0322 
0323 def printDbInfo( cnxn_string, title ):
0324     conn = pyodbc.connect( cnxn_string )
0325     name=conn.getinfo(pyodbc.SQL_DATA_SOURCE_NAME)
0326     serv=conn.getinfo(pyodbc.SQL_SERVER_NAME)
0327     print(f"with {cnxn_string}\n   connected {name} from {serv} as {title}")
0328 
0329 # ============================================================================================
0330 def dbQuery( cnxn_string, query, ntries=10 ):
0331 
0332     CHATTY(f'[cnxn_string] {cnxn_string}')
0333     CHATTY(f'[query      ]\n{query}')
0334 
0335     start=datetime.now()
0336 #    last_exception = None
0337     ntries = 5
0338     curs=None
0339     # Attempt to connect up to ntries
0340     for itry in range(0,ntries):
0341         try:
0342             conn = pyodbc.connect( cnxn_string )
0343             curs = conn.cursor()
0344             curs.execute( query )
0345             break
0346         except pyodbc.Error as E:
0347             # last_exception = str(E)
0348             #ERROR(f"Warning: Attempt {itry} failed: {last_exception}")
0349             ERROR(f"Warning: Attempt {itry} failed: {E}")
0350             if E.args[0] == '40001':  # replica needs updating; formally: "serialization failure" (deadlock)
0351                 delay = (itry + 1 ) * random.random()
0352                 time.sleep(delay)
0353                 continue
0354             else:
0355                 ERROR(f"Non-retryable odbc error encountered: {E}")
0356                 curs=None
0357                 exit(11)
0358         except Exception as E:
0359             ERROR(f"Non-retryable other error encountered during database query: {E}")
0360             curs=None
0361             exit(11)
0362     else:
0363         ERROR(f"Too many attempts. Stop.")
0364         # raise(E) # allow the mother process to just move on
0365         exit(11)
0366     CHATTY(f'[query time ] {(datetime.now() - start).total_seconds():.2f} seconds' )
0367 
0368     return curs
0369 
0370 # ============================================================================================
0371 def list_to_condition(lst: List[int], name: str="runnumber")  -> str :
0372     """
0373     Generates a condition string usable in a SQL query from a list of values.
0374 
0375     This function takes a list (`lst`) and a field name (`name`) and constructs a
0376     string that can be used as a `WHERE` clause condition in a SQL query.
0377 
0378     Args:
0379         lst: A list of positive integers. Usually runnumbers.
0380         name: The name of the field/column in the database (usually runnumber)
0381 
0382     Returns:
0383         A string representing a (an?) SQL condition, or "" if the list is empty.
0384 
0385     Examples:
0386         - list_to_condition([123], "runnumber") returns "and runnumber=123", [123]
0387         - list_to_condition([100, 200], "runnumber") returns "and runnumber>=100 and runnumber<=200", [100, 101, ..., 200]
0388         - list_to_condition([1, 2, 3], "runnumber") returns "and runnumber in ( 1,2,3 )", [1, 2, 3]
0389         - list_to_condition([], "runnumber") returns None
0390     """
0391 
0392     if isinstance(lst,int):
0393         lst=[ lst ]
0394     elif isinstance(lst,list):
0395         pass
0396     else:
0397         ERROR(f"list_to_condition: input argument is {type(lst)}")
0398         exit(1)
0399 
0400     length=len( lst )
0401     if length==0:
0402         return ""
0403 
0404     if length>20000:
0405         ERROR(f"Run list has {length} entries. Not a good idea. Bailing out.")
0406         exit(-1)
0407 
0408     if length==1:
0409         return f"{name}={lst[0]}"
0410 
0411     # range?
0412     if length==2:
0413         lst=sorted(lst) # fix user error
0414         return f"{name}>={lst[0]} and {name}<={lst[-1]}"
0415 
0416     # --> list, possibly with gaps
0417     strlist=map(str,lst)
0418     return f"{name} in  ( {','.join(strlist)} )"
0419 
0420 def main():
0421     args = get_parser()
0422 
0423     if args.command == 'jobstarted':
0424         jobstarted(args.dbid, args.dryrun)
0425     elif args.command == 'jobended':
0426         jobended(args.dbid, getattr(args, 'exit_code', 1), args.dryrun)
0427 
0428 
0429 if __name__ == '__main__':
0430     main()
0431