File indexing completed on 2026-04-10 08:39:02
0001 import argparse
0002 import os
0003 import sys
0004
0005 from pandacommon.pandalogger.LogWrapper import LogWrapper
0006 from pandacommon.pandautils.thread_utils import GenericThread
0007 from rucio.client import Client as RucioClient
0008 from rucio.common.exception import DataIdentifierNotFound
0009
0010 from pandaserver.config import panda_config
0011 from pandaserver.dataservice.ddm import rucioAPI
0012 from pandaserver.userinterface import Client
0013
0014
0015
0016 def get_files_from_rucio(ds_name):
0017
0018 try:
0019 rc = RucioClient()
0020 scope, name = rucioAPI.extract_scope(ds_name)
0021 if name.endswith("/"):
0022 name = name[:-1]
0023 files_rucio = set()
0024 for i in rc.list_files(scope, name):
0025 files_rucio.add(i["name"])
0026 return True, files_rucio
0027 except DataIdentifierNotFound:
0028 return False, f"unknown dataset {ds_name}"
0029 except Exception as e:
0030 msg_str = f"failed to get files from rucio: {str(e)}"
0031 return None, msg_str
0032
0033
0034
0035 def print_msg(message: str, log_stream: LogWrapper | None, is_error: bool = False, put_log: str | None = None):
0036 """
0037 Print a message to log stream or stdout.
0038
0039 :param message: Message string.
0040 :param log_stream: LogWrapper object or None.
0041 :param is_error: If True, log as error.
0042 :param put_log: Log filename to dump log to this file in cache_dir.
0043 """
0044 if log_stream:
0045 if is_error:
0046 log_stream.error("Error: " + message)
0047 else:
0048 log_stream.info(message)
0049 if put_log:
0050 with open(os.path.join(panda_config.cache_dir, put_log), "w") as f:
0051 f.write(log_stream.dumpToString())
0052 f.write("done\n")
0053 else:
0054 if is_error:
0055 print("ERROR: ", message)
0056 else:
0057 print(message)
0058
0059
0060
0061 def main(taskBuffer=None, exec_options=None, log_stream=None, args_list=None):
0062
0063 parser = argparse.ArgumentParser()
0064 parser.add_argument(
0065 "--ds",
0066 action="store",
0067 dest="ds",
0068 default=None,
0069 help="dataset name",
0070 )
0071 parser.add_argument(
0072 "--files",
0073 action="store",
0074 dest="files",
0075 default=None,
0076 help="comma-separated list of lost file names. The list is deduced if this option is omitted",
0077 )
0078 parser.add_argument(
0079 "--jediTaskID",
0080 action="store",
0081 dest="jediTaskID",
0082 type=int,
0083 default=None,
0084 help="JEDI task ID that produced the dataset",
0085 )
0086 parser.add_argument(
0087 "--noChildRetry",
0088 action="store_const",
0089 const=True,
0090 dest="noChildRetry",
0091 default=False,
0092 help="not retry child tasks",
0093 )
0094 parser.add_argument(
0095 "--resurrectDS",
0096 action="store_const",
0097 const=True,
0098 dest="resurrectDS",
0099 default=False,
0100 help="resurrect output and log datasets if they were already deleted",
0101 )
0102 parser.add_argument(
0103 "--dryRun",
0104 action="store_const",
0105 const=True,
0106 dest="dryRun",
0107 default=False,
0108 help="dry run",
0109 )
0110 parser.add_argument(
0111 "--force",
0112 action="store_const",
0113 const=True,
0114 dest="force",
0115 default=False,
0116 help="force retry even if no lost files",
0117 )
0118 parser.add_argument(
0119 "--reproduceParent",
0120 action="store_const",
0121 const=True,
0122 dest="reproduceParent",
0123 default=False,
0124 help="reproduce the input files from which the lost files were produced. "
0125 "Typically useful to recover merged files when unmerged files were already deleted",
0126 )
0127 parser.add_argument(
0128 "--reproduceUptoNthGen",
0129 action="store",
0130 dest="reproduceUptoNthGen",
0131 type=int,
0132 default=0,
0133 help="reproduce files up to N-th upper generation. --reproduceUptoNthGen=1 corresponds "
0134 "to --reproduceParent. 0 by default so that any provenance files are not reproduced",
0135 )
0136
0137 if taskBuffer:
0138 if args_list:
0139 options = parser.parse_args(args_list)
0140 else:
0141 options, unknown = parser.parse_known_args()
0142 else:
0143 if args_list:
0144 options = parser.parse_args(args_list)
0145 else:
0146 options = parser.parse_args()
0147
0148 requester_id = GenericThread().get_full_id(__name__, sys.modules[__name__].__file__)
0149
0150 if taskBuffer is None:
0151
0152 from pandaserver.taskbuffer.TaskBuffer import taskBuffer
0153
0154 taskBuffer.init(
0155 panda_config.dbhost,
0156 panda_config.dbpasswd,
0157 nDBConnection=1,
0158 requester=requester_id,
0159 )
0160
0161
0162 if exec_options is None:
0163 exec_options = {}
0164 keys = set(vars(options).keys())
0165 for k in exec_options:
0166 if k in keys:
0167 setattr(options, k, exec_options[k])
0168
0169 dn = exec_options.get("userName")
0170 is_production_manager = exec_options.get("isProductionManager", False)
0171 log_filename = exec_options.get("logFilename")
0172 print_msg(f"User='{dn}' is_prod={is_production_manager} log_name={log_filename}", log_stream)
0173
0174 if options.reproduceUptoNthGen > 0:
0175 options.reproduceUptoNthGen -= 1
0176 options.reproduceParent = True
0177
0178
0179 if options.jediTaskID is None and options.ds is None:
0180 msg_str = "Dataset name or jediTaskID is required"
0181 print_msg(msg_str, log_stream, is_error=True, put_log=log_filename)
0182 return False, msg_str
0183
0184 ds_files = {}
0185 if options.files is not None:
0186 files = options.files.split(",")
0187 ds_files[options.ds] = files
0188 else:
0189
0190 if not options.jediTaskID:
0191
0192 st, files_rucio = get_files_from_rucio(options.ds)
0193 if st is not True:
0194 print_msg(files_rucio, log_stream, is_error=True, put_log=log_filename)
0195 return st, files_rucio
0196
0197 dsName = options.ds.split(":")[-1]
0198 fd, fo = taskBuffer.querySQLS(
0199 "SELECT c.lfn FROM ATLAS_PANDA.JEDI_Datasets d,ATLAS_PANDA.JEDI_Dataset_Contents c "
0200 "WHERE c.jediTaskID=d.jediTaskID AND c.datasetID=d.datasetID AND "
0201 "d.type=:t1 AND c.status=:s AND d.datasetName=:name ",
0202 {":s": "finished", ":t1": "output", ":name": dsName},
0203 )
0204 for (tmpLFN,) in fo:
0205 if tmpLFN not in files_rucio:
0206 ds_files.setdefault(options.ds, [])
0207 ds_files[options.ds].append(tmpLFN)
0208
0209 td, to = taskBuffer.querySQLS(
0210 "SELECT jediTaskID FROM ATLAS_PANDA.JEDI_Datasets " "WHERE datasetName=:datasetName AND type=:t1 ",
0211 {":t1": "output", ":datasetName": dsName},
0212 )
0213 (jediTaskID,) = to[0]
0214 else:
0215
0216 dd, do = taskBuffer.querySQLS(
0217 "SELECT datasetName FROM ATLAS_PANDA.JEDI_Datasets " "WHERE jediTaskID=:jediTaskID AND type=:t1 ",
0218 {":t1": "output", ":jediTaskID": options.jediTaskID},
0219 )
0220
0221 files_rucio = set()
0222 for (tmpDS,) in do:
0223 st, tmp_files_rucio = get_files_from_rucio(tmpDS)
0224 if st is None:
0225 print_msg(tmp_files_rucio, log_stream, is_error=True, put_log=log_filename)
0226 return st, tmp_files_rucio
0227
0228 if st:
0229 files_rucio = files_rucio.union(tmp_files_rucio)
0230
0231 fd, fo = taskBuffer.querySQLS(
0232 "SELECT d.datasetName,c.lfn FROM ATLAS_PANDA.JEDI_Datasets d,ATLAS_PANDA.JEDI_Dataset_Contents c "
0233 "WHERE d.jediTaskID=:jediTaskID AND c.jediTaskID=d.jediTaskID AND c.datasetID=d.datasetID AND "
0234 "d.type=:t1 AND c.status=:s ",
0235 {
0236 ":s": "finished",
0237 ":t1": "output",
0238 ":jediTaskID": options.jediTaskID,
0239 },
0240 )
0241 for tmpDS, tmpLFN in fo:
0242 if tmpLFN not in files_rucio:
0243 ds_files.setdefault(tmpDS, [])
0244 ds_files[tmpDS].append(tmpLFN)
0245 for tmpDS in ds_files:
0246 files = ds_files[tmpDS]
0247 msg_str = f"{tmpDS} has {len(files)} lost files -> {','.join(files)}"
0248 print_msg(msg_str, log_stream)
0249
0250
0251 if not ds_files and not options.force:
0252 msg_str = "No lost files. Use --force to ignore this check"
0253 print_msg(msg_str, log_stream, put_log=log_filename)
0254 return True, msg_str
0255
0256
0257 s = False
0258 for tmpDS in ds_files:
0259 files = ds_files[tmpDS]
0260 if not is_production_manager:
0261 ts, jediTaskID, lostInputFiles, error_message = taskBuffer.resetFileStatusInJEDI(dn, False, tmpDS, files, options.reproduceParent, options.dryRun)
0262 else:
0263 ts, jediTaskID, lostInputFiles, error_message = taskBuffer.resetFileStatusInJEDI("", True, tmpDS, files, options.reproduceParent, options.dryRun)
0264 if error_message:
0265 msg_str = f"Failed to reset file status in the DB since {error_message}"
0266 print_msg(msg_str, log_stream, is_error=True)
0267 else:
0268 msg_str = f"Reset file status for {tmpDS} in the DB: done with {ts} for jediTaskID={jediTaskID}"
0269 print_msg(msg_str, log_stream)
0270 s |= ts
0271
0272 if options.reproduceParent:
0273
0274 for lostDS in lostInputFiles:
0275 com_args = ["--ds", lostDS, "--noChildRetry", "--resurrectDS"]
0276 if options.reproduceUptoNthGen > 0:
0277 com_args += [
0278 "--reproduceUptoNthGen",
0279 str(options.reproduceUptoNthGen),
0280 ]
0281 if options.dryRun:
0282 com_args.append("--dryRun")
0283 com_args += ["--files", ",".join(lostInputFiles[lostDS])]
0284 main(taskBuffer=taskBuffer, log_stream=log_stream, args_list=com_args)
0285
0286
0287 if options.dryRun:
0288 msg_str = f"Done in the dry-run mode with {s}"
0289 print_msg(msg_str, log_stream, put_log=log_filename)
0290 return True, msg_str
0291 if s or options.force:
0292 if options.resurrectDS:
0293 sd, so = taskBuffer.querySQLS(
0294 "SELECT datasetName FROM ATLAS_PANDA.JEDI_Datasets WHERE jediTaskID=:id AND type IN (:t1,:t2)",
0295 {":id": jediTaskID, ":t1": "output", ":t2": "log"},
0296 )
0297 rc = RucioClient()
0298 for (datasetName,) in so:
0299 for i in range(3):
0300 try:
0301 scope, name = rucioAPI.extract_scope(datasetName)
0302 rc.get_did(scope, name)
0303 break
0304 except DataIdentifierNotFound:
0305 msg_str = f"resurrect {datasetName}"
0306 print_msg(msg_str, log_stream)
0307 rc.resurrect([{"scope": scope, "name": name}])
0308 try:
0309 rc.set_metadata(scope, name, "lifetime", None)
0310 except Exception:
0311 pass
0312 if not options.reproduceParent:
0313 tmp_ret = Client.retry_task(jediTaskID, no_child_retry=options.noChildRetry)
0314 else:
0315 tmp_ret = Client.reload_input(jediTaskID)
0316 msg_str = f"Retried task {jediTaskID}: done with {tmp_ret}"
0317 print_msg(msg_str, log_stream, put_log=log_filename)
0318 return True, msg_str
0319 else:
0320 msg_str = "Failed"
0321 print_msg(msg_str, log_stream, is_error=True, put_log=log_filename)
0322 return False, msg_str