File indexing completed on 2026-04-20 07:58:59
0001 import json
0002 import os
0003 import subprocess
0004 import traceback
0005
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestercore.plugin_base import PluginBase
0008 from pandaharvester.harvestermover import mover_utils
0009
0010
0011 baseLogger = core_utils.setup_logger("rucio_preparator")
0012
0013
0014 def get_num_files(logs):
0015 total_files = None
0016 total_filtered_files = None
0017 downloaded_files = 0
0018 already_downloaded_files = None
0019 cannot_download_files = None
0020 for line in logs.split("\n"):
0021 if "Total files (DID):" in line:
0022 total_files = int(line.replace("Total files (DID):", "").strip())
0023 if "Total files (filtered):" in line:
0024 total_filtered_files = int(line.replace("Total files (filtered):", "").strip())
0025 if "Downloaded files:" in line:
0026 downloaded_files = int(line.replace("Downloaded files:", "").strip())
0027 if "Files already found locally:" in line:
0028 already_downloaded_files = int(line.replace("Files already found locally:", "").strip())
0029 if "Files that cannot be downloaded:" in line:
0030 cannot_download_files = int(line.replace("Files that cannot be downloaded:", "").strip())
0031 if total_filtered_files:
0032 total_files = total_filtered_files
0033 if already_downloaded_files:
0034 downloaded_files += already_downloaded_files
0035 return total_files, downloaded_files, cannot_download_files
0036
0037
0038 class RucioPreparator(PluginBase):
0039 """
0040 Praparator bring files from remote ATLAS/Rucio storage to local facility.
0041 """
0042
0043
0044 def __init__(self, **kwarg):
0045 PluginBase.__init__(self, **kwarg)
0046 if not hasattr(self, "rucioEnv"):
0047 self.rucioEnv = None
0048 if not hasattr(self, "timeout"):
0049 self.timeout = 30 * 60
0050
0051
0052 try:
0053 self.x509UserProxy
0054 except AttributeError:
0055 self.x509UserProxy = os.getenv("X509_USER_PROXY")
0056
0057 try:
0058 self.cacheDir
0059 except AttributeError:
0060 self.cacheDir = "/tmp"
0061
0062 try:
0063 self.defaultDest
0064 except AttributeError:
0065 self.defaultDest = None
0066
0067
0068 def check_stage_in_status(self, jobspec):
0069 return True, ""
0070
0071
0072 def trigger_preparation(self, jobspec):
0073
0074 tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="trigger_preparation")
0075 tmpLog.debug(f"Start. Trigger data transfer for job: {jobspec.PandaID}")
0076
0077 try:
0078 params = json.loads(jobspec.jobParams["jobPars"])
0079 if "input_datasets" not in params or "input_location" not in params:
0080 errMsg = "input_datasets or input_location not in job parameters"
0081 tmpLog.error(errMsg)
0082 return True, errMsg
0083
0084 datasets = params["input_datasets"]
0085 datasets = datasets.split(",")
0086 base_dir = params["input_location"]
0087
0088 if not base_dir:
0089 tmpLog.debug(f"input_location is not defined. will use harvester defaultDest: {self.defaultDest}")
0090 base_dir = self.defaultDest
0091
0092
0093 total_datasets = len(datasets)
0094 downloaded_datasets = 0
0095 final_exit_code = 0
0096
0097 for dataset in datasets:
0098 upload_src_dir = os.path.join(self.cacheDir, dataset)
0099 if self.rucioEnv:
0100 command = "%s; export X509_USER_PROXY=%s; rucio download --dir %s %s; gfal-copy -f -r -v %s %s" % (
0101 self.rucioEnv,
0102 self.x509UserProxy,
0103 self.cacheDir,
0104 dataset,
0105 upload_src_dir,
0106 base_dir,
0107 )
0108 else:
0109
0110 command = "export X509_USER_PROXY=%s; rucio download --dir %s %s; gfal-copy -f -r -v %s %s" % (
0111 self.x509UserProxy,
0112 self.cacheDir,
0113 dataset,
0114 upload_src_dir,
0115 base_dir,
0116 )
0117 tmpLog.debug("execute: " + command)
0118 exit_code = 0
0119 try:
0120 p = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8", errors="replace")
0121 stdout, stderr = p.communicate(timeout=self.timeout)
0122 exit_code = p.poll()
0123 except subprocess.TimeoutExpired:
0124 p.kill()
0125 stdout, stderr = p.communicate()
0126 exit_code = -1
0127 tmpLog.warning("command timeout")
0128
0129 tmpLog.debug(f"stdout: {stdout}")
0130 tmpLog.debug(f"stderr: {stderr}")
0131
0132 if exit_code != 0:
0133 final_exit_code = exit_code
0134
0135 total_files, downloaded_files, cannot_download_files = get_num_files(stdout)
0136 if total_files is None or downloaded_files is None:
0137 errMsg = f"Failed to download dataset {dataset}: cannot parse total files or downloaded files: stdout: {stdout}, stderr: {stderr}"
0138 tmpLog.error(errMsg)
0139 elif total_files > downloaded_files or cannot_download_files:
0140 errMsg = f"Not all files are downloaded for dataset {dataset}: stdout: {stdout}, stderr: {stderr}"
0141 tmpLog.error(errMsg)
0142 else:
0143 tmpLog.info(f"All files are downloaded for dataset {dataset}: stdout: {stdout}, stderr: {stderr}")
0144 downloaded_datasets += 1
0145 if final_exit_code == 0 and total_datasets == downloaded_datasets:
0146 tmpLog.info("All datasets have been downloaded")
0147 return True, ""
0148 else:
0149 errMsg = "Not all datasets have been downloaded"
0150 tmpLog.error(errMsg)
0151 return False, errMsg
0152 except Exception as ex:
0153 tmpLog.error(ex)
0154 tmpLog.error(traceback.format_exc())
0155 return False, str(ex)
0156
0157
0158 def resolve_input_paths(self, jobspec):
0159
0160 params = json.loads(jobspec.jobParams["jobPars"])
0161 base_dir = params["input_location"]
0162
0163
0164 inFiles = jobspec.get_input_file_attributes()
0165
0166 for inLFN, inFile in inFiles.items():
0167 inFile["path"] = mover_utils.construct_file_path(base_dir, inFile["scope"], inLFN)
0168
0169 jobspec.set_input_file_paths(inFiles)
0170 return True, ""