Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:59

0001 import os
0002 import sys
0003 
0004 from globus_sdk import (
0005     NativeAppAuthClient,
0006     RefreshTokenAuthorizer,
0007     TransferClient,
0008     TransferData,
0009 )
0010 
0011 from pandaharvester.harvestercore import core_utils
0012 from pandaharvester.harvestercore.plugin_base import PluginBase
0013 from pandaharvester.harvestermisc import globus_utils
0014 from pandaharvester.harvestermover import mover_utils
0015 
0016 # logger
0017 _logger = core_utils.setup_logger()
0018 
0019 
0020 def dump(obj):
0021     for attr in dir(obj):
0022         if hasattr(obj, attr):
0023             print(f"obj.{attr} = {getattr(obj, attr)}")
0024 
0025 
0026 # preparator with Globus Online
0027 class GoPreparator(PluginBase):
0028     # constructor
0029     def __init__(self, **kwarg):
0030         PluginBase.__init__(self, **kwarg)
0031         # create Globus Transfer Client
0032         tmpLog = self.make_logger(_logger, method_name="GoPreparator __init__ ")
0033         try:
0034             self.tc = None
0035             # need to get client_id and refresh_token from PanDA server via harvester cache mechanism
0036             tmpLog.debug("about to call dbInterface.get_cache(globus_secret)")
0037             c_data = self.dbInterface.get_cache("globus_secret")
0038             if (c_data is not None) and c_data.data["StatusCode"] == 0:
0039                 tmpLog.debug("Got the globus_secrets from PanDA")
0040                 self.client_id = c_data.data["publicKey"]  # client_id
0041                 self.refresh_token = c_data.data["privateKey"]  # refresh_token
0042                 tmpStat, self.tc = globus_utils.create_globus_transfer_client(tmpLog, self.client_id, self.refresh_token)
0043                 if not tmpStat:
0044                     self.tc = None
0045                     errStr = "failed to create Globus Transfer Client"
0046                     tmpLog.error(errStr)
0047             else:
0048                 self.client_id = None
0049                 self.refresh_token = None
0050                 self.tc = None
0051                 errStr = "failed to get Globus Client ID and Refresh Token"
0052                 tmpLog.error(errStr)
0053         except BaseException:
0054             core_utils.dump_error_message(tmpLog)
0055         tmpLog.debug("__init__ finished")
0056 
0057     # check status
0058     def check_stage_in_status(self, jobspec):
0059         # get logger
0060         tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID}", method_name="check_stage_in_status")
0061         # get groups of input files except ones already in ready state
0062         transferGroups = jobspec.get_groups_of_input_files(skip_ready=True)
0063         # print type(transferGroups)," ",transferGroups
0064         # update transfer status
0065 
0066         # get label
0067         label = self.make_label(jobspec)
0068         tmpLog.debug(f"label={label}")
0069         # get transfer task
0070         tmpStat, transferTasks = globus_utils.get_transfer_tasks(tmpLog, self.tc, label)
0071         # return a temporary error when failed to get task
0072         if not tmpStat:
0073             errStr = "failed to get transfer task"
0074             tmpLog.error(errStr)
0075             return None, errStr
0076         # return a fatal error when task is missing # FIXME retry instead?
0077         if label not in transferTasks:
0078             errStr = "transfer task is missing"
0079             tmpLog.error(errStr)
0080             return False, errStr
0081         # succeeded
0082         if transferTasks[label]["status"] == "SUCCEEDED":
0083             transferID = transferTasks[label]["task_id"]
0084             jobspec.update_group_status_in_files(transferID, "done")
0085             tmpLog.debug("transfer task succeeded")
0086             return True, ""
0087         # failed
0088         if transferTasks[label]["status"] == "FAILED":
0089             errStr = "transfer task failed"
0090             tmpLog.error(errStr)
0091             return False, errStr
0092         # another status
0093         tmpStr = f"transfer task is in {transferTasks[label]['status']}"
0094         tmpLog.debug(tmpStr)
0095         return None, ""
0096 
0097     # trigger preparation
0098     def trigger_preparation(self, jobspec):
0099         # get logger
0100         tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID}", method_name="trigger_preparation")
0101         tmpLog.debug("start")
0102         # check that jobspec.computingSite is defined
0103         if jobspec.computingSite is None:
0104             # not found
0105             tmpLog.error("jobspec.computingSite is not defined")
0106             return False, "jobspec.computingSite is not defined"
0107         else:
0108             tmpLog.debug(f"jobspec.computingSite : {jobspec.computingSite}")
0109         # test we have a Globus Transfer Client
0110         if not self.tc:
0111             errStr = "failed to get Globus Transfer Client"
0112             tmpLog.error(errStr)
0113             return False, errStr
0114         # get label
0115         label = self.make_label(jobspec)
0116         tmpLog.debug(f"label={label}")
0117         # get transfer tasks
0118         tmpStat, transferTasks = globus_utils.get_transfer_tasks(tmpLog, self.tc, label)
0119         if not tmpStat:
0120             errStr = "failed to get transfer tasks"
0121             tmpLog.error(errStr)
0122             return False, errStr
0123         # check if already queued
0124         if label in transferTasks:
0125             tmpLog.debug(f"skip since already queued with {str(transferTasks[label])}")
0126             return True, ""
0127         # set the Globus destination Endpoint id and path will get them from Agis eventually
0128         from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0129 
0130         queueConfigMapper = QueueConfigMapper()
0131         queueConfig = queueConfigMapper.get_queue(jobspec.computingSite)
0132         self.Globus_srcPath = queueConfig.preparator["Globus_srcPath"]
0133         self.srcEndpoint = queueConfig.preparator["srcEndpoint"]
0134         self.Globus_dstPath = self.basePath
0135         # self.Globus_dstPath = queueConfig.preparator['Globus_dstPath']
0136         self.dstEndpoint = queueConfig.preparator["dstEndpoint"]
0137         # get input files
0138         files = []
0139         lfns = []
0140         inFiles = jobspec.get_input_file_attributes(skip_ready=True)
0141         for inLFN, inFile in inFiles.items():
0142             # set path to each file
0143             inFile["path"] = mover_utils.construct_file_path(self.basePath, inFile["scope"], inLFN)
0144             dstpath = inFile["path"]
0145             # check if path exists if not create it.
0146             if not os.access(self.basePath, os.F_OK):
0147                 os.makedirs(self.basePath)
0148             # create the file paths for the Globus source and destination endpoints
0149             Globus_srcpath = mover_utils.construct_file_path(self.Globus_srcPath, inFile["scope"], inLFN)
0150             Globus_dstpath = mover_utils.construct_file_path(self.Globus_dstPath, inFile["scope"], inLFN)
0151             files.append({"scope": inFile["scope"], "name": inLFN, "Globus_dstPath": Globus_dstpath, "Globus_srcPath": Globus_srcpath})
0152             lfns.append(inLFN)
0153         tmpLog.debug(f"files[] {files}")
0154         try:
0155             # Test endpoints for activation
0156             tmpStatsrc, srcStr = globus_utils.check_endpoint_activation(tmpLog, self.tc, self.srcEndpoint)
0157             tmpStatdst, dstStr = globus_utils.check_endpoint_activation(tmpLog, self.tc, self.dstEndpoint)
0158             if tmpStatsrc and tmpStatdst:
0159                 errStr = "source Endpoint and destination Endpoint activated"
0160                 tmpLog.debug(errStr)
0161             else:
0162                 errStr = ""
0163                 if not tmpStatsrc:
0164                     errStr += " source Endpoint not activated "
0165                 if not tmpStatdst:
0166                     errStr += " destination Endpoint not activated "
0167                 tmpLog.error(errStr)
0168                 return False, errStr
0169             # both endpoints activated now prepare to transfer data
0170             if len(files) > 0:
0171                 tdata = TransferData(self.tc, self.srcEndpoint, self.dstEndpoint, label=label, sync_level="checksum")
0172                 # loop over all input files and add
0173                 for myfile in files:
0174                     tdata.add_item(myfile["Globus_srcPath"], myfile["Globus_dstPath"])
0175                 # submit
0176                 transfer_result = self.tc.submit_transfer(tdata)
0177                 # check status code and message
0178                 tmpLog.debug(str(transfer_result))
0179                 if transfer_result["code"] == "Accepted":
0180                     # succeeded
0181                     # set transfer ID which are used for later lookup
0182                     transferID = transfer_result["task_id"]
0183                     jobspec.set_groups_to_files({transferID: {"lfns": lfns, "groupStatus": "active"}})
0184                     tmpLog.debug("done")
0185                     return True, ""
0186                 else:
0187                     return False, transfer_result["message"]
0188             # if no files to transfer return True
0189             return True, "No files to transfer"
0190         except BaseException:
0191             errStat, errMsg = globus_utils.handle_globus_exception(tmpLog)
0192             return errStat, {}
0193 
0194     # make label for transfer task
0195 
0196     def make_label(self, jobspec):
0197         return f"IN-{jobspec.computingSite}-{jobspec.PandaID}"
0198 
0199     # resolve input file paths
0200     def resolve_input_paths(self, jobspec):
0201         # get input files
0202         inFiles = jobspec.get_input_file_attributes()
0203         # set path to each file
0204         for inLFN, inFile in inFiles.items():
0205             inFile["path"] = mover_utils.construct_file_path(self.basePath, inFile["scope"], inLFN)
0206         # set
0207         jobspec.set_input_file_paths(inFiles)
0208         return True, ""