Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:59

0001 import os
0002 import shutil
0003 import subprocess
0004 
0005 import requests
0006 import requests.exceptions
0007 
0008 from pandaharvester.harvestercore import core_utils
0009 from pandaharvester.harvestercore.plugin_base import PluginBase
0010 from pandaharvester.harvestermover import mover_utils
0011 
0012 # logger
0013 baseLogger = core_utils.setup_logger("analysis_aux_preparator")
0014 
0015 
0016 # preparator plugin for analysis auxiliary inputs
0017 class AnalysisAuxPreparator(PluginBase):
0018     # constructor
0019     def __init__(self, **kwarg):
0020         self.containerRuntime = None
0021         self.externalCommand = {}
0022         self.maxAttempts = 3
0023         PluginBase.__init__(self, **kwarg)
0024 
0025     # trigger preparation
0026     def trigger_preparation(self, jobspec):
0027         # make logger
0028         tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="trigger_preparation")
0029         tmpLog.debug("start")
0030         # loop over all inputs
0031         allDone = True
0032         bulkExtCommand = {}
0033         tmpLog.debug(f"number of inFiles : {len(jobspec.inFiles)}")
0034         for tmpFileSpec in jobspec.inFiles:
0035             # local access path
0036             url = tmpFileSpec.url
0037             accPath = self.make_local_access_path(tmpFileSpec.scope, tmpFileSpec.lfn)
0038             accPathTmp = accPath + ".tmp"
0039             tmpLog.debug(f"url : {url} accPath : {accPath}")
0040             # check if already exits
0041             if os.path.exists(accPath):
0042                 continue
0043             # make directories if needed
0044             if not os.path.isdir(os.path.dirname(accPath)):
0045                 os.makedirs(os.path.dirname(accPath))
0046             # check if use an external command
0047             extCommand = None
0048             for protocol in self.externalCommand:
0049                 if url.startswith(protocol):
0050                     extCommand = self.externalCommand[protocol]
0051                     # collect file info to execute the command later
0052                     bulkExtCommand.setdefault(protocol, {"command": extCommand, "url": [], "dst": [], "lfn": []})
0053                     bulkExtCommand[protocol]["url"].append(url)
0054                     bulkExtCommand[protocol]["dst"].append(accPath)
0055                     bulkExtCommand[protocol]["lfn"].append(tmpFileSpec.lfn)
0056                     break
0057             # execute the command later
0058             if extCommand is not None:
0059                 continue
0060             # execute
0061             return_code = 1
0062             if url.startswith("http"):
0063                 try:
0064                     tmpLog.debug(f"getting via http from {url} to {accPathTmp}")
0065                     res = requests.get(url, timeout=180, verify=False)
0066                     if res.status_code == 200:
0067                         with open(accPathTmp, "wb") as f:
0068                             f.write(res.content)
0069                         tmpLog.debug(f"Successfully fetched file - {accPathTmp}")
0070                         return_code = 0
0071                     else:
0072                         errMsg = f"failed to get {url} with StatusCode={res.status_code} {res.text}"
0073                         tmpLog.error(errMsg)
0074                 except requests.exceptions.ReadTimeout:
0075                     tmpLog.error(f"read timeout when getting data from {url}")
0076                 except Exception:
0077                     core_utils.dump_error_message(tmpLog)
0078             elif url.startswith("docker"):
0079                 if self.containerRuntime is None:
0080                     tmpLog.debug("container downloading is disabled")
0081                     continue
0082                 if self.containerRuntime == "docker":
0083                     args = ["docker", "save", "-o", accPathTmp, url.split("://")[-1]]
0084                     return_code = self.make_image(jobspec, args)
0085                 elif self.containerRuntime == "singularity":
0086                     args = ["singularity", "build", "--sandbox", accPathTmp, url]
0087                     return_code = self.make_image(jobspec, args)
0088                 elif self.containerRuntime == "shifter":
0089                     args = ["shifterimg", "pull", url]
0090                     return_code = self.make_image(jobspec, args)
0091                 else:
0092                     tmpLog.error(f"unsupported container runtime : {self.containerRuntime}")
0093             elif url.startswith("/"):
0094                 try:
0095                     shutil.copyfile(url, accPathTmp)
0096                     return_code = 0
0097                 except Exception:
0098                     core_utils.dump_error_message(tmpLog)
0099             else:
0100                 tmpLog.error(f"unsupported protocol in {url}")
0101             # remove empty files
0102             if os.path.exists(accPathTmp) and os.path.getsize(accPathTmp) == 0:
0103                 return_code = 1
0104                 tmpLog.debug(f"remove empty file - {accPathTmp}")
0105                 try:
0106                     os.remove(accPathTmp)
0107                 except Exception:
0108                     core_utils.dump_error_message(tmpLog)
0109             # rename
0110             if return_code == 0:
0111                 try:
0112                     os.rename(accPathTmp, accPath)
0113                 except Exception:
0114                     return_code = 1
0115                     core_utils.dump_error_message(tmpLog)
0116             if return_code != 0:
0117                 allDone = False
0118         # execute external command
0119         execIdMap = {}
0120         tmpLog.debug(f"bulkExtCommand : {bulkExtCommand}")
0121         for protocol in bulkExtCommand:
0122             args = []
0123             for arg in bulkExtCommand[protocol]["command"]["trigger"]["args"]:
0124                 if arg == "{src}":
0125                     arg = ",".join(bulkExtCommand[protocol]["url"])
0126                 elif arg == "{dst}":
0127                     arg = ",".join(bulkExtCommand[protocol]["dst"])
0128                 args.append(arg)
0129             # execute
0130             try:
0131                 tmpLog.debug("executing external command: " + " ".join(args))
0132                 p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
0133                 stdout, stderr = p.communicate()
0134                 return_code = p.returncode
0135                 if stdout is None:
0136                     stdout = ""
0137                 if stderr is None:
0138                     stderr = ""
0139                 # get ID of command execution such as transfer ID and batch job ID
0140                 executionID = None
0141                 if return_code == 0 and "check" in bulkExtCommand[protocol]["command"]:
0142                     executionID = [s for s in stdout.split("\n") if s][-1]
0143                     dst = ",".join(bulkExtCommand[protocol]["dst"])
0144                     executionID = f"{protocol}:{executionID}:{dst}"
0145                     tmpLog.debug(f"executionID - {executionID}")
0146                     execIdMap[executionID] = {"lfns": bulkExtCommand[protocol]["lfn"], "groupStatus": "active"}
0147                 stdout = stdout.replace("\n", " ")
0148                 stderr = stderr.replace("\n", " ")
0149                 tmpLog.debug(f"stdout: {stdout}")
0150                 tmpLog.debug(f"stderr: {stderr}")
0151                 if executionID is not None:
0152                     tmpLog.debug(f"execution ID: {executionID}")
0153             except Exception:
0154                 core_utils.dump_error_message(tmpLog)
0155                 allDone = False
0156         # keep execution ID to check later
0157         tmpLog.debug(f"execIdMap : {execIdMap}")
0158         if execIdMap:
0159             jobspec.set_groups_to_files(execIdMap)
0160         # done
0161         if allDone:
0162             tmpLog.debug("succeeded")
0163             return True, ""
0164         else:
0165             errMsg = "failed"
0166             tmpLog.error(errMsg)
0167             # check attemptNr
0168             for tmpFileSpec in jobspec.inFiles:
0169                 if tmpFileSpec.attemptNr >= self.maxAttempts:
0170                     errMsg = "gave up due to max attempts"
0171                     tmpLog.error(errMsg)
0172                     return (False, errMsg)
0173             return None, errMsg
0174 
0175     # check status
0176     def check_stage_in_status(self, jobspec):
0177         # make logger
0178         tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="check_stage_in_status")
0179         tmpLog.debug("start")
0180         allDone = True
0181         errMsg = ""
0182         transferGroups = jobspec.get_groups_of_input_files(skip_ready=True)
0183         for tmpGroupID in transferGroups:
0184             if tmpGroupID is None:
0185                 continue
0186             tmpGroupID_parts = tmpGroupID.split(":", 2)
0187             tmpLog.debug(f"transfer group ID : {tmpGroupID} components: {tmpGroupID_parts}")
0188             protocol, executionID, dst = tmpGroupID.split(":", 2)
0189             args = []
0190             for arg in self.externalCommand[protocol]["check"]["args"]:
0191                 if arg == "{id}":
0192                     arg = executionID
0193                 elif arg == "{dst}":
0194                     arg = dst
0195                 args.append(arg)
0196             # execute
0197             try:
0198                 tmpLog.debug("executing external command: " + " ".join(args))
0199                 p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
0200                 stdout, stderr = p.communicate()
0201                 return_code = p.returncode
0202                 if stdout is None:
0203                     stdout = ""
0204                 if stderr is None:
0205                     stderr = ""
0206                 stdout = stdout.replace("\n", " ")
0207                 stderr = stderr.replace("\n", " ")
0208                 tmpLog.debug(f"return_code: {return_code}")
0209                 tmpLog.debug(f"stdout: {stdout}")
0210                 tmpLog.debug(f"stderr: {stderr}")
0211                 if return_code != 0:
0212                     errMsg = f"{tmpGroupID} is not ready"
0213                     allDone = False
0214                     break
0215             except Exception:
0216                 errMsg = core_utils.dump_error_message(tmpLog)
0217                 allDone = False
0218                 break
0219         if not allDone:
0220             tmpLog.debug(f"check_stage_in_status: Return : None errMsg : {errMsg}")
0221             return None, errMsg
0222         tmpLog.debug("check_stage_in_status: Return : True")
0223         return True, ""
0224 
0225     # resolve input file paths
0226     def resolve_input_paths(self, jobspec):
0227         # make logger
0228         tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="resolve_input_paths")
0229         pathInfo = dict()
0230         for tmpFileSpec in jobspec.inFiles:
0231             url = tmpFileSpec.lfn
0232             accPath = self.make_local_access_path(tmpFileSpec.scope, tmpFileSpec.lfn)
0233             pathInfo[tmpFileSpec.lfn] = {"path": accPath}
0234             tmpLog.debug(f"lfn: {url} scope : {tmpFileSpec.scope} accPath : {accPath} pathInfo : {pathInfo}")
0235         jobspec.set_input_file_paths(pathInfo)
0236         return True, ""
0237 
0238     # make local access path
0239     def make_local_access_path(self, scope, lfn):
0240         return mover_utils.construct_file_path(self.localBasePath, scope, lfn)
0241 
0242     # run the command to create the image
0243     def make_image(self, jobspec, args):
0244         # make logger
0245         tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="make_image")
0246         tmpLog.debug("start")
0247         return_code = 1
0248         try:
0249             tmpLog.debug("executing " + " ".join(args))
0250             p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
0251             stdout, stderr = p.communicate()
0252             return_code = p.returncode
0253             if stdout is not None:
0254                 stdout = stdout.replace("\n", " ")
0255             if stderr is not None:
0256                 stderr = stderr.replace("\n", " ")
0257             tmpLog.debug(f"stdout: {stdout}")
0258             tmpLog.debug("stderr: [0}".format(stderr))
0259         except Exception:
0260             core_utils.dump_error_message(tmpLog)
0261         tmpLog.debug(f"end with return code {return_code}")
0262         return return_code