Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:59:01

0001 import datetime
0002 import hashlib
0003 import logging
0004 import os
0005 import os.path
0006 import random
0007 import string
0008 import sys
0009 import threading
0010 import time
0011 import uuid
0012 
0013 from globus_sdk import (
0014     NativeAppAuthClient,
0015     RefreshTokenAuthorizer,
0016     TransferClient,
0017     TransferData,
0018 )
0019 
0020 from pandaharvester.harvesterbody.cacher import Cacher
0021 from pandaharvester.harvesterconfig import harvester_config
0022 from pandaharvester.harvestercore import core_utils
0023 from pandaharvester.harvestercore.communicator_pool import CommunicatorPool
0024 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0025 from pandaharvester.harvestercore.file_spec import FileSpec
0026 from pandaharvester.harvestercore.job_spec import JobSpec
0027 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0028 from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0029 from pandaharvester.harvestermisc import globus_utils
0030 
0031 # initial variables
0032 fileTableName = "file_table"
0033 queueName = "ALCF_Theta"
0034 begin_job_id = 1111
0035 end_job_id = 1113
0036 globus_sleep_time = 15
0037 
0038 # connection lock
0039 conLock = threading.Lock()
0040 
0041 
0042 def dump(obj):
0043     for attr in dir(obj):
0044         if hasattr(obj, attr):
0045             print(f"obj.{attr} = {getattr(obj, attr)}")
0046 
0047 
0048 if len(sys.argv) > 1:
0049     queueName = sys.argv[1]
0050 # if len(sys.argv) > 2:
0051 #   begin_job_id = int(sys.argv[2])
0052 # if len(sys.argv) > 3:
0053 #   end_job_id = int(sys.argv[3])
0054 # if len(sys.argv) > 4:
0055 #   globus_sleep_time = int(sys.argv[4])
0056 
0057 queueConfigMapper = QueueConfigMapper()
0058 queueConfig = queueConfigMapper.get_queue(queueName)
0059 initial_queueConfig_preparator = queueConfig.preparator
0060 queueConfig.preparator["module"] = "pandaharvester.harvesterpreparator.go_bulk_preparator"
0061 queueConfig.preparator["name"] = "GlobusBulkPreparator"
0062 modified_queueConfig_preparator = queueConfig.preparator
0063 
0064 pluginFactory = PluginFactory()
0065 # get stage-out plugin
0066 preparatorCore = pluginFactory.get_plugin(queueConfig.preparator)
0067 
0068 # logger
0069 _logger = core_utils.setup_logger("further_testing_go_bulk_preparator")
0070 tmpLog = core_utils.make_logger(_logger, method_name="further_testing_go_bulk_preparator")
0071 tmpLog.debug("start")
0072 
0073 for loggerName, loggerObj in logging.Logger.manager.loggerDict.iteritems():
0074     # print "loggerName - {}".format(loggerName)
0075     if loggerName.startswith("panda.log"):
0076         if len(loggerObj.handlers) == 0:
0077             continue
0078         if loggerName.split(".")[-1] in ["db_proxy"]:
0079             continue
0080         stdoutHandler = logging.StreamHandler(sys.stdout)
0081         stdoutHandler.setFormatter(loggerObj.handlers[0].formatter)
0082         loggerObj.addHandler(stdoutHandler)
0083 
0084 msgStr = f"plugin={preparatorCore.__class__.__name__}"
0085 tmpLog.debug(msgStr)
0086 msgStr = f"Initial queueConfig.preparator = {initial_queueConfig_preparator}"
0087 tmpLog.debug(msgStr)
0088 msgStr = f"Modified queueConfig.preparator = {modified_queueConfig_preparator}"
0089 tmpLog.debug(msgStr)
0090 
0091 scope = "panda"
0092 
0093 proxy = DBProxy()
0094 communicator = CommunicatorPool()
0095 cacher = Cacher(communicator, single_mode=True)
0096 cacher.run()
0097 
0098 tmpLog.debug(f"plugin={preparatorCore.__class__.__name__}")
0099 tmpLog.debug(f"BasePath from preparator configuration: {preparatorCore.basePath} ")
0100 
0101 # get all jobs in table in a preparing substate
0102 tmpLog.debug("try to get all jobs in a preparing substate")
0103 jobSpec_list = proxy.get_jobs_in_sub_status("preparing", 2000, None, None, None, None, None, None)
0104 tmpLog.debug(f"got {len(jobSpec_list)} jobs")
0105 # loop over all found jobs
0106 if len(jobSpec_list) > 0:
0107     for jobSpec in jobSpec_list:
0108         tmpLog.debug(" PandaID = %d status = %s subStatus = %s lockedBy = %s" % (jobSpec.PandaID, jobSpec.status, jobSpec.subStatus, jobSpec.lockedBy))
0109         # get the transfer groups
0110         groups = jobSpec.get_groups_of_input_files(skip_ready=True)
0111         tmpLog.debug(f"jobspec.get_groups_of_input_files() = : {groups}")
0112         # loop over groups keys to see if db is locked
0113         for key in groups:
0114             locked = preparatorCore.dbInterface.get_object_lock(key, lock_interval=120)
0115             if not locked:
0116                 tmpLog.debug("DB Already locked by another thread")
0117             # now unlock db
0118             unlocked = preparatorCore.dbInterface.release_object_lock(key)
0119             if unlocked:
0120                 tmpLog.debug("unlocked db")
0121             else:
0122                 tmpLog.debug(" Could not unlock db")
0123         # print out jobSpec PandID
0124         msgStr = f"jobSpec PandaID - {jobSpec.PandaID}"
0125         tmpLog.debug(msgStr)
0126         # msgStr = "testing trigger_preparation"
0127         # tmpLog.debug(msgStr)
0128         # tmpStat, tmpOut = preparatorCore.trigger_preparation(jobSpec)
0129         # if tmpStat:
0130         #   msgStr = " OK "
0131         #   tmpLog.debug(msgStr)
0132         # elif tmpStat == None:
0133         #   msgStr = " Temporary failure NG {0}".format(tmpOut)
0134         #   tmpLog.debug(msgStr)
0135         # elif not tmpStat:
0136         #   msgStr = " No Good {0}".format(tmpOut)
0137         #   tmpLog.debug(msgStr)
0138         #   sys.exit(1)
0139 
0140         # check status to actually trigger transfer
0141         # get the files with the group_id and print out
0142         msgStr = f"Original dummy_transfer_id = {preparatorCore.get_dummy_transfer_id()}"
0143         tmpLog.debug(msgStr)
0144         # modify dummy_transfer_id from groups of input files
0145         for key in groups:
0146             preparatorCore.set_dummy_transfer_id_testing(key)
0147             msgStr = f"Revised dummy_transfer_id = {preparatorCore.get_dummy_transfer_id()}"
0148             tmpLog.debug(msgStr)
0149             files = proxy.get_files_with_group_id(preparatorCore.get_dummy_transfer_id())
0150             tmpLog.debug(f"proxy.get_files_with_group_id(preparatorCore.get_dummy_transfer_id()) = {files}")
0151             files = preparatorCore.dbInterface.get_files_with_group_id(preparatorCore.get_dummy_transfer_id())
0152             tmpLog.debug(f"preparatorCore.dbInterface.get_files_with_group_id(preparatorCore.get_dummy_transfer_id()) = {files}")
0153             msgStr = "checking status for transfer and perhaps ultimately triggering the transfer"
0154             tmpLog.debug(msgStr)
0155             tmpStat, tmpOut = preparatorCore.check_stage_in_status(jobSpec)
0156             if tmpStat:
0157                 msgStr = " OK"
0158                 tmpLog.debug(msgStr)
0159             elif tmpStat is None:
0160                 msgStr = f" Temporary failure No Good {tmpOut}"
0161                 tmpLog.debug(msgStr)
0162             elif not tmpStat:
0163                 msgStr = f" No Good {tmpOut}"
0164                 tmpLog.debug(msgStr)