Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:02

0001 import argparse
0002 import os
0003 import sys
0004 
0005 from pandacommon.pandalogger.LogWrapper import LogWrapper
0006 from pandacommon.pandautils.thread_utils import GenericThread
0007 from rucio.client import Client as RucioClient
0008 from rucio.common.exception import DataIdentifierNotFound
0009 
0010 from pandaserver.config import panda_config
0011 from pandaserver.dataservice.ddm import rucioAPI
0012 from pandaserver.userinterface import Client
0013 
0014 
0015 # get files form rucio
0016 def get_files_from_rucio(ds_name):
0017     # get files from rucio
0018     try:
0019         rc = RucioClient()
0020         scope, name = rucioAPI.extract_scope(ds_name)
0021         if name.endswith("/"):
0022             name = name[:-1]
0023         files_rucio = set()
0024         for i in rc.list_files(scope, name):
0025             files_rucio.add(i["name"])
0026         return True, files_rucio
0027     except DataIdentifierNotFound:
0028         return False, f"unknown dataset {ds_name}"
0029     except Exception as e:
0030         msg_str = f"failed to get files from rucio: {str(e)}"
0031         return None, msg_str
0032 
0033 
0034 # print a message
0035 def print_msg(message: str, log_stream: LogWrapper | None, is_error: bool = False, put_log: str | None = None):
0036     """
0037     Print a message to log stream or stdout.
0038 
0039     :param message: Message string.
0040     :param log_stream: LogWrapper object or None.
0041     :param is_error: If True, log as error.
0042     :param put_log: Log filename to dump log to this file in cache_dir.
0043     """
0044     if log_stream:
0045         if is_error:
0046             log_stream.error("Error: " + message)
0047         else:
0048             log_stream.info(message)
0049         if put_log:
0050             with open(os.path.join(panda_config.cache_dir, put_log), "w") as f:
0051                 f.write(log_stream.dumpToString())
0052                 f.write("done\n")
0053     else:
0054         if is_error:
0055             print("ERROR: ", message)
0056         else:
0057             print(message)
0058 
0059 
0060 # main
0061 def main(taskBuffer=None, exec_options=None, log_stream=None, args_list=None):
0062     # options
0063     parser = argparse.ArgumentParser()
0064     parser.add_argument(
0065         "--ds",
0066         action="store",
0067         dest="ds",
0068         default=None,
0069         help="dataset name",
0070     )
0071     parser.add_argument(
0072         "--files",
0073         action="store",
0074         dest="files",
0075         default=None,
0076         help="comma-separated list of lost file names. The list is deduced if this option is omitted",
0077     )
0078     parser.add_argument(
0079         "--jediTaskID",
0080         action="store",
0081         dest="jediTaskID",
0082         type=int,
0083         default=None,
0084         help="JEDI task ID that produced the dataset",
0085     )
0086     parser.add_argument(
0087         "--noChildRetry",
0088         action="store_const",
0089         const=True,
0090         dest="noChildRetry",
0091         default=False,
0092         help="not retry child tasks",
0093     )
0094     parser.add_argument(
0095         "--resurrectDS",
0096         action="store_const",
0097         const=True,
0098         dest="resurrectDS",
0099         default=False,
0100         help="resurrect output and log datasets if they were already deleted",
0101     )
0102     parser.add_argument(
0103         "--dryRun",
0104         action="store_const",
0105         const=True,
0106         dest="dryRun",
0107         default=False,
0108         help="dry run",
0109     )
0110     parser.add_argument(
0111         "--force",
0112         action="store_const",
0113         const=True,
0114         dest="force",
0115         default=False,
0116         help="force retry even if no lost files",
0117     )
0118     parser.add_argument(
0119         "--reproduceParent",
0120         action="store_const",
0121         const=True,
0122         dest="reproduceParent",
0123         default=False,
0124         help="reproduce the input files from which the lost files were produced. "
0125         "Typically useful to recover merged files when unmerged files were already deleted",
0126     )
0127     parser.add_argument(
0128         "--reproduceUptoNthGen",
0129         action="store",
0130         dest="reproduceUptoNthGen",
0131         type=int,
0132         default=0,
0133         help="reproduce files up to N-th upper generation. --reproduceUptoNthGen=1 corresponds "
0134         "to --reproduceParent. 0 by default so that any provenance files are not reproduced",
0135     )
0136     # parse options
0137     if taskBuffer:
0138         if args_list:
0139             options = parser.parse_args(args_list)
0140         else:
0141             options, unknown = parser.parse_known_args()
0142     else:
0143         if args_list:
0144             options = parser.parse_args(args_list)
0145         else:
0146             options = parser.parse_args()
0147 
0148     requester_id = GenericThread().get_full_id(__name__, sys.modules[__name__].__file__)
0149 
0150     if taskBuffer is None:
0151         # instantiate TB
0152         from pandaserver.taskbuffer.TaskBuffer import taskBuffer
0153 
0154         taskBuffer.init(
0155             panda_config.dbhost,
0156             panda_config.dbpasswd,
0157             nDBConnection=1,
0158             requester=requester_id,
0159         )
0160 
0161     # set options from dict
0162     if exec_options is None:
0163         exec_options = {}
0164     keys = set(vars(options).keys())
0165     for k in exec_options:
0166         if k in keys:
0167             setattr(options, k, exec_options[k])
0168 
0169     dn = exec_options.get("userName")
0170     is_production_manager = exec_options.get("isProductionManager", False)
0171     log_filename = exec_options.get("logFilename")
0172     print_msg(f"User='{dn}' is_prod={is_production_manager} log_name={log_filename}", log_stream)
0173 
0174     if options.reproduceUptoNthGen > 0:
0175         options.reproduceUptoNthGen -= 1
0176         options.reproduceParent = True
0177 
0178     # check if dataset name or taskID is given
0179     if options.jediTaskID is None and options.ds is None:
0180         msg_str = "Dataset name or jediTaskID is required"
0181         print_msg(msg_str, log_stream, is_error=True, put_log=log_filename)
0182         return False, msg_str
0183 
0184     ds_files = {}
0185     if options.files is not None:
0186         files = options.files.split(",")
0187         ds_files[options.ds] = files
0188     else:
0189         # look for lost files
0190         if not options.jediTaskID:
0191             # get files from rucio
0192             st, files_rucio = get_files_from_rucio(options.ds)
0193             if st is not True:
0194                 print_msg(files_rucio, log_stream, is_error=True, put_log=log_filename)
0195                 return st, files_rucio
0196             # get files from panda
0197             dsName = options.ds.split(":")[-1]
0198             fd, fo = taskBuffer.querySQLS(
0199                 "SELECT c.lfn FROM ATLAS_PANDA.JEDI_Datasets d,ATLAS_PANDA.JEDI_Dataset_Contents c "
0200                 "WHERE c.jediTaskID=d.jediTaskID AND c.datasetID=d.datasetID AND "
0201                 "d.type=:t1 AND c.status=:s AND d.datasetName=:name ",
0202                 {":s": "finished", ":t1": "output", ":name": dsName},
0203             )
0204             for (tmpLFN,) in fo:
0205                 if tmpLFN not in files_rucio:
0206                     ds_files.setdefault(options.ds, [])
0207                     ds_files[options.ds].append(tmpLFN)
0208             # get taskID
0209             td, to = taskBuffer.querySQLS(
0210                 "SELECT jediTaskID FROM ATLAS_PANDA.JEDI_Datasets " "WHERE datasetName=:datasetName AND type=:t1 ",
0211                 {":t1": "output", ":datasetName": dsName},
0212             )
0213             (jediTaskID,) = to[0]
0214         else:
0215             # get dataset names
0216             dd, do = taskBuffer.querySQLS(
0217                 "SELECT datasetName FROM ATLAS_PANDA.JEDI_Datasets " "WHERE jediTaskID=:jediTaskID AND type=:t1 ",
0218                 {":t1": "output", ":jediTaskID": options.jediTaskID},
0219             )
0220             # get files from rucio
0221             files_rucio = set()
0222             for (tmpDS,) in do:
0223                 st, tmp_files_rucio = get_files_from_rucio(tmpDS)
0224                 if st is None:
0225                     print_msg(tmp_files_rucio, log_stream, is_error=True, put_log=log_filename)
0226                     return st, tmp_files_rucio
0227                 # ignore unknown dataset
0228                 if st:
0229                     files_rucio = files_rucio.union(tmp_files_rucio)
0230             # get files from PanDA
0231             fd, fo = taskBuffer.querySQLS(
0232                 "SELECT d.datasetName,c.lfn FROM ATLAS_PANDA.JEDI_Datasets d,ATLAS_PANDA.JEDI_Dataset_Contents c "
0233                 "WHERE d.jediTaskID=:jediTaskID AND c.jediTaskID=d.jediTaskID AND c.datasetID=d.datasetID AND "
0234                 "d.type=:t1 AND c.status=:s ",
0235                 {
0236                     ":s": "finished",
0237                     ":t1": "output",
0238                     ":jediTaskID": options.jediTaskID,
0239                 },
0240             )
0241             for tmpDS, tmpLFN in fo:
0242                 if tmpLFN not in files_rucio:
0243                     ds_files.setdefault(tmpDS, [])
0244                     ds_files[tmpDS].append(tmpLFN)
0245         for tmpDS in ds_files:
0246             files = ds_files[tmpDS]
0247             msg_str = f"{tmpDS} has {len(files)} lost files -> {','.join(files)}"
0248             print_msg(msg_str, log_stream)
0249 
0250     # no lost files
0251     if not ds_files and not options.force:
0252         msg_str = "No lost files. Use --force to ignore this check"
0253         print_msg(msg_str, log_stream, put_log=log_filename)
0254         return True, msg_str
0255 
0256     # reset file status
0257     s = False
0258     for tmpDS in ds_files:
0259         files = ds_files[tmpDS]
0260         if not is_production_manager:
0261             ts, jediTaskID, lostInputFiles, error_message = taskBuffer.resetFileStatusInJEDI(dn, False, tmpDS, files, options.reproduceParent, options.dryRun)
0262         else:
0263             ts, jediTaskID, lostInputFiles, error_message = taskBuffer.resetFileStatusInJEDI("", True, tmpDS, files, options.reproduceParent, options.dryRun)
0264         if error_message:
0265             msg_str = f"Failed to reset file status in the DB since {error_message}"
0266             print_msg(msg_str, log_stream, is_error=True)
0267         else:
0268             msg_str = f"Reset file status for {tmpDS} in the DB: done with {ts} for jediTaskID={jediTaskID}"
0269             print_msg(msg_str, log_stream)
0270         s |= ts
0271         # recover provenance
0272         if options.reproduceParent:
0273             # reproduce input
0274             for lostDS in lostInputFiles:
0275                 com_args = ["--ds", lostDS, "--noChildRetry", "--resurrectDS"]
0276                 if options.reproduceUptoNthGen > 0:
0277                     com_args += [
0278                         "--reproduceUptoNthGen",
0279                         str(options.reproduceUptoNthGen),
0280                     ]
0281                 if options.dryRun:
0282                     com_args.append("--dryRun")
0283                 com_args += ["--files", ",".join(lostInputFiles[lostDS])]
0284             main(taskBuffer=taskBuffer, log_stream=log_stream, args_list=com_args)
0285 
0286     # go ahead
0287     if options.dryRun:
0288         msg_str = f"Done in the dry-run mode with {s}"
0289         print_msg(msg_str, log_stream, put_log=log_filename)
0290         return True, msg_str
0291     if s or options.force:
0292         if options.resurrectDS:
0293             sd, so = taskBuffer.querySQLS(
0294                 "SELECT datasetName FROM ATLAS_PANDA.JEDI_Datasets WHERE jediTaskID=:id AND type IN (:t1,:t2)",
0295                 {":id": jediTaskID, ":t1": "output", ":t2": "log"},
0296             )
0297             rc = RucioClient()
0298             for (datasetName,) in so:
0299                 for i in range(3):
0300                     try:
0301                         scope, name = rucioAPI.extract_scope(datasetName)
0302                         rc.get_did(scope, name)
0303                         break
0304                     except DataIdentifierNotFound:
0305                         msg_str = f"resurrect {datasetName}"
0306                         print_msg(msg_str, log_stream)
0307                         rc.resurrect([{"scope": scope, "name": name}])
0308                         try:
0309                             rc.set_metadata(scope, name, "lifetime", None)
0310                         except Exception:
0311                             pass
0312         if not options.reproduceParent:
0313             tmp_ret = Client.retry_task(jediTaskID, no_child_retry=options.noChildRetry)
0314         else:
0315             tmp_ret = Client.reload_input(jediTaskID)
0316         msg_str = f"Retried task {jediTaskID}: done with {tmp_ret}"
0317         print_msg(msg_str, log_stream, put_log=log_filename)
0318         return True, msg_str
0319     else:
0320         msg_str = "Failed"
0321         print_msg(msg_str, log_stream, is_error=True, put_log=log_filename)
0322         return False, msg_str