Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:59

0001 import os
0002 import os.path
0003 import threading
0004 import time
0005 
0006 from pilot.api import data
0007 from pilot.info import infosys
0008 from pilot.info.filespec import FileSpec as PilotFileSpec
0009 
0010 from pandaharvester.harvestercore import core_utils
0011 from pandaharvester.harvestercore.plugin_base import PluginBase
0012 from pandaharvester.harvestermover import mover_utils
0013 
0014 # logger
0015 baseLogger = core_utils.setup_logger("pilotmover_mt_preparator_kari")
0016 
0017 
0018 # plugin for preparator based on Pilot2.0 Data API, MultipleThreads
0019 # Pilot 2.0 should be deployed as library
0020 # default self.basePath came from preparator section of configuration file
0021 
0022 # Modified by FaHui Lin to be compatible with current pilot 2 code
0023 
0024 
0025 class PilotmoverMTPreparator(PluginBase):
0026     """
0027     Praparator bring files from remote ATLAS/Rucio storage to local facility.
0028     """
0029 
0030     # constructor
0031     def __init__(self, **kwarg):
0032         self.n_threads = 3
0033         PluginBase.__init__(self, **kwarg)
0034         if self.n_threads < 1:
0035             self.n_threads = 1
0036 
0037     # check status
0038     def check_stage_in_status(self, jobspec):
0039         return True, ""
0040 
0041     def stage_in(self, tmpLog, jobspec, files):
0042         tmpLog.debug(f"To stagein files[] {files}")
0043         # get infosys
0044         # infoservice = InfoService()
0045         # infoservice.init(jobspec.computingSite, infosys.confinfo, infosys.extinfo)
0046         infosys.init(jobspec.computingSite, infosys.confinfo, infosys.extinfo)
0047         # always disable remote/direct io
0048         infosys.queuedata.direct_access_lan = False
0049         infosys.queuedata.direct_access_wan = False
0050         # set data client, always use rucio
0051         data_client = data.StageInClient(infosys, acopytools={"default": ["rucio"]}, default_copytools="rucio")
0052         allChecked = True
0053         ErrMsg = "These files failed to download : "
0054         # change directory to basPath for input to pass pilot check_availablespace
0055         os.chdir(self.basePath)
0056         # transfer
0057         if len(files) > 0:
0058             try:
0059                 result = data_client.transfer(files)
0060             except Exception as e:
0061                 tmpLog.error(f"error when stage_in: {e.__class__.__name__} ; {e}")
0062                 raise
0063             else:
0064                 tmpLog.debug(f"pilot.api data.StageInClient.transfer(files) result: {result}")
0065 
0066                 # loop over each file check result all must be true for entire result to be true
0067                 if result:
0068                     for answer in result:
0069                         if answer.status_code != 0:
0070                             allChecked = False
0071                             ErrMsg = ErrMsg + f" {answer.lfn} "
0072                 else:
0073                     tmpLog.info(f"Looks like all files already inplace: {files}")
0074         # return
0075         tmpLog.debug("stop thread")
0076         if allChecked:
0077             return True, ""
0078         else:
0079             return False, ErrMsg
0080 
0081     # trigger preparation
0082     def trigger_preparation(self, jobspec):
0083         # make logger
0084         tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="trigger_preparation")
0085         tmpLog.debug("start")
0086 
0087         # check that jobspec.computingSite is defined
0088         if jobspec.computingSite is None:
0089             # not found
0090             tmpLog.error("jobspec.computingSite is not defined")
0091             return False, "jobspec.computingSite is not defined"
0092         else:
0093             tmpLog.debug(f"jobspec.computingSite : {jobspec.computingSite}")
0094         # get input files
0095         files = []
0096         inFiles = jobspec.get_input_file_attributes(skip_ready=True)
0097         # set path to each file
0098         for inLFN, inFile in inFiles.items():
0099             inFile["path"] = mover_utils.construct_file_path(self.basePath, inFile["scope"], inLFN)
0100             tmpLog.debug(f"To check file: {inFile}")
0101             if os.path.exists(inFile["path"]):
0102                 # checksum = core_utils.calc_adler32(inFile['path'])
0103                 # checksum = 'ad:%s' % checksum
0104                 # tmpLog.debug('checksum for file %s is %s' % (inFile['path'], checksum))
0105                 # if 'checksum' in inFile and inFile['checksum'] and inFile['checksum'] == checksum:
0106                 #     tmpLog.debug('File %s already exists at %s' % (inLFN, inFile['path']))
0107                 #     continue
0108 
0109                 # lazy but unsafe check to be faster...
0110                 file_size = os.stat(inFile["path"]).st_size
0111                 tmpLog.debug(f"file size for file {inFile['path']} is {file_size}")
0112                 if "fsize" in inFile and inFile["fsize"] and inFile["fsize"] == file_size:
0113                     tmpLog.debug(f"File {inLFN} already exists at {inFile['path']}")
0114                     continue
0115             dstpath = os.path.dirname(inFile["path"])
0116             # check if path exists if not create it.
0117             if not os.access(dstpath, os.F_OK):
0118                 os.makedirs(dstpath)
0119             file_data = {
0120                 "scope": inFile["scope"],
0121                 "dataset": inFile.get("dataset"),
0122                 "lfn": inLFN,
0123                 "ddmendpoint": inFile.get("endpoint"),
0124                 "guid": inFile.get("guid"),
0125                 "workdir": dstpath,
0126             }
0127             pilotfilespec = PilotFileSpec(type="input", **file_data)
0128             files.append(pilotfilespec)
0129         # tmpLog.debug('files[] {0}'.format(files))
0130         tmpLog.debug("path set")
0131 
0132         allChecked = True
0133         ErrMsg = "These files failed to download : "
0134         if files:
0135             threads = []
0136             n_files_per_thread = (len(files) + self.n_threads - 1) // self.n_threads
0137             tmpLog.debug(f"num files per thread: {n_files_per_thread}")
0138             for i in range(0, len(files), n_files_per_thread):
0139                 sub_files = files[i : i + n_files_per_thread]
0140                 thread = threading.Thread(
0141                     target=self.stage_in,
0142                     kwargs={
0143                         "tmpLog": tmpLog,
0144                         "jobspec": jobspec,
0145                         "files": sub_files,
0146                     },
0147                 )
0148                 threads.append(thread)
0149             [t.start() for t in threads]
0150             while len(threads) > 0:
0151                 time.sleep(1)
0152                 threads = [t for t in threads if t and t.isAlive()]
0153 
0154             tmpLog.info(f"Checking all files: {files}")
0155             for file in files:
0156                 if file.status_code != 0:
0157                     allChecked = False
0158                     ErrMsg = ErrMsg + f" {file.lfn} "
0159             for inLFN, inFile in inFiles.items():
0160                 if not os.path.isfile(inFile["path"]):
0161                     allChecked = False
0162                     ErrMsg = ErrMsg + f" {file.lfn} "
0163         # return
0164         tmpLog.debug("stop")
0165         if allChecked:
0166             tmpLog.info("Looks like all files are successfully downloaded.")
0167             return True, ""
0168         else:
0169             # keep retrying
0170             return None, ErrMsg
0171             # return False, ErrMsg
0172 
0173     # resolve input file paths
0174     def resolve_input_paths(self, jobspec):
0175         # get input files
0176         inFiles = jobspec.get_input_file_attributes()
0177         # set path to each file
0178         for inLFN, inFile in inFiles.items():
0179             inFile["path"] = mover_utils.construct_file_path(self.basePath, inFile["scope"], inLFN)
0180         # set
0181         jobspec.set_input_file_paths(inFiles)
0182         return True, ""