Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:59:01

0001 import datetime
0002 import hashlib
0003 import logging
0004 import os
0005 import os.path
0006 import random
0007 import string
0008 import sys
0009 import threading
0010 import time
0011 import uuid
0012 
0013 from globus_sdk import (
0014     NativeAppAuthClient,
0015     RefreshTokenAuthorizer,
0016     TransferClient,
0017     TransferData,
0018 )
0019 
0020 from pandaharvester.harvesterbody.cacher import Cacher
0021 from pandaharvester.harvesterconfig import harvester_config
0022 from pandaharvester.harvestercore import core_utils
0023 from pandaharvester.harvestercore.communicator_pool import CommunicatorPool
0024 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0025 from pandaharvester.harvestercore.file_spec import FileSpec
0026 from pandaharvester.harvestercore.job_spec import JobSpec
0027 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0028 from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0029 from pandaharvester.harvestermisc import globus_utils
0030 
0031 # initial variables
0032 fileTableName = "file_table"
0033 queueName = "ALCF_Theta"
0034 job_id = 0
0035 end_job_id = 1113
0036 globus_sleep_time = 15
0037 
0038 # connection lock
0039 conLock = threading.Lock()
0040 
0041 
0042 def dump(obj):
0043     for attr in dir(obj):
0044         if hasattr(obj, attr):
0045             print(f"obj.{attr} = {getattr(obj, attr)}")
0046 
0047 
0048 if len(sys.argv) > 1:
0049     queueName = sys.argv[1]
0050 if len(sys.argv) > 2:
0051     job_id = int(sys.argv[2])
0052 # if len(sys.argv) > 3:
0053 #   end_job_id = int(sys.argv[3])
0054 # if len(sys.argv) > 4:
0055 #   globus_sleep_time = int(sys.argv[4])
0056 
0057 queueConfigMapper = QueueConfigMapper()
0058 queueConfig = queueConfigMapper.get_queue(queueName)
0059 initial_queueConfig_preparator = queueConfig.preparator
0060 queueConfig.preparator["module"] = "pandaharvester.harvesterpreparator.go_bulk_preparator"
0061 queueConfig.preparator["name"] = "GlobusBulkPreparator"
0062 modified_queueConfig_preparator = queueConfig.preparator
0063 
0064 pluginFactory = PluginFactory()
0065 # get stage-out plugin
0066 preparatorCore = pluginFactory.get_plugin(queueConfig.preparator)
0067 
0068 # logger
0069 _logger = core_utils.setup_logger("further_testing_go_bulk_preparator")
0070 tmpLog = core_utils.make_logger(_logger, method_name="further_testing_go_bulk_preparator")
0071 tmpLog.debug("start")
0072 
0073 for loggerName, loggerObj in logging.Logger.manager.loggerDict.iteritems():
0074     # print "loggerName - {}".format(loggerName)
0075     if loggerName.startswith("panda.log"):
0076         if len(loggerObj.handlers) == 0:
0077             continue
0078         if loggerName.split(".")[-1] in ["db_proxy"]:
0079             continue
0080         stdoutHandler = logging.StreamHandler(sys.stdout)
0081         stdoutHandler.setFormatter(loggerObj.handlers[0].formatter)
0082         loggerObj.addHandler(stdoutHandler)
0083 
0084 msgStr = f"plugin={preparatorCore.__class__.__name__}"
0085 tmpLog.debug(msgStr)
0086 msgStr = f"Initial queueConfig.preparator = {initial_queueConfig_preparator}"
0087 tmpLog.debug(msgStr)
0088 msgStr = f"Modified queueConfig.preparator = {modified_queueConfig_preparator}"
0089 tmpLog.debug(msgStr)
0090 
0091 scope = "panda"
0092 
0093 proxy = DBProxy()
0094 communicator = CommunicatorPool()
0095 cacher = Cacher(communicator, single_mode=True)
0096 cacher.run()
0097 
0098 tmpLog.debug(f"plugin={preparatorCore.__class__.__name__}")
0099 tmpLog.debug(f"BasePath from preparator configuration: {preparatorCore.basePath} ")
0100 
0101 # get all jobs in table in a preparing substate
0102 # tmpLog.debug('try to get all jobs in a preparing substate')
0103 # jobSpec_list = proxy.get_jobs_in_sub_status('preparing',2000,None,None,None,None,None,None)
0104 # get all jobs
0105 if job_id > 0:
0106     tmpLog.debug(f"try to get job ID - {job_id}")
0107     jobSpec_list = [proxy.get_job(job_id)]
0108 else:
0109     tmpLog.debug("try to get all jobs")
0110     jobSpec_list = proxy.get_jobs()
0111 
0112 tmpLog.debug(f"got {len(jobSpec_list)} jobs")
0113 
0114 
0115 # loop over all found jobs
0116 if len(jobSpec_list) > 0:
0117     for jobSpec in jobSpec_list:
0118         # if user entered a job id check for it
0119         if job_id > 0:
0120             if jobSpec.PandaID != job_id:
0121                 continue
0122             tmpLog.debug(" PandaID = %d status = %s subStatus = %s lockedBy = %s" % (jobSpec.PandaID, jobSpec.status, jobSpec.subStatus, jobSpec.lockedBy))
0123         # get the transfer groups
0124         groups = jobSpec.get_groups_of_input_files(skip_ready=True)
0125         tmpLog.debug(f"jobspec.get_groups_of_input_files(skip_ready=True) = : {groups}")
0126         groups = jobSpec.get_groups_of_input_files()
0127         tmpLog.debug(f"jobspec.get_groups_of_input_files() = : {groups}")
0128 
0129         # get the number of input files
0130         tmpLog.debug(f"Number of input files - {len(jobSpec.inFiles)}")
0131 
0132         # loop over the groups and get the number of files per group
0133         for group in groups:
0134             tmpLog.debug(f"file group id - {group} number of input files - {len(jobSpec.get_input_file_specs(group))}")
0135 
0136         inFiles = jobSpec.get_input_file_attributes(skip_ready=True)
0137         tmpLog.debug(f"number of input files from get_input_file_attributes - {len(inFiles)}")
0138 
0139         lfns = inFiles.keys()
0140         tmpLog.debug(f"number of input files from inFiles.keys() - {len(lfns)}")
0141 
0142         tmpLog.debug(f"{lfns}")
0143 
0144         for inLFN in inFiles.keys():
0145             lfns.append(inLFN)
0146         tmpLog.debug(f"number of input files from append inFiles.keys() - {len(lfns)}")
0147 
0148         sys.exit(0)
0149 
0150         # loop over groups keys to see if db is locked
0151         for key in groups:
0152             locked = preparatorCore.dbInterface.get_object_lock(key, lock_interval=120)
0153             if not locked:
0154                 tmpLog.debug("DB Already locked by another thread")
0155             # now unlock db
0156             unlocked = preparatorCore.dbInterface.release_object_lock(key)
0157             if unlocked:
0158                 tmpLog.debug("unlocked db")
0159             else:
0160                 tmpLog.debug(" Could not unlock db")
0161         # print out jobSpec PandID
0162         msgStr = f"jobSpec PandaID - {jobSpec.PandaID}"
0163         tmpLog.debug(msgStr)
0164         # msgStr = "testing trigger_preparation"
0165         # tmpLog.debug(msgStr)
0166         # tmpStat, tmpOut = preparatorCore.trigger_preparation(jobSpec)
0167         # if tmpStat:
0168         #   msgStr = " OK "
0169         #   tmpLog.debug(msgStr)
0170         # elif tmpStat == None:
0171         #   msgStr = " Temporary failure NG {0}".format(tmpOut)
0172         #   tmpLog.debug(msgStr)
0173         # elif not tmpStat:
0174         #   msgStr = " No Good {0}".format(tmpOut)
0175         #   tmpLog.debug(msgStr)
0176         #   sys.exit(1)
0177 
0178         # check status to actually trigger transfer
0179         # get the files with the group_id and print out
0180         msgStr = f"Original dummy_transfer_id = {preparatorCore.get_dummy_transfer_id()}"
0181         tmpLog.debug(msgStr)
0182         # modify dummy_transfer_id from groups of input files
0183         for key in groups:
0184             preparatorCore.set_dummy_transfer_id_testing(key)
0185             msgStr = f"Revised dummy_transfer_id = {preparatorCore.get_dummy_transfer_id()}"
0186             tmpLog.debug(msgStr)
0187             files = proxy.get_files_with_group_id(preparatorCore.get_dummy_transfer_id())
0188             tmpLog.debug(f"proxy.get_files_with_group_id(preparatorCore.get_dummy_transfer_id()) = {files}")
0189             files = preparatorCore.dbInterface.get_files_with_group_id(preparatorCore.get_dummy_transfer_id())
0190             tmpLog.debug(f"preparatorCore.dbInterface.get_files_with_group_id(preparatorCore.get_dummy_transfer_id()) = {files}")
0191             msgStr = "checking status for transfer and perhaps ultimately triggering the transfer"
0192             tmpLog.debug(msgStr)
0193             tmpStat, tmpOut = preparatorCore.check_stage_in_status(jobSpec)
0194             if tmpStat:
0195                 msgStr = " OK"
0196                 tmpLog.debug(msgStr)
0197             elif tmpStat is None:
0198                 msgStr = f" Temporary failure No Good {tmpOut}"
0199                 tmpLog.debug(msgStr)
0200             elif not tmpStat:
0201                 msgStr = f" No Good {tmpOut}"
0202                 tmpLog.debug(msgStr)