File indexing completed on 2026-04-20 07:59:01
0001 import datetime
0002 import hashlib
0003 import logging
0004 import os
0005 import os.path
0006 import random
0007 import string
0008 import sys
0009 import threading
0010 import time
0011 import uuid
0012
0013 from globus_sdk import (
0014 NativeAppAuthClient,
0015 RefreshTokenAuthorizer,
0016 TransferClient,
0017 TransferData,
0018 )
0019
0020 from pandaharvester.harvesterbody.cacher import Cacher
0021 from pandaharvester.harvesterconfig import harvester_config
0022 from pandaharvester.harvestercore import core_utils
0023 from pandaharvester.harvestercore.communicator_pool import CommunicatorPool
0024 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0025 from pandaharvester.harvestercore.file_spec import FileSpec
0026 from pandaharvester.harvestercore.job_spec import JobSpec
0027 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0028 from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0029 from pandaharvester.harvestermisc import globus_utils
0030
0031
0032 fileTableName = "file_table"
0033 queueName = "ALCF_Theta"
0034 begin_job_id = 1111
0035 end_job_id = 1113
0036 globus_sleep_time = 15
0037
0038
0039 conLock = threading.Lock()
0040
0041
0042 def dump(obj):
0043 for attr in dir(obj):
0044 if hasattr(obj, attr):
0045 print(f"obj.{attr} = {getattr(obj, attr)}")
0046
0047
0048 if len(sys.argv) > 1:
0049 queueName = sys.argv[1]
0050
0051
0052
0053
0054
0055
0056
0057 queueConfigMapper = QueueConfigMapper()
0058 queueConfig = queueConfigMapper.get_queue(queueName)
0059 initial_queueConfig_preparator = queueConfig.preparator
0060 queueConfig.preparator["module"] = "pandaharvester.harvesterpreparator.go_bulk_preparator"
0061 queueConfig.preparator["name"] = "GlobusBulkPreparator"
0062 modified_queueConfig_preparator = queueConfig.preparator
0063
0064 pluginFactory = PluginFactory()
0065
0066 preparatorCore = pluginFactory.get_plugin(queueConfig.preparator)
0067
0068
0069 _logger = core_utils.setup_logger("further_testing_go_bulk_preparator")
0070 tmpLog = core_utils.make_logger(_logger, method_name="further_testing_go_bulk_preparator")
0071 tmpLog.debug("start")
0072
0073 for loggerName, loggerObj in logging.Logger.manager.loggerDict.iteritems():
0074
0075 if loggerName.startswith("panda.log"):
0076 if len(loggerObj.handlers) == 0:
0077 continue
0078 if loggerName.split(".")[-1] in ["db_proxy"]:
0079 continue
0080 stdoutHandler = logging.StreamHandler(sys.stdout)
0081 stdoutHandler.setFormatter(loggerObj.handlers[0].formatter)
0082 loggerObj.addHandler(stdoutHandler)
0083
0084 msgStr = f"plugin={preparatorCore.__class__.__name__}"
0085 tmpLog.debug(msgStr)
0086 msgStr = f"Initial queueConfig.preparator = {initial_queueConfig_preparator}"
0087 tmpLog.debug(msgStr)
0088 msgStr = f"Modified queueConfig.preparator = {modified_queueConfig_preparator}"
0089 tmpLog.debug(msgStr)
0090
0091 scope = "panda"
0092
0093 proxy = DBProxy()
0094 communicator = CommunicatorPool()
0095 cacher = Cacher(communicator, single_mode=True)
0096 cacher.run()
0097
0098 tmpLog.debug(f"plugin={preparatorCore.__class__.__name__}")
0099 tmpLog.debug(f"BasePath from preparator configuration: {preparatorCore.basePath} ")
0100
0101
0102 tmpLog.debug("try to get all jobs in a preparing substate")
0103 jobSpec_list = proxy.get_jobs_in_sub_status("preparing", 2000, None, None, None, None, None, None)
0104 tmpLog.debug(f"got {len(jobSpec_list)} jobs")
0105
0106 if len(jobSpec_list) > 0:
0107 for jobSpec in jobSpec_list:
0108 tmpLog.debug(" PandaID = %d status = %s subStatus = %s lockedBy = %s" % (jobSpec.PandaID, jobSpec.status, jobSpec.subStatus, jobSpec.lockedBy))
0109
0110 groups = jobSpec.get_groups_of_input_files(skip_ready=True)
0111 tmpLog.debug(f"jobspec.get_groups_of_input_files() = : {groups}")
0112
0113 for key in groups:
0114 locked = preparatorCore.dbInterface.get_object_lock(key, lock_interval=120)
0115 if not locked:
0116 tmpLog.debug("DB Already locked by another thread")
0117
0118 unlocked = preparatorCore.dbInterface.release_object_lock(key)
0119 if unlocked:
0120 tmpLog.debug("unlocked db")
0121 else:
0122 tmpLog.debug(" Could not unlock db")
0123
0124 msgStr = f"jobSpec PandaID - {jobSpec.PandaID}"
0125 tmpLog.debug(msgStr)
0126
0127
0128
0129
0130
0131
0132
0133
0134
0135
0136
0137
0138
0139
0140
0141
0142 msgStr = f"Original dummy_transfer_id = {preparatorCore.get_dummy_transfer_id()}"
0143 tmpLog.debug(msgStr)
0144
0145 for key in groups:
0146 preparatorCore.set_dummy_transfer_id_testing(key)
0147 msgStr = f"Revised dummy_transfer_id = {preparatorCore.get_dummy_transfer_id()}"
0148 tmpLog.debug(msgStr)
0149 files = proxy.get_files_with_group_id(preparatorCore.get_dummy_transfer_id())
0150 tmpLog.debug(f"proxy.get_files_with_group_id(preparatorCore.get_dummy_transfer_id()) = {files}")
0151 files = preparatorCore.dbInterface.get_files_with_group_id(preparatorCore.get_dummy_transfer_id())
0152 tmpLog.debug(f"preparatorCore.dbInterface.get_files_with_group_id(preparatorCore.get_dummy_transfer_id()) = {files}")
0153 msgStr = "checking status for transfer and perhaps ultimately triggering the transfer"
0154 tmpLog.debug(msgStr)
0155 tmpStat, tmpOut = preparatorCore.check_stage_in_status(jobSpec)
0156 if tmpStat:
0157 msgStr = " OK"
0158 tmpLog.debug(msgStr)
0159 elif tmpStat is None:
0160 msgStr = f" Temporary failure No Good {tmpOut}"
0161 tmpLog.debug(msgStr)
0162 elif not tmpStat:
0163 msgStr = f" No Good {tmpOut}"
0164 tmpLog.debug(msgStr)