Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:59

0001 import logging
0002 import os
0003 import os.path
0004 import threading
0005 import time
0006 import traceback
0007 
0008 from pilot.api import data
0009 from pilot.info import infosys
0010 from pilot.info.filespec import FileSpec as PilotFileSpec
0011 
0012 from pandaharvester.harvestercore import core_utils
0013 from pandaharvester.harvestercore.plugin_base import PluginBase
0014 from pandaharvester.harvestermover import mover_utils
0015 
0016 # logger
0017 baseLogger = core_utils.setup_logger("pilotmover_mt_preparator")
0018 
0019 
0020 # plugin for preparator based on Pilot2.0 Data API, MultipleThreads
0021 # Pilot 2.0 should be deployed as library
0022 # default self.basePath came from preparator section of configuration file
0023 
0024 
0025 class PilotmoverMTPreparator(PluginBase):
0026     """
0027     Praparator bring files from remote ATLAS/Rucio storage to local facility.
0028     """
0029 
0030     # constructor
0031     def __init__(self, **kwarg):
0032         self.n_threads = 3
0033         PluginBase.__init__(self, **kwarg)
0034         if self.n_threads < 1:
0035             self.n_threads = 1
0036 
0037     # check status
0038     def check_stage_in_status(self, jobspec):
0039         return True, ""
0040 
0041     def stage_in(self, tmpLog, jobspec, files):
0042         try:
0043             tmpLog.debug(f"To stagein files[] {files}")
0044             # get infosys
0045             # infoservice = InfoService()
0046             # infoservice.init(jobspec.computingSite, infosys.confinfo, infosys.extinfo)
0047             infosys.init(jobspec.computingSite, infosys.confinfo, infosys.extinfo)
0048             # always disable remote/direct io
0049             infosys.queuedata.direct_access_lan = False
0050             infosys.queuedata.direct_access_wan = False
0051             # set data client, always use rucio
0052             data_client = data.StageInClient(infosys, acopytools={"default": ["rucio"]}, default_copytools="rucio")
0053 
0054             allChecked = True
0055             ErrMsg = "These files failed to download : "
0056             if len(files) > 0:
0057                 result = data_client.transfer(files, use_vp=False)
0058                 tmpLog.debug(f"pilot.api data.StageInClient.transfer(files) result: {result}")
0059 
0060                 # loop over each file check result all must be true for entire result to be true
0061                 if result:
0062                     for answer in result:
0063                         if answer.status_code != 0:
0064                             allChecked = False
0065                             ErrMsg = ErrMsg + f" {answer.lfn} "
0066                 else:
0067                     tmpLog.info(f"Looks like all files already inplace: {files}")
0068 
0069             # return
0070             tmpLog.debug("stop thread")
0071             if allChecked:
0072                 return True, ""
0073             else:
0074                 return False, ErrMsg
0075         except Exception as ex:
0076             tmpLog.error(ex)
0077             tmpLog.error(traceback.format_exc())
0078             return False, str(ex)
0079 
0080     # trigger preparation
0081     def trigger_preparation(self, jobspec):
0082         # make logger
0083         tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="trigger_preparation")
0084         tmpLog.debug("start")
0085 
0086         try:
0087             # check that jobspec.computingSite is defined
0088             if jobspec.computingSite is None:
0089                 # not found
0090                 tmpLog.error("jobspec.computingSite is not defined")
0091                 return False, "jobspec.computingSite is not defined"
0092             else:
0093                 tmpLog.debug(f"jobspec.computingSite : {jobspec.computingSite}")
0094             # get input files
0095             files = []
0096             inFiles = jobspec.get_input_file_attributes(skip_ready=True)
0097             # set path to each file
0098             for inLFN, inFile in inFiles.items():
0099                 inFile["path"] = mover_utils.construct_file_path(self.basePath, inFile["scope"], inLFN)
0100                 tmpLog.debug(f"To check file: {inFile}")
0101                 if os.path.exists(inFile["path"]):
0102                     checksum = core_utils.calc_adler32(inFile["path"])
0103                     checksum = f"ad:{checksum}"
0104                     tmpLog.debug(f"checksum for file {inFile['path']} is {checksum}")
0105                     if "checksum" in inFile and inFile["checksum"] and inFile["checksum"] == checksum:
0106                         tmpLog.debug(f"File {inLFN} already exists at {inFile['path']}")
0107                         continue
0108                 dstpath = os.path.dirname(inFile["path"])
0109                 # check if path exists if not create it.
0110                 if not os.access(dstpath, os.F_OK):
0111                     os.makedirs(dstpath)
0112                 file_data = {
0113                     "scope": inFile["scope"],
0114                     "dataset": inFile.get("dataset"),
0115                     "lfn": inLFN,
0116                     "ddmendpoint": inFile.get("endpoint"),
0117                     "guid": inFile.get("guid"),
0118                     "workdir": dstpath,
0119                 }
0120                 pilotfilespec = PilotFileSpec(type="input", **file_data)
0121                 files.append(pilotfilespec)
0122 
0123             tmpLog.debug(f"files[] {files}")
0124 
0125             allChecked = True
0126             ErrMsg = "These files failed to download : "
0127             if files:
0128                 threads = []
0129                 n_files_per_thread = int((len(files) + self.n_threads - 1) / self.n_threads)
0130                 tmpLog.debug(f"num files per thread: {n_files_per_thread}")
0131                 for i in range(0, len(files), n_files_per_thread):
0132                     sub_files = files[i : i + n_files_per_thread]
0133                     thread = threading.Thread(target=self.stage_in, kwargs={"tmpLog": tmpLog, "jobspec": jobspec, "files": sub_files})
0134                     threads.append(thread)
0135                 [t.start() for t in threads]
0136                 tmpLog.debug(f"threads: {str(threads)}")
0137                 while len(threads) > 0:
0138                     time.sleep(1)
0139                     threads = [t for t in threads if t and t.is_alive()]
0140 
0141                 tmpLog.info(f"Checking all files: {files}")
0142                 for file in files:
0143                     if file.status_code != 0:
0144                         allChecked = False
0145                         ErrMsg = ErrMsg + f" {file.lfn} "
0146                 for inLFN, inFile in inFiles.items():
0147                     if not os.path.isfile(inFile["path"]):
0148                         allChecked = False
0149                         ErrMsg = ErrMsg + f" {file.lfn} "
0150         except Exception as ex:
0151             tmpLog.error(ex)
0152             tmpLog.error(traceback.format_exc())
0153         # return
0154         tmpLog.debug("stop")
0155         if allChecked:
0156             tmpLog.info("Looks like all files are successfully downloaded.")
0157             return True, ""
0158         else:
0159             return False, ErrMsg
0160 
0161     # resolve input file paths
0162     def resolve_input_paths(self, jobspec):
0163         # get input files
0164         inFiles = jobspec.get_input_file_attributes()
0165         # set path to each file
0166         for inLFN, inFile in inFiles.items():
0167             inFile["path"] = mover_utils.construct_file_path(self.basePath, inFile["scope"], inLFN)
0168         # set
0169         jobspec.set_input_file_paths(inFiles)
0170         return True, ""