File indexing completed on 2026-04-20 07:58:59
0001 import os
0002 import os.path
0003
0004 from pilot.api import data
0005
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestercore.plugin_base import PluginBase
0008 from pandaharvester.harvestermover import mover_utils
0009
0010
0011 baseLogger = core_utils.setup_logger("pilotmover_preparator")
0012
0013
0014
0015
0016
0017
0018
0019 class PilotmoverPreparator(PluginBase):
0020 """
0021 Praparator bring files from remote ATLAS/Rucio storage to local facility.
0022 """
0023
0024
0025 def __init__(self, **kwarg):
0026 PluginBase.__init__(self, **kwarg)
0027
0028
0029 def check_stage_in_status(self, jobspec):
0030 return True, ""
0031
0032
0033 def trigger_preparation(self, jobspec):
0034
0035 tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="trigger_preparation")
0036 tmpLog.debug(f"Start. Trigger data transfer for job: {jobspec.PandaID}")
0037
0038
0039 if jobspec.computingSite is None:
0040
0041 tmpLog.error("jobspec.computingSite is not defined")
0042 return False, "jobspec.computingSite is not defined"
0043 else:
0044 tmpLog.debug(f"jobspec.computingSite : {jobspec.computingSite}")
0045
0046 files = []
0047 inFiles = jobspec.get_input_file_attributes(skip_ready=True)
0048
0049 tmpLog.info("Prepare files to download (construct path and verifiy existing files)")
0050 for inLFN, inFile in inFiles.items():
0051 inFile["path"] = mover_utils.construct_file_path(self.basePath, inFile["scope"], inLFN)
0052
0053 if os.path.exists(inFile["path"]):
0054 checksum = core_utils.calc_adler32(inFile["path"])
0055 checksum = f"ad:{checksum}"
0056
0057 if "checksum" in inFile and inFile["checksum"] and inFile["checksum"] == checksum:
0058
0059 continue
0060 dstpath = os.path.dirname(inFile["path"])
0061
0062 if not os.access(dstpath, os.F_OK):
0063 os.makedirs(dstpath)
0064 files.append({"scope": inFile["scope"], "name": inLFN, "destination": dstpath})
0065 tmpLog.info(f"Number of files to dowload: {len(files)} for job: {jobspec.PandaID}")
0066
0067 tmpLog.info("Setup of Pilot2 API client")
0068 data_client = data.StageInClient(site=jobspec.computingSite)
0069 allChecked = True
0070 ErrMsg = "These files failed to download : "
0071 if len(files) > 0:
0072 tmpLog.info(f"Going to transfer {len(files)} of files with one call to Pilot2 Data API")
0073 try:
0074 result = data_client.transfer(files)
0075 except Exception as e:
0076 tmpLog.error(f"Pilot2 Data API rise error: {e.message}")
0077 tmpLog.debug(f"data_client.transfer(files) result:\n{result}")
0078 tmpLog.info("Transfer call to Pilot2 Data API completed")
0079
0080 if result:
0081 for answer in result:
0082 if answer["errno"] != 0:
0083 allChecked = False
0084 ErrMsg = ErrMsg + f" {answer['name']} "
0085 else:
0086 tmpLog.info(f"Looks like all files in place. Number of files: {len(files)}")
0087
0088 tmpLog.debug(f"Finished data transfer with {len(files)} files for job {jobspec.PandaID}")
0089 if allChecked:
0090 return True, ""
0091 else:
0092 return False, ErrMsg
0093
0094
0095 def resolve_input_paths(self, jobspec):
0096
0097 inFiles = jobspec.get_input_file_attributes()
0098
0099 for inLFN, inFile in inFiles.items():
0100 inFile["path"] = mover_utils.construct_file_path(self.basePath, inFile["scope"], inLFN)
0101
0102 jobspec.set_input_file_paths(inFiles)
0103 return True, ""