File indexing completed on 2026-04-20 07:58:59
0001 import os
0002 import shutil
0003 import subprocess
0004
0005 import requests
0006 import requests.exceptions
0007
0008 from pandaharvester.harvestercore import core_utils
0009 from pandaharvester.harvestercore.plugin_base import PluginBase
0010 from pandaharvester.harvestermover import mover_utils
0011
0012
0013 baseLogger = core_utils.setup_logger("analysis_aux_preparator")
0014
0015
0016
0017 class AnalysisAuxPreparator(PluginBase):
0018
0019 def __init__(self, **kwarg):
0020 self.containerRuntime = None
0021 self.externalCommand = {}
0022 self.maxAttempts = 3
0023 PluginBase.__init__(self, **kwarg)
0024
0025
0026 def trigger_preparation(self, jobspec):
0027
0028 tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="trigger_preparation")
0029 tmpLog.debug("start")
0030
0031 allDone = True
0032 bulkExtCommand = {}
0033 tmpLog.debug(f"number of inFiles : {len(jobspec.inFiles)}")
0034 for tmpFileSpec in jobspec.inFiles:
0035
0036 url = tmpFileSpec.url
0037 accPath = self.make_local_access_path(tmpFileSpec.scope, tmpFileSpec.lfn)
0038 accPathTmp = accPath + ".tmp"
0039 tmpLog.debug(f"url : {url} accPath : {accPath}")
0040
0041 if os.path.exists(accPath):
0042 continue
0043
0044 if not os.path.isdir(os.path.dirname(accPath)):
0045 os.makedirs(os.path.dirname(accPath))
0046
0047 extCommand = None
0048 for protocol in self.externalCommand:
0049 if url.startswith(protocol):
0050 extCommand = self.externalCommand[protocol]
0051
0052 bulkExtCommand.setdefault(protocol, {"command": extCommand, "url": [], "dst": [], "lfn": []})
0053 bulkExtCommand[protocol]["url"].append(url)
0054 bulkExtCommand[protocol]["dst"].append(accPath)
0055 bulkExtCommand[protocol]["lfn"].append(tmpFileSpec.lfn)
0056 break
0057
0058 if extCommand is not None:
0059 continue
0060
0061 return_code = 1
0062 if url.startswith("http"):
0063 try:
0064 tmpLog.debug(f"getting via http from {url} to {accPathTmp}")
0065 res = requests.get(url, timeout=180, verify=False)
0066 if res.status_code == 200:
0067 with open(accPathTmp, "wb") as f:
0068 f.write(res.content)
0069 tmpLog.debug(f"Successfully fetched file - {accPathTmp}")
0070 return_code = 0
0071 else:
0072 errMsg = f"failed to get {url} with StatusCode={res.status_code} {res.text}"
0073 tmpLog.error(errMsg)
0074 except requests.exceptions.ReadTimeout:
0075 tmpLog.error(f"read timeout when getting data from {url}")
0076 except Exception:
0077 core_utils.dump_error_message(tmpLog)
0078 elif url.startswith("docker"):
0079 if self.containerRuntime is None:
0080 tmpLog.debug("container downloading is disabled")
0081 continue
0082 if self.containerRuntime == "docker":
0083 args = ["docker", "save", "-o", accPathTmp, url.split("://")[-1]]
0084 return_code = self.make_image(jobspec, args)
0085 elif self.containerRuntime == "singularity":
0086 args = ["singularity", "build", "--sandbox", accPathTmp, url]
0087 return_code = self.make_image(jobspec, args)
0088 elif self.containerRuntime == "shifter":
0089 args = ["shifterimg", "pull", url]
0090 return_code = self.make_image(jobspec, args)
0091 else:
0092 tmpLog.error(f"unsupported container runtime : {self.containerRuntime}")
0093 elif url.startswith("/"):
0094 try:
0095 shutil.copyfile(url, accPathTmp)
0096 return_code = 0
0097 except Exception:
0098 core_utils.dump_error_message(tmpLog)
0099 else:
0100 tmpLog.error(f"unsupported protocol in {url}")
0101
0102 if os.path.exists(accPathTmp) and os.path.getsize(accPathTmp) == 0:
0103 return_code = 1
0104 tmpLog.debug(f"remove empty file - {accPathTmp}")
0105 try:
0106 os.remove(accPathTmp)
0107 except Exception:
0108 core_utils.dump_error_message(tmpLog)
0109
0110 if return_code == 0:
0111 try:
0112 os.rename(accPathTmp, accPath)
0113 except Exception:
0114 return_code = 1
0115 core_utils.dump_error_message(tmpLog)
0116 if return_code != 0:
0117 allDone = False
0118
0119 execIdMap = {}
0120 tmpLog.debug(f"bulkExtCommand : {bulkExtCommand}")
0121 for protocol in bulkExtCommand:
0122 args = []
0123 for arg in bulkExtCommand[protocol]["command"]["trigger"]["args"]:
0124 if arg == "{src}":
0125 arg = ",".join(bulkExtCommand[protocol]["url"])
0126 elif arg == "{dst}":
0127 arg = ",".join(bulkExtCommand[protocol]["dst"])
0128 args.append(arg)
0129
0130 try:
0131 tmpLog.debug("executing external command: " + " ".join(args))
0132 p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
0133 stdout, stderr = p.communicate()
0134 return_code = p.returncode
0135 if stdout is None:
0136 stdout = ""
0137 if stderr is None:
0138 stderr = ""
0139
0140 executionID = None
0141 if return_code == 0 and "check" in bulkExtCommand[protocol]["command"]:
0142 executionID = [s for s in stdout.split("\n") if s][-1]
0143 dst = ",".join(bulkExtCommand[protocol]["dst"])
0144 executionID = f"{protocol}:{executionID}:{dst}"
0145 tmpLog.debug(f"executionID - {executionID}")
0146 execIdMap[executionID] = {"lfns": bulkExtCommand[protocol]["lfn"], "groupStatus": "active"}
0147 stdout = stdout.replace("\n", " ")
0148 stderr = stderr.replace("\n", " ")
0149 tmpLog.debug(f"stdout: {stdout}")
0150 tmpLog.debug(f"stderr: {stderr}")
0151 if executionID is not None:
0152 tmpLog.debug(f"execution ID: {executionID}")
0153 except Exception:
0154 core_utils.dump_error_message(tmpLog)
0155 allDone = False
0156
0157 tmpLog.debug(f"execIdMap : {execIdMap}")
0158 if execIdMap:
0159 jobspec.set_groups_to_files(execIdMap)
0160
0161 if allDone:
0162 tmpLog.debug("succeeded")
0163 return True, ""
0164 else:
0165 errMsg = "failed"
0166 tmpLog.error(errMsg)
0167
0168 for tmpFileSpec in jobspec.inFiles:
0169 if tmpFileSpec.attemptNr >= self.maxAttempts:
0170 errMsg = "gave up due to max attempts"
0171 tmpLog.error(errMsg)
0172 return (False, errMsg)
0173 return None, errMsg
0174
0175
0176 def check_stage_in_status(self, jobspec):
0177
0178 tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="check_stage_in_status")
0179 tmpLog.debug("start")
0180 allDone = True
0181 errMsg = ""
0182 transferGroups = jobspec.get_groups_of_input_files(skip_ready=True)
0183 for tmpGroupID in transferGroups:
0184 if tmpGroupID is None:
0185 continue
0186 tmpGroupID_parts = tmpGroupID.split(":", 2)
0187 tmpLog.debug(f"transfer group ID : {tmpGroupID} components: {tmpGroupID_parts}")
0188 protocol, executionID, dst = tmpGroupID.split(":", 2)
0189 args = []
0190 for arg in self.externalCommand[protocol]["check"]["args"]:
0191 if arg == "{id}":
0192 arg = executionID
0193 elif arg == "{dst}":
0194 arg = dst
0195 args.append(arg)
0196
0197 try:
0198 tmpLog.debug("executing external command: " + " ".join(args))
0199 p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
0200 stdout, stderr = p.communicate()
0201 return_code = p.returncode
0202 if stdout is None:
0203 stdout = ""
0204 if stderr is None:
0205 stderr = ""
0206 stdout = stdout.replace("\n", " ")
0207 stderr = stderr.replace("\n", " ")
0208 tmpLog.debug(f"return_code: {return_code}")
0209 tmpLog.debug(f"stdout: {stdout}")
0210 tmpLog.debug(f"stderr: {stderr}")
0211 if return_code != 0:
0212 errMsg = f"{tmpGroupID} is not ready"
0213 allDone = False
0214 break
0215 except Exception:
0216 errMsg = core_utils.dump_error_message(tmpLog)
0217 allDone = False
0218 break
0219 if not allDone:
0220 tmpLog.debug(f"check_stage_in_status: Return : None errMsg : {errMsg}")
0221 return None, errMsg
0222 tmpLog.debug("check_stage_in_status: Return : True")
0223 return True, ""
0224
0225
0226 def resolve_input_paths(self, jobspec):
0227
0228 tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="resolve_input_paths")
0229 pathInfo = dict()
0230 for tmpFileSpec in jobspec.inFiles:
0231 url = tmpFileSpec.lfn
0232 accPath = self.make_local_access_path(tmpFileSpec.scope, tmpFileSpec.lfn)
0233 pathInfo[tmpFileSpec.lfn] = {"path": accPath}
0234 tmpLog.debug(f"lfn: {url} scope : {tmpFileSpec.scope} accPath : {accPath} pathInfo : {pathInfo}")
0235 jobspec.set_input_file_paths(pathInfo)
0236 return True, ""
0237
0238
0239 def make_local_access_path(self, scope, lfn):
0240 return mover_utils.construct_file_path(self.localBasePath, scope, lfn)
0241
0242
0243 def make_image(self, jobspec, args):
0244
0245 tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="make_image")
0246 tmpLog.debug("start")
0247 return_code = 1
0248 try:
0249 tmpLog.debug("executing " + " ".join(args))
0250 p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
0251 stdout, stderr = p.communicate()
0252 return_code = p.returncode
0253 if stdout is not None:
0254 stdout = stdout.replace("\n", " ")
0255 if stderr is not None:
0256 stderr = stderr.replace("\n", " ")
0257 tmpLog.debug(f"stdout: {stdout}")
0258 tmpLog.debug("stderr: [0}".format(stderr))
0259 except Exception:
0260 core_utils.dump_error_message(tmpLog)
0261 tmpLog.debug(f"end with return code {return_code}")
0262 return return_code