File indexing completed on 2026-04-20 07:58:59
0001 import os
0002 import os.path
0003 import threading
0004 import time
0005
0006 from pilot.api import data
0007 from pilot.info import infosys
0008 from pilot.info.filespec import FileSpec as PilotFileSpec
0009
0010 from pandaharvester.harvestercore import core_utils
0011 from pandaharvester.harvestercore.plugin_base import PluginBase
0012 from pandaharvester.harvestermover import mover_utils
0013
0014
0015 baseLogger = core_utils.setup_logger("pilotmover_mt_preparator_kari")
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025 class PilotmoverMTPreparator(PluginBase):
0026 """
0027 Praparator bring files from remote ATLAS/Rucio storage to local facility.
0028 """
0029
0030
0031 def __init__(self, **kwarg):
0032 self.n_threads = 3
0033 PluginBase.__init__(self, **kwarg)
0034 if self.n_threads < 1:
0035 self.n_threads = 1
0036
0037
0038 def check_stage_in_status(self, jobspec):
0039 return True, ""
0040
0041 def stage_in(self, tmpLog, jobspec, files):
0042 tmpLog.debug(f"To stagein files[] {files}")
0043
0044
0045
0046 infosys.init(jobspec.computingSite, infosys.confinfo, infosys.extinfo)
0047
0048 infosys.queuedata.direct_access_lan = False
0049 infosys.queuedata.direct_access_wan = False
0050
0051 data_client = data.StageInClient(infosys, acopytools={"default": ["rucio"]}, default_copytools="rucio")
0052 allChecked = True
0053 ErrMsg = "These files failed to download : "
0054
0055 os.chdir(self.basePath)
0056
0057 if len(files) > 0:
0058 try:
0059 result = data_client.transfer(files)
0060 except Exception as e:
0061 tmpLog.error(f"error when stage_in: {e.__class__.__name__} ; {e}")
0062 raise
0063 else:
0064 tmpLog.debug(f"pilot.api data.StageInClient.transfer(files) result: {result}")
0065
0066
0067 if result:
0068 for answer in result:
0069 if answer.status_code != 0:
0070 allChecked = False
0071 ErrMsg = ErrMsg + f" {answer.lfn} "
0072 else:
0073 tmpLog.info(f"Looks like all files already inplace: {files}")
0074
0075 tmpLog.debug("stop thread")
0076 if allChecked:
0077 return True, ""
0078 else:
0079 return False, ErrMsg
0080
0081
0082 def trigger_preparation(self, jobspec):
0083
0084 tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="trigger_preparation")
0085 tmpLog.debug("start")
0086
0087
0088 if jobspec.computingSite is None:
0089
0090 tmpLog.error("jobspec.computingSite is not defined")
0091 return False, "jobspec.computingSite is not defined"
0092 else:
0093 tmpLog.debug(f"jobspec.computingSite : {jobspec.computingSite}")
0094
0095 files = []
0096 inFiles = jobspec.get_input_file_attributes(skip_ready=True)
0097
0098 for inLFN, inFile in inFiles.items():
0099 inFile["path"] = mover_utils.construct_file_path(self.basePath, inFile["scope"], inLFN)
0100 tmpLog.debug(f"To check file: {inFile}")
0101 if os.path.exists(inFile["path"]):
0102
0103
0104
0105
0106
0107
0108
0109
0110 file_size = os.stat(inFile["path"]).st_size
0111 tmpLog.debug(f"file size for file {inFile['path']} is {file_size}")
0112 if "fsize" in inFile and inFile["fsize"] and inFile["fsize"] == file_size:
0113 tmpLog.debug(f"File {inLFN} already exists at {inFile['path']}")
0114 continue
0115 dstpath = os.path.dirname(inFile["path"])
0116
0117 if not os.access(dstpath, os.F_OK):
0118 os.makedirs(dstpath)
0119 file_data = {
0120 "scope": inFile["scope"],
0121 "dataset": inFile.get("dataset"),
0122 "lfn": inLFN,
0123 "ddmendpoint": inFile.get("endpoint"),
0124 "guid": inFile.get("guid"),
0125 "workdir": dstpath,
0126 }
0127 pilotfilespec = PilotFileSpec(type="input", **file_data)
0128 files.append(pilotfilespec)
0129
0130 tmpLog.debug("path set")
0131
0132 allChecked = True
0133 ErrMsg = "These files failed to download : "
0134 if files:
0135 threads = []
0136 n_files_per_thread = (len(files) + self.n_threads - 1) // self.n_threads
0137 tmpLog.debug(f"num files per thread: {n_files_per_thread}")
0138 for i in range(0, len(files), n_files_per_thread):
0139 sub_files = files[i : i + n_files_per_thread]
0140 thread = threading.Thread(
0141 target=self.stage_in,
0142 kwargs={
0143 "tmpLog": tmpLog,
0144 "jobspec": jobspec,
0145 "files": sub_files,
0146 },
0147 )
0148 threads.append(thread)
0149 [t.start() for t in threads]
0150 while len(threads) > 0:
0151 time.sleep(1)
0152 threads = [t for t in threads if t and t.isAlive()]
0153
0154 tmpLog.info(f"Checking all files: {files}")
0155 for file in files:
0156 if file.status_code != 0:
0157 allChecked = False
0158 ErrMsg = ErrMsg + f" {file.lfn} "
0159 for inLFN, inFile in inFiles.items():
0160 if not os.path.isfile(inFile["path"]):
0161 allChecked = False
0162 ErrMsg = ErrMsg + f" {file.lfn} "
0163
0164 tmpLog.debug("stop")
0165 if allChecked:
0166 tmpLog.info("Looks like all files are successfully downloaded.")
0167 return True, ""
0168 else:
0169
0170 return None, ErrMsg
0171
0172
0173
0174 def resolve_input_paths(self, jobspec):
0175
0176 inFiles = jobspec.get_input_file_attributes()
0177
0178 for inLFN, inFile in inFiles.items():
0179 inFile["path"] = mover_utils.construct_file_path(self.basePath, inFile["scope"], inLFN)
0180
0181 jobspec.set_input_file_paths(inFiles)
0182 return True, ""