Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:59

0001 import json
0002 import os
0003 import subprocess
0004 import traceback
0005 
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestercore.plugin_base import PluginBase
0008 from pandaharvester.harvestermover import mover_utils
0009 
0010 # logger
0011 baseLogger = core_utils.setup_logger("rucio_preparator")
0012 
0013 
0014 def get_num_files(logs):
0015     total_files = None
0016     total_filtered_files = None
0017     downloaded_files = 0
0018     already_downloaded_files = None
0019     cannot_download_files = None
0020     for line in logs.split("\n"):
0021         if "Total files (DID):" in line:
0022             total_files = int(line.replace("Total files (DID):", "").strip())
0023         if "Total files (filtered):" in line:
0024             total_filtered_files = int(line.replace("Total files (filtered):", "").strip())
0025         if "Downloaded files:" in line:
0026             downloaded_files = int(line.replace("Downloaded files:", "").strip())
0027         if "Files already found locally:" in line:
0028             already_downloaded_files = int(line.replace("Files already found locally:", "").strip())
0029         if "Files that cannot be downloaded:" in line:
0030             cannot_download_files = int(line.replace("Files that cannot be downloaded:", "").strip())
0031     if total_filtered_files:
0032         total_files = total_filtered_files
0033     if already_downloaded_files:
0034         downloaded_files += already_downloaded_files
0035     return total_files, downloaded_files, cannot_download_files
0036 
0037 
0038 class RucioPreparator(PluginBase):
0039     """
0040     Praparator bring files from remote ATLAS/Rucio storage to local facility.
0041     """
0042 
0043     # constructor
0044     def __init__(self, **kwarg):
0045         PluginBase.__init__(self, **kwarg)
0046         if not hasattr(self, "rucioEnv"):
0047             self.rucioEnv = None
0048         if not hasattr(self, "timeout"):
0049             self.timeout = 30 * 60
0050 
0051         # Default x509 proxy for a queue
0052         try:
0053             self.x509UserProxy
0054         except AttributeError:
0055             self.x509UserProxy = os.getenv("X509_USER_PROXY")
0056 
0057         try:
0058             self.cacheDir
0059         except AttributeError:
0060             self.cacheDir = "/tmp"
0061 
0062         try:
0063             self.defaultDest
0064         except AttributeError:
0065             self.defaultDest = None
0066 
0067     # check status
0068     def check_stage_in_status(self, jobspec):
0069         return True, ""
0070 
0071     # trigger preparation
0072     def trigger_preparation(self, jobspec):
0073         # make logger
0074         tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="trigger_preparation")
0075         tmpLog.debug(f"Start. Trigger data transfer for job: {jobspec.PandaID}")
0076 
0077         try:
0078             params = json.loads(jobspec.jobParams["jobPars"])
0079             if "input_datasets" not in params or "input_location" not in params:
0080                 errMsg = "input_datasets or input_location not in job parameters"
0081                 tmpLog.error(errMsg)
0082                 return True, errMsg
0083 
0084             datasets = params["input_datasets"]  # a comma-separated string
0085             datasets = datasets.split(",")
0086             base_dir = params["input_location"]  # dir name in EOS
0087 
0088             if not base_dir:
0089                 tmpLog.debug(f"input_location is not defined. will use harvester defaultDest: {self.defaultDest}")
0090                 base_dir = self.defaultDest
0091                 # tmpLog.error("input_location is not defined.")
0092 
0093             total_datasets = len(datasets)
0094             downloaded_datasets = 0
0095             final_exit_code = 0
0096 
0097             for dataset in datasets:
0098                 upload_src_dir = os.path.join(self.cacheDir, dataset)
0099                 if self.rucioEnv:
0100                     command = "%s; export X509_USER_PROXY=%s; rucio download --dir %s %s; gfal-copy -f -r -v %s %s" % (
0101                         self.rucioEnv,
0102                         self.x509UserProxy,
0103                         self.cacheDir,
0104                         dataset,
0105                         upload_src_dir,
0106                         base_dir,
0107                     )
0108                 else:
0109                     # command = "rucio download --dir %s %s" % (base_dir, dataset)
0110                     command = "export X509_USER_PROXY=%s; rucio download --dir %s %s; gfal-copy -f -r -v %s %s" % (
0111                         self.x509UserProxy,
0112                         self.cacheDir,
0113                         dataset,
0114                         upload_src_dir,
0115                         base_dir,
0116                     )
0117                 tmpLog.debug("execute: " + command)
0118                 exit_code = 0
0119                 try:
0120                     p = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8", errors="replace")
0121                     stdout, stderr = p.communicate(timeout=self.timeout)
0122                     exit_code = p.poll()
0123                 except subprocess.TimeoutExpired:
0124                     p.kill()
0125                     stdout, stderr = p.communicate()
0126                     exit_code = -1
0127                     tmpLog.warning("command timeout")
0128 
0129                 tmpLog.debug(f"stdout: {stdout}")
0130                 tmpLog.debug(f"stderr: {stderr}")
0131 
0132                 if exit_code != 0:
0133                     final_exit_code = exit_code
0134 
0135                 total_files, downloaded_files, cannot_download_files = get_num_files(stdout)
0136                 if total_files is None or downloaded_files is None:
0137                     errMsg = f"Failed to download dataset {dataset}: cannot parse total files or downloaded files: stdout: {stdout}, stderr: {stderr}"
0138                     tmpLog.error(errMsg)
0139                 elif total_files > downloaded_files or cannot_download_files:
0140                     errMsg = f"Not all files are downloaded for dataset {dataset}: stdout: {stdout}, stderr: {stderr}"
0141                     tmpLog.error(errMsg)
0142                 else:
0143                     tmpLog.info(f"All files are downloaded for dataset {dataset}: stdout: {stdout}, stderr: {stderr}")
0144                     downloaded_datasets += 1
0145             if final_exit_code == 0 and total_datasets == downloaded_datasets:
0146                 tmpLog.info("All datasets have been downloaded")
0147                 return True, ""
0148             else:
0149                 errMsg = "Not all datasets have been downloaded"
0150                 tmpLog.error(errMsg)
0151                 return False, errMsg
0152         except Exception as ex:
0153             tmpLog.error(ex)
0154             tmpLog.error(traceback.format_exc())
0155             return False, str(ex)
0156 
0157     # resolve input file paths
0158     def resolve_input_paths(self, jobspec):
0159         # get input base location
0160         params = json.loads(jobspec.jobParams["jobPars"])
0161         base_dir = params["input_location"]  # dir name in EOS
0162 
0163         # get input files
0164         inFiles = jobspec.get_input_file_attributes()
0165         # set path to each file
0166         for inLFN, inFile in inFiles.items():
0167             inFile["path"] = mover_utils.construct_file_path(base_dir, inFile["scope"], inLFN)
0168         # set
0169         jobspec.set_input_file_paths(inFiles)
0170         return True, ""