File indexing completed on 2026-04-20 07:59:01
0001 import datetime
0002 import hashlib
0003 import logging
0004 import os
0005 import os.path
0006 import random
0007 import string
0008 import sys
0009 import threading
0010 import time
0011 import uuid
0012
0013 from globus_sdk import (
0014 NativeAppAuthClient,
0015 RefreshTokenAuthorizer,
0016 TransferClient,
0017 TransferData,
0018 )
0019
0020 from pandaharvester.harvesterbody.cacher import Cacher
0021 from pandaharvester.harvesterconfig import harvester_config
0022 from pandaharvester.harvestercore import core_utils
0023 from pandaharvester.harvestercore.communicator_pool import CommunicatorPool
0024 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0025 from pandaharvester.harvestercore.file_spec import FileSpec
0026 from pandaharvester.harvestercore.job_spec import JobSpec
0027 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0028 from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0029 from pandaharvester.harvestermisc import globus_utils
0030
0031
0032 fileTableName = "file_table"
0033 queueName = "ALCF_Theta"
0034 job_id = 0
0035 end_job_id = 1113
0036 globus_sleep_time = 15
0037
0038
0039 conLock = threading.Lock()
0040
0041
0042 def dump(obj):
0043 for attr in dir(obj):
0044 if hasattr(obj, attr):
0045 print(f"obj.{attr} = {getattr(obj, attr)}")
0046
0047
0048 if len(sys.argv) > 1:
0049 queueName = sys.argv[1]
0050 if len(sys.argv) > 2:
0051 job_id = int(sys.argv[2])
0052
0053
0054
0055
0056
0057 queueConfigMapper = QueueConfigMapper()
0058 queueConfig = queueConfigMapper.get_queue(queueName)
0059 initial_queueConfig_preparator = queueConfig.preparator
0060 queueConfig.preparator["module"] = "pandaharvester.harvesterpreparator.go_bulk_preparator"
0061 queueConfig.preparator["name"] = "GlobusBulkPreparator"
0062 modified_queueConfig_preparator = queueConfig.preparator
0063
0064 pluginFactory = PluginFactory()
0065
0066 preparatorCore = pluginFactory.get_plugin(queueConfig.preparator)
0067
0068
0069 _logger = core_utils.setup_logger("further_testing_go_bulk_preparator")
0070 tmpLog = core_utils.make_logger(_logger, method_name="further_testing_go_bulk_preparator")
0071 tmpLog.debug("start")
0072
0073 for loggerName, loggerObj in logging.Logger.manager.loggerDict.iteritems():
0074
0075 if loggerName.startswith("panda.log"):
0076 if len(loggerObj.handlers) == 0:
0077 continue
0078 if loggerName.split(".")[-1] in ["db_proxy"]:
0079 continue
0080 stdoutHandler = logging.StreamHandler(sys.stdout)
0081 stdoutHandler.setFormatter(loggerObj.handlers[0].formatter)
0082 loggerObj.addHandler(stdoutHandler)
0083
0084 msgStr = f"plugin={preparatorCore.__class__.__name__}"
0085 tmpLog.debug(msgStr)
0086 msgStr = f"Initial queueConfig.preparator = {initial_queueConfig_preparator}"
0087 tmpLog.debug(msgStr)
0088 msgStr = f"Modified queueConfig.preparator = {modified_queueConfig_preparator}"
0089 tmpLog.debug(msgStr)
0090
0091 scope = "panda"
0092
0093 proxy = DBProxy()
0094 communicator = CommunicatorPool()
0095 cacher = Cacher(communicator, single_mode=True)
0096 cacher.run()
0097
0098 tmpLog.debug(f"plugin={preparatorCore.__class__.__name__}")
0099 tmpLog.debug(f"BasePath from preparator configuration: {preparatorCore.basePath} ")
0100
0101
0102
0103
0104
0105 if job_id > 0:
0106 tmpLog.debug(f"try to get job ID - {job_id}")
0107 jobSpec_list = [proxy.get_job(job_id)]
0108 else:
0109 tmpLog.debug("try to get all jobs")
0110 jobSpec_list = proxy.get_jobs()
0111
0112 tmpLog.debug(f"got {len(jobSpec_list)} jobs")
0113
0114
0115
0116 if len(jobSpec_list) > 0:
0117 for jobSpec in jobSpec_list:
0118
0119 if job_id > 0:
0120 if jobSpec.PandaID != job_id:
0121 continue
0122 tmpLog.debug(" PandaID = %d status = %s subStatus = %s lockedBy = %s" % (jobSpec.PandaID, jobSpec.status, jobSpec.subStatus, jobSpec.lockedBy))
0123
0124 groups = jobSpec.get_groups_of_input_files(skip_ready=True)
0125 tmpLog.debug(f"jobspec.get_groups_of_input_files(skip_ready=True) = : {groups}")
0126 groups = jobSpec.get_groups_of_input_files()
0127 tmpLog.debug(f"jobspec.get_groups_of_input_files() = : {groups}")
0128
0129
0130 tmpLog.debug(f"Number of input files - {len(jobSpec.inFiles)}")
0131
0132
0133 for group in groups:
0134 tmpLog.debug(f"file group id - {group} number of input files - {len(jobSpec.get_input_file_specs(group))}")
0135
0136 inFiles = jobSpec.get_input_file_attributes(skip_ready=True)
0137 tmpLog.debug(f"number of input files from get_input_file_attributes - {len(inFiles)}")
0138
0139 lfns = inFiles.keys()
0140 tmpLog.debug(f"number of input files from inFiles.keys() - {len(lfns)}")
0141
0142 tmpLog.debug(f"{lfns}")
0143
0144 for inLFN in inFiles.keys():
0145 lfns.append(inLFN)
0146 tmpLog.debug(f"number of input files from append inFiles.keys() - {len(lfns)}")
0147
0148 sys.exit(0)
0149
0150
0151 for key in groups:
0152 locked = preparatorCore.dbInterface.get_object_lock(key, lock_interval=120)
0153 if not locked:
0154 tmpLog.debug("DB Already locked by another thread")
0155
0156 unlocked = preparatorCore.dbInterface.release_object_lock(key)
0157 if unlocked:
0158 tmpLog.debug("unlocked db")
0159 else:
0160 tmpLog.debug(" Could not unlock db")
0161
0162 msgStr = f"jobSpec PandaID - {jobSpec.PandaID}"
0163 tmpLog.debug(msgStr)
0164
0165
0166
0167
0168
0169
0170
0171
0172
0173
0174
0175
0176
0177
0178
0179
0180 msgStr = f"Original dummy_transfer_id = {preparatorCore.get_dummy_transfer_id()}"
0181 tmpLog.debug(msgStr)
0182
0183 for key in groups:
0184 preparatorCore.set_dummy_transfer_id_testing(key)
0185 msgStr = f"Revised dummy_transfer_id = {preparatorCore.get_dummy_transfer_id()}"
0186 tmpLog.debug(msgStr)
0187 files = proxy.get_files_with_group_id(preparatorCore.get_dummy_transfer_id())
0188 tmpLog.debug(f"proxy.get_files_with_group_id(preparatorCore.get_dummy_transfer_id()) = {files}")
0189 files = preparatorCore.dbInterface.get_files_with_group_id(preparatorCore.get_dummy_transfer_id())
0190 tmpLog.debug(f"preparatorCore.dbInterface.get_files_with_group_id(preparatorCore.get_dummy_transfer_id()) = {files}")
0191 msgStr = "checking status for transfer and perhaps ultimately triggering the transfer"
0192 tmpLog.debug(msgStr)
0193 tmpStat, tmpOut = preparatorCore.check_stage_in_status(jobSpec)
0194 if tmpStat:
0195 msgStr = " OK"
0196 tmpLog.debug(msgStr)
0197 elif tmpStat is None:
0198 msgStr = f" Temporary failure No Good {tmpOut}"
0199 tmpLog.debug(msgStr)
0200 elif not tmpStat:
0201 msgStr = f" No Good {tmpOut}"
0202 tmpLog.debug(msgStr)