File indexing completed on 2026-04-20 07:58:59
0001 import os
0002 import sys
0003
0004 from globus_sdk import (
0005 NativeAppAuthClient,
0006 RefreshTokenAuthorizer,
0007 TransferClient,
0008 TransferData,
0009 )
0010
0011 from pandaharvester.harvestercore import core_utils
0012 from pandaharvester.harvestercore.plugin_base import PluginBase
0013 from pandaharvester.harvestermisc import globus_utils
0014 from pandaharvester.harvestermover import mover_utils
0015
0016
0017 _logger = core_utils.setup_logger()
0018
0019
0020 def dump(obj):
0021 for attr in dir(obj):
0022 if hasattr(obj, attr):
0023 print(f"obj.{attr} = {getattr(obj, attr)}")
0024
0025
0026
0027 class GoPreparator(PluginBase):
0028
0029 def __init__(self, **kwarg):
0030 PluginBase.__init__(self, **kwarg)
0031
0032 tmpLog = self.make_logger(_logger, method_name="GoPreparator __init__ ")
0033 try:
0034 self.tc = None
0035
0036 tmpLog.debug("about to call dbInterface.get_cache(globus_secret)")
0037 c_data = self.dbInterface.get_cache("globus_secret")
0038 if (c_data is not None) and c_data.data["StatusCode"] == 0:
0039 tmpLog.debug("Got the globus_secrets from PanDA")
0040 self.client_id = c_data.data["publicKey"]
0041 self.refresh_token = c_data.data["privateKey"]
0042 tmpStat, self.tc = globus_utils.create_globus_transfer_client(tmpLog, self.client_id, self.refresh_token)
0043 if not tmpStat:
0044 self.tc = None
0045 errStr = "failed to create Globus Transfer Client"
0046 tmpLog.error(errStr)
0047 else:
0048 self.client_id = None
0049 self.refresh_token = None
0050 self.tc = None
0051 errStr = "failed to get Globus Client ID and Refresh Token"
0052 tmpLog.error(errStr)
0053 except BaseException:
0054 core_utils.dump_error_message(tmpLog)
0055 tmpLog.debug("__init__ finished")
0056
0057
0058 def check_stage_in_status(self, jobspec):
0059
0060 tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID}", method_name="check_stage_in_status")
0061
0062 transferGroups = jobspec.get_groups_of_input_files(skip_ready=True)
0063
0064
0065
0066
0067 label = self.make_label(jobspec)
0068 tmpLog.debug(f"label={label}")
0069
0070 tmpStat, transferTasks = globus_utils.get_transfer_tasks(tmpLog, self.tc, label)
0071
0072 if not tmpStat:
0073 errStr = "failed to get transfer task"
0074 tmpLog.error(errStr)
0075 return None, errStr
0076
0077 if label not in transferTasks:
0078 errStr = "transfer task is missing"
0079 tmpLog.error(errStr)
0080 return False, errStr
0081
0082 if transferTasks[label]["status"] == "SUCCEEDED":
0083 transferID = transferTasks[label]["task_id"]
0084 jobspec.update_group_status_in_files(transferID, "done")
0085 tmpLog.debug("transfer task succeeded")
0086 return True, ""
0087
0088 if transferTasks[label]["status"] == "FAILED":
0089 errStr = "transfer task failed"
0090 tmpLog.error(errStr)
0091 return False, errStr
0092
0093 tmpStr = f"transfer task is in {transferTasks[label]['status']}"
0094 tmpLog.debug(tmpStr)
0095 return None, ""
0096
0097
0098 def trigger_preparation(self, jobspec):
0099
0100 tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID}", method_name="trigger_preparation")
0101 tmpLog.debug("start")
0102
0103 if jobspec.computingSite is None:
0104
0105 tmpLog.error("jobspec.computingSite is not defined")
0106 return False, "jobspec.computingSite is not defined"
0107 else:
0108 tmpLog.debug(f"jobspec.computingSite : {jobspec.computingSite}")
0109
0110 if not self.tc:
0111 errStr = "failed to get Globus Transfer Client"
0112 tmpLog.error(errStr)
0113 return False, errStr
0114
0115 label = self.make_label(jobspec)
0116 tmpLog.debug(f"label={label}")
0117
0118 tmpStat, transferTasks = globus_utils.get_transfer_tasks(tmpLog, self.tc, label)
0119 if not tmpStat:
0120 errStr = "failed to get transfer tasks"
0121 tmpLog.error(errStr)
0122 return False, errStr
0123
0124 if label in transferTasks:
0125 tmpLog.debug(f"skip since already queued with {str(transferTasks[label])}")
0126 return True, ""
0127
0128 from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0129
0130 queueConfigMapper = QueueConfigMapper()
0131 queueConfig = queueConfigMapper.get_queue(jobspec.computingSite)
0132 self.Globus_srcPath = queueConfig.preparator["Globus_srcPath"]
0133 self.srcEndpoint = queueConfig.preparator["srcEndpoint"]
0134 self.Globus_dstPath = self.basePath
0135
0136 self.dstEndpoint = queueConfig.preparator["dstEndpoint"]
0137
0138 files = []
0139 lfns = []
0140 inFiles = jobspec.get_input_file_attributes(skip_ready=True)
0141 for inLFN, inFile in inFiles.items():
0142
0143 inFile["path"] = mover_utils.construct_file_path(self.basePath, inFile["scope"], inLFN)
0144 dstpath = inFile["path"]
0145
0146 if not os.access(self.basePath, os.F_OK):
0147 os.makedirs(self.basePath)
0148
0149 Globus_srcpath = mover_utils.construct_file_path(self.Globus_srcPath, inFile["scope"], inLFN)
0150 Globus_dstpath = mover_utils.construct_file_path(self.Globus_dstPath, inFile["scope"], inLFN)
0151 files.append({"scope": inFile["scope"], "name": inLFN, "Globus_dstPath": Globus_dstpath, "Globus_srcPath": Globus_srcpath})
0152 lfns.append(inLFN)
0153 tmpLog.debug(f"files[] {files}")
0154 try:
0155
0156 tmpStatsrc, srcStr = globus_utils.check_endpoint_activation(tmpLog, self.tc, self.srcEndpoint)
0157 tmpStatdst, dstStr = globus_utils.check_endpoint_activation(tmpLog, self.tc, self.dstEndpoint)
0158 if tmpStatsrc and tmpStatdst:
0159 errStr = "source Endpoint and destination Endpoint activated"
0160 tmpLog.debug(errStr)
0161 else:
0162 errStr = ""
0163 if not tmpStatsrc:
0164 errStr += " source Endpoint not activated "
0165 if not tmpStatdst:
0166 errStr += " destination Endpoint not activated "
0167 tmpLog.error(errStr)
0168 return False, errStr
0169
0170 if len(files) > 0:
0171 tdata = TransferData(self.tc, self.srcEndpoint, self.dstEndpoint, label=label, sync_level="checksum")
0172
0173 for myfile in files:
0174 tdata.add_item(myfile["Globus_srcPath"], myfile["Globus_dstPath"])
0175
0176 transfer_result = self.tc.submit_transfer(tdata)
0177
0178 tmpLog.debug(str(transfer_result))
0179 if transfer_result["code"] == "Accepted":
0180
0181
0182 transferID = transfer_result["task_id"]
0183 jobspec.set_groups_to_files({transferID: {"lfns": lfns, "groupStatus": "active"}})
0184 tmpLog.debug("done")
0185 return True, ""
0186 else:
0187 return False, transfer_result["message"]
0188
0189 return True, "No files to transfer"
0190 except BaseException:
0191 errStat, errMsg = globus_utils.handle_globus_exception(tmpLog)
0192 return errStat, {}
0193
0194
0195
0196 def make_label(self, jobspec):
0197 return f"IN-{jobspec.computingSite}-{jobspec.PandaID}"
0198
0199
0200 def resolve_input_paths(self, jobspec):
0201
0202 inFiles = jobspec.get_input_file_attributes()
0203
0204 for inLFN, inFile in inFiles.items():
0205 inFile["path"] = mover_utils.construct_file_path(self.basePath, inFile["scope"], inLFN)
0206
0207 jobspec.set_input_file_paths(inFiles)
0208 return True, ""