Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:58

0001 # DB API for JEDI
0002 
0003 from pandacommon.pandalogger.PandaLogger import PandaLogger
0004 
0005 from pandajedi.jediconfig import jedi_config
0006 from pandaserver.taskbuffer import TaskBuffer
0007 
0008 from . import JediDBProxyPool
0009 from .Interaction import CommandReceiveInterface
0010 
0011 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0012 
0013 # use customized proxy pool
0014 TaskBuffer.DBProxyPool = JediDBProxyPool.DBProxyPool
0015 
0016 
0017 class JediTaskBuffer(TaskBuffer.TaskBuffer, CommandReceiveInterface):
0018     # constructor
0019     def __init__(self, conn, nDBConnection=1):
0020         CommandReceiveInterface.__init__(self, conn)
0021         TaskBuffer.TaskBuffer.__init__(self)
0022         TaskBuffer.TaskBuffer.init(self, jedi_config.db.dbhost, jedi_config.db.dbpasswd, nDBConnection=nDBConnection)
0023         logger.debug("__init__")
0024 
0025     # query an SQL
0026     def querySQL(self, sql, varMap, arraySize=1000):
0027         with self.proxyPool.get() as proxy:
0028             return proxy.querySQLS(sql, varMap, arraySize)[1]
0029 
0030     # get work queue map
0031     def getWorkQueueMap(self):
0032         with self.proxyPool.get() as proxy:
0033             return proxy.getWorkQueueMap()
0034 
0035     # get the list of datasets to feed contents to DB
0036     def getDatasetsToFeedContents_JEDI(self, vo=None, prodSourceLabel=None, task_id=None, force_read=False):
0037         with self.proxyPool.get() as proxy:
0038             return proxy.getDatasetsToFeedContents_JEDI(vo, prodSourceLabel, task_id, force_read)
0039 
0040     # feed files to the JEDI contents table
0041     def insertFilesForDataset_JEDI(
0042         self,
0043         datasetSpec,
0044         fileMap,
0045         datasetState,
0046         stateUpdateTime,
0047         nEventsPerFile,
0048         nEventsPerJob,
0049         maxAttempt,
0050         firstEventNumber,
0051         nMaxFiles,
0052         nMaxEvents,
0053         useScout,
0054         fileList,
0055         useFilesWithNewAttemptNr,
0056         nFilesPerJob,
0057         nEventsPerRange,
0058         nChunksForScout,
0059         includePatt,
0060         excludePatt,
0061         xmlConfig,
0062         noWaitParent,
0063         parent_tid,
0064         pid,
0065         maxFailure,
0066         useRealNumEvents,
0067         respectLB,
0068         tgtNumEventsPerJob,
0069         skipFilesUsedBy,
0070         ramCount,
0071         taskSpec,
0072         skipShortInput,
0073         inputPreStaging,
0074         order_by,
0075         maxFileRecords,
0076         skip_short_output,
0077     ):
0078         with self.proxyPool.get() as proxy:
0079             return proxy.insertFilesForDataset_JEDI(
0080                 datasetSpec,
0081                 fileMap,
0082                 datasetState,
0083                 stateUpdateTime,
0084                 nEventsPerFile,
0085                 nEventsPerJob,
0086                 maxAttempt,
0087                 firstEventNumber,
0088                 nMaxFiles,
0089                 nMaxEvents,
0090                 useScout,
0091                 fileList,
0092                 useFilesWithNewAttemptNr,
0093                 nFilesPerJob,
0094                 nEventsPerRange,
0095                 nChunksForScout,
0096                 includePatt,
0097                 excludePatt,
0098                 xmlConfig,
0099                 noWaitParent,
0100                 parent_tid,
0101                 pid,
0102                 maxFailure,
0103                 useRealNumEvents,
0104                 respectLB,
0105                 tgtNumEventsPerJob,
0106                 skipFilesUsedBy,
0107                 ramCount,
0108                 taskSpec,
0109                 skipShortInput,
0110                 inputPreStaging,
0111                 order_by,
0112                 maxFileRecords,
0113                 skip_short_output,
0114             )
0115 
0116     # get files from the JEDI contents table with jediTaskID and/or datasetID
0117     def getFilesInDatasetWithID_JEDI(self, jediTaskID=None, datasetID=None, nFiles=None, status=None):
0118         with self.proxyPool.get() as proxy:
0119             return proxy.getFilesInDatasetWithID_JEDI(jediTaskID, datasetID, nFiles, status)
0120 
0121     # insert dataset to the JEDI datasets table
0122     def insertDataset_JEDI(self, datasetSpec):
0123         with self.proxyPool.get() as proxy:
0124             return proxy.insertDataset_JEDI(datasetSpec)
0125 
0126     # update JEDI dataset
0127     def updateDataset_JEDI(self, datasetSpec, criteria, lockTask=False):
0128         with self.proxyPool.get() as proxy:
0129             return proxy.updateDataset_JEDI(datasetSpec, criteria, lockTask)
0130 
0131     # update JEDI dataset attributes
0132     def updateDatasetAttributes_JEDI(self, jediTaskID, datasetID, attributes):
0133         with self.proxyPool.get() as proxy:
0134             return proxy.updateDatasetAttributes_JEDI(jediTaskID, datasetID, attributes)
0135 
0136     # get JEDI dataset attributes
0137     def getDatasetAttributes_JEDI(self, jediTaskID, datasetID, attributes):
0138         with self.proxyPool.get() as proxy:
0139             return proxy.getDatasetAttributes_JEDI(jediTaskID, datasetID, attributes)
0140 
0141     # get JEDI dataset attributes with map
0142     def getDatasetAttributesWithMap_JEDI(self, jediTaskID, criteria, attributes):
0143         with self.proxyPool.get() as proxy:
0144             return proxy.getDatasetAttributesWithMap_JEDI(jediTaskID, criteria, attributes)
0145 
0146     # get JEDI dataset with jediTaskID and datasetID
0147     def getDatasetWithID_JEDI(self, jediTaskID, datasetID):
0148         with self.proxyPool.get() as proxy:
0149             return proxy.getDatasetWithID_JEDI(jediTaskID, datasetID)
0150 
0151     # get JEDI datasets with jediTaskID
0152     def getDatasetsWithJediTaskID_JEDI(self, jediTaskID, datasetTypes=None, getFiles=False):
0153         with self.proxyPool.get() as proxy:
0154             retStat, datasetSpecList = proxy.getDatasetsWithJediTaskID_JEDI(jediTaskID, datasetTypes=datasetTypes)
0155             if retStat is True and getFiles is True:
0156                 for datasetSpec in datasetSpecList:
0157                     # read files
0158                     retStat, fileSpecList = proxy.getFilesInDatasetWithID_JEDI(jediTaskID, datasetSpec.datasetID, None, None)
0159                     if retStat is False:
0160                         break
0161                     for fileSpec in fileSpecList:
0162                         datasetSpec.addFile(fileSpec)
0163             # return
0164             return retStat, datasetSpecList
0165 
0166     # get jediTaskIDs with dataset attributes
0167     def get_task_ids_with_dataset_attributes(self, dataset_attributes, only_active_tasks=True):
0168         with self.proxyPool.get() as proxy:
0169             return proxy.get_task_ids_with_dataset_attributes(dataset_attributes, only_active_tasks)
0170 
0171     # insert task to the JEDI tasks table
0172     def insertTask_JEDI(self, taskSpec):
0173         with self.proxyPool.get() as proxy:
0174             return proxy.insertTask_JEDI(taskSpec)
0175 
0176     # update JEDI task
0177     def updateTask_JEDI(self, taskSpec, criteria, oldStatus=None, updateDEFT=False, insertUnknown=None, setFrozenTime=True, setOldModTime=False):
0178         with self.proxyPool.get() as proxy:
0179             return proxy.updateTask_JEDI(taskSpec, criteria, oldStatus, updateDEFT, insertUnknown, setFrozenTime, setOldModTime)
0180 
0181     # update JEDI task lock
0182     def updateTaskLock_JEDI(self, jediTaskID):
0183         with self.proxyPool.get() as proxy:
0184             return proxy.updateTaskLock_JEDI(jediTaskID)
0185 
0186     # update JEDI task status by ContentsFeeder
0187     def updateTaskStatusByContFeeder_JEDI(self, jediTaskID, taskSpec=None, getTaskStatus=False, pid=None, setFrozenTime=True, useWorldCloud=False):
0188         with self.proxyPool.get() as proxy:
0189             return proxy.updateTaskStatusByContFeeder_JEDI(jediTaskID, taskSpec, getTaskStatus, pid, setFrozenTime, useWorldCloud)
0190 
0191     # get JEDI task with jediTaskID
0192     def getTaskWithID_JEDI(self, jediTaskID, fullFlag=False, lockTask=False, pid=None, lockInterval=None, clearError=False):
0193         with self.proxyPool.get() as proxy:
0194             return proxy.getTaskWithID_JEDI(jediTaskID, fullFlag, lockTask, pid, lockInterval, clearError)
0195 
0196     # get JEDI task and tasks with ID and lock it
0197     def getTaskDatasetsWithID_JEDI(self, jediTaskID, pid, lockTask=True):
0198         with self.proxyPool.get() as proxy:
0199             return proxy.getTaskDatasetsWithID_JEDI(jediTaskID, pid, lockTask)
0200 
0201     # get JEDI tasks with selection criteria
0202     def getTaskIDsWithCriteria_JEDI(self, criteria, nTasks=50):
0203         with self.proxyPool.get() as proxy:
0204             return proxy.getTaskIDsWithCriteria_JEDI(criteria, nTasks)
0205 
0206     # get JEDI tasks to be finished
0207     def getTasksToBeFinished_JEDI(self, vo, prodSourceLabel, pid, nTasks=50, target_tasks=None):
0208         with self.proxyPool.get() as proxy:
0209             return proxy.getTasksToBeFinished_JEDI(vo, prodSourceLabel, pid, nTasks, target_tasks)
0210 
0211     # get tasks to be processed
0212     def getTasksToBeProcessed_JEDI(
0213         self,
0214         pid,
0215         vo,
0216         workQueue,
0217         prodSourceLabel,
0218         cloudName,
0219         nTasks=50,
0220         nFiles=100,
0221         simTasks=None,
0222         minPriority=None,
0223         maxNumJobs=None,
0224         typicalNumFilesMap=None,
0225         fullSimulation=False,
0226         simDatasets=None,
0227         mergeUnThrottled=None,
0228         readMinFiles=False,
0229         numNewTaskWithJumbo=0,
0230         resource_name=None,
0231         ignore_lock=False,
0232         target_tasks=None,
0233     ):
0234         with self.proxyPool.get() as proxy:
0235             return proxy.getTasksToBeProcessed_JEDI(
0236                 pid,
0237                 vo,
0238                 workQueue,
0239                 prodSourceLabel,
0240                 cloudName,
0241                 nTasks,
0242                 nFiles,
0243                 simTasks=simTasks,
0244                 minPriority=minPriority,
0245                 maxNumJobs=maxNumJobs,
0246                 typicalNumFilesMap=typicalNumFilesMap,
0247                 fullSimulation=fullSimulation,
0248                 simDatasets=simDatasets,
0249                 mergeUnThrottled=mergeUnThrottled,
0250                 readMinFiles=readMinFiles,
0251                 numNewTaskWithJumbo=numNewTaskWithJumbo,
0252                 resource_name=resource_name,
0253                 ignore_lock=ignore_lock,
0254                 target_tasks=target_tasks,
0255             )
0256 
0257     # get tasks to be processed
0258     def checkWaitingTaskPrio_JEDI(self, vo, workQueue, prodSourceLabel, cloudName, resource_name=None):
0259         with self.proxyPool.get() as proxy:
0260             return proxy.getTasksToBeProcessed_JEDI(None, vo, workQueue, prodSourceLabel, cloudName, isPeeking=True, resource_name=resource_name)
0261 
0262     # get job statistics with work queue
0263     def getJobStatisticsWithWorkQueue_JEDI(self, vo, prodSourceLabel, minPriority=None):
0264         with self.proxyPool.get() as proxy:
0265             return proxy.getJobStatisticsWithWorkQueue_JEDI(vo, prodSourceLabel, minPriority)
0266 
0267     # get core statistics with VO and prodSourceLabel
0268     def get_core_statistics(self, vo, prod_source_label):
0269         with self.proxyPool.get() as proxy:
0270             return proxy.get_core_statistics(vo, prod_source_label)
0271 
0272     # get job statistics by global share
0273     def getJobStatisticsByGlobalShare(self, vo, exclude_rwq=False):
0274         with self.proxyPool.get() as proxy:
0275             return proxy.getJobStatisticsByGlobalShare(vo, exclude_rwq)
0276 
0277     # get whether a gshare rtype combination is active
0278     def get_active_gshare_rtypes(self, vo):
0279         with self.proxyPool.get() as proxy:
0280             return proxy.get_active_gshare_rtypes(vo)
0281 
0282     # get job statistics by resource type
0283     def getJobStatisticsByResourceType(self, workqueue):
0284         with self.proxyPool.get() as proxy:
0285             return proxy.getJobStatisticsByResourceType(workqueue)
0286 
0287     # get job statistics by site and resource type
0288     def getJobStatisticsByResourceTypeSite(self, workqueue):
0289         with self.proxyPool.get() as proxy:
0290             return proxy.getJobStatisticsByResourceTypeSite(workqueue)
0291 
0292     # generate output files for task
0293     def getOutputFiles_JEDI(
0294         self,
0295         jediTaskID,
0296         provenanceID,
0297         simul,
0298         instantiateTmpl=False,
0299         instantiatedSite=None,
0300         isUnMerging=False,
0301         isPrePro=False,
0302         xmlConfigJob=None,
0303         siteDsMap=None,
0304         middleName="",
0305         registerDatasets=False,
0306         parallelOutMap=None,
0307         fileIDPool=[],
0308         n_files_per_chunk=1,
0309         bulk_fetch_for_multiple_jobs=False,
0310         master_dataset_id=None,
0311     ):
0312         with self.proxyPool.get() as proxy:
0313             return proxy.getOutputFiles_JEDI(
0314                 jediTaskID,
0315                 provenanceID,
0316                 simul,
0317                 instantiateTmpl,
0318                 instantiatedSite,
0319                 isUnMerging,
0320                 isPrePro,
0321                 xmlConfigJob,
0322                 siteDsMap,
0323                 middleName,
0324                 registerDatasets,
0325                 parallelOutMap,
0326                 fileIDPool,
0327                 n_files_per_chunk,
0328                 bulk_fetch_for_multiple_jobs,
0329                 master_dataset_id,
0330             )
0331 
0332     # insert output file templates
0333     def insertOutputTemplate_JEDI(self, templates):
0334         with self.proxyPool.get() as proxy:
0335             return proxy.insertOutputTemplate_JEDI(templates)
0336 
0337     # insert JobParamsTemplate
0338     def insertJobParamsTemplate_JEDI(self, jediTaskID, templ):
0339         with self.proxyPool.get() as proxy:
0340             return proxy.insertJobParamsTemplate_JEDI(jediTaskID, templ)
0341 
0342     # insert TaskParams
0343     def insertTaskParams_JEDI(self, vo, prodSourceLabel, userName, taskName, taskParams, parent_tid=None):
0344         with self.proxyPool.get() as proxy:
0345             return proxy.insertTaskParams_JEDI(vo, prodSourceLabel, userName, taskName, taskParams, parent_tid)
0346 
0347     # reset unused files
0348     def resetUnusedFiles_JEDI(self, jediTaskID, inputChunk):
0349         with self.proxyPool.get() as proxy:
0350             return proxy.resetUnusedFiles_JEDI(jediTaskID, inputChunk)
0351 
0352     # insert TaskParams
0353     def insertUpdateTaskParams_JEDI(self, jediTaskID, vo, prodSourceLabel, updateTaskParams, insertTaskParamsList):
0354         with self.proxyPool.get() as proxy:
0355             return proxy.insertUpdateTaskParams_JEDI(jediTaskID, vo, prodSourceLabel, updateTaskParams, insertTaskParamsList)
0356 
0357     # set missing files
0358     def setMissingFiles_JEDI(self, jediTaskID, datasetID, fileIDs):
0359         with self.proxyPool.get() as proxy:
0360             return proxy.setMissingFiles_JEDI(jediTaskID, datasetID, fileIDs)
0361 
0362     # rescue picked files
0363     def rescuePickedFiles_JEDI(self, vo, prodSourceLabel, waitTime):
0364         with self.proxyPool.get() as proxy:
0365             return proxy.rescuePickedFiles_JEDI(vo, prodSourceLabel, waitTime)
0366 
0367     # rescue unlocked tasks with picked files
0368     def rescueUnLockedTasksWithPicked_JEDI(self, vo, prodSourceLabel, waitTime, pid):
0369         with self.proxyPool.get() as proxy:
0370             return proxy.rescueUnLockedTasksWithPicked_JEDI(vo, prodSourceLabel, waitTime, pid)
0371 
0372     # unlock tasks
0373     def unlockTasks_JEDI(self, vo, prodSourceLabel, waitTime, hostName=None, pgid=None):
0374         with self.proxyPool.get() as proxy:
0375             return proxy.unlockTasks_JEDI(vo, prodSourceLabel, waitTime, hostName, pgid)
0376 
0377     # get the size of input files which will be copied to the site
0378     def getMovingInputSize_JEDI(self, siteName):
0379         with self.proxyPool.get() as proxy:
0380             return proxy.getMovingInputSize_JEDI(siteName)
0381 
0382     # get typical number of input files for each workQueue+processingType
0383     def getTypicalNumInput_JEDI(self, vo, prodSourceLabel, workQueue):
0384         with self.proxyPool.get() as proxy:
0385             return proxy.getTypicalNumInput_JEDI(vo, prodSourceLabel, workQueue)
0386 
0387     # get highest prio jobs with workQueueID
0388     def getHighestPrioJobStat_JEDI(self, prodSourceLabel, cloudName, workQueue, resource_name=None):
0389         with self.proxyPool.get() as proxy:
0390             return proxy.getHighestPrioJobStat_JEDI(prodSourceLabel, cloudName, workQueue, resource_name)
0391 
0392     # get the list of tasks to refine
0393     def getTasksToRefine_JEDI(self, vo=None, prodSourceLabel=None):
0394         with self.proxyPool.get() as proxy:
0395             return proxy.getTasksToRefine_JEDI(vo, prodSourceLabel)
0396 
0397     # get task parameters with jediTaskID
0398     def getTaskParamsWithID_JEDI(self, jediTaskID):
0399         with self.proxyPool.get() as proxy:
0400             return proxy.getTaskParamsWithID_JEDI(jediTaskID)
0401 
0402     # register task/dataset/templ/param in a single transaction
0403     def registerTaskInOneShot_JEDI(
0404         self,
0405         jediTaskID,
0406         taskSpec,
0407         inMasterDatasetSpec,
0408         inSecDatasetSpecList,
0409         outDatasetSpecList,
0410         outputTemplateMap,
0411         jobParamsTemplate,
0412         taskParams,
0413         unmergeMasterDatasetSpec,
0414         unmergeDatasetSpecMap,
0415         uniqueTaskName,
0416         oldTaskStatus,
0417     ):
0418         with self.proxyPool.get() as proxy:
0419             return proxy.registerTaskInOneShot_JEDI(
0420                 jediTaskID,
0421                 taskSpec,
0422                 inMasterDatasetSpec,
0423                 inSecDatasetSpecList,
0424                 outDatasetSpecList,
0425                 outputTemplateMap,
0426                 jobParamsTemplate,
0427                 taskParams,
0428                 unmergeMasterDatasetSpec,
0429                 unmergeDatasetSpecMap,
0430                 uniqueTaskName,
0431                 oldTaskStatus,
0432             )
0433 
0434     # set tasks to be assigned
0435     def setScoutJobDataToTasks_JEDI(self, vo, prodSourceLabel):
0436         with self.proxyPool.get() as proxy:
0437             tmp_site_mapper = self.get_site_mapper()
0438             return proxy.setScoutJobDataToTasks_JEDI(vo, prodSourceLabel, tmp_site_mapper)
0439 
0440     # prepare tasks to be finished
0441     def prepareTasksToBeFinished_JEDI(self, vo, prodSourceLabel, nTasks=50, simTasks=None, pid="lock", noBroken=False):
0442         with self.proxyPool.get() as proxy:
0443             tmp_site_mapper = self.get_site_mapper()
0444             return proxy.prepareTasksToBeFinished_JEDI(vo, prodSourceLabel, nTasks, simTasks, pid, noBroken, tmp_site_mapper)
0445 
0446     # get tasks to be assigned
0447     def getTasksToAssign_JEDI(self, vo, prodSourceLabel, workQueue, resource_name):
0448         with self.proxyPool.get() as proxy:
0449             return proxy.getTasksToAssign_JEDI(vo, prodSourceLabel, workQueue, resource_name)
0450 
0451     # get tasks to check task assignment
0452     def getTasksToCheckAssignment_JEDI(self, vo, prodSourceLabel, workQueue, resource_name):
0453         with self.proxyPool.get() as proxy:
0454             return proxy.getTasksToCheckAssignment_JEDI(vo, prodSourceLabel, workQueue, resource_name)
0455 
0456     # calculate RW with a priority
0457     def calculateRWwithPrio_JEDI(self, vo, prodSourceLabel, workQueue, priority):
0458         with self.proxyPool.get() as proxy:
0459             return proxy.calculateRWwithPrio_JEDI(vo, prodSourceLabel, workQueue, priority)
0460 
0461     # calculate RW for tasks
0462     def calculateTaskRW_JEDI(self, jediTaskID):
0463         with self.proxyPool.get() as proxy:
0464             return proxy.calculateTaskRW_JEDI(jediTaskID)
0465 
0466     # calculate WORLD RW with a priority
0467     def calculateWorldRWwithPrio_JEDI(self, vo, prodSourceLabel, workQueue, priority):
0468         with self.proxyPool.get() as proxy:
0469             return proxy.calculateWorldRWwithPrio_JEDI(vo, prodSourceLabel, workQueue, priority)
0470 
0471     # calculate WORLD RW for tasks
0472     def calculateTaskWorldRW_JEDI(self, jediTaskID):
0473         with self.proxyPool.get() as proxy:
0474             return proxy.calculateTaskWorldRW_JEDI(jediTaskID)
0475 
0476     # set cloud to tasks
0477     def setCloudToTasks_JEDI(self, taskCloudMap):
0478         with self.proxyPool.get() as proxy:
0479             return proxy.setCloudToTasks_JEDI(taskCloudMap)
0480 
0481     # get the list of tasks to exec command
0482     def getTasksToExecCommand_JEDI(self, vo, prodSourceLabel):
0483         with self.proxyPool.get() as proxy:
0484             return proxy.getTasksToExecCommand_JEDI(vo, prodSourceLabel)
0485 
0486     # get the list of PandaIDs for a task
0487     def getPandaIDsWithTask_JEDI(self, jediTaskID, onlyActive):
0488         with self.proxyPool.get() as proxy:
0489             return proxy.getPandaIDsWithTask_JEDI(jediTaskID, onlyActive)
0490 
0491     # get the list of queued PandaIDs for a task
0492     def getQueuedPandaIDsWithTask_JEDI(self, jediTaskID):
0493         with self.proxyPool.get() as proxy:
0494             return proxy.getQueuedPandaIDsWithTask_JEDI(jediTaskID)
0495 
0496     # get jediTaskID/datasetID/FileID with dataset and file names
0497     def getIDsWithFileDataset_JEDI(self, datasetName, fileName, fileType):
0498         with self.proxyPool.get() as proxy:
0499             return proxy.getIDsWithFileDataset_JEDI(datasetName, fileName, fileType)
0500 
0501     # get PandaID for a file
0502     def getPandaIDWithFileID_JEDI(self, jediTaskID, datasetID, fileID):
0503         with self.proxyPool.get() as proxy:
0504             return proxy.getPandaIDWithFileID_JEDI(jediTaskID, datasetID, fileID)
0505 
0506     # get JEDI files for a job
0507     def getFilesWithPandaID_JEDI(self, pandaID):
0508         with self.proxyPool.get() as proxy:
0509             return proxy.getFilesWithPandaID_JEDI(pandaID)
0510 
0511     # update task parameters
0512     def updateTaskParams_JEDI(self, jediTaskID, taskParams):
0513         with self.proxyPool.get() as proxy:
0514             return proxy.updateTaskParams_JEDI(jediTaskID, taskParams)
0515 
0516     # reactivate pending tasks
0517     def reactivatePendingTasks_JEDI(self, vo, prodSourceLabel, timeLimit, timeoutLimit=None, minPriority=None):
0518         with self.proxyPool.get() as proxy:
0519             return proxy.reactivatePendingTasks_JEDI(vo, prodSourceLabel, timeLimit, timeoutLimit, minPriority)
0520 
0521     # restart contents update
0522     def restartTasksForContentsUpdate_JEDI(self, vo, prodSourceLabel, timeLimit=30):
0523         with self.proxyPool.get() as proxy:
0524             return proxy.restartTasksForContentsUpdate_JEDI(vo, prodSourceLabel, timeLimit=timeLimit)
0525 
0526     # kick exhausted tasks
0527     def kickExhaustedTasks_JEDI(self, vo, prodSourceLabel, timeLimit):
0528         with self.proxyPool.get() as proxy:
0529             return proxy.kickExhaustedTasks_JEDI(vo, prodSourceLabel, timeLimit)
0530 
0531     # get file spec of lib.tgz
0532     def get_previous_build_file_spec(self, jediTaskID, siteName, associatedSites):
0533         with self.proxyPool.get() as proxy:
0534             return proxy.get_previous_build_file_spec(jediTaskID, siteName, associatedSites)
0535 
0536     # get file spec of old lib.tgz
0537     def getOldBuildFileSpec_JEDI(self, jediTaskID, datasetID, fileID):
0538         with self.proxyPool.get() as proxy:
0539             return proxy.getOldBuildFileSpec_JEDI(jediTaskID, datasetID, fileID)
0540 
0541     # insert lib dataset and files
0542     def insertBuildFileSpec_JEDI(self, jobSpec, reusedDatasetID, simul):
0543         with self.proxyPool.get() as proxy:
0544             return proxy.insertBuildFileSpec_JEDI(jobSpec, reusedDatasetID, simul)
0545 
0546     # get sites used by a task
0547     def getSitesUsedByTask_JEDI(self, jediTaskID):
0548         with self.proxyPool.get() as proxy:
0549             return proxy.getSitesUsedByTask_JEDI(jediTaskID)
0550 
0551     # get random seed
0552     def getRandomSeed_JEDI(self, jediTaskID, simul, n_files=1):
0553         with self.proxyPool.get() as proxy:
0554             return proxy.getRandomSeed_JEDI(jediTaskID, simul, n_files)
0555 
0556     # get preprocess metadata
0557     def getPreprocessMetadata_JEDI(self, jediTaskID):
0558         with self.proxyPool.get() as proxy:
0559             return proxy.getPreprocessMetadata_JEDI(jediTaskID)
0560 
0561     # get log dataset for preprocessing
0562     def getPreproLog_JEDI(self, jediTaskID, simul):
0563         with self.proxyPool.get() as proxy:
0564             return proxy.getPreproLog_JEDI(jediTaskID, simul)
0565 
0566     # get jobsetID
0567     def getUserJobsetID_JEDI(self, userName):
0568         with self.proxyPool.get() as proxy:
0569             tmpJobID, tmpDummy, tmpStat = proxy.getUserParameter(userName, 1, None)
0570             # return
0571             return tmpStat, tmpJobID
0572 
0573     # retry or incrementally execute a task
0574     def retryTask_JEDI(
0575         self,
0576         jediTaskID,
0577         commStr,
0578         maxAttempt=5,
0579         retryChildTasks=True,
0580         discardEvents=False,
0581         release_unstaged=False,
0582         keep_share_priority=False,
0583         ignore_hard_exhausted=False,
0584     ):
0585         with self.proxyPool.get() as proxy:
0586             return proxy.retryTask_JEDI(
0587                 jediTaskID,
0588                 commStr,
0589                 maxAttempt,
0590                 retryChildTasks=retryChildTasks,
0591                 discardEvents=discardEvents,
0592                 release_unstaged=release_unstaged,
0593                 keep_share_priority=keep_share_priority,
0594                 ignore_hard_exhausted=ignore_hard_exhausted,
0595             )
0596 
0597     # append input datasets for incremental execution
0598     def appendDatasets_JEDI(self, jediTaskID, inMasterDatasetSpecList, inSecDatasetSpecList):
0599         with self.proxyPool.get() as proxy:
0600             return proxy.appendDatasets_JEDI(jediTaskID, inMasterDatasetSpecList, inSecDatasetSpecList)
0601 
0602     # record retry history
0603     def recordRetryHistory_JEDI(self, jediTaskID, oldNewPandaIDs, relationType):
0604         with self.proxyPool.get() as proxy:
0605             return proxy.recordRetryHistory_JEDI(jediTaskID, oldNewPandaIDs, relationType)
0606 
0607     # get JEDI tasks with a selection criteria
0608     def getTasksWithCriteria_JEDI(
0609         self,
0610         vo,
0611         prodSourceLabel,
0612         taskStatusList,
0613         taskCriteria={},
0614         datasetCriteria={},
0615         taskParamList=[],
0616         datasetParamList=[],
0617         taskLockColumn=None,
0618         taskLockInterval=60,
0619     ):
0620         with self.proxyPool.get() as proxy:
0621             return proxy.getTasksWithCriteria_JEDI(
0622                 vo, prodSourceLabel, taskStatusList, taskCriteria, datasetCriteria, taskParamList, datasetParamList, taskLockColumn, taskLockInterval
0623             )
0624 
0625     # check parent task status
0626     def checkParentTask_JEDI(self, parent_task_id, jedi_task_id=None):
0627         with self.proxyPool.get() as proxy:
0628             return proxy.checkParentTask_JEDI(parent_task_id, jedi_task_id)
0629 
0630     # get task status
0631     def getTaskStatus_JEDI(self, jediTaskID):
0632         with self.proxyPool.get() as proxy:
0633             return proxy.getTaskStatus_JEDI(jediTaskID)
0634 
0635     # get lib.tgz for waiting jobs
0636     def getLibForWaitingRunJob_JEDI(self, vo, prodSourceLabel, checkInterval):
0637         with self.proxyPool.get() as proxy:
0638             return proxy.getLibForWaitingRunJob_JEDI(vo, prodSourceLabel, checkInterval)
0639 
0640     # get tasks to get reassigned
0641     def getTasksToReassign_JEDI(self, vo=None, prodSourceLabel=None):
0642         with self.proxyPool.get() as proxy:
0643             return proxy.getTasksToReassign_JEDI(vo, prodSourceLabel)
0644 
0645     # kill child tasks
0646     def killChildTasks_JEDI(self, jediTaskID, taskStatus):
0647         with self.proxyPool.get() as proxy:
0648             return proxy.killChildTasks_JEDI(jediTaskID, taskStatus)
0649 
0650     # kick child tasks
0651     def kickChildTasks_JEDI(self, jediTaskID):
0652         with self.proxyPool.get() as proxy:
0653             return proxy.kickChildTasks_JEDI(jediTaskID)
0654 
0655     # lock task
0656     def lockTask_JEDI(self, jediTaskID, pid):
0657         with self.proxyPool.get() as proxy:
0658             return proxy.lockTask_JEDI(jediTaskID, pid)
0659 
0660     # get successful files
0661     def getSuccessfulFiles_JEDI(self, jediTaskID, datasetID):
0662         with self.proxyPool.get() as proxy:
0663             return proxy.getSuccessfulFiles_JEDI(jediTaskID, datasetID)
0664 
0665     # unlock a single task
0666     def unlockSingleTask_JEDI(self, jediTaskID, pid):
0667         with self.proxyPool.get() as proxy:
0668             return proxy.unlockSingleTask_JEDI(jediTaskID, pid)
0669 
0670     # throttle JEDI tasks
0671     def throttleTasks_JEDI(self, vo, prodSourceLabel, waitTime):
0672         with self.proxyPool.get() as proxy:
0673             return proxy.throttleTasks_JEDI(vo, prodSourceLabel, waitTime)
0674 
0675     # throttle a JEDI task
0676     def throttleTask_JEDI(self, jediTaskID, waitTime, errorDialog):
0677         with self.proxyPool.get() as proxy:
0678             return proxy.throttleTask_JEDI(jediTaskID, waitTime, errorDialog)
0679 
0680     # release throttled tasks
0681     def releaseThrottledTasks_JEDI(self, vo, prodSourceLabel):
0682         with self.proxyPool.get() as proxy:
0683             return proxy.releaseThrottledTasks_JEDI(vo, prodSourceLabel)
0684 
0685     # release a task with on-hold status
0686     def release_task_on_hold(self, jedi_task_id, target_status=None):
0687         with self.proxyPool.get() as proxy:
0688             return proxy.release_task_on_hold(jedi_task_id, target_status)
0689 
0690     # get throttled users
0691     def getThrottledUsersTasks_JEDI(self, vo, prodSourceLabel):
0692         with self.proxyPool.get() as proxy:
0693             return proxy.getThrottledUsersTasks_JEDI(vo, prodSourceLabel)
0694 
0695     # lock process
0696     def lockProcess_JEDI(self, vo, prodSourceLabel, cloud, workqueue_id, resource_name, component, pid, forceOption=False, timeLimit=5):
0697         with self.proxyPool.get() as proxy:
0698             return proxy.lockProcess_JEDI(vo, prodSourceLabel, cloud, workqueue_id, resource_name, component, pid, forceOption, timeLimit)
0699 
0700     # unlock process
0701     def unlockProcess_JEDI(self, vo, prodSourceLabel, cloud, workqueue_id, resource_name, component, pid):
0702         with self.proxyPool.get() as proxy:
0703             return proxy.unlockProcess_JEDI(vo, prodSourceLabel, cloud, workqueue_id, resource_name, component, pid)
0704 
0705     # unlock process with PID
0706     def unlockProcessWithPID_JEDI(self, vo, prodSourceLabel, workqueue_id, resource_name, pid, useBase):
0707         with self.proxyPool.get() as proxy:
0708             return proxy.unlockProcessWithPID_JEDI(vo, prodSourceLabel, workqueue_id, resource_name, pid, useBase)
0709 
0710     # check process lock
0711     def checkProcessLock_JEDI(self, vo, prodSourceLabel, cloud, workqueue_id, resource_name, component, pid, checkBase):
0712         with self.proxyPool.get() as proxy:
0713             return proxy.checkProcessLock_JEDI(vo, prodSourceLabel, cloud, workqueue_id, resource_name, component, pid, checkBase)
0714 
0715     # get JEDI tasks to be assessed
0716     def getAchievedTasks_JEDI(self, vo, prodSourceLabel, timeLimit=60, nTasks=50):
0717         with self.proxyPool.get() as proxy:
0718             return proxy.getAchievedTasks_JEDI(vo, prodSourceLabel, timeLimit, nTasks)
0719 
0720     # get tasks to take periodic action
0721     def get_tasks_for_periodic_action(self, vo, prod_source_label, time_limit=12, n_tasks=100):
0722         with self.proxyPool.get() as proxy:
0723             return proxy.get_tasks_for_periodic_action(vo, prod_source_label, time_limit, n_tasks)
0724 
0725     # get inactive sites
0726     def getInactiveSites_JEDI(self, flag, timeLimit):
0727         with self.proxyPool.get() as proxy:
0728             return proxy.getInactiveSites_JEDI(flag, timeLimit)
0729 
0730     # get total walltime
0731     def getTotalWallTime_JEDI(self, vo, prodSourceLabel, workQueue, resource_name):
0732         with self.proxyPool.get() as proxy:
0733             return proxy.getTotalWallTime_JEDI(vo, prodSourceLabel, workQueue, resource_name)
0734 
0735     # check duplication with internal merge
0736     def checkDuplication_JEDI(self, jediTaskID):
0737         with self.proxyPool.get() as proxy:
0738             return proxy.checkDuplication_JEDI(jediTaskID)
0739 
0740     # get network metrics for brokerage
0741     def getNetworkMetrics(self, dst, keyList):
0742         with self.proxyPool.get() as proxy:
0743             return proxy.getNetworkMetrics(dst, keyList)
0744 
0745     # get nuclei that have built up a long backlog
0746     def getBackloggedNuclei(self):
0747         with self.proxyPool.get() as proxy:
0748             return proxy.getBackloggedNuclei()
0749 
0750     # get network metrics for brokerage
0751     def getPandaSiteToOutputStorageSiteMapping(self):
0752         with self.proxyPool.get() as proxy:
0753             return proxy.getPandaSiteToOutputStorageSiteMapping()
0754 
0755     # get failure counts for a task
0756     def getFailureCountsForTask_JEDI(self, jediTaskID, timeWindow):
0757         with self.proxyPool.get() as proxy:
0758             return proxy.getFailureCountsForTask_JEDI(jediTaskID, timeWindow)
0759 
0760         # count the number of queued jobs per user or working group
0761 
0762     def countJobsPerTarget_JEDI(self, target, is_user):
0763         with self.proxyPool.get() as proxy:
0764             return proxy.countJobsPerTarget_JEDI(target, is_user)
0765 
0766     # get old merge job PandaIDs
0767     def getOldMergeJobPandaIDs_JEDI(self, jediTaskID, pandaID):
0768         with self.proxyPool.get() as proxy:
0769             return proxy.getOldMergeJobPandaIDs_JEDI(jediTaskID, pandaID)
0770 
0771     # get active jumbo jobs for a task
0772     def getActiveJumboJobs_JEDI(self, jediTaskID):
0773         with self.proxyPool.get() as proxy:
0774             return proxy.getActiveJumboJobs_JEDI(jediTaskID)
0775 
0776     # get jobParams of the first job
0777     def getJobParamsOfFirstJob_JEDI(self, jediTaskID):
0778         with self.proxyPool.get() as proxy:
0779             return proxy.getJobParamsOfFirstJob_JEDI(jediTaskID)
0780 
0781     # bulk fetch fileIDs
0782     def bulkFetchFileIDs_JEDI(self, jediTaskID, nIDs):
0783         with self.proxyPool.get() as proxy:
0784             return proxy.bulkFetchFileIDs_JEDI(jediTaskID, nIDs)
0785 
0786     # set del flag to events
0787     def setDelFlagToEvents_JEDI(self, jediTaskID):
0788         with self.proxyPool.get() as proxy:
0789             return proxy.setDelFlagToEvents_JEDI(jediTaskID)
0790 
0791     # set del flag to events
0792     def removeFilesIndexInconsistent_JEDI(self, jediTaskID, datasetIDs):
0793         with self.proxyPool.get() as proxy:
0794             return proxy.removeFilesIndexInconsistent_JEDI(jediTaskID, datasetIDs)
0795 
0796     # throttle jobs in pauses tasks
0797     def throttleJobsInPausedTasks_JEDI(self, vo, prodSourceLabel):
0798         with self.proxyPool.get() as proxy:
0799             return proxy.throttleJobsInPausedTasks_JEDI(vo, prodSourceLabel)
0800 
0801     # set useJumbo flag
0802     def setUseJumboFlag_JEDI(self, jediTaskID, statusStr):
0803         with self.proxyPool.get() as proxy:
0804             return proxy.setUseJumboFlag_JEDI(jediTaskID, statusStr)
0805 
0806     # get number of tasks with running jumbo jobs
0807     def getNumTasksWithRunningJumbo_JEDI(self, vo, prodSourceLabel, cloudName, workqueue):
0808         with self.proxyPool.get() as proxy:
0809             return proxy.getNumTasksWithRunningJumbo_JEDI(vo, prodSourceLabel, cloudName, workqueue)
0810 
0811     # get number of unprocessed events
0812     def getNumUnprocessedEvents_JEDI(self, vo, prodSourceLabel, criteria, neg_criteria):
0813         with self.proxyPool.get() as proxy:
0814             return proxy.getNumUnprocessedEvents_JEDI(vo, prodSourceLabel, criteria, neg_criteria)
0815 
0816     # get number of jobs for a task
0817     def getNumJobsForTask_JEDI(self, jediTaskID):
0818         with self.proxyPool.get() as proxy:
0819             return proxy.getNumJobsForTask_JEDI(jediTaskID)
0820 
0821     # get number map for standby jobs
0822     def getNumMapForStandbyJobs_JEDI(self, workqueue):
0823         with self.proxyPool.get() as proxy:
0824             return proxy.getNumMapForStandbyJobs_JEDI(workqueue)
0825 
0826     # update datasets to finish a task
0827     def updateDatasetsToFinishTask_JEDI(self, jediTaskID, lockedBy):
0828         with self.proxyPool.get() as proxy:
0829             return proxy.updateDatasetsToFinishTask_JEDI(jediTaskID, lockedBy)
0830 
0831     # get tasks with jumbo jobs
0832     def getTaskWithJumbo_JEDI(self, vo, prodSourceLabel):
0833         with self.proxyPool.get() as proxy:
0834             return proxy.getTaskWithJumbo_JEDI(vo, prodSourceLabel)
0835 
0836     # kick pending tasks with jumbo jobs
0837     def kickPendingTasksWithJumbo_JEDI(self, jediTaskID):
0838         with self.proxyPool.get() as proxy:
0839             return proxy.kickPendingTasksWithJumbo_JEDI(jediTaskID)
0840 
0841     # reset input to re-generate co-jumbo jobs
0842     def resetInputToReGenCoJumbo_JEDI(self, jediTaskID):
0843         with self.proxyPool.get() as proxy:
0844             return proxy.resetInputToReGenCoJumbo_JEDI(jediTaskID)
0845 
0846     # get averaged disk IO
0847     def getAvgDiskIO_JEDI(self):
0848         with self.proxyPool.get() as proxy:
0849             return proxy.getAvgDiskIO_JEDI()
0850 
0851     # update input files stage-in done (according to message from iDDS, called by other methods, etc.)
0852     def updateInputFilesStaged_JEDI(self, jeditaskid, scope, filenames_dict, chunk_size=500, by=None, check_scope=True):
0853         with self.proxyPool.get() as proxy:
0854             return proxy.updateInputFilesStaged_JEDI(jeditaskid, scope, filenames_dict, chunk_size, by, check_scope)
0855 
0856     # update input datasets stage-in done (according to message from iDDS, called by other methods, etc.)
0857     def updateInputDatasetsStaged_JEDI(self, jeditaskid, scope, dsnames_dict, use_commit=True, by=None):
0858         with self.proxyPool.get() as proxy:
0859             return proxy.updateInputDatasetsStaged_JEDI(jeditaskid, scope, dsnames_dict, use_commit, by)
0860 
0861     # get number of staging files
0862     def getNumStagingFiles_JEDI(self, jeditaskid):
0863         with self.proxyPool.get() as proxy:
0864             return proxy.getNumStagingFiles_JEDI(jeditaskid)
0865 
0866     # get usage breakdown by users and sites
0867     def getUsageBreakdown_JEDI(self, prod_source_label="user"):
0868         with self.proxyPool.get() as proxy:
0869             return proxy.getUsageBreakdown_JEDI(prod_source_label)
0870 
0871     # get jobs stat of each user
0872     def getUsersJobsStats_JEDI(self, prod_source_label="user"):
0873         with self.proxyPool.get() as proxy:
0874             return proxy.getUsersJobsStats_JEDI(prod_source_label)
0875 
0876     # insert HPO pseudo event according to message from idds
0877     def insertHpoEventAboutIdds_JEDI(self, jedi_task_id, event_id_list):
0878         with self.proxyPool.get() as proxy:
0879             return proxy.insertHpoEventAboutIdds_JEDI(jedi_task_id, event_id_list)
0880 
0881     # get event statistics
0882     def get_event_statistics(self, jedi_task_id):
0883         with self.proxyPool.get() as proxy:
0884             return proxy.get_event_statistics(jedi_task_id)
0885 
0886     # get site to-running rate statistics by global share
0887     def getSiteToRunRateStats(self, vo, exclude_rwq, starttime_min, starttime_max):
0888         with self.proxyPool.get() as proxy:
0889             return proxy.getSiteToRunRateStats(vo, exclude_rwq, starttime_min, starttime_max)
0890 
0891     # update cache
0892     def updateCache_JEDI(self, main_key, sub_key, data):
0893         with self.proxyPool.get() as proxy:
0894             return proxy.updateCache_JEDI(main_key, sub_key, data)
0895 
0896     # get cache
0897     def getCache_JEDI(self, main_key, sub_key):
0898         with self.proxyPool.get() as proxy:
0899             return proxy.getCache_JEDI(main_key, sub_key)
0900 
0901     # get cache
0902     def extendSandboxLifetime_JEDI(self, jedi_taskid, file_name):
0903         with self.proxyPool.get() as proxy:
0904             return proxy.extendSandboxLifetime_JEDI(jedi_taskid, file_name)
0905 
0906     # turn a task into pending status for some reason
0907     def makeTaskPending_JEDI(self, jedi_taskid, reason="unknown"):
0908         with self.proxyPool.get() as proxy:
0909             return proxy.makeTaskPending_JEDI(jedi_taskid, reason)
0910 
0911     # query tasks and turn them into pending status for some reason, sql_query should query jeditaskid
0912     def queryTasksToBePending_JEDI(self, sql_query, params_map, reason):
0913         with self.proxyPool.get() as proxy:
0914             return proxy.queryTasksToBePending_JEDI(sql_query, params_map, reason)
0915 
0916     # get IDs of all datasets of input and lib, to update data locality records
0917     def get_tasks_inputdatasets_JEDI(self, vo):
0918         with self.proxyPool.get() as proxy:
0919             return proxy.get_tasks_inputdatasets_JEDI(vo)
0920 
0921     # update dataset locality
0922     def updateDatasetLocality_JEDI(self, jedi_taskid, datasetid, rse):
0923         with self.proxyPool.get() as proxy:
0924             return proxy.updateDatasetLocality_JEDI(jedi_taskid, datasetid, rse)
0925 
0926     # delete outdated dataset locality records
0927     def deleteOutdatedDatasetLocality_JEDI(self, before_timestamp):
0928         with self.proxyPool.get() as proxy:
0929             return proxy.deleteOutdatedDatasetLocality_JEDI(before_timestamp)
0930 
0931     # query tasks and preassign them to dedicate workqueue, sql_query should query jeditaskid
0932     def queryTasksToPreassign_JEDI(self, sql_query, params_map, site, blacklist=set(), limit=1):
0933         with self.proxyPool.get() as proxy:
0934             return proxy.queryTasksToPreassign_JEDI(sql_query, params_map, site, blacklist, limit)
0935 
0936     # close and reassign N jobs of a preassigned task
0937     def reassignJobsInPreassignedTask_JEDI(self, jedi_taskid, site, n_jobs_to_close):
0938         with self.proxyPool.get() as proxy:
0939             return proxy.reassignJobsInPreassignedTask_JEDI(jedi_taskid, site, n_jobs_to_close)
0940 
0941     # undo preassigned tasks
0942     def undoPreassignedTasks_JEDI(self, jedi_taskids, task_orig_attr_map, params_map, force=False):
0943         with self.proxyPool.get() as proxy:
0944             return proxy.undoPreassignedTasks_JEDI(jedi_taskids, task_orig_attr_map, params_map, force)
0945 
0946     # set missing files according to iDDS messages
0947     def setMissingFilesAboutIdds_JEDI(self, jeditaskid, filenames_dict):
0948         with self.proxyPool.get() as proxy:
0949             return proxy.setMissingFilesAboutIdds_JEDI(jeditaskid, filenames_dict)
0950 
0951     # set missing files according to iDDS messages
0952     def load_sw_map(self):
0953         with self.proxyPool.get() as proxy:
0954             return proxy.load_sw_map()
0955 
0956     # get origin datasets
0957     def get_origin_datasets(self, jedi_task_id, dataset_name, lfns):
0958         with self.proxyPool.get() as proxy:
0959             return proxy.get_origin_datasets(jedi_task_id, dataset_name, lfns)
0960 
0961     # push message to message processors which triggers functions of agents
0962     def push_task_trigger_message(self, msg_type, jedi_task_id, data_dict=None, priority=None, task_spec=None):
0963         with self.proxyPool.get() as proxy:
0964             return proxy.push_task_trigger_message(msg_type, jedi_task_id, data_dict, priority, task_spec)
0965 
0966     # aggregate carbon footprint of a task
0967     def get_task_carbon_footprint(self, jedi_task_id, level="global"):
0968         with self.proxyPool.get() as proxy:
0969             return proxy.get_task_carbon_footprint(jedi_task_id, level)
0970 
0971     # get pending data carousel tasks and their input datasets
0972     def get_pending_dc_tasks_JEDI(self, task_type="prod", time_limit_minutes=60):
0973         with self.proxyPool.get() as proxy:
0974             return proxy.get_pending_dc_tasks_JEDI(task_type=task_type, time_limit_minutes=time_limit_minutes)
0975 
0976     # get max number of events in a file of the dataset
0977     def get_max_events_in_dataset(self, jedi_task_id, dataset_id):
0978         with self.proxyPool.get() as proxy:
0979             return proxy.get_max_events_in_dataset(jedi_task_id, dataset_id)
0980 
0981     # get task failure metrics
0982     def get_task_failure_metrics(self, jedi_task_id):
0983         with self.proxyPool.get() as proxy:
0984             return proxy.get_task_failure_metrics(jedi_task_id)