File indexing completed on 2026-04-20 07:59:01
0001 import datetime
0002 import hashlib
0003 import logging
0004 import os
0005 import os.path
0006 import random
0007 import string
0008 import sys
0009 import threading
0010 import time
0011 import uuid
0012
0013 from globus_sdk import (
0014 NativeAppAuthClient,
0015 RefreshTokenAuthorizer,
0016 TransferClient,
0017 TransferData,
0018 )
0019
0020 from pandaharvester.harvesterbody.cacher import Cacher
0021 from pandaharvester.harvesterconfig import harvester_config
0022 from pandaharvester.harvestercore import core_utils
0023 from pandaharvester.harvestercore.communicator_pool import CommunicatorPool
0024 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0025 from pandaharvester.harvestercore.file_spec import FileSpec
0026 from pandaharvester.harvestercore.job_spec import JobSpec
0027 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0028 from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0029 from pandaharvester.harvestermisc import globus_utils
0030
0031
0032 def dump(obj):
0033 for attr in dir(obj):
0034 if hasattr(obj, attr):
0035 print(f"obj.{attr} = {getattr(obj, attr)}")
0036
0037
0038 print(len(sys.argv))
0039 queueName = "ALCF_Theta"
0040 job_id = 1111
0041 globus_sleep_time = 15
0042
0043 if len(sys.argv) > 1:
0044 queueName = sys.argv[1]
0045 if len(sys.argv) > 2:
0046 job_id = int(sys.argv[2])
0047 if len(sys.argv) > 3:
0048 globus_sleep_time = int(sys.argv[3])
0049
0050
0051 queueConfigMapper = QueueConfigMapper()
0052 queueConfig = queueConfigMapper.get_queue(queueName)
0053 initial_queueConfig_preparator = queueConfig.preparator
0054 queueConfig.preparator["module"] = "pandaharvester.harvesterpreparator.go_preparator"
0055 queueConfig.preparator["name"] = "GoPreparator"
0056 modified_queueConfig_preparator = queueConfig.preparator
0057
0058 pluginFactory = PluginFactory()
0059
0060 preparatorCore = pluginFactory.get_plugin(queueConfig.preparator)
0061
0062
0063 _logger = core_utils.setup_logger("stageInTest_go_preparator")
0064 tmpLog = core_utils.make_logger(_logger, method_name="stageInTest_go_preparator")
0065 tmpLog.debug("start")
0066
0067 for loggerName, loggerObj in logging.Logger.manager.loggerDict.items():
0068
0069 if loggerName.startswith("panda.log"):
0070 if len(loggerObj.handlers) == 0:
0071 continue
0072 if loggerName.split(".")[-1] in ["db_proxy"]:
0073 continue
0074 stdoutHandler = logging.StreamHandler(sys.stdout)
0075 stdoutHandler.setFormatter(loggerObj.handlers[0].formatter)
0076 loggerObj.addHandler(stdoutHandler)
0077
0078 msgStr = f"plugin={preparatorCore.__class__.__name__}"
0079 tmpLog.debug(msgStr)
0080 msgStr = f"Initial queueConfig.preparator = {initial_queueConfig_preparator}"
0081 tmpLog.debug(msgStr)
0082 msgStr = f"Modified queueConfig.preparator = {modified_queueConfig_preparator}"
0083 tmpLog.debug(msgStr)
0084
0085 scope = "panda"
0086
0087 proxy = DBProxy()
0088 communicator = CommunicatorPool()
0089 cacher = Cacher(communicator, single_mode=True)
0090 cacher.run()
0091
0092 Globus_srcPath = queueConfig.preparator["Globus_srcPath"]
0093 srcEndpoint = queueConfig.preparator["srcEndpoint"]
0094 basePath = queueConfig.preparator["basePath"]
0095 Globus_dstPath = queueConfig.preparator["Globus_dstPath"]
0096 dstEndpoint = queueConfig.preparator["dstEndpoint"]
0097
0098
0099 c_data = preparatorCore.dbInterface.get_cache("globus_secret")
0100 client_id = None
0101 refresh_token = None
0102 if (c_data is not None) and c_data.data["StatusCode"] == 0:
0103 client_id = c_data.data["publicKey"]
0104 refresh_token = c_data.data["privateKey"]
0105 else:
0106 client_id = None
0107 refresh_token = None
0108 tc = None
0109 errStr = "failed to get Globus Client ID and Refresh Token"
0110 tmpLog.error(errStr)
0111 sys.exit(1)
0112
0113
0114 tmpStat, tc = globus_utils.create_globus_transfer_client(tmpLog, client_id, refresh_token)
0115 if not tmpStat:
0116 tc = None
0117 errStr = "failed to create Globus Transfer Client"
0118 tmpLog.error(errStr)
0119 sys.exit(1)
0120 try:
0121
0122
0123 tmpStatsrc, srcStr = globus_utils.check_endpoint_activation(tmpLog, tc, dstEndpoint)
0124 tmpStatdst, dstStr = globus_utils.check_endpoint_activation(tmpLog, tc, srcEndpoint)
0125 if tmpStatsrc and tmpStatdst:
0126 errStr = "source Endpoint and destination Endpoint activated"
0127 tmpLog.debug(errStr)
0128 else:
0129 errStr = ""
0130 if not tmpStatsrc:
0131 errStr += " source Endpoint not activated "
0132 if not tmpStatdst:
0133 errStr += " destination Endpoint not activated "
0134 tmpLog.error(errStr)
0135 sys.exit(2)
0136
0137
0138 tdata = TransferData(tc, dstEndpoint, srcEndpoint, sync_level="checksum")
0139 except BaseException:
0140 errStat, errMsg = globus_utils.handle_globus_exception(tmpLog)
0141 sys.exit(1)
0142
0143
0144 jobSpec = JobSpec()
0145 jobSpec.jobParams = {
0146 "scopeLog": "panda",
0147 "logFile": "log",
0148 }
0149 jobSpec.computingSite = queueName
0150 jobSpec.PandaID = job_id
0151 jobSpec.modificationTime = datetime.datetime.now()
0152 realDataset = "panda.sgotest." + uuid.uuid4().hex
0153 ddmEndPointIn = "BNL-OSG2_DATADISK"
0154 inFiles_scope_str = ""
0155 inFiles_str = ""
0156 realDatasets_str = ""
0157 realDatasetsIn_str = ""
0158 ddmEndPointIn_str = ""
0159 GUID_str = ""
0160 fsize_str = ""
0161 checksum_str = ""
0162 scope_in_str = ""
0163
0164
0165 for index in range(random.randint(1, 5)):
0166 fileSpec = FileSpec()
0167 assFileSpec = FileSpec()
0168 fileSpec.fileType = "input"
0169 assFileSpec.lfn = "panda.sgotest." + uuid.uuid4().hex
0170 fileSpec.lfn = assFileSpec.lfn
0171 fileSpec.scope = "panda"
0172 inFiles_scope_str += "panda,"
0173 inFiles_str += fileSpec.lfn + ","
0174 realDatasets_str += realDataset + ","
0175 realDatasetsIn_str += realDataset + ","
0176 ddmEndPointIn_str += ddmEndPointIn + ","
0177
0178 GUID_str += "d82e8e5e301b77489fd4da04bcdd6565,"
0179 fsize_str += "3084569129,"
0180 checksum_str += "ad:9f60d29f,"
0181 scope_in_str += "panda,"
0182
0183 assFileSpec.fileType = "input"
0184 assFileSpec.fsize = random.randint(10, 100)
0185
0186 hash = hashlib.md5()
0187 hash.update(f"{fileSpec.scope}:{fileSpec.lfn}".encode("utf-8"))
0188 hash_hex = hash.hexdigest()
0189 correctedscope = "/".join(scope.split("."))
0190 fileSpec.path = f"{queueConfig.preparator['Globus_dstPath']}/{correctedscope}/{hash_hex[0:2]}/{hash_hex[2:4]}/{fileSpec.lfn}"
0191 assFileSpec.path = fileSpec.path
0192 fileSpec.add_associated_file(assFileSpec)
0193
0194 tmpfile_path = f"{queueConfig.preparator['basePath']}/testdata/{assFileSpec.lfn}"
0195 if not os.path.exists(os.path.dirname(tmpfile_path)):
0196 tmpLog.debug(f"os.makedirs({os.path.dirname(tmpfile_path)})")
0197 os.makedirs(os.path.dirname(tmpfile_path))
0198 oFile = open(tmpfile_path, "w")
0199 oFile.write("".join(random.choice(string.ascii_uppercase + string.digits) for _ in range(assFileSpec.fsize)))
0200 oFile.close()
0201
0202 destfile_path = f"{queueConfig.preparator['Globus_srcPath']}/{correctedscope}/{hash_hex[0:2]}/{hash_hex[2:4]}/{fileSpec.lfn}"
0203
0204
0205 tdata.add_item(tmpfile_path, destfile_path)
0206
0207
0208
0209 jobSpec.add_in_file(fileSpec)
0210
0211 tmpLog.debug(f"source file to transfer - {tmpfile_path}")
0212 tmpLog.debug(f"destination file to transfer - {destfile_path}")
0213
0214
0215
0216 realDatasetsIn_str = realDatasetsIn_str[:-1]
0217 inFiles_str = inFiles_str[:-1]
0218 inFiles_scope_str = inFiles_scope_str[:-1]
0219 GUID_str = GUID_str[:-1]
0220 fsize_str = fsize_str[:-1]
0221 checksum_str = checksum_str[:-1]
0222 scope_in_str = scope_in_str[:-1]
0223 jobSpec.jobParams["realDatasets"] = realDatasets_str
0224 jobSpec.jobParams["ddmEndPointIn"] = ddmEndPointIn_str
0225 jobSpec.jobParams["inFiles"] = inFiles_str
0226 jobSpec.jobParams["GUID"] = GUID_str
0227 jobSpec.jobParams["fsize"] = fsize_str
0228 jobSpec.jobParams["checksum"] = checksum_str
0229 jobSpec.jobParams["scopeIn"] = scope_in_str
0230 jobSpec.jobParams["realDatasetsIn"] = realDatasetsIn_str
0231 msgStr = f"jobSpec.jobParams ={jobSpec.jobParams}"
0232 tmpLog.debug(msgStr)
0233
0234
0235 transfer_result = tc.submit_transfer(tdata)
0236
0237 tmpLog.debug(str(transfer_result))
0238 if transfer_result["code"] == "Accepted":
0239
0240
0241 transferID = transfer_result["task_id"]
0242 tmpLog.debug("done")
0243 else:
0244 tmpLog.error("Failed to send intial files")
0245 sys.exit(3)
0246
0247 print(f"sleep {globus_sleep_time} seconds")
0248 time.sleep(globus_sleep_time)
0249
0250
0251 maxloop = 5
0252 iloop = 0
0253 NotFound = True
0254 while (iloop < maxloop) and NotFound:
0255
0256 tmpStat, transferTasks = globus_utils.get_transfer_task_by_id(tmpLog, tc, transferID)
0257
0258 if not tmpStat:
0259 errStr = "failed to get transfer task"
0260 tmpLog.error(errStr)
0261 else:
0262
0263 tmpLog.debug(f"transferTasks : {transferTasks} ")
0264 if transferID not in transferTasks:
0265 errStr = f"transfer task ID - {transferID} is missing"
0266 tmpLog.error(errStr)
0267 else:
0268
0269 if transferTasks[transferID]["status"] == "SUCCEEDED":
0270 tmpLog.debug(f"transfer task {transferID} succeeded")
0271 NotFound = False
0272
0273 if transferTasks[transferID]["status"] == "FAILED":
0274 errStr = f"transfer task {transferID} failed"
0275 tmpLog.error(errStr)
0276
0277 tmpStr = f"transfer task {transferID} status: {transferTasks[transferID]['status']}"
0278 tmpLog.debug(tmpStr)
0279 if NotFound:
0280 print(f"sleep {globus_sleep_time} seconds")
0281 time.sleep(globus_sleep_time)
0282 ++iloop
0283
0284 if NotFound:
0285 errStr = f"transfer task ID - {transferID} is missing"
0286 tmpLog.error(errStr)
0287 sys.exit(1)
0288
0289
0290
0291 print(f"plugin={preparatorCore.__class__.__name__}")
0292
0293 print("testing stagein:")
0294 print(f"BasePath from preparator configuration: {preparatorCore.basePath} ")
0295
0296
0297 tmpStat, tmpOut = preparatorCore.trigger_preparation(jobSpec)
0298 if tmpStat:
0299 print(" OK")
0300 else:
0301 print(f" NG {tmpOut}")
0302
0303 print(f"sleep {globus_sleep_time} seconds")
0304 time.sleep(globus_sleep_time)
0305
0306 print("testing status check")
0307 while True:
0308 tmpStat, tmpOut = preparatorCore.check_stage_in_status(jobSpec)
0309 if tmpStat:
0310 print(" OK")
0311 break
0312 elif tmpStat == False:
0313 print(f" NG {tmpOut}")
0314 sys.exit(1)
0315 else:
0316 print(" still running. sleep 1 min")
0317 time.sleep(60)
0318
0319 print("checking path resolution")
0320 tmpStat, tmpOut = preparatorCore.resolve_input_paths(jobSpec)
0321 if tmpStat:
0322 print(f" OK {jobSpec.jobParams['inFilePaths']}")
0323 else:
0324 print(f" NG {tmpOut}")