Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:59

0001 import os
0002 import os.path
0003 
0004 from pilot.api import data
0005 
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestercore.plugin_base import PluginBase
0008 from pandaharvester.harvestermover import mover_utils
0009 
0010 # logger
0011 baseLogger = core_utils.setup_logger("pilotmover_preparator")
0012 
0013 
0014 # plugin for preparator based on Pilot2.0 Data API
0015 # Pilot 2.0 should be deployed as library
0016 # default self.basePath came from preparator section of configuration file
0017 
0018 
0019 class PilotmoverPreparator(PluginBase):
0020     """
0021     Praparator bring files from remote ATLAS/Rucio storage to local facility.
0022     """
0023 
0024     # constructor
0025     def __init__(self, **kwarg):
0026         PluginBase.__init__(self, **kwarg)
0027 
0028     # check status
0029     def check_stage_in_status(self, jobspec):
0030         return True, ""
0031 
0032     # trigger preparation
0033     def trigger_preparation(self, jobspec):
0034         # make logger
0035         tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="trigger_preparation")
0036         tmpLog.debug(f"Start. Trigger data transfer for job: {jobspec.PandaID}")
0037 
0038         # check that jobspec.computingSite is defined
0039         if jobspec.computingSite is None:
0040             # not found
0041             tmpLog.error("jobspec.computingSite is not defined")
0042             return False, "jobspec.computingSite is not defined"
0043         else:
0044             tmpLog.debug(f"jobspec.computingSite : {jobspec.computingSite}")
0045         # get input files
0046         files = []
0047         inFiles = jobspec.get_input_file_attributes(skip_ready=True)
0048         # set path to each file
0049         tmpLog.info("Prepare files to download (construct path and verifiy existing files)")
0050         for inLFN, inFile in inFiles.items():
0051             inFile["path"] = mover_utils.construct_file_path(self.basePath, inFile["scope"], inLFN)
0052             # check if file exist. Skip alrady downoladed files
0053             if os.path.exists(inFile["path"]):
0054                 checksum = core_utils.calc_adler32(inFile["path"])
0055                 checksum = f"ad:{checksum}"
0056                 # tmpLog.debug('checksum for file %s is %s' % (inFile['path'], checksum))
0057                 if "checksum" in inFile and inFile["checksum"] and inFile["checksum"] == checksum:
0058                     # tmpLog.debug('File %s already exists at %s' % (inLFN, inFile['path']))
0059                     continue
0060             dstpath = os.path.dirname(inFile["path"])
0061             # check if path exists if not create it.
0062             if not os.access(dstpath, os.F_OK):
0063                 os.makedirs(dstpath)
0064             files.append({"scope": inFile["scope"], "name": inLFN, "destination": dstpath})
0065         tmpLog.info(f"Number of files to dowload: {len(files)} for job: {jobspec.PandaID}")
0066         # tmpLog.debug('files {0}'.format(files))
0067         tmpLog.info("Setup of Pilot2 API client")
0068         data_client = data.StageInClient(site=jobspec.computingSite)
0069         allChecked = True
0070         ErrMsg = "These files failed to download : "
0071         if len(files) > 0:
0072             tmpLog.info(f"Going to transfer {len(files)} of files with one call to Pilot2 Data API")
0073             try:
0074                 result = data_client.transfer(files)
0075             except Exception as e:
0076                 tmpLog.error(f"Pilot2 Data API rise error: {e.message}")
0077             tmpLog.debug(f"data_client.transfer(files) result:\n{result}")
0078             tmpLog.info("Transfer call to Pilot2 Data API completed")
0079             # loop over each file check result all must be true for entire result to be true
0080             if result:
0081                 for answer in result:
0082                     if answer["errno"] != 0:
0083                         allChecked = False
0084                         ErrMsg = ErrMsg + f" {answer['name']} "
0085             else:
0086                 tmpLog.info(f"Looks like all files in place. Number of files: {len(files)}")
0087         # return
0088         tmpLog.debug(f"Finished data transfer with {len(files)} files for job {jobspec.PandaID}")
0089         if allChecked:
0090             return True, ""
0091         else:
0092             return False, ErrMsg
0093 
0094     # resolve input file paths
0095     def resolve_input_paths(self, jobspec):
0096         # get input files
0097         inFiles = jobspec.get_input_file_attributes()
0098         # set path to each file
0099         for inLFN, inFile in inFiles.items():
0100             inFile["path"] = mover_utils.construct_file_path(self.basePath, inFile["scope"], inLFN)
0101         # set
0102         jobspec.set_input_file_paths(inFiles)
0103         return True, ""