File indexing completed on 2026-04-25 08:29:08
0001
0002 import pyodbc
0003 from pathlib import Path
0004 import pprint
0005
0006 import time
0007 from datetime import datetime
0008 import random
0009 import os
0010 import argparse
0011
0012 from typing import overload, List, Union
0013 from collections import namedtuple
0014
0015 def get_parser():
0016 parser = argparse.ArgumentParser(description='sPHENIX DB utilities')
0017 subparsers = parser.add_subparsers(dest='command', required=True)
0018
0019
0020 parser_jobstarted = subparsers.add_parser('jobstarted', help='Mark a job as started')
0021 parser_jobstarted.add_argument('--dbid', required=True, type=int, help='Database ID of the job.')
0022 parser_jobstarted.add_argument('--dryrun', action='store_true', help='Do not perform database updates.')
0023
0024
0025 parser_jobended = subparsers.add_parser('jobended', help='Mark a job as ended')
0026 parser_jobended.add_argument('--dbid', required=True, type=int, help='Database ID of the job.')
0027 parser_jobended.add_argument('--exit-code', required=False, type=int, default=0, help='Exit code of the job.')
0028 parser_jobended.add_argument('-n','--dryrun', action='store_true', help='Do not perform database updates.')
0029
0030 return parser.parse_args()
0031
0032 filedb_info = namedtuple('filedb_info', ['dsttype','run','seg','lfn','nevents','first','last','md5','size','ctime'])
0033 long_filedb_info = namedtuple('long_filedb_info', [
0034 'origfile',
0035 'lfn','full_host_name','full_file_path','ctime','size','md5',
0036 'run','seg','dataset','dsttype','nevents','first','last','status','tag',
0037 ])
0038
0039 from simpleLogger import WARN, ERROR, DEBUG, INFO, CHATTY
0040
0041 """
0042 This module provides an interface to the sPHENIX databases.
0043 Used both by submission scripts and by the production payload scripts themselves,
0044 so it should remain lightweight and not depend on any other package modules.
0045 Also, it needs a robust way to establish things like testbed vs. production mode.
0046 """
0047
0048
0049
0050
0051 test_mode = (
0052 False
0053 or 'testbed' in str(Path(".").absolute()).lower()
0054 or Path(".testbed").exists()
0055 or Path("SPHNX_TESTBED_MODE").exists()
0056 )
0057
0058 prod_mode = Path("SPHNX_PRODUCTION_MODE").exists()
0059 if ( prod_mode ):
0060 dsnprodr = 'Production_read'
0061 dsnprodw = 'Production_write'
0062 dsnfilec = 'FileCatalog'
0063 elif ( test_mode ):
0064 dsnprodr = 'ProductionStatus'
0065 dsnprodw = 'ProductionStatusWrite'
0066 dsnfilec = 'FileCatalog'
0067 else:
0068 INFO("Neither production nor testbed mode set. Default to PRODUCTION. YMMV.")
0069 dsnprodr = 'Production_read'
0070 dsnprodw = 'Production_write'
0071 dsnfilec = 'FileCatalog'
0072
0073
0074 cnxn_string_map = {
0075 'fcw' : f'DSN={dsnfilec};UID=phnxrc',
0076 'fcr' : f'DSN={dsnfilec};READONLY=True;UID=phnxrc',
0077 'statr' : f'DSN={dsnprodr};READONLY=True;UID=argouser',
0078 'statw' : f'DSN={dsnprodw};UID=argouser',
0079 'daqr' : 'DSN=daq;READONLY=True;UID=phnxrc',
0080 'rawr' : 'DSN=RawdataCatalog_read;READONLY=True;UID=phnxrc',
0081 'testw' : 'DSN=FileCatalogTest;UID=phnxrc',
0082
0083 }
0084
0085
0086 if os.uname().sysname=='Darwin' :
0087 cnxn_string_map = {
0088 'fcw' : 'DRIVER=PostgreSQL Unicode;SERVER=localhost;DATABASE=filecatalogdb;UID=eickolja',
0089 'fcr' : 'DRIVER=PostgreSQL Unicode;SERVER=localhost;DATABASE=filecatalogdb;READONLY=True;UID=eickolja',
0090 'statr' : 'DRIVER=PostgreSQL Unicode;SERVER=localhost;DATABASE=productiondb;READONLY=True;UID=eickolja',
0091 'statw' : 'DRIVER=PostgreSQL Unicode;SERVER=localhost;DATABASE=productiondb;UID=eickolja',
0092 'rawr' : 'DRIVER=PostgreSQL Unicode;SERVER=localhost;DATABASE=rawdatacatalogdb;READONLY=True;UID=eickolja',
0093 }
0094
0095
0096
0097
0098
0099
0100
0101
0102
0103
0104
0105
0106
0107 def full_db_info(origfile: str, info: filedb_info, lfn: str, full_file_path: str, dataset: str, tag: str) -> long_filedb_info:
0108 return long_filedb_info(
0109 origfile=origfile,
0110 lfn=lfn,
0111 full_host_name = "lustre" if 'lustre' in full_file_path else 'gpfs',
0112 full_file_path=full_file_path,
0113 ctime=info.ctime,
0114 size=info.size,
0115 md5=info.md5,
0116 run=info.run,
0117 seg=info.seg,
0118 dataset=dataset,
0119 dsttype=info.dsttype,
0120 nevents=info.nevents,first=info.first,last=info.last,
0121 status=1,
0122 tag=tag,
0123 )
0124
0125
0126 files_db_line = "('{lfn}','{full_host_name}','{full_file_path}','{ctimestamp}',{file_size_bytes},'{md5}')"
0127 insert_files_tmpl="""
0128 insert into {files_table} (lfn,full_host_name,full_file_path,time,size,md5)
0129 values
0130 {files_db_lines}
0131 on conflict
0132 on constraint {files_table}_pkey
0133 do update set
0134 time=EXCLUDED.time,
0135 size=EXCLUDED.size,
0136 md5=EXCLUDED.md5
0137 ;
0138 """
0139
0140
0141 datasets_db_line="('{lfn}',{run},{segment},{file_size_bytes},'{dataset}','{dsttype}',{nevents},{firstevent},{lastevent},'{tag}')"
0142 insert_datasets_tmpl="""
0143 insert into {datasets_table} (filename,runnumber,segment,size,dataset,dsttype,events,firstevent,lastevent,tag)
0144 values
0145 {datasets_db_lines}
0146 on conflict
0147 on constraint {datasets_table}_pkey
0148 do update set
0149 runnumber=EXCLUDED.runnumber,
0150 segment=EXCLUDED.segment,
0151 size=EXCLUDED.size,
0152 dsttype=EXCLUDED.dsttype,
0153 events=EXCLUDED.events,
0154 firstevent=EXCLUDED.firstevent,
0155 lastevent=EXCLUDED.lastevent,
0156 tag=EXCLUDED.tag
0157 ;
0158 """
0159
0160
0161
0162
0163 @overload
0164 def upsert_filecatalog(fullinfos: long_filedb_info, dryrun=True ):
0165 ...
0166 @overload
0167 def upsert_filecatalog(fullinfos: List[long_filedb_info], dryrun=True ):
0168 ...
0169
0170 def upsert_filecatalog(fullinfos: Union[long_filedb_info,List[long_filedb_info]], dryrun=True ):
0171 if isinstance(fullinfos, long_filedb_info):
0172 fullinfos=[fullinfos]
0173 elif isinstance(fullinfos, list):
0174 pass
0175 else:
0176 raise TypeError("Unsupported data type")
0177
0178 files_db_lines = []
0179 datasets_db_lines=[]
0180 for fullinfo in fullinfos:
0181 files_db_lines.append( files_db_line.format(
0182 lfn=fullinfo.lfn,
0183 full_host_name = fullinfo.full_host_name,
0184 full_file_path = fullinfo.full_file_path,
0185 ctimestamp = datetime.fromtimestamp(fullinfo.ctime),
0186 file_size_bytes = fullinfo.size,
0187 md5=fullinfo.md5,
0188 ))
0189 datasets_db_lines.append( datasets_db_line.format(
0190 lfn=fullinfo.lfn,
0191 md5=fullinfo.md5,
0192 run=fullinfo.run, segment=fullinfo.seg,
0193 file_size_bytes=fullinfo.size,
0194 dataset=fullinfo.dataset,
0195 dsttype=fullinfo.dsttype,
0196 nevents=fullinfo.nevents,
0197 firstevent=fullinfo.first,
0198 lastevent=fullinfo.last,
0199 tag=fullinfo.tag,
0200 ))
0201
0202 files_db_lines = ",\n".join(files_db_lines)
0203 insert_files=insert_files_tmpl.format(
0204 files_table='test_files' if test_mode else 'files',
0205 files_db_lines = files_db_lines,
0206 )
0207 CHATTY(insert_files)
0208
0209 datasets_db_lines=",\n".join(datasets_db_lines)
0210 insert_datasets=insert_datasets_tmpl.format(
0211 datasets_table='test_datasets' if test_mode else 'datasets',
0212 datasets_db_lines=datasets_db_lines,
0213 )
0214 CHATTY(insert_datasets)
0215 if not dryrun:
0216 dbstring = 'testw' if test_mode else 'fcw'
0217 files_curs = dbQuery( cnxn_string_map[ dbstring ], insert_files )
0218 if files_curs:
0219 files_curs.commit()
0220 else:
0221 ERROR(f"Failed to insert file(s)into database {dbstring}. Line was:")
0222 ERROR(f"{insert_files}")
0223 exit(1)
0224 datasets_curs = dbQuery( cnxn_string_map[ dbstring ], insert_datasets )
0225 if datasets_curs:
0226 datasets_curs.commit()
0227 else:
0228 ERROR(f"Failed to insert dataset(s)into database {dbstring}. Line was:")
0229 ERROR(f"{insert_datasets}")
0230 exit(1)
0231
0232
0233
0234 update_prodstate_tmpl = """
0235 update production_status
0236 set status='{status}', ended='{ended}'
0237 where
0238 id={dbid}
0239 ;
0240 """
0241
0242 def update_proddb( dbid: int, filestat=None, dryrun=True ):
0243
0244 update_prodstate=update_prodstate_tmpl.format(
0245 dbid=dbid,
0246 status='finished',
0247 ended=datetime.fromtimestamp(filestat.st_ctime) if filestat else str(datetime.now().replace(microsecond=0)),
0248 )
0249 CHATTY(update_prodstate)
0250 if not dryrun:
0251 dbstring = 'statw'
0252 prodstate_curs = dbQuery( cnxn_string_map[ dbstring ], update_prodstate )
0253 if prodstate_curs:
0254 prodstate_curs.commit()
0255 else:
0256 ERROR(f"Failed to update production status for {dbid} in database {dbstring}")
0257 exit(1)
0258
0259
0260 def jobstarted(dbid: int, dryrun: bool = False):
0261 """
0262 Marks a job as started in the production database.
0263 This includes setting the status to 'running', recording the start time,
0264 and capturing the execution node from the Condor environment.
0265 """
0266 execution_node = "UNKNOWN"
0267 condor_job_ad_file = os.getenv("_CONDOR_JOB_AD")
0268 if condor_job_ad_file and os.path.exists(condor_job_ad_file):
0269 with open(condor_job_ad_file, 'r') as f:
0270 for line in f:
0271 if line.startswith("RemoteHost"):
0272 execution_node = line.split(" = ")[1].strip().strip('"')
0273
0274 execution_node = execution_node.split('@')[-1]
0275 break
0276
0277 update_sql = f"""
0278 UPDATE production_status
0279 SET
0280 status = 'running',
0281 started = '{datetime.now().replace(microsecond=0)}',
0282 execution_node = '{execution_node}'
0283 WHERE id = {dbid};
0284 """
0285 DEBUG(update_sql)
0286 if not dryrun:
0287 dbstring = 'statw'
0288 prodstate_curs = dbQuery(cnxn_string_map[dbstring], update_sql)
0289 if prodstate_curs:
0290 prodstate_curs.commit()
0291 else:
0292 ERROR(f"Failed to update production status for {dbid} in database {dbstring}")
0293
0294
0295 pass
0296
0297
0298 def jobended(dbid: int, exit_code: int, dryrun: bool = False):
0299 """
0300 Marks a job as ended in the production database.
0301 The final status is determined by the exit_code.
0302 """
0303 status = 'finished' if exit_code == 0 else 'failed'
0304 update_sql = f"""
0305 UPDATE production_status
0306 SET
0307 status = '{status}',
0308 ended = '{datetime.now().replace(microsecond=0)}'
0309 WHERE id = {dbid};
0310 """
0311 CHATTY(update_sql)
0312 if not dryrun:
0313 dbstring = 'statw'
0314 prodstate_curs = dbQuery(cnxn_string_map[dbstring], update_sql)
0315 if prodstate_curs:
0316 prodstate_curs.commit()
0317 else:
0318 ERROR(f"Failed to update production status for {dbid} in database {dbstring}")
0319
0320
0321 pass
0322
0323 def printDbInfo( cnxn_string, title ):
0324 conn = pyodbc.connect( cnxn_string )
0325 name=conn.getinfo(pyodbc.SQL_DATA_SOURCE_NAME)
0326 serv=conn.getinfo(pyodbc.SQL_SERVER_NAME)
0327 print(f"with {cnxn_string}\n connected {name} from {serv} as {title}")
0328
0329
0330 def dbQuery( cnxn_string, query, ntries=10 ):
0331
0332 CHATTY(f'[cnxn_string] {cnxn_string}')
0333 CHATTY(f'[query ]\n{query}')
0334
0335 start=datetime.now()
0336
0337 ntries = 5
0338 curs=None
0339
0340 for itry in range(0,ntries):
0341 try:
0342 conn = pyodbc.connect( cnxn_string )
0343 curs = conn.cursor()
0344 curs.execute( query )
0345 break
0346 except pyodbc.Error as E:
0347
0348
0349 ERROR(f"Warning: Attempt {itry} failed: {E}")
0350 if E.args[0] == '40001':
0351 delay = (itry + 1 ) * random.random()
0352 time.sleep(delay)
0353 continue
0354 else:
0355 ERROR(f"Non-retryable odbc error encountered: {E}")
0356 curs=None
0357 exit(11)
0358 except Exception as E:
0359 ERROR(f"Non-retryable other error encountered during database query: {E}")
0360 curs=None
0361 exit(11)
0362 else:
0363 ERROR(f"Too many attempts. Stop.")
0364
0365 exit(11)
0366 CHATTY(f'[query time ] {(datetime.now() - start).total_seconds():.2f} seconds' )
0367
0368 return curs
0369
0370
0371 def list_to_condition(lst: List[int], name: str="runnumber") -> str :
0372 """
0373 Generates a condition string usable in a SQL query from a list of values.
0374
0375 This function takes a list (`lst`) and a field name (`name`) and constructs a
0376 string that can be used as a `WHERE` clause condition in a SQL query.
0377
0378 Args:
0379 lst: A list of positive integers. Usually runnumbers.
0380 name: The name of the field/column in the database (usually runnumber)
0381
0382 Returns:
0383 A string representing a (an?) SQL condition, or "" if the list is empty.
0384
0385 Examples:
0386 - list_to_condition([123], "runnumber") returns "and runnumber=123", [123]
0387 - list_to_condition([100, 200], "runnumber") returns "and runnumber>=100 and runnumber<=200", [100, 101, ..., 200]
0388 - list_to_condition([1, 2, 3], "runnumber") returns "and runnumber in ( 1,2,3 )", [1, 2, 3]
0389 - list_to_condition([], "runnumber") returns None
0390 """
0391
0392 if isinstance(lst,int):
0393 lst=[ lst ]
0394 elif isinstance(lst,list):
0395 pass
0396 else:
0397 ERROR(f"list_to_condition: input argument is {type(lst)}")
0398 exit(1)
0399
0400 length=len( lst )
0401 if length==0:
0402 return ""
0403
0404 if length>20000:
0405 ERROR(f"Run list has {length} entries. Not a good idea. Bailing out.")
0406 exit(-1)
0407
0408 if length==1:
0409 return f"{name}={lst[0]}"
0410
0411
0412 if length==2:
0413 lst=sorted(lst)
0414 return f"{name}>={lst[0]} and {name}<={lst[-1]}"
0415
0416
0417 strlist=map(str,lst)
0418 return f"{name} in ( {','.join(strlist)} )"
0419
0420 def main():
0421 args = get_parser()
0422
0423 if args.command == 'jobstarted':
0424 jobstarted(args.dbid, args.dryrun)
0425 elif args.command == 'jobended':
0426 jobended(args.dbid, getattr(args, 'exit_code', 1), args.dryrun)
0427
0428
0429 if __name__ == '__main__':
0430 main()
0431