File indexing completed on 2026-04-20 07:58:59
0001 """
0002 utilities routines associated with globus
0003
0004 """
0005 import inspect
0006 import sys
0007 import traceback
0008
0009 from globus_sdk import (
0010 GlobusAPIError,
0011 GlobusConnectionError,
0012 GlobusError,
0013 GlobusTimeoutError,
0014 NativeAppAuthClient,
0015 NetworkError,
0016 RefreshTokenAuthorizer,
0017 TransferAPIError,
0018 TransferClient,
0019 TransferData,
0020 )
0021 from pandaharvester.harvestercore import core_utils
0022 from pandalogger.LogWrapper import LogWrapper
0023 from pandalogger.PandaLogger import PandaLogger
0024
0025
0026
0027
0028 def handle_globus_exception(tmp_log):
0029 if not isinstance(tmp_log, LogWrapper):
0030 methodName = f"{inspect.stack()[1][3]} : "
0031 else:
0032 methodName = ""
0033
0034 errtype, errvalue = sys.exc_info()[:2]
0035 errStat = None
0036 errMsg = f"{methodName} {errtype.__name__} "
0037 if isinstance(errvalue, GlobusAPIError):
0038
0039
0040 errStat = None
0041 errMsg += f"HTTP status code: {errvalue.http_status} Error Code: {errvalue.code} Error Message: {errvalue.message} "
0042 elif isinstance(errvalue, TransferAPIError):
0043 err_args = list(errvalue._get_args())
0044 errStat = None
0045 errMsg += f" http_status: {err_args[0]} code: {err_args[1]} message: {err_args[2]} requestID: {err_args[3]} "
0046 elif isinstance(errvalue, NetworkError):
0047 errStat = None
0048 errMsg += "Network Failure. Possibly a firewall or connectivity issue "
0049 elif isinstance(errvalue, GlobusConnectionError):
0050 errStat = None
0051 errMsg += "A connection error occured while making a REST request. "
0052 elif isinstance(errvalue, GlobusTimeoutError):
0053 errStat = None
0054 errMsg += "A REST request timeout. "
0055 elif isinstance(errvalue, GlobusError):
0056 errStat = False
0057 errMsg += "Totally unexpected GlobusError! "
0058 else:
0059 errStat = False
0060 errMsg = f"{errvalue} "
0061
0062 tmp_log.error(errMsg)
0063 return (errStat, errMsg)
0064
0065
0066
0067
0068
0069 def create_globus_transfer_client(tmpLog, globus_client_id, globus_refresh_token):
0070 """
0071 create Globus Transfer Client and return the transfer client
0072 """
0073
0074 tmpLog.info("Creating instance of GlobusTransferClient")
0075
0076
0077
0078 tc = None
0079 ErrStat = True
0080 try:
0081 client = NativeAppAuthClient(client_id=globus_client_id)
0082 authorizer = RefreshTokenAuthorizer(refresh_token=globus_refresh_token, auth_client=client)
0083 tc = TransferClient(authorizer=authorizer)
0084 except BaseException:
0085 errStat, errMsg = handle_globus_exception(tmpLog)
0086 return ErrStat, tc
0087
0088
0089 def check_endpoint_activation(tmpLog, tc, endpoint_id):
0090 """
0091 check if endpoint is activated
0092 """
0093
0094 if not tc:
0095 errStr = "failed to get Globus Transfer Client"
0096 tmpLog.error(errStr)
0097 return False, errStr
0098 try:
0099 endpoint = tc.get_endpoint(endpoint_id)
0100 r = tc.endpoint_autoactivate(endpoint_id, if_expires_in=3600)
0101
0102 tmpLog.info(f"Endpoint - {endpoint['display_name']} - activation status code {str(r['code'])}")
0103 if r["code"] == "AutoActivationFailed":
0104 errStr = f"Endpoint({endpoint_id}) Not Active! Error! Source message: {r['message']}"
0105 tmpLog.debug(errStr)
0106 return False, errStr
0107 elif r["code"] == "AutoActivated.CachedCredential":
0108 errStr = f"Endpoint({endpoint_id}) autoactivated using a cached credential."
0109 tmpLog.debug(errStr)
0110 return True, errStr
0111 elif r["code"] == "AutoActivated.GlobusOnlineCredential":
0112 errStr = f"Endpoint({endpoint_id}) autoactivated using a built-in Globus "
0113 tmpLog.debug(errStr)
0114 return True, errStr
0115 elif r["code"] == "AlreadyActivated":
0116 errStr = f"Endpoint({endpoint_id}) already active until at least {3600}"
0117 tmpLog.debug(errStr)
0118 return True, errStr
0119 except BaseException:
0120 errStat, errMsg = handle_globus_exception(tmpLog)
0121 return errStat, {}
0122
0123
0124
0125
0126
0127 def get_transfer_task_by_id(tmpLog, tc, transferID=None):
0128
0129 if not tc:
0130 errStr = "failed to get Globus Transfer Client"
0131 tmpLog.error(errStr)
0132 return False, errStr
0133 if transferID is None:
0134
0135 errStr = "failed to provide transfer task ID "
0136 tmpLog.error(errStr)
0137 return False, errStr
0138 try:
0139
0140 gRes = tc.get_task(transferID)
0141
0142 tasks = {}
0143 tasks[transferID] = gRes
0144
0145 tmpLog.debug(f"got {len(tasks)} tasks")
0146 return True, tasks
0147 except BaseException:
0148 errStat, errMsg = handle_globus_exception(tmpLog)
0149 return errStat, {}
0150
0151
0152
0153
0154
0155 def get_transfer_tasks(tmpLog, tc, label=None):
0156
0157 if not tc:
0158 errStr = "failed to get Globus Transfer Client"
0159 tmpLog.error(errStr)
0160 return False, errStr
0161 try:
0162
0163 if label is None:
0164 params = {"filter": "type:TRANSFER/status:SUCCEEDED,INACTIVE,FAILED,SUCCEEDED"}
0165 gRes = tc.task_list(num_results=1000, **params)
0166 else:
0167 params = {"filter": f"type:TRANSFER/status:SUCCEEDED,INACTIVE,FAILED,SUCCEEDED/label:{label}"}
0168 gRes = tc.task_list(**params)
0169
0170 tasks = {}
0171 for res in gRes:
0172 reslabel = res.data["label"]
0173 tasks[reslabel] = res.data
0174
0175 tmpLog.debug(f"got {len(tasks)} tasks")
0176 return True, tasks
0177 except BaseException:
0178 errStat, errMsg = handle_globus_exception(tmpLog)
0179 return errStat, {}