Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:59

0001 """
0002 utilities routines associated with globus
0003 
0004 """
0005 import inspect
0006 import sys
0007 import traceback
0008 
0009 from globus_sdk import (
0010     GlobusAPIError,
0011     GlobusConnectionError,
0012     GlobusError,
0013     GlobusTimeoutError,
0014     NativeAppAuthClient,
0015     NetworkError,
0016     RefreshTokenAuthorizer,
0017     TransferAPIError,
0018     TransferClient,
0019     TransferData,
0020 )
0021 from pandaharvester.harvestercore import core_utils
0022 from pandalogger.LogWrapper import LogWrapper
0023 from pandalogger.PandaLogger import PandaLogger
0024 
0025 # handle exception from globus software
0026 
0027 
0028 def handle_globus_exception(tmp_log):
0029     if not isinstance(tmp_log, LogWrapper):
0030         methodName = f"{inspect.stack()[1][3]} : "
0031     else:
0032         methodName = ""
0033     # extract errtype and check if it a GlobusError Class
0034     errtype, errvalue = sys.exc_info()[:2]
0035     errStat = None
0036     errMsg = f"{methodName} {errtype.__name__} "
0037     if isinstance(errvalue, GlobusAPIError):
0038         # Error response from the REST service, check the code and message for
0039         # details.
0040         errStat = None
0041         errMsg += f"HTTP status code: {errvalue.http_status} Error Code: {errvalue.code} Error Message: {errvalue.message} "
0042     elif isinstance(errvalue, TransferAPIError):
0043         err_args = list(errvalue._get_args())
0044         errStat = None
0045         errMsg += f" http_status: {err_args[0]} code: {err_args[1]} message: {err_args[2]} requestID: {err_args[3]} "
0046     elif isinstance(errvalue, NetworkError):
0047         errStat = None
0048         errMsg += "Network Failure. Possibly a firewall or connectivity issue "
0049     elif isinstance(errvalue, GlobusConnectionError):
0050         errStat = None
0051         errMsg += "A connection error occured while making a REST request. "
0052     elif isinstance(errvalue, GlobusTimeoutError):
0053         errStat = None
0054         errMsg += "A REST request timeout. "
0055     elif isinstance(errvalue, GlobusError):
0056         errStat = False
0057         errMsg += "Totally unexpected GlobusError! "
0058     else:  # some other error
0059         errStat = False
0060         errMsg = f"{errvalue} "
0061     # errMsg += traceback.format_exc()
0062     tmp_log.error(errMsg)
0063     return (errStat, errMsg)
0064 
0065 
0066 # Globus create transfer client
0067 
0068 
0069 def create_globus_transfer_client(tmpLog, globus_client_id, globus_refresh_token):
0070     """
0071     create Globus Transfer Client and return the transfer client
0072     """
0073     # get logger
0074     tmpLog.info("Creating instance of GlobusTransferClient")
0075     # start the Native App authentication process
0076     # use the refresh token to get authorizer
0077     # create the Globus Transfer Client
0078     tc = None
0079     ErrStat = True
0080     try:
0081         client = NativeAppAuthClient(client_id=globus_client_id)
0082         authorizer = RefreshTokenAuthorizer(refresh_token=globus_refresh_token, auth_client=client)
0083         tc = TransferClient(authorizer=authorizer)
0084     except BaseException:
0085         errStat, errMsg = handle_globus_exception(tmpLog)
0086     return ErrStat, tc
0087 
0088 
0089 def check_endpoint_activation(tmpLog, tc, endpoint_id):
0090     """
0091     check if endpoint is activated
0092     """
0093     # test we have a Globus Transfer Client
0094     if not tc:
0095         errStr = "failed to get Globus Transfer Client"
0096         tmpLog.error(errStr)
0097         return False, errStr
0098     try:
0099         endpoint = tc.get_endpoint(endpoint_id)
0100         r = tc.endpoint_autoactivate(endpoint_id, if_expires_in=3600)
0101 
0102         tmpLog.info(f"Endpoint - {endpoint['display_name']} - activation status code {str(r['code'])}")
0103         if r["code"] == "AutoActivationFailed":
0104             errStr = f"Endpoint({endpoint_id}) Not Active! Error! Source message: {r['message']}"
0105             tmpLog.debug(errStr)
0106             return False, errStr
0107         elif r["code"] == "AutoActivated.CachedCredential":
0108             errStr = f"Endpoint({endpoint_id}) autoactivated using a cached credential."
0109             tmpLog.debug(errStr)
0110             return True, errStr
0111         elif r["code"] == "AutoActivated.GlobusOnlineCredential":
0112             errStr = f"Endpoint({endpoint_id}) autoactivated using a built-in Globus "
0113             tmpLog.debug(errStr)
0114             return True, errStr
0115         elif r["code"] == "AlreadyActivated":
0116             errStr = f"Endpoint({endpoint_id}) already active until at least {3600}"
0117             tmpLog.debug(errStr)
0118             return True, errStr
0119     except BaseException:
0120         errStat, errMsg = handle_globus_exception(tmpLog)
0121         return errStat, {}
0122 
0123 
0124 # get transfer tasks
0125 
0126 
0127 def get_transfer_task_by_id(tmpLog, tc, transferID=None):
0128     # test we have a Globus Transfer Client
0129     if not tc:
0130         errStr = "failed to get Globus Transfer Client"
0131         tmpLog.error(errStr)
0132         return False, errStr
0133     if transferID is None:
0134         # error need to have task ID
0135         errStr = "failed to provide transfer task ID "
0136         tmpLog.error(errStr)
0137         return False, errStr
0138     try:
0139         # execute
0140         gRes = tc.get_task(transferID)
0141         # parse output
0142         tasks = {}
0143         tasks[transferID] = gRes
0144         # return
0145         tmpLog.debug(f"got {len(tasks)} tasks")
0146         return True, tasks
0147     except BaseException:
0148         errStat, errMsg = handle_globus_exception(tmpLog)
0149         return errStat, {}
0150 
0151 
0152 # get transfer tasks
0153 
0154 
0155 def get_transfer_tasks(tmpLog, tc, label=None):
0156     # test we have a Globus Transfer Client
0157     if not tc:
0158         errStr = "failed to get Globus Transfer Client"
0159         tmpLog.error(errStr)
0160         return False, errStr
0161     try:
0162         # execute
0163         if label is None:
0164             params = {"filter": "type:TRANSFER/status:SUCCEEDED,INACTIVE,FAILED,SUCCEEDED"}
0165             gRes = tc.task_list(num_results=1000, **params)
0166         else:
0167             params = {"filter": f"type:TRANSFER/status:SUCCEEDED,INACTIVE,FAILED,SUCCEEDED/label:{label}"}
0168             gRes = tc.task_list(**params)
0169         # parse output
0170         tasks = {}
0171         for res in gRes:
0172             reslabel = res.data["label"]
0173             tasks[reslabel] = res.data
0174         # return
0175         tmpLog.debug(f"got {len(tasks)} tasks")
0176         return True, tasks
0177     except BaseException:
0178         errStat, errMsg = handle_globus_exception(tmpLog)
0179         return errStat, {}