Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:59:01

0001 import datetime
0002 import hashlib
0003 import logging
0004 import os
0005 import os.path
0006 import random
0007 import string
0008 import sys
0009 import threading
0010 import time
0011 import uuid
0012 
0013 from globus_sdk import (
0014     NativeAppAuthClient,
0015     RefreshTokenAuthorizer,
0016     TransferClient,
0017     TransferData,
0018 )
0019 
0020 from pandaharvester.harvesterbody.cacher import Cacher
0021 from pandaharvester.harvesterconfig import harvester_config
0022 from pandaharvester.harvestercore import core_utils
0023 from pandaharvester.harvestercore.communicator_pool import CommunicatorPool
0024 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0025 from pandaharvester.harvestercore.file_spec import FileSpec
0026 from pandaharvester.harvestercore.job_spec import JobSpec
0027 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0028 from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0029 from pandaharvester.harvestermisc import globus_utils
0030 
0031 
0032 def dump(obj):
0033     for attr in dir(obj):
0034         if hasattr(obj, attr):
0035             print(f"obj.{attr} = {getattr(obj, attr)}")
0036 
0037 
0038 print(len(sys.argv))
0039 queueName = "ALCF_Theta"
0040 job_id = 1111
0041 globus_sleep_time = 15
0042 
0043 if len(sys.argv) > 1:
0044     queueName = sys.argv[1]
0045 if len(sys.argv) > 2:
0046     job_id = int(sys.argv[2])
0047 if len(sys.argv) > 3:
0048     globus_sleep_time = int(sys.argv[3])
0049 
0050 
0051 queueConfigMapper = QueueConfigMapper()
0052 queueConfig = queueConfigMapper.get_queue(queueName)
0053 initial_queueConfig_preparator = queueConfig.preparator
0054 queueConfig.preparator["module"] = "pandaharvester.harvesterpreparator.go_preparator"
0055 queueConfig.preparator["name"] = "GoPreparator"
0056 modified_queueConfig_preparator = queueConfig.preparator
0057 
0058 pluginFactory = PluginFactory()
0059 # get stage-out plugin
0060 preparatorCore = pluginFactory.get_plugin(queueConfig.preparator)
0061 
0062 # logger
0063 _logger = core_utils.setup_logger("stageInTest_go_preparator")
0064 tmpLog = core_utils.make_logger(_logger, method_name="stageInTest_go_preparator")
0065 tmpLog.debug("start")
0066 
0067 for loggerName, loggerObj in logging.Logger.manager.loggerDict.items():
0068     # print "loggerName - {}".format(loggerName)
0069     if loggerName.startswith("panda.log"):
0070         if len(loggerObj.handlers) == 0:
0071             continue
0072         if loggerName.split(".")[-1] in ["db_proxy"]:
0073             continue
0074         stdoutHandler = logging.StreamHandler(sys.stdout)
0075         stdoutHandler.setFormatter(loggerObj.handlers[0].formatter)
0076         loggerObj.addHandler(stdoutHandler)
0077 
0078 msgStr = f"plugin={preparatorCore.__class__.__name__}"
0079 tmpLog.debug(msgStr)
0080 msgStr = f"Initial queueConfig.preparator = {initial_queueConfig_preparator}"
0081 tmpLog.debug(msgStr)
0082 msgStr = f"Modified queueConfig.preparator = {modified_queueConfig_preparator}"
0083 tmpLog.debug(msgStr)
0084 
0085 scope = "panda"
0086 
0087 proxy = DBProxy()
0088 communicator = CommunicatorPool()
0089 cacher = Cacher(communicator, single_mode=True)
0090 cacher.run()
0091 
0092 Globus_srcPath = queueConfig.preparator["Globus_srcPath"]
0093 srcEndpoint = queueConfig.preparator["srcEndpoint"]
0094 basePath = queueConfig.preparator["basePath"]
0095 Globus_dstPath = queueConfig.preparator["Globus_dstPath"]
0096 dstEndpoint = queueConfig.preparator["dstEndpoint"]
0097 
0098 # need to get client_id and refresh_token from PanDA server via harvester cache mechanism
0099 c_data = preparatorCore.dbInterface.get_cache("globus_secret")
0100 client_id = None
0101 refresh_token = None
0102 if (c_data is not None) and c_data.data["StatusCode"] == 0:
0103     client_id = c_data.data["publicKey"]  # client_id
0104     refresh_token = c_data.data["privateKey"]  # refresh_token
0105 else:
0106     client_id = None
0107     refresh_token = None
0108     tc = None
0109     errStr = "failed to get Globus Client ID and Refresh Token"
0110     tmpLog.error(errStr)
0111     sys.exit(1)
0112 
0113 # create Globus transfer client to send initial files to remote Globus source
0114 tmpStat, tc = globus_utils.create_globus_transfer_client(tmpLog, client_id, refresh_token)
0115 if not tmpStat:
0116     tc = None
0117     errStr = "failed to create Globus Transfer Client"
0118     tmpLog.error(errStr)
0119     sys.exit(1)
0120 try:
0121     # We are sending test files from our destination machine to the source machine
0122     # Test endpoints for activation -
0123     tmpStatsrc, srcStr = globus_utils.check_endpoint_activation(tmpLog, tc, dstEndpoint)
0124     tmpStatdst, dstStr = globus_utils.check_endpoint_activation(tmpLog, tc, srcEndpoint)
0125     if tmpStatsrc and tmpStatdst:
0126         errStr = "source Endpoint and destination Endpoint activated"
0127         tmpLog.debug(errStr)
0128     else:
0129         errStr = ""
0130         if not tmpStatsrc:
0131             errStr += " source Endpoint not activated "
0132         if not tmpStatdst:
0133             errStr += " destination Endpoint not activated "
0134         tmpLog.error(errStr)
0135         sys.exit(2)
0136     # We are sending test files from our destination machine to the source machine
0137     # both endpoints activated now prepare to transfer data
0138     tdata = TransferData(tc, dstEndpoint, srcEndpoint, sync_level="checksum")
0139 except BaseException:
0140     errStat, errMsg = globus_utils.handle_globus_exception(tmpLog)
0141     sys.exit(1)
0142 
0143 # create JobSpec
0144 jobSpec = JobSpec()
0145 jobSpec.jobParams = {
0146     "scopeLog": "panda",
0147     "logFile": "log",
0148 }
0149 jobSpec.computingSite = queueName
0150 jobSpec.PandaID = job_id
0151 jobSpec.modificationTime = datetime.datetime.now()
0152 realDataset = "panda.sgotest." + uuid.uuid4().hex
0153 ddmEndPointIn = "BNL-OSG2_DATADISK"
0154 inFiles_scope_str = ""
0155 inFiles_str = ""
0156 realDatasets_str = ""
0157 realDatasetsIn_str = ""
0158 ddmEndPointIn_str = ""
0159 GUID_str = ""
0160 fsize_str = ""
0161 checksum_str = ""
0162 scope_in_str = ""
0163 
0164 # create up 5 files for input
0165 for index in range(random.randint(1, 5)):
0166     fileSpec = FileSpec()
0167     assFileSpec = FileSpec()
0168     fileSpec.fileType = "input"
0169     assFileSpec.lfn = "panda.sgotest." + uuid.uuid4().hex
0170     fileSpec.lfn = assFileSpec.lfn
0171     fileSpec.scope = "panda"
0172     inFiles_scope_str += "panda,"
0173     inFiles_str += fileSpec.lfn + ","
0174     realDatasets_str += realDataset + ","
0175     realDatasetsIn_str += realDataset + ","
0176     ddmEndPointIn_str += ddmEndPointIn + ","
0177     # some dummy inputs
0178     GUID_str += "d82e8e5e301b77489fd4da04bcdd6565,"
0179     fsize_str += "3084569129,"
0180     checksum_str += "ad:9f60d29f,"
0181     scope_in_str += "panda,"
0182     #
0183     assFileSpec.fileType = "input"
0184     assFileSpec.fsize = random.randint(10, 100)
0185     # create source file
0186     hash = hashlib.md5()
0187     hash.update(f"{fileSpec.scope}:{fileSpec.lfn}".encode("utf-8"))
0188     hash_hex = hash.hexdigest()
0189     correctedscope = "/".join(scope.split("."))
0190     fileSpec.path = f"{queueConfig.preparator['Globus_dstPath']}/{correctedscope}/{hash_hex[0:2]}/{hash_hex[2:4]}/{fileSpec.lfn}"
0191     assFileSpec.path = fileSpec.path
0192     fileSpec.add_associated_file(assFileSpec)
0193     # now create the temporary file
0194     tmpfile_path = f"{queueConfig.preparator['basePath']}/testdata/{assFileSpec.lfn}"
0195     if not os.path.exists(os.path.dirname(tmpfile_path)):
0196         tmpLog.debug(f"os.makedirs({os.path.dirname(tmpfile_path)})")
0197         os.makedirs(os.path.dirname(tmpfile_path))
0198     oFile = open(tmpfile_path, "w")
0199     oFile.write("".join(random.choice(string.ascii_uppercase + string.digits) for _ in range(assFileSpec.fsize)))
0200     oFile.close()
0201     # location of destination file
0202     destfile_path = f"{queueConfig.preparator['Globus_srcPath']}/{correctedscope}/{hash_hex[0:2]}/{hash_hex[2:4]}/{fileSpec.lfn}"
0203 
0204     # add to Globus transfer list
0205     tdata.add_item(tmpfile_path, destfile_path)
0206     # print "dump(fileSpec)"
0207     # dump(fileSpec)
0208     # add input file to jobSpec
0209     jobSpec.add_in_file(fileSpec)
0210     #
0211     tmpLog.debug(f"source file to transfer - {tmpfile_path}")
0212     tmpLog.debug(f"destination file to transfer - {destfile_path}")
0213     # print "dump(jobSpec)"
0214     # dump(jobSpec)
0215 # remove final ","
0216 realDatasetsIn_str = realDatasetsIn_str[:-1]
0217 inFiles_str = inFiles_str[:-1]
0218 inFiles_scope_str = inFiles_scope_str[:-1]
0219 GUID_str = GUID_str[:-1]
0220 fsize_str = fsize_str[:-1]
0221 checksum_str = checksum_str[:-1]
0222 scope_in_str = scope_in_str[:-1]
0223 jobSpec.jobParams["realDatasets"] = realDatasets_str
0224 jobSpec.jobParams["ddmEndPointIn"] = ddmEndPointIn_str
0225 jobSpec.jobParams["inFiles"] = inFiles_str
0226 jobSpec.jobParams["GUID"] = GUID_str
0227 jobSpec.jobParams["fsize"] = fsize_str
0228 jobSpec.jobParams["checksum"] = checksum_str
0229 jobSpec.jobParams["scopeIn"] = scope_in_str
0230 jobSpec.jobParams["realDatasetsIn"] = realDatasetsIn_str
0231 msgStr = f"jobSpec.jobParams ={jobSpec.jobParams}"
0232 tmpLog.debug(msgStr)
0233 
0234 # transfer dummy files to Remote site for input
0235 transfer_result = tc.submit_transfer(tdata)
0236 # check status code and message
0237 tmpLog.debug(str(transfer_result))
0238 if transfer_result["code"] == "Accepted":
0239     # succeeded
0240     # set transfer ID which are used for later lookup
0241     transferID = transfer_result["task_id"]
0242     tmpLog.debug("done")
0243 else:
0244     tmpLog.error("Failed to send intial files")
0245     sys.exit(3)
0246 
0247 print(f"sleep {globus_sleep_time} seconds")
0248 time.sleep(globus_sleep_time)
0249 
0250 # enter polling loop to see if the intial files have transfered
0251 maxloop = 5
0252 iloop = 0
0253 NotFound = True
0254 while (iloop < maxloop) and NotFound:
0255     # get transfer task
0256     tmpStat, transferTasks = globus_utils.get_transfer_task_by_id(tmpLog, tc, transferID)
0257     # return a temporary error when failed to get task
0258     if not tmpStat:
0259         errStr = "failed to get transfer task"
0260         tmpLog.error(errStr)
0261     else:
0262         # return a temporary error when task is missing
0263         tmpLog.debug(f"transferTasks : {transferTasks} ")
0264         if transferID not in transferTasks:
0265             errStr = f"transfer task ID - {transferID} is missing"
0266             tmpLog.error(errStr)
0267         else:
0268             # succeeded in finding a transfer task by tranferID
0269             if transferTasks[transferID]["status"] == "SUCCEEDED":
0270                 tmpLog.debug(f"transfer task {transferID} succeeded")
0271                 NotFound = False
0272             # failed
0273             if transferTasks[transferID]["status"] == "FAILED":
0274                 errStr = f"transfer task {transferID} failed"
0275                 tmpLog.error(errStr)
0276         # another status
0277         tmpStr = f"transfer task {transferID} status: {transferTasks[transferID]['status']}"
0278         tmpLog.debug(tmpStr)
0279     if NotFound:
0280         print(f"sleep {globus_sleep_time} seconds")
0281         time.sleep(globus_sleep_time)
0282         ++iloop
0283 
0284 if NotFound:
0285     errStr = f"transfer task ID - {transferID} is missing"
0286     tmpLog.error(errStr)
0287     sys.exit(1)
0288 
0289 # dump(queueConfig)
0290 
0291 print(f"plugin={preparatorCore.__class__.__name__}")
0292 
0293 print("testing stagein:")
0294 print(f"BasePath from preparator configuration: {preparatorCore.basePath} ")
0295 
0296 
0297 tmpStat, tmpOut = preparatorCore.trigger_preparation(jobSpec)
0298 if tmpStat:
0299     print(" OK")
0300 else:
0301     print(f" NG {tmpOut}")
0302 
0303 print(f"sleep {globus_sleep_time} seconds")
0304 time.sleep(globus_sleep_time)
0305 
0306 print("testing status check")
0307 while True:
0308     tmpStat, tmpOut = preparatorCore.check_stage_in_status(jobSpec)
0309     if tmpStat:
0310         print(" OK")
0311         break
0312     elif tmpStat == False:
0313         print(f" NG {tmpOut}")
0314         sys.exit(1)
0315     else:
0316         print(" still running. sleep 1 min")
0317         time.sleep(60)
0318 
0319 print("checking path resolution")
0320 tmpStat, tmpOut = preparatorCore.resolve_input_paths(jobSpec)
0321 if tmpStat:
0322     print(f" OK {jobSpec.jobParams['inFilePaths']}")
0323 else:
0324     print(f" NG {tmpOut}")