File indexing completed on 2026-04-20 07:58:59
0001 import logging
0002 import os
0003 import os.path
0004 import threading
0005 import time
0006 import traceback
0007
0008 from pilot.api import data
0009 from pilot.info import infosys
0010 from pilot.info.filespec import FileSpec as PilotFileSpec
0011
0012 from pandaharvester.harvestercore import core_utils
0013 from pandaharvester.harvestercore.plugin_base import PluginBase
0014 from pandaharvester.harvestermover import mover_utils
0015
0016
0017 baseLogger = core_utils.setup_logger("pilotmover_mt_preparator")
0018
0019
0020
0021
0022
0023
0024
0025 class PilotmoverMTPreparator(PluginBase):
0026 """
0027 Praparator bring files from remote ATLAS/Rucio storage to local facility.
0028 """
0029
0030
0031 def __init__(self, **kwarg):
0032 self.n_threads = 3
0033 PluginBase.__init__(self, **kwarg)
0034 if self.n_threads < 1:
0035 self.n_threads = 1
0036
0037
0038 def check_stage_in_status(self, jobspec):
0039 return True, ""
0040
0041 def stage_in(self, tmpLog, jobspec, files):
0042 try:
0043 tmpLog.debug(f"To stagein files[] {files}")
0044
0045
0046
0047 infosys.init(jobspec.computingSite, infosys.confinfo, infosys.extinfo)
0048
0049 infosys.queuedata.direct_access_lan = False
0050 infosys.queuedata.direct_access_wan = False
0051
0052 data_client = data.StageInClient(infosys, acopytools={"default": ["rucio"]}, default_copytools="rucio")
0053
0054 allChecked = True
0055 ErrMsg = "These files failed to download : "
0056 if len(files) > 0:
0057 result = data_client.transfer(files, use_vp=False)
0058 tmpLog.debug(f"pilot.api data.StageInClient.transfer(files) result: {result}")
0059
0060
0061 if result:
0062 for answer in result:
0063 if answer.status_code != 0:
0064 allChecked = False
0065 ErrMsg = ErrMsg + f" {answer.lfn} "
0066 else:
0067 tmpLog.info(f"Looks like all files already inplace: {files}")
0068
0069
0070 tmpLog.debug("stop thread")
0071 if allChecked:
0072 return True, ""
0073 else:
0074 return False, ErrMsg
0075 except Exception as ex:
0076 tmpLog.error(ex)
0077 tmpLog.error(traceback.format_exc())
0078 return False, str(ex)
0079
0080
0081 def trigger_preparation(self, jobspec):
0082
0083 tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="trigger_preparation")
0084 tmpLog.debug("start")
0085
0086 try:
0087
0088 if jobspec.computingSite is None:
0089
0090 tmpLog.error("jobspec.computingSite is not defined")
0091 return False, "jobspec.computingSite is not defined"
0092 else:
0093 tmpLog.debug(f"jobspec.computingSite : {jobspec.computingSite}")
0094
0095 files = []
0096 inFiles = jobspec.get_input_file_attributes(skip_ready=True)
0097
0098 for inLFN, inFile in inFiles.items():
0099 inFile["path"] = mover_utils.construct_file_path(self.basePath, inFile["scope"], inLFN)
0100 tmpLog.debug(f"To check file: {inFile}")
0101 if os.path.exists(inFile["path"]):
0102 checksum = core_utils.calc_adler32(inFile["path"])
0103 checksum = f"ad:{checksum}"
0104 tmpLog.debug(f"checksum for file {inFile['path']} is {checksum}")
0105 if "checksum" in inFile and inFile["checksum"] and inFile["checksum"] == checksum:
0106 tmpLog.debug(f"File {inLFN} already exists at {inFile['path']}")
0107 continue
0108 dstpath = os.path.dirname(inFile["path"])
0109
0110 if not os.access(dstpath, os.F_OK):
0111 os.makedirs(dstpath)
0112 file_data = {
0113 "scope": inFile["scope"],
0114 "dataset": inFile.get("dataset"),
0115 "lfn": inLFN,
0116 "ddmendpoint": inFile.get("endpoint"),
0117 "guid": inFile.get("guid"),
0118 "workdir": dstpath,
0119 }
0120 pilotfilespec = PilotFileSpec(type="input", **file_data)
0121 files.append(pilotfilespec)
0122
0123 tmpLog.debug(f"files[] {files}")
0124
0125 allChecked = True
0126 ErrMsg = "These files failed to download : "
0127 if files:
0128 threads = []
0129 n_files_per_thread = int((len(files) + self.n_threads - 1) / self.n_threads)
0130 tmpLog.debug(f"num files per thread: {n_files_per_thread}")
0131 for i in range(0, len(files), n_files_per_thread):
0132 sub_files = files[i : i + n_files_per_thread]
0133 thread = threading.Thread(target=self.stage_in, kwargs={"tmpLog": tmpLog, "jobspec": jobspec, "files": sub_files})
0134 threads.append(thread)
0135 [t.start() for t in threads]
0136 tmpLog.debug(f"threads: {str(threads)}")
0137 while len(threads) > 0:
0138 time.sleep(1)
0139 threads = [t for t in threads if t and t.is_alive()]
0140
0141 tmpLog.info(f"Checking all files: {files}")
0142 for file in files:
0143 if file.status_code != 0:
0144 allChecked = False
0145 ErrMsg = ErrMsg + f" {file.lfn} "
0146 for inLFN, inFile in inFiles.items():
0147 if not os.path.isfile(inFile["path"]):
0148 allChecked = False
0149 ErrMsg = ErrMsg + f" {file.lfn} "
0150 except Exception as ex:
0151 tmpLog.error(ex)
0152 tmpLog.error(traceback.format_exc())
0153
0154 tmpLog.debug("stop")
0155 if allChecked:
0156 tmpLog.info("Looks like all files are successfully downloaded.")
0157 return True, ""
0158 else:
0159 return False, ErrMsg
0160
0161
0162 def resolve_input_paths(self, jobspec):
0163
0164 inFiles = jobspec.get_input_file_attributes()
0165
0166 for inLFN, inFile in inFiles.items():
0167 inFile["path"] = mover_utils.construct_file_path(self.basePath, inFile["scope"], inLFN)
0168
0169 jobspec.set_input_file_paths(inFiles)
0170 return True, ""