Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:06

0001 import datetime
0002 import json
0003 import re
0004 import shlex
0005 import sys
0006 import time
0007 import traceback
0008 from contextlib import contextmanager
0009 from threading import Lock
0010 
0011 from pandacommon.pandalogger.LogWrapper import LogWrapper
0012 from pandacommon.pandalogger.PandaLogger import PandaLogger
0013 from pandacommon.pandautils.PandaUtils import naive_utcnow
0014 
0015 from pandaserver.brokerage.SiteMapper import SiteMapper
0016 from pandaserver.config import panda_config
0017 from pandaserver.dataservice.closer import Closer
0018 from pandaserver.dataservice.setupper import Setupper
0019 from pandaserver.srvcore import CoreUtils
0020 from pandaserver.taskbuffer import ErrorCode, EventServiceUtils, JobUtils, ProcessGroups
0021 from pandaserver.taskbuffer.DBProxyPool import DBProxyPool
0022 
0023 _logger = PandaLogger().getLogger("TaskBuffer")
0024 
0025 
0026 class TaskBuffer:
0027     """
0028     task queue
0029 
0030     """
0031 
0032     # constructor
0033     def __init__(self):
0034         self.proxyPool = None
0035         self.lock = Lock()
0036         self.nDBConnection = None
0037 
0038         # save the requester for monitoring/logging purposes
0039         self.start_time = time.time()
0040 
0041         # site mapper
0042         self.site_mapper = None
0043         # update time for site mapper
0044         self.last_update_site_mapper = None
0045 
0046     def __repr__(self):
0047         return "TaskBuffer"
0048 
0049     # initialize
0050     def init(self, dbname, dbpass, nDBConnection=10, useTimeout=False, requester=None):
0051         # acquire lock
0052         self.lock.acquire()
0053         self.nDBConnection = nDBConnection
0054 
0055         # create Proxy Pool
0056         if self.proxyPool is None:
0057             _logger.info(f"creating DBProxyPool with n_connections={nDBConnection} on behalf of {requester}")
0058             self.start_time = time.time()
0059             self.proxyPool = DBProxyPool(dbname, dbpass, nDBConnection, useTimeout)
0060 
0061         # release lock
0062         self.lock.release()
0063 
0064     # cleanup
0065     def cleanup(self, requester=None):
0066         if self.proxyPool:
0067             try:
0068                 pool_duration = time.time() - self.start_time
0069             except TypeError:
0070                 pool_duration = -1  # duration unknown
0071 
0072             _logger.info(f"destroying DBProxyPool after n_seconds={pool_duration} on behalf of {requester}")
0073             self.proxyPool.cleanup()
0074 
0075     # transaction as a context manager
0076     # CANNOT be used with ConBridge or TaskBufferInterface which uses multiprocess.pipe
0077     @contextmanager
0078     def transaction(self, name=None, tmp_log=None):
0079         with self.proxyPool.get() as proxy:
0080             with proxy.transaction(name, tmp_log) as txn:
0081                 if txn is None:
0082                     raise RuntimeError(f"Failed to start transaction {name}")
0083                 # yield the transaction
0084                 yield txn
0085 
0086     # get number of database connections
0087     def get_num_connections(self):
0088         return self.nDBConnection
0089 
0090     # get SiteMapper
0091     def get_site_mapper(self):
0092         time_now = naive_utcnow()
0093         if self.last_update_site_mapper is None or datetime.datetime.now(datetime.timezone.utc).replace(
0094             tzinfo=None
0095         ) - self.last_update_site_mapper > datetime.timedelta(minutes=10):
0096             self.site_mapper = SiteMapper(self)
0097             self.last_update_site_mapper = time_now
0098         return self.site_mapper
0099 
0100     # check production role
0101     def checkProdRole(self, fqans):
0102         for fqan in fqans:
0103             # check production role
0104             match = re.search("/([^/]+)/Role=production", fqan)
0105             if match is not None:
0106                 return True, match.group(1)
0107         return False, None
0108 
0109     # get priority parameters for user
0110     def getPrioParameters(self, jobs, user, fqans, userDefinedWG, validWorkingGroup):
0111         priorityOffset = 0
0112         serNum = 0
0113         weight = None
0114         prio_reduction = True
0115         # get boosted users and groups
0116         boost_dict = {}
0117         # get DB proxy
0118         with self.proxyPool.get() as proxy:
0119             # check production role
0120             withProdRole, workingGroup = self.checkProdRole(fqans)
0121             if withProdRole and jobs != []:
0122                 # check dataset name
0123                 for tmpFile in jobs[-1].Files:
0124                     if tmpFile.type in ["output", "log"] and not (tmpFile.lfn.startswith("group") or tmpFile.lfn.startswith("panda.um.group")):
0125                         # reset
0126                         withProdRole, workingGroup = False, None
0127                         break
0128             # reset nJob/weight for HC
0129             if jobs != []:
0130                 if jobs[0].processingType in [
0131                     "hammercloud",
0132                     "gangarobot",
0133                     "hammercloud-fax",
0134                 ] or jobs[
0135                     0
0136                 ].processingType.startswith("gangarobot-"):
0137                     serNum = 0
0138                     weight = 0.0
0139                 elif jobs[0].processingType in ["gangarobot", "gangarobot-pft"]:
0140                     priorityOffset = 3000
0141                 elif jobs[0].processingType in ["hammercloud-fax"]:
0142                     priorityOffset = 1001
0143                 else:
0144                     # get users and groups to boost job priorities
0145                     boost_dict = proxy.get_dict_to_boost_job_prio(jobs[-1].VO)
0146                     if boost_dict:
0147                         prodUserName = CoreUtils.clean_user_id(user)
0148                         # check boost list
0149                         if userDefinedWG and validWorkingGroup:
0150                             if "group" in boost_dict and jobs[-1].workingGroup in boost_dict["group"]:
0151                                 priorityOffset = boost_dict["group"][jobs[-1].workingGroup]
0152                                 weight = 0.0
0153                                 prio_reduction = False
0154                         else:
0155                             if "user" in boost_dict and prodUserName in boost_dict["user"]:
0156                                 priorityOffset = boost_dict["user"][prodUserName]
0157                                 weight = 0.0
0158                                 prio_reduction = False
0159             # check quota
0160             if weight is None:
0161                 weight = proxy.checkQuota(user)
0162                 # get nJob
0163                 if jobs == []:
0164                     serNum = proxy.getNumberJobsUser(user, workingGroup=userDefinedWG)
0165                 elif userDefinedWG and validWorkingGroup:
0166                     # check if group privileged
0167                     isSU, isGU = proxy.isSuperUser(jobs[0].workingGroup)
0168                     if not isSU:
0169                         serNum = proxy.getNumberJobsUser(user, workingGroup=jobs[0].workingGroup)
0170                     else:
0171                         # set high priority for production role
0172                         serNum = 0
0173                         weight = 0.0
0174                         priorityOffset = 2000
0175                 else:
0176                     serNum = proxy.getNumberJobsUser(user, workingGroup=None)
0177 
0178         return (
0179             withProdRole,
0180             workingGroup,
0181             priorityOffset,
0182             serNum,
0183             weight,
0184             prio_reduction,
0185         )
0186 
0187     # store Jobs into DB
0188     def storeJobs(
0189         self,
0190         jobs,
0191         user,
0192         joinThr=False,
0193         fqans=[],
0194         hostname="",
0195         checkSpecialHandling=True,
0196         toPending=False,
0197         oldPandaIDs=None,
0198         relationType=None,
0199         userVO="atlas",
0200         esJobsetMap=None,
0201         getEsJobsetMap=False,
0202         unprocessedMap=None,
0203         bulk_job_insert=False,
0204         trust_user=False,
0205     ):
0206         try:
0207             tmpLog = LogWrapper(_logger, f"storeJobs <{CoreUtils.clean_user_id(user)} nJobs={len(jobs)}>")
0208             tmpLog.debug(f"start toPending={toPending}")
0209             # check quota for priority calculation
0210             weight = 0.0
0211             userJobID = -1
0212             userJobsetID = -1
0213             userStatus = True
0214             priorityOffset = 0
0215             userCountry = None
0216             useExpress = False
0217             nExpressJobs = 0
0218             useDebugMode = False
0219             siteMapper = self.get_site_mapper()
0220 
0221             # check ban user
0222             if not trust_user and len(jobs) > 0:
0223                 # get DB proxy
0224                 with self.proxyPool.get() as proxy:
0225                     # check user status
0226                     tmpStatus = proxy.checkBanUser(user, jobs[0].prodSourceLabel)
0227                 # return if DN is blocked
0228                 if not tmpStatus:
0229                     tmpLog.debug(f"end 1 since DN {user} is blocked")
0230                     if getEsJobsetMap:
0231                         return [], None, unprocessedMap
0232                     return []
0233 
0234             tmpLog.debug(f"checked ban user")
0235             # set parameters for user jobs
0236             if (
0237                 len(jobs) > 0
0238                 and (jobs[0].prodSourceLabel in JobUtils.analy_sources + JobUtils.list_ptest_prod_sources)
0239                 and (not jobs[0].processingType in ["merge", "unmerge"])
0240             ):
0241                 # get DB proxy
0242                 with self.proxyPool.get() as proxy:
0243                     # get JobID and status
0244                     userJobID, userJobsetID, userStatus = proxy.getUserParameter(user, jobs[0].jobDefinitionID, jobs[0].jobsetID)
0245 
0246                     # check quota for express jobs
0247                     if "express" in jobs[0].specialHandling:
0248                         expressQuota = proxy.getExpressJobs(user)
0249                         if expressQuota is not None and expressQuota["status"] and expressQuota["quota"] > 0:
0250                             nExpressJobs = expressQuota["quota"]
0251                             if nExpressJobs > 0:
0252                                 useExpress = True
0253                     # debug mode
0254                     if jobs[0].is_debug_mode() or jobs[-1].is_debug_mode():
0255                         useDebugMode = True
0256 
0257                 # extract country group
0258                 for tmpFQAN in fqans:
0259                     match = re.search("^/atlas/([^/]+)/", tmpFQAN)
0260                     if match is not None:
0261                         tmpCountry = match.group(1)
0262                         # use country code or usatlas
0263                         if len(tmpCountry) == 2:
0264                             userCountry = tmpCountry
0265                             break
0266                         # usatlas
0267                         if tmpCountry in ["usatlas"]:
0268                             userCountry = "us"
0269                             break
0270             tmpLog.debug(f"set user job parameters")
0271 
0272             # return if DN is blocked
0273             if not userStatus:
0274                 tmpLog.debug(f"end 2 since {user} DN is blocked")
0275                 if getEsJobsetMap:
0276                     return ([], None, unprocessedMap)
0277                 return []
0278             # extract VO
0279             for tmpFQAN in fqans:
0280                 match = re.search("^/([^/]+)/", tmpFQAN)
0281                 if match is not None:
0282                     userVO = match.group(1)
0283                     break
0284 
0285             # get number of jobs currently in PandaDB
0286             serNum = 0
0287             userDefinedWG = False
0288             validWorkingGroup = False
0289             usingBuild = False
0290             withProdRole = False
0291             workingGroup = None
0292             prio_reduction = True
0293             if len(jobs) > 0 and (jobs[0].prodSourceLabel in JobUtils.analy_sources) and (not jobs[0].processingType in ["merge", "unmerge"]):
0294                 # extract user's working group from FQANs
0295                 userWorkingGroupList = []
0296                 for tmpFQAN in fqans:
0297                     match = re.search("/([^/]+)/Role=production", tmpFQAN)
0298                     if match is not None:
0299                         userWorkingGroupList.append(match.group(1))
0300                 # check workingGroup
0301                 if jobs[0].workingGroup not in ["", None, "NULL"]:
0302                     userDefinedWG = True
0303                     # check with FQANs
0304                     if jobs[0].workingGroup in userWorkingGroupList:
0305                         validWorkingGroup = True
0306                 # using build for analysis
0307                 if jobs[0].prodSourceLabel == "panda":
0308                     usingBuild = True
0309                 # get priority parameters for user
0310                 (
0311                     withProdRole,
0312                     workingGroup,
0313                     priorityOffset,
0314                     serNum,
0315                     weight,
0316                     prio_reduction,
0317                 ) = self.getPrioParameters(jobs, user, fqans, userDefinedWG, validWorkingGroup)
0318                 tmpLog.debug(f"workingGroup={jobs[0].workingGroup} serNum={serNum} weight={weight} pOffset={priorityOffset} reduction={prio_reduction}")
0319             tmpLog.debug(f"got prio parameters")
0320             # get DB proxy
0321             with self.proxyPool.get() as proxy:
0322                 tmpLog.debug(f"got proxy")
0323                 # get total number of files
0324                 totalNumFiles = 0
0325                 for job in jobs:
0326                     totalNumFiles += len(job.Files)
0327                 # bulk fetch PandaIDs
0328                 new_panda_ids = proxy.bulk_fetch_panda_ids(len(jobs))
0329                 tmpLog.debug(f"got PandaIDs")
0330                 # bulk fetch fileIDs
0331                 fileIDPool = []
0332                 if totalNumFiles > 0:
0333                     fileIDPool = sorted(proxy.bulkFetchFileIDsPanda(totalNumFiles))
0334                 # loop over all jobs
0335                 ret = []
0336                 newJobs = []
0337                 if esJobsetMap is None:
0338                     esJobsetMap = {}
0339                 try:
0340                     tmpLog.debug(f"jediTaskID={jobs[0].jediTaskID} len(esJobsetMap)={len(esJobsetMap)} nJobs={len(jobs)}")
0341                 except Exception:
0342                     pass
0343                 job_ret_list = []
0344                 params_for_bulk_insert = []
0345                 special_handling_list = []
0346                 num_original_event_service_jobs = 0
0347                 for idxJob, job in enumerate(jobs):
0348                     # set PandaID
0349                     job.PandaID = new_panda_ids[idxJob]
0350                     # set JobID. keep original JobID when retry
0351                     if (
0352                         userJobID != -1
0353                         and job.prodSourceLabel in JobUtils.analy_sources
0354                         and (job.attemptNr in [0, "0", "NULL"] or (job.jobExecutionID not in [0, "0", "NULL"]) or job.lockedby == "jedi")
0355                         and (not jobs[0].processingType in ["merge", "unmerge"])
0356                     ):
0357                         job.jobDefinitionID = userJobID
0358                     # set jobsetID
0359                     if job.prodSourceLabel in JobUtils.analy_sources + JobUtils.list_ptest_prod_sources:
0360                         job.jobsetID = userJobsetID
0361                     # set relocation flag
0362                     if job.computingSite != "NULL" and job.relocationFlag != 2:
0363                         job.relocationFlag = 1
0364                     # protection agains empty jobParameters
0365                     if job.jobParameters in ["", None, "NULL"]:
0366                         job.jobParameters = " "
0367                     # set country group and nJobs (=taskID)
0368                     if job.prodSourceLabel in JobUtils.analy_sources:
0369                         if job.lockedby != "jedi":
0370                             job.countryGroup = userCountry
0371                         # set workingGroup
0372                         if not validWorkingGroup:
0373                             if withProdRole:
0374                                 # set country group if submitted with production role
0375                                 job.workingGroup = workingGroup
0376                             else:
0377                                 if userDefinedWG:
0378                                     # reset invalid working group
0379                                     job.workingGroup = None
0380                         # set nJobs (=taskID)
0381                         if usingBuild:
0382                             tmpNumBuild = 1
0383                             tmpNunRun = len(jobs) - 1
0384                         else:
0385                             tmpNumBuild = 0
0386                             tmpNunRun = len(jobs)
0387                         # encode
0388                         job.taskID = tmpNumBuild + (tmpNunRun << 1)
0389                     # set hostname
0390                     if hostname != "":
0391                         job.creationHost = hostname
0392 
0393                     # process and set the job_label
0394                     if not job.job_label or job.job_label not in (
0395                         JobUtils.PROD_PS,
0396                         JobUtils.ANALY_PS,
0397                     ):
0398                         tmpSiteSpec = siteMapper.getSite(job.computingSite)
0399                         queue_type = tmpSiteSpec.type
0400                         if queue_type == "analysis":
0401                             job.job_label = JobUtils.ANALY_PS
0402                         elif queue_type == "production":
0403                             job.job_label = JobUtils.PROD_PS
0404                         elif queue_type == "unified":
0405                             if job.prodSourceLabel in JobUtils.analy_sources:
0406                                 job.job_label = JobUtils.ANALY_PS
0407                             else:
0408                                 # set production as default if not specified and neutral prodsourcelabel
0409                                 job.job_label = JobUtils.PROD_PS
0410                         else:  # e.g. type = special
0411                             job.job_label = JobUtils.PROD_PS
0412 
0413                     # extract file info, change specialHandling for event service
0414                     origSH = job.specialHandling
0415                     (
0416                         eventServiceInfo,
0417                         job.specialHandling,
0418                         esIndex,
0419                     ) = EventServiceUtils.decodeFileInfo(job.specialHandling)
0420                     origEsJob = False
0421                     if eventServiceInfo != {}:
0422                         # set jobsetID
0423                         if esIndex in esJobsetMap:
0424                             job.jobsetID = esJobsetMap[esIndex]
0425                         else:
0426                             origEsJob = True
0427                             num_original_event_service_jobs += 1
0428                             esJobsetMap[esIndex] = None
0429                         # sort files since file order is important for positional event number
0430                         job.sortFiles()
0431                     if oldPandaIDs is not None and len(oldPandaIDs) > idxJob:
0432                         jobOldPandaIDs = oldPandaIDs[idxJob]
0433                     else:
0434                         jobOldPandaIDs = None
0435                     # check events for jumbo jobs
0436                     isOK = True
0437                     if EventServiceUtils.isJumboJob(job):
0438                         hasReadyEvents = proxy.hasReadyEvents(job.jediTaskID)
0439                         if hasReadyEvents is False:
0440                             isOK = False
0441                     # insert job to DB
0442                     if not isOK:
0443                         # skip since there is no ready event
0444                         job.PandaID = None
0445                     if not bulk_job_insert:
0446                         tmp_ret_i = proxy.insertNewJob(
0447                             job,
0448                             user,
0449                             serNum,
0450                             weight,
0451                             priorityOffset,
0452                             userVO,
0453                             toPending,
0454                             origEsJob,
0455                             eventServiceInfo,
0456                             oldPandaIDs=jobOldPandaIDs,
0457                             relationType=relationType,
0458                             fileIDPool=fileIDPool,
0459                             origSpecialHandling=origSH,
0460                             unprocessedMap=unprocessedMap,
0461                             prio_reduction=prio_reduction,
0462                         )
0463                         if unprocessedMap is not None:
0464                             tmp_ret_i, unprocessedMap = tmp_ret_i
0465                     else:
0466                         # keep parameters for late bulk execution
0467                         params_for_bulk_insert.append(
0468                             [
0469                                 [job, user, serNum, weight, priorityOffset, userVO, toPending],
0470                                 {
0471                                     "origEsJob": origEsJob,
0472                                     "eventServiceInfo": eventServiceInfo,
0473                                     "oldPandaIDs": jobOldPandaIDs,
0474                                     "relationType": relationType,
0475                                     "fileIDPool": fileIDPool,
0476                                     "origSpecialHandling": origSH,
0477                                     "unprocessedMap": unprocessedMap,
0478                                     "prio_reduction": prio_reduction,
0479                                 },
0480                                 {"esIndex": esIndex},
0481                             ]
0482                         )
0483                         special_handling_list.append(origSH)
0484                         tmp_ret_i = True
0485                     if tmp_ret_i and origEsJob and not bulk_job_insert:
0486                         # mapping of jobsetID for event service
0487                         esJobsetMap[esIndex] = job.jobsetID
0488                     job_ret_list.append([job, tmp_ret_i])
0489                     serNum += 1
0490                     try:
0491                         fileIDPool = fileIDPool[len(job.Files) :]
0492                     except Exception:
0493                         fileIDPool = []
0494                 # bulk insert
0495                 if bulk_job_insert:
0496                     # get jobset IDs for event service jobs
0497                     if num_original_event_service_jobs > 0:
0498                         new_jobset_ids = proxy.bulk_fetch_panda_ids(num_original_event_service_jobs)
0499                     else:
0500                         new_jobset_ids = []
0501                     tmp_ret, job_ret_list, es_jobset_map = proxy.bulk_insert_new_jobs(
0502                         jobs[0].jediTaskID, params_for_bulk_insert, new_jobset_ids, special_handling_list
0503                     )
0504                     if not tmp_ret:
0505                         raise RuntimeError("bulk job insert failed")
0506                     # update esJobsetMap
0507                     esJobsetMap.update(es_jobset_map)
0508 
0509                 # check returns
0510                 for job, tmp_ret_i in job_ret_list:
0511                     if not tmp_ret_i:
0512                         # reset if failed
0513                         job.PandaID = None
0514                     else:
0515                         # append
0516                         newJobs.append(job)
0517                     if job.prodSourceLabel in JobUtils.analy_sources + JobUtils.list_ptest_prod_sources:
0518                         ret.append((job.PandaID, job.jobDefinitionID, {"jobsetID": job.jobsetID}))
0519                     else:
0520                         ret.append((job.PandaID, job.jobDefinitionID, job.jobName))
0521             # set up dataset
0522             if not toPending:
0523                 if joinThr:
0524                     thr = Setupper(
0525                         self,
0526                         newJobs,
0527                     )
0528                     thr.start()
0529                     thr.join()
0530                 else:
0531                     # cannot use 'thr =' because it may trigger garbage collector
0532                     Setupper(
0533                         self,
0534                         newJobs,
0535                     ).start()
0536             # return jobIDs
0537             tmpLog.debug("end successfully")
0538             if getEsJobsetMap:
0539                 return (ret, esJobsetMap, unprocessedMap)
0540             return ret
0541         except Exception as e:
0542             tmpLog.error(f"{str(e)} {traceback.format_exc()}")
0543             errStr = "ERROR: ServerError with storeJobs"
0544             if getEsJobsetMap:
0545                 return (errStr, None, unprocessedMap)
0546             return errStr
0547 
0548     # lock jobs for reassign
0549     def lockJobsForReassign(
0550         self,
0551         tableName,
0552         timeLimit,
0553         statList,
0554         labels,
0555         processTypes,
0556         sites,
0557         clouds,
0558         useJEDI=False,
0559         onlyReassignable=False,
0560         useStateChangeTime=False,
0561         getEventService=False,
0562     ):
0563         # get DB proxy
0564         with self.proxyPool.get() as proxy:
0565             # exec
0566             res = proxy.lockJobsForReassign(
0567                 tableName,
0568                 timeLimit,
0569                 statList,
0570                 labels,
0571                 processTypes,
0572                 sites,
0573                 clouds,
0574                 useJEDI,
0575                 onlyReassignable,
0576                 useStateChangeTime,
0577                 getEventService,
0578             )
0579         return res
0580 
0581     # get a DB configuration value
0582     def getConfigValue(self, component, key, app="pandaserver", vo=None, default=None):
0583         # get DB proxy
0584         with self.proxyPool.get() as proxy:
0585             # exec
0586             res = proxy.getConfigValue(component, key, app, vo)
0587             if res is None and default is not None:
0588                 res = default
0589         return res
0590 
0591     # lock jobs for finisher
0592     def lockJobsForFinisher(self, timeNow, rownum, highPrio):
0593         # get DB proxy
0594         with self.proxyPool.get() as proxy:
0595             # exec
0596             res = proxy.lockJobsForFinisher(timeNow, rownum, highPrio)
0597         return res
0598 
0599     # lock jobs for activator
0600     def lockJobsForActivator(self, timeLimit, rownum, prio):
0601         # get DB proxy
0602         with self.proxyPool.get() as proxy:
0603             # exec
0604             res = proxy.lockJobsForActivator(timeLimit, rownum, prio)
0605         return res
0606 
0607     # update overall job information
0608     def updateJobs(
0609         self,
0610         jobs,
0611         inJobsDefined,
0612         oldJobStatusList=None,
0613         extraInfo=None,
0614         async_dataset_update=False,
0615     ):
0616         # get DB proxy
0617         with self.proxyPool.get() as proxy:
0618             # loop over all jobs
0619             returns = []
0620             ddmIDs = []
0621             ddmAttempt = 0
0622             newMover = None
0623             for idxJob, job in enumerate(jobs):
0624                 # update DB
0625                 tmpddmIDs = []
0626                 if oldJobStatusList is not None and idxJob < len(oldJobStatusList):
0627                     oldJobStatus = oldJobStatusList[idxJob]
0628                 else:
0629                     oldJobStatus = None
0630                 # check for jumbo jobs
0631                 if EventServiceUtils.isJumboJob(job):
0632                     if job.jobStatus in ["defined", "assigned", "activated"]:
0633                         pass
0634                     else:
0635                         # check if there are done events
0636                         hasDone = proxy.hasDoneEvents(job.jediTaskID, job.PandaID, job)
0637                         if hasDone:
0638                             job.jobStatus = "finished"
0639                         else:
0640                             if job.pilotErrorCode in [1144, "1144"]:
0641                                 job.jobStatus = "cancelled"
0642                             else:
0643                                 job.jobStatus = "failed"
0644                             if job.taskBufferErrorDiag in ["", "NULL", None]:
0645                                 job.taskBufferErrorDiag = f"set {job.jobStatus} since no successful events"
0646                                 job.taskBufferErrorCode = ErrorCode.EC_EventServiceNoEvent
0647                 if job.jobStatus in ["finished", "failed", "cancelled"]:
0648                     if async_dataset_update and job.jediTaskID not in [None, "NULL"]:
0649                         async_params = {
0650                             "exec_order": 0,
0651                             "PandaID": job.PandaID,
0652                             "jediTaskID": job.jediTaskID,
0653                         }
0654                     else:
0655                         async_params = None
0656                     ret, tmpddmIDs, ddmAttempt, newMover = proxy.archiveJob(job, inJobsDefined, extraInfo=extraInfo, async_params=async_params)
0657                     if async_params is not None and ret:
0658                         proxy.async_update_datasets(job.PandaID)
0659                 else:
0660                     ret = proxy.updateJob(job, inJobsDefined, oldJobStatus=oldJobStatus, extraInfo=extraInfo)
0661                 returns.append(ret)
0662                 # collect IDs for reassign
0663                 if ret:
0664                     ddmIDs += tmpddmIDs
0665         # retry mover
0666         if newMover is not None:
0667             self.storeJobs([newMover], None, joinThr=True)
0668         # reassign jobs when ddm failed
0669         if ddmIDs != []:
0670             self.reassignJobs(ddmIDs, ddmAttempt, joinThr=True)
0671 
0672         return returns
0673 
0674     # update job jobStatus only
0675     def updateJobStatus(self, jobID, jobStatus, param, updateStateChange=False, attemptNr=None):
0676         # get DB proxy
0677         with self.proxyPool.get() as proxy:
0678             # update DB and buffer
0679             ret, post_action = proxy.updateJobStatus(jobID, jobStatus, param, updateStateChange, attemptNr)
0680             # take post-action
0681             if post_action:
0682                 # get semaphore for job cloning with runonce
0683                 if post_action["action"] == "get_event":
0684                     event_ret = proxy.getEventRanges(post_action["pandaID"], post_action["jobsetID"], post_action["jediTaskID"], 1, True, False, None)
0685                     if not event_ret:
0686                         proxy.killJob(post_action["pandaID"], "job cloning", "", True)
0687                         ret = "tobekilled"
0688             # get secrets for debug mode
0689             if isinstance(ret, str) and "debug" in ret:
0690                 tmpS, secrets = proxy.get_user_secrets(panda_config.pilot_secrets)
0691                 if tmpS and secrets:
0692                     ret = {"command": ret, "secrets": secrets}
0693         return ret
0694 
0695     # update worker status by the pilot
0696     def updateWorkerPilotStatus(self, workerID, harvesterID, status, node_id):
0697         # get DB proxy
0698         with self.proxyPool.get() as proxy:
0699             # update DB and buffer
0700             ret = proxy.updateWorkerPilotStatus(workerID, harvesterID, status, node_id)
0701         return ret
0702 
0703     def update_worker_node(
0704         self,
0705         site,
0706         panda_queue,
0707         host_name,
0708         cpu_model,
0709         cpu_model_normalized,
0710         n_logical_cpus,
0711         n_sockets,
0712         cores_per_socket,
0713         threads_per_core,
0714         cpu_architecture,
0715         cpu_architecture_level,
0716         clock_speed,
0717         total_memory,
0718         total_local_disk,
0719     ):
0720         # get DB proxy
0721         with self.proxyPool.get() as proxy:
0722             # update DB and buffer
0723             ret = proxy.update_worker_node(
0724                 site,
0725                 panda_queue,
0726                 host_name,
0727                 cpu_model,
0728                 cpu_model_normalized,
0729                 n_logical_cpus,
0730                 n_sockets,
0731                 cores_per_socket,
0732                 threads_per_core,
0733                 cpu_architecture,
0734                 cpu_architecture_level,
0735                 clock_speed,
0736                 total_memory,
0737                 total_local_disk,
0738             )
0739         return ret
0740 
0741     def update_worker_node_gpu(
0742         self,
0743         site: str,
0744         host_name: str,
0745         vendor: str,
0746         model: str,
0747         count: int,
0748         vram: int,
0749         architecture: str,
0750         framework: str,
0751         framework_version: str,
0752         driver_version: str,
0753     ):
0754         # get DB proxy
0755         with self.proxyPool.get() as proxy:
0756             # update DB and buffer
0757             ret = proxy.update_worker_node_gpu(
0758                 site,
0759                 host_name,
0760                 vendor,
0761                 model,
0762                 count,
0763                 vram,
0764                 architecture,
0765                 framework,
0766                 framework_version,
0767                 driver_version,
0768             )
0769         return ret
0770 
0771     def get_architecture_level_map(self):
0772         # get DB proxy
0773         with self.proxyPool.get() as proxy:
0774             ret = proxy.get_architecture_level_map()
0775         return ret
0776 
0777     # finalize pending analysis jobs
0778     def finalizePendingJobs(self, prodUserName, jobDefinitionID, waitLock=False):
0779         # get DB proxy
0780         with self.proxyPool.get() as proxy:
0781             # update DB
0782             ret = proxy.finalizePendingJobs(prodUserName, jobDefinitionID, waitLock)
0783         return ret
0784 
0785     # retry job
0786     def retryJob(
0787         self,
0788         jobID,
0789         param,
0790         failedInActive=False,
0791         changeJobInMem=False,
0792         inMemJob=None,
0793         getNewPandaID=False,
0794         attemptNr=None,
0795         recoverableEsMerge=False,
0796     ):
0797         # get DB proxy
0798         with self.proxyPool.get() as proxy:
0799             # update DB
0800             ret = proxy.retryJob(
0801                 jobID,
0802                 param,
0803                 failedInActive,
0804                 changeJobInMem,
0805                 inMemJob,
0806                 getNewPandaID,
0807                 attemptNr,
0808                 recoverableEsMerge,
0809             )
0810         return ret
0811 
0812     # activate jobs
0813     def activateJobs(self, jobs):
0814         # get DB proxy
0815         with self.proxyPool.get() as proxy:
0816             # loop over all jobs
0817             returns = []
0818             for job in jobs:
0819                 # update DB
0820                 ret = proxy.activateJob(job)
0821                 returns.append(ret)
0822         return returns
0823 
0824     # send jobs to jobsWaiting
0825     def keepJobs(self, jobs):
0826         # get DB proxy
0827         with self.proxyPool.get() as proxy:
0828             # loop over all jobs
0829             returns = []
0830             for job in jobs:
0831                 # update DB
0832                 ret = proxy.keepJob(job)
0833                 returns.append(ret)
0834         return returns
0835 
0836     # archive jobs
0837     def archiveJobs(self, jobs, inJobsDefined, fromJobsWaiting=False):
0838         # get DB proxy
0839         with self.proxyPool.get() as proxy:
0840             # loop over all jobs
0841             returns = []
0842             for job in jobs:
0843                 # update DB
0844                 ret = proxy.archiveJob(job, inJobsDefined, fromJobsWaiting=fromJobsWaiting)
0845                 returns.append(ret[0])
0846         return returns
0847 
0848     # set debug mode
0849     def setDebugMode(self, dn, pandaID, prodManager, modeOn, workingGroup):
0850         # get DB proxy
0851         with self.proxyPool.get() as proxy:
0852             # check the number of debug jobs
0853             hitLimit = False
0854             if modeOn is True:
0855                 if prodManager:
0856                     limitNum = None
0857                 elif workingGroup is not None:
0858                     jobList = proxy.getActiveDebugJobs(workingGroup=workingGroup)
0859                     limitNum = ProcessGroups.maxDebugWgJobs
0860                 else:
0861                     jobList = proxy.getActiveDebugJobs(dn=dn)
0862                     limitNum = ProcessGroups.maxDebugJobs
0863                 if limitNum and len(jobList) >= limitNum:
0864                     # exceeded
0865                     retStr = "You already hit the limit on the maximum number of debug subjobs "
0866                     retStr += f"({limitNum} jobs). "
0867                     retStr += "Please set the debug mode off for one of the following PandaIDs : "
0868                     for tmpID in jobList:
0869                         retStr += f"{tmpID},"
0870                     retStr = retStr[:-1]
0871                     hitLimit = True
0872             if not hitLimit:
0873                 # execute
0874                 retStr = proxy.setDebugMode(dn, pandaID, prodManager, modeOn, workingGroup)
0875         return retStr
0876 
0877     # get jobs
0878     def getJobs(
0879         self,
0880         nJobs,
0881         siteName,
0882         prodSourceLabel,
0883         mem,
0884         diskSpace,
0885         node,
0886         timeout,
0887         computingElement,
0888         prodUserID,
0889         taskID,
0890         background,
0891         resourceType,
0892         harvester_id,
0893         worker_id,
0894         schedulerID,
0895         jobType,
0896         is_gu,
0897         via_topic,
0898         remaining_time,
0899     ):
0900         # get DBproxy
0901         with self.proxyPool.get() as proxy:
0902             # get waiting jobs
0903             t_before = time.time()
0904             jobs, nSent = proxy.getJobs(
0905                 nJobs,
0906                 siteName,
0907                 prodSourceLabel,
0908                 mem,
0909                 diskSpace,
0910                 node,
0911                 timeout,
0912                 computingElement,
0913                 prodUserID,
0914                 taskID,
0915                 background,
0916                 resourceType,
0917                 harvester_id,
0918                 worker_id,
0919                 schedulerID,
0920                 jobType,
0921                 is_gu,
0922                 via_topic,
0923                 remaining_time,
0924             )
0925             t_after = time.time()
0926             t_total = t_after - t_before
0927             _logger.debug(f"getJobs : took {t_total}s for {siteName} nJobs={nJobs} prodSourceLabel={prodSourceLabel}")
0928 
0929         # get secret
0930         secrets_map = {}
0931         for job in jobs:
0932             if job.use_secrets() and job.prodUserName not in secrets_map:
0933                 # get secret
0934                 with self.proxyPool.get() as proxy:
0935                     tmp_status, secret = proxy.get_user_secrets(job.prodUserName)
0936                     if not tmp_status:
0937                         secret = None
0938                 secrets_map[job.prodUserName] = secret
0939             if job.is_debug_mode():
0940                 if panda_config.pilot_secrets not in secrets_map:
0941                     # get secret
0942                     with self.proxyPool.get() as proxy:
0943                         tmp_status, secret = proxy.get_user_secrets(panda_config.pilot_secrets)
0944                         if not tmp_status:
0945                             secret = None
0946                     secrets_map[panda_config.pilot_secrets] = secret
0947 
0948         return jobs + [nSent, {}, secrets_map]
0949 
0950     # get job status
0951     def getJobStatus(
0952         self,
0953         jobIDs,
0954         fromDefined=True,
0955         fromActive=True,
0956         fromArchived=True,
0957         fromWaiting=True,
0958     ):
0959         # get DBproxy
0960         with self.proxyPool.get() as proxy:
0961             retStatus = []
0962             # peek at job
0963             for jobID in jobIDs:
0964                 res = proxy.peekJob(jobID, fromDefined, fromActive, fromArchived, fromWaiting)
0965                 if res:
0966                     retStatus.append(res.jobStatus)
0967                 else:
0968                     retStatus.append(None)
0969         return retStatus
0970 
0971     # peek at jobs
0972     def peekJobs(
0973         self,
0974         jobIDs,
0975         fromDefined=True,
0976         fromActive=True,
0977         fromArchived=True,
0978         fromWaiting=True,
0979         forAnal=False,
0980         use_json=False,
0981     ):
0982         # get proxy
0983         with self.proxyPool.get() as proxy:
0984             retJobs = []
0985             # peek at job
0986             for jobID in jobIDs:
0987                 res = proxy.peekJob(jobID, fromDefined, fromActive, fromArchived, fromWaiting, forAnal)
0988                 if res:
0989                     if use_json:
0990                         retJobs.append(res.to_dict())
0991                     else:
0992                         retJobs.append(res)
0993                 else:
0994                     retJobs.append(None)
0995         return retJobs
0996 
0997     # get PandaIDs with TaskID
0998     def getPandaIDsWithTaskID(self, jediTaskID: int, scout_only: bool = False, unsuccessful_only: bool = False) -> list[int]:
0999         """Get PanDA job IDs associated with a JEDI task.
1000 
1001         Args:
1002             jediTaskID: JEDI task ID.
1003             scout_only: When True, return only scout job IDs.
1004             unsuccessful_only: When True, return only job IDs with status failed, cancelled, or closed.
1005 
1006         Returns:
1007             list[int]: PanDA job IDs for the task.
1008         """
1009         # get DBproxy
1010         with self.proxyPool.get() as proxy:
1011             # exec
1012             retJobs = proxy.getPandaIDsWithTaskID(jediTaskID, scout_only=scout_only, unsuccessful_only=unsuccessful_only)
1013         return retJobs
1014 
1015     # get full job status
1016     def getFullJobStatus(self, jobIDs, fromDefined=True, fromActive=True, fromArchived=True, fromWaiting=True, forAnal=True, days=30, use_json=False):
1017         retJobMap = {}
1018 
1019         # peek at job
1020         for jobID in jobIDs:
1021             # get DBproxy for each job to avoid occupying connection for long time
1022             with self.proxyPool.get() as proxy:
1023                 # peek job
1024                 res = proxy.peekJob(jobID, fromDefined, fromActive, fromArchived, fromWaiting, forAnal)
1025                 retJobMap[jobID] = res
1026 
1027         # get IDs
1028         for jobID in jobIDs:
1029             if retJobMap[jobID] is None:
1030                 # get ArchiveDBproxy
1031                 with self.proxyPool.get() as proxy:
1032                     # peek job
1033                     res = proxy.peekJobLog(jobID, days)
1034                     retJobMap[jobID] = res
1035 
1036         # sort
1037         retJobs = []
1038         for jobID in jobIDs:
1039             if use_json:
1040                 if retJobMap[jobID] is None:
1041                     retJobs.append(None)
1042                 else:
1043                     retJobs.append(retJobMap[jobID].to_dict())
1044             else:
1045                 retJobs.append(retJobMap[jobID])
1046 
1047         return retJobs
1048 
1049     # get script for offline running
1050     def getScriptOfflineRunning(self, pandaID, days=None):
1051         try:
1052             # get job
1053             tmpJobs = self.getFullJobStatus([pandaID], days=days)
1054             if tmpJobs == [] or tmpJobs[0] is None:
1055                 errStr = f"ERROR: Cannot get PandaID={pandaID} in DB "
1056                 if days is None:
1057                     errStr += "for the last 30 days. You may add &days=N to the URL"
1058                 else:
1059                     errStr += f"for the last {days} days. You may change &days=N in the URL"
1060                 return errStr
1061             tmpJob = tmpJobs[0]
1062             # user job
1063             isUser = False
1064             for trf in [
1065                 "runAthena",
1066                 "runGen",
1067                 "runcontainer",
1068                 "runMerge",
1069                 "buildJob",
1070                 "buildGen",
1071             ]:
1072                 if trf in tmpJob.transformation:
1073                     isUser = True
1074                     break
1075             # check prodSourceLabel
1076             if tmpJob.prodSourceLabel == "user":
1077                 isUser = True
1078             if isUser:
1079                 tmpAtls = [tmpJob.AtlasRelease]
1080                 tmpRels = [re.sub("^AnalysisTransforms-*", "", tmpJob.homepackage)]
1081                 tmpPars = [tmpJob.jobParameters]
1082                 tmpTrfs = [tmpJob.transformation]
1083             else:
1084                 # release and trf
1085                 tmpAtls = tmpJob.AtlasRelease.split("\n")
1086                 tmpRels = tmpJob.homepackage.split("\n")
1087                 tmpPars = tmpJob.jobParameters.split("\n")
1088                 tmpTrfs = tmpJob.transformation.split("\n")
1089             if not (len(tmpRels) == len(tmpPars) == len(tmpTrfs)):
1090                 return "ERROR: The number of releases or parameters or trfs is inconsistent with others"
1091             # construct script
1092             scrStr = (
1093                 "#!/bin/bash\n\n"
1094                 "# To rerun the job interactively :\n"
1095                 "#   1) download this script\n"
1096                 "#   2) chmod +x ./<this script>\n"
1097                 "#   3) setupATLAS\n"
1098                 "#   4) ./<this script>\n\n"
1099                 "temp_file=$(mktemp)\n"
1100                 'cat << EOF > "$temp_file"\n\n'
1101                 "source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh\n"
1102                 "lsetup rucio\n\n"
1103                 "#retrieve inputs\n\n"
1104             )
1105             # collect inputs
1106             dsFileMap = {}
1107             for tmpFile in tmpJob.Files:
1108                 if tmpFile.type == "input":
1109                     if tmpFile.dataset not in dsFileMap:
1110                         dsFileMap[tmpFile.dataset] = []
1111                     if tmpFile.lfn not in dsFileMap[tmpFile.dataset]:
1112                         dsFileMap[tmpFile.dataset].append(tmpFile.scope + ":" + tmpFile.lfn)
1113             # get
1114             for tmpDS in dsFileMap:
1115                 tmpFileList = dsFileMap[tmpDS]
1116                 for tmpLFN in tmpFileList:
1117                     scrStr += f"rucio download {tmpLFN} --no-subdir\n"
1118             if isUser:
1119                 scrStr += "\n#get trf\n"
1120                 scrStr += f"wget {tmpTrfs[0]}\n"
1121                 scrStr += f"chmod +x {tmpTrfs[0].split('/')[-1]}\n"
1122             scrStr += "\n#transform commands\n\n"
1123             for tmpIdx, tmpRel in enumerate(tmpRels):
1124                 # asetup
1125                 atlRel = re.sub("Atlas-", "", tmpAtls[tmpIdx])
1126                 atlTags = re.split("[/_]", tmpRel)
1127                 if "" in atlTags:
1128                     atlTags.remove("")
1129                 if atlRel != "" and atlRel not in atlTags and (re.search("^\d+\.\d+\.\d+$", atlRel) is None or isUser):
1130                     atlTags.append(atlRel)
1131                 try:
1132                     cmtConfig = [s for s in tmpJob.cmtConfig.split("@") if s][-1]
1133                 except Exception:
1134                     cmtConfig = ""
1135                 scrStr += f"asetup --platform={tmpJob.cmtConfig.split('@')[0]} {','.join(atlTags)}\n"
1136                 # athenaMP
1137                 if tmpJob.coreCount not in ["NULL", None] and tmpJob.coreCount > 1:
1138                     scrStr += f"export ATHENA_PROC_NUMBER={tmpJob.coreCount}\n"
1139                     scrStr += f"export ATHENA_CORE_NUMBER={tmpJob.coreCount}\n"
1140                 # add double quotes for zsh
1141                 tmpParamStr = tmpPars[tmpIdx]
1142                 tmpSplitter = shlex.shlex(tmpParamStr, posix=True)
1143                 tmpSplitter.whitespace = " "
1144                 tmpSplitter.whitespace_split = True
1145                 # loop for params
1146                 for tmpItem in tmpSplitter:
1147                     tmpMatch = re.search("^(-[^=]+=)(.+)$", tmpItem)
1148                     if tmpMatch is not None:
1149                         tmpArgName = tmpMatch.group(1)
1150                         tmpArgVal = tmpMatch.group(2)
1151                         tmpArgIdx = tmpParamStr.find(tmpArgName) + len(tmpArgName)
1152                         # add "
1153                         if tmpParamStr[tmpArgIdx] != '"':
1154                             tmpParamStr = tmpParamStr.replace(tmpMatch.group(0), tmpArgName + '"' + tmpArgVal + '"')
1155                 # run trf
1156                 if isUser:
1157                     scrStr += "./"
1158                     tmpParamStr += " --debug"
1159                 scrStr += f"{tmpTrfs[tmpIdx].split('/')[-1]} {tmpParamStr}\n\n"
1160                 scrStr += "EOF\n\n" 'chmod +x "$temp_file"\n'
1161                 scrStr += 'source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh -c %s -r "$temp_file"\n' % cmtConfig
1162                 scrStr += 'rm "$temp_file"\n'
1163             return scrStr
1164         except Exception as e:
1165             _logger.error(f"getScriptOfflineRunning : {str(e)} {traceback.format_exc()}")
1166             return f"ERROR: ServerError in getScriptOfflineRunning with {str(e)}"
1167 
1168     # kill jobs
1169     def killJobs(self, ids, user, code, prodManager, wgProdRole=[], killOptions=[]):
1170         tmp_log = LogWrapper(_logger, "killJobs")
1171         tmp_log.debug(f"start for {len(ids)} IDs")
1172         # get DBproxy
1173         with self.proxyPool.get() as proxy:
1174             rets = []
1175             # kill jobs
1176             pandaIDforCloserMap = {}
1177             for id in ids:
1178                 # retry event service merge
1179                 toKill = True
1180                 if "keepUnmerged" in killOptions:
1181                     tmpJobSpec = proxy.peekJob(id, True, True, False, False, False)
1182                     if tmpJobSpec is not None:
1183                         if EventServiceUtils.isEventServiceMerge(tmpJobSpec):
1184                             # retry ES merge jobs not to discard events
1185                             proxy.retryJob(
1186                                 id,
1187                                 {},
1188                                 getNewPandaID=True,
1189                                 attemptNr=tmpJobSpec.attemptNr,
1190                                 recoverableEsMerge=True,
1191                             )
1192                         elif EventServiceUtils.isEventServiceJob(tmpJobSpec) and not EventServiceUtils.isJobCloningJob(tmpJobSpec):
1193                             # get number of started events
1194                             nEvt = proxy.getNumStartedEvents(tmpJobSpec)
1195                             # not to kill jobset if there are started events
1196                             if nEvt is not None and nEvt > 0:
1197                                 # set sub status if any
1198                                 for killOpt in killOptions:
1199                                     if killOpt.startswith("jobSubStatus"):
1200                                         tmpJobSpec.jobSubStatus = killOpt.split("=")[-1]
1201                                         break
1202                                 # trigger ppE for ES jobs to properly trigger subsequent procedures
1203                                 ret = proxy.archiveJob(
1204                                     tmpJobSpec,
1205                                     tmpJobSpec.jobStatus in ["defined", "assigned"],
1206                                 )
1207                                 toKill = False
1208                                 userInfo = {"prodSourceLabel": None}
1209                 if toKill:
1210                     ret, userInfo = proxy.killJob(id, user, code, prodManager, True, wgProdRole, killOptions)
1211                 rets.append(ret)
1212                 if ret and userInfo["prodSourceLabel"] in ["user", "managed", "test"]:
1213                     jobIDKey = (
1214                         userInfo["prodUserID"],
1215                         userInfo["jobDefinitionID"],
1216                         userInfo["jobsetID"],
1217                     )
1218                     if jobIDKey not in pandaIDforCloserMap:
1219                         pandaIDforCloserMap[jobIDKey] = id
1220         # run Closer
1221         try:
1222             if pandaIDforCloserMap != {}:
1223                 for pandaIDforCloser in pandaIDforCloserMap.values():
1224                     tmpJobs = self.peekJobs([pandaIDforCloser])
1225                     tmpJob = tmpJobs[0]
1226                     if tmpJob is not None:
1227                         tmpDestDBlocks = []
1228                         # get destDBlock
1229                         for tmpFile in tmpJob.Files:
1230                             if tmpFile.type in ["output", "log"]:
1231                                 if tmpFile.destinationDBlock not in tmpDestDBlocks:
1232                                     tmpDestDBlocks.append(tmpFile.destinationDBlock)
1233                         # run
1234                         closer_process = Closer(self, tmpDestDBlocks, tmpJob)
1235                         closer_process.run()
1236         except Exception as e:
1237             tmp_log.error(f"failed with {str(e)} {traceback.format_exc()}")
1238         tmp_log.debug("done")
1239         return rets
1240 
1241     # reassign jobs
1242     def reassignJobs(
1243         self,
1244         ids,
1245         attempt=0,
1246         joinThr=False,
1247         forPending=False,
1248         firstSubmission=True,
1249     ):
1250         tmp_log = LogWrapper(_logger, "reassignJobs")
1251         tmp_log.debug(f"start for {len(ids)} IDs")
1252         # get DB proxy
1253         with self.proxyPool.get() as proxy:
1254             jobs = []
1255             # reset jobs
1256             n_reset_waiting = 0
1257             n_reset_defined = 0
1258             for tmp_id in ids:
1259                 try:
1260                     # try to reset a job in waiting table
1261                     ret = proxy.resetJob(
1262                         tmp_id,
1263                         activeTable=False,
1264                         forPending=True,
1265                     )
1266                     if ret is not None:
1267                         jobs.append(ret)
1268                         n_reset_waiting += 1
1269                         # waiting jobs don't create sub or dis
1270                         continue
1271                     # try to reset a job in defined table
1272                     ret = proxy.resetDefinedJob(tmp_id)
1273                     if ret is not None:
1274                         jobs.append(ret)
1275                         n_reset_defined += 1
1276                         continue
1277                 except Exception as e:
1278                     tmp_log.error(f"failed with {str(e)} {traceback.format_exc()}")
1279         tmp_log.debug(f"got {len(jobs)} IDs in total: {n_reset_waiting} from Waiting, {n_reset_defined} from Defined")
1280         # trigger subsequent agent
1281         if jobs:
1282             if joinThr:
1283                 thr = Setupper(
1284                     self,
1285                     jobs,
1286                     resubmit=True,
1287                     first_submission=firstSubmission,
1288                 )
1289                 thr.start()
1290                 thr.join()
1291             else:
1292                 # cannot use 'thr =' because it may trigger a garbage collector
1293                 Setupper(
1294                     self,
1295                     jobs,
1296                     resubmit=True,
1297                     first_submission=firstSubmission,
1298                 ).start()
1299         tmp_log.debug("done")
1300 
1301         return True
1302 
1303     # update input files and return corresponding PandaIDs
1304     def updateInFilesReturnPandaIDs(self, dataset, status, fileLFN=""):
1305         # get DBproxy
1306         with self.proxyPool.get() as proxy:
1307             retList = []
1308             # query PandaID
1309             retList = proxy.updateInFilesReturnPandaIDs(dataset, status, fileLFN)
1310         return retList
1311 
1312     # update input files for jobs at certain sites and return corresponding PandaIDs
1313     def update_input_files_at_sites_and_get_panda_ids(self, filename: str, sites: list) -> list:
1314         with self.proxyPool.get() as proxy:
1315             ret = proxy.update_input_files_at_sites_and_get_panda_ids(filename, sites)
1316         return ret
1317 
1318     # update output files and return corresponding PandaIDs
1319     def updateOutFilesReturnPandaIDs(self, dataset, fileLFN=""):
1320         # get DBproxy
1321         with self.proxyPool.get() as proxy:
1322             retList = []
1323             # query PandaID
1324             retList = proxy.updateOutFilesReturnPandaIDs(dataset, fileLFN)
1325         return retList
1326 
1327     # get _dis datasets associated to _sub
1328     def getAssociatedDisDatasets(self, subDsName):
1329         # get DBproxy
1330         with self.proxyPool.get() as proxy:
1331             retList = []
1332             # query
1333             retList = proxy.getAssociatedDisDatasets(subDsName)
1334         return retList
1335 
1336     # insert sandbox file info
1337     def insertSandboxFileInfo(self, userName, hostName, fileName, fileSize, checkSum):
1338         # get DBproxy
1339         with self.proxyPool.get() as proxy:
1340             # exec
1341             ret = proxy.insertSandboxFileInfo(userName, hostName, fileName, fileSize, checkSum)
1342         return ret
1343 
1344     # get and lock sandbox files
1345     def getLockSandboxFiles(self, time_limit, n_files):
1346         # get DBproxy
1347         with self.proxyPool.get() as proxy:
1348             # exec
1349             ret = proxy.getLockSandboxFiles(time_limit, n_files)
1350         return ret
1351 
1352     # check duplicated sandbox file
1353     def checkSandboxFile(self, userName, fileSize, checkSum):
1354         # get DBproxy
1355         with self.proxyPool.get() as proxy:
1356             # exec
1357             ret = proxy.checkSandboxFile(userName, fileSize, checkSum)
1358         return ret
1359 
1360     # insert datasets
1361     def insertDatasets(self, datasets):
1362         # get DBproxy
1363         with self.proxyPool.get() as proxy:
1364             retList = []
1365             # insert
1366             for dataset in datasets:
1367                 ret = proxy.insertDataset(dataset)
1368                 retList.append(ret)
1369         return retList
1370 
1371     # get and lock dataset with a query
1372     def getLockDatasets(self, sqlQuery, varMapGet, modTimeOffset="", getVersion=False):
1373         # get DBproxy
1374         with self.proxyPool.get() as proxy:
1375             # query Dataset
1376             ret = proxy.getLockDatasets(sqlQuery, varMapGet, modTimeOffset, getVersion)
1377         return ret
1378 
1379     # query Dataset
1380     def queryDatasetWithMap(self, map):
1381         # get DBproxy
1382         with self.proxyPool.get() as proxy:
1383             # query Dataset
1384             ret = proxy.queryDatasetWithMap(map)
1385         return ret
1386 
1387     # set GUIDs
1388     def setGUIDs(self, files):
1389         # get DBproxy
1390         with self.proxyPool.get() as proxy:
1391             # set GUIDs
1392             ret = proxy.setGUIDs(files)
1393         return ret
1394 
1395     # update dataset
1396     def updateDatasets(self, datasets, withLock=False, withCriteria="", criteriaMap={}):
1397         # get DBproxy
1398         with self.proxyPool.get() as proxy:
1399             # update Dataset
1400             retList = proxy.updateDataset(datasets, withLock, withCriteria, criteriaMap)
1401         return retList
1402 
1403     # trigger cleanup of internal datasets used by a task
1404     def trigger_cleanup_internal_datasets(self, task_id: int) -> bool:
1405         with self.proxyPool.get() as proxy:
1406             ret = proxy.trigger_cleanup_internal_datasets(task_id)
1407         return ret
1408 
1409     # count the number of files with map
1410     def countFilesWithMap(self, map):
1411         # get DBproxy
1412         with self.proxyPool.get() as proxy:
1413             # query files
1414             ret = proxy.countFilesWithMap(map)
1415         return ret
1416 
1417     # get serial number for dataset
1418     def getSerialNumber(self, datasetname, definedFreshFlag=None):
1419         # get DBproxy
1420         with self.proxyPool.get() as proxy:
1421             # get serial number
1422             ret = proxy.getSerialNumber(datasetname, definedFreshFlag)
1423         return ret
1424 
1425     # add metadata
1426     def addMetadata(self, ids, metadataList, newStatusList):
1427         # get DBproxy
1428         with self.proxyPool.get() as proxy:
1429             # add metadata
1430             index = 0
1431             retList = []
1432             for id in ids:
1433                 ret = proxy.addMetadata(id, metadataList[index], newStatusList[index])
1434                 retList.append(ret)
1435                 index += 1
1436         return retList
1437 
1438     # add stdout
1439     def addStdOut(self, id, stdout):
1440         # get DBproxy
1441         with self.proxyPool.get() as proxy:
1442             # add
1443             ret = proxy.addStdOut(id, stdout)
1444         return ret
1445 
1446     # extract scope from dataset name
1447     def extractScope(self, name):
1448         # get DBproxy
1449         with self.proxyPool.get() as proxy:
1450             # get
1451             ret = proxy.extractScope(name)
1452         return ret
1453 
1454     # get job statistics
1455     def getJobStatistics(self):
1456         # get DBproxy
1457         with self.proxyPool.get() as proxy:
1458             # get serial number
1459             ret = proxy.getJobStatistics()
1460         return ret
1461 
1462     # get detailed job statistics with resource_type and prodsourcelabel
1463     def getDetailedJobStatistics(self):
1464         with self.proxyPool.get() as proxy:
1465             ret = proxy.getDetailedJobStatistics()
1466         return ret
1467 
1468     # get job statistics for ExtIF. Source type is analysis or production
1469     def getJobStatisticsForExtIF(self, sourcetype=None):
1470         # get DBproxy
1471         with self.proxyPool.get() as proxy:
1472             # get serial number
1473             ret = proxy.getJobStatisticsForExtIF(sourcetype)
1474         return ret
1475 
1476     # get job statistics for Bamboo
1477     def getJobStatisticsForBamboo(self):
1478         # get DBproxy
1479         with self.proxyPool.get() as proxy:
1480             # get serial number
1481             ret = proxy.getJobStatisticsPerProcessingType()
1482         return ret
1483 
1484     # update site data
1485     def updateSiteData(self, hostID, pilotRequests, interval=3):
1486         # get DBproxy
1487         with self.proxyPool.get() as proxy:
1488             # get serial number
1489             ret = proxy.updateSiteData(hostID, pilotRequests, interval)
1490         return ret
1491 
1492     # get current site data
1493     def getCurrentSiteData(self):
1494         # get DBproxy
1495         with self.proxyPool.get() as proxy:
1496             # get serial number
1497             ret = proxy.getCurrentSiteData()
1498         return ret
1499 
1500     # insert nRunning in site data
1501     def insertnRunningInSiteData(self):
1502         # get DBproxy
1503         with self.proxyPool.get() as proxy:
1504             # get serial number
1505             ret = proxy.insertnRunningInSiteData()
1506         return ret
1507 
1508     # get site info
1509     def getSiteInfo(self):
1510         # get DBproxy
1511         with self.proxyPool.get() as proxy:
1512             # get site info
1513             ret = proxy.getSiteInfo()
1514         return ret
1515 
1516     # get cloud list
1517     def get_cloud_list(self):
1518         # get DBproxy
1519         with self.proxyPool.get() as proxy:
1520             # get cloud list
1521             ret = proxy.get_cloud_list()
1522         return ret
1523 
1524     # get special dispatcher parameters
1525     def get_special_dispatch_params(self):
1526         # get DBproxy
1527         with self.proxyPool.get() as proxy:
1528             # exec
1529             ret = proxy.get_special_dispatch_params()
1530         return ret
1531 
1532     # get email address
1533     def getEmailAddr(self, name, withDN=False, withUpTime=False):
1534         # get DBproxy
1535         with self.proxyPool.get() as proxy:
1536             # get
1537             ret = proxy.getEmailAddr(name, withDN, withUpTime)
1538         return ret
1539 
1540     # set email address for a user
1541     def setEmailAddr(self, userName, emailAddr):
1542         # get DBproxy
1543         with self.proxyPool.get() as proxy:
1544             # set
1545             ret = proxy.setEmailAddr(userName, emailAddr)
1546         return ret
1547 
1548     # get banned users
1549     def get_ban_users(self):
1550         # get DBproxy
1551         with self.proxyPool.get() as proxy:
1552             # get
1553             ret = proxy.get_ban_users()
1554         return ret
1555 
1556     # register a token key
1557     def register_token_key(self, client_name, lifetime):
1558         # get DBproxy
1559         with self.proxyPool.get() as proxy:
1560             # register proxy key
1561             ret = proxy.register_token_key(client_name, lifetime)
1562         return ret
1563 
1564     # query an SQL return Status
1565     def querySQLS(self, sql, varMap, arraySize=1000):
1566         # get DBproxy
1567         with self.proxyPool.get() as proxy:
1568             # get
1569             ret = proxy.querySQLS(sql, varMap, arraySize)
1570         return ret
1571 
1572     # query an SQL
1573     def querySQL(self, sql, varMap, arraySize=1000):
1574         # get DBproxy
1575         with self.proxyPool.get() as proxy:
1576             # get
1577             ret = proxy.querySQLS(sql, varMap, arraySize)[1]
1578         return ret
1579 
1580     # execute an SQL return with executemany
1581     def executemanySQL(self, sql, varMaps, arraySize=1000):
1582         # get DBproxy
1583         with self.proxyPool.get() as proxy:
1584             # get
1585             ret = proxy.executemanySQL(sql, varMaps, arraySize)
1586         return ret
1587 
1588     # check quota
1589     def checkQuota(self, dn):
1590         # query an SQL return Status
1591         with self.proxyPool.get() as proxy:
1592             # get
1593             ret = proxy.checkQuota(dn)
1594         return ret
1595 
1596     # insert TaskParams
1597     def insertTaskParamsPanda(self, taskParams, user, prodRole, fqans=[], parent_tid=None, properErrorCode=False, allowActiveTask=False, decode=True):
1598         # query an SQL return Status
1599         with self.proxyPool.get() as proxy:
1600             # check user status
1601             tmpStatus = proxy.checkBanUser(user, None, True)
1602             if tmpStatus is True:
1603                 # exec
1604                 ret = proxy.insertTaskParamsPanda(taskParams, user, prodRole, fqans, parent_tid, properErrorCode, allowActiveTask, decode)
1605             elif tmpStatus == 1:
1606                 ret = False, "Failed to update DN in PandaDB"
1607             elif tmpStatus == 2:
1608                 ret = False, "Failed to insert user info to PandaDB"
1609             else:
1610                 ret = False, f"The following DN is banned: DN={user}"
1611         return ret
1612 
1613     # send command to task
1614     def sendCommandTaskPanda(
1615         self,
1616         jediTaskID,
1617         dn,
1618         prodRole,
1619         comStr,
1620         comComment=None,
1621         useCommit=True,
1622         properErrorCode=False,
1623         comQualifier=None,
1624         broadcast=False,
1625     ):
1626         # query an SQL return Status
1627         with self.proxyPool.get() as proxy:
1628             # exec
1629             ret = proxy.sendCommandTaskPanda(
1630                 jediTaskID,
1631                 dn,
1632                 prodRole,
1633                 comStr,
1634                 comComment,
1635                 useCommit,
1636                 properErrorCode,
1637                 comQualifier,
1638                 broadcast,
1639             )
1640         return ret
1641 
1642     # update unmerged datasets to trigger merging
1643     def updateUnmergedDatasets(self, job, finalStatusDS, updateCompleted=False):
1644         # get proxy
1645         with self.proxyPool.get() as proxy:
1646             # exec
1647             ret = proxy.updateUnmergedDatasets(job, finalStatusDS, updateCompleted)
1648         return ret
1649 
1650     # get active JediTasks in a time range
1651     def getJediTasksInTimeRange(self, dn, timeRangeStr, fullFlag=False, minTaskID=None, task_type="user"):
1652         # check DN
1653         if dn in ["NULL", "", "None", None]:
1654             return {}
1655         # check timeRange
1656         match = re.match("^(\d+)-(\d+)-(\d+) (\d+):(\d+):(\d+)$", timeRangeStr)
1657         if match is None:
1658             return {}
1659         timeRange = datetime.datetime(
1660             year=int(match.group(1)),
1661             month=int(match.group(2)),
1662             day=int(match.group(3)),
1663             hour=int(match.group(4)),
1664             minute=int(match.group(5)),
1665             second=int(match.group(6)),
1666         )
1667         # max range is 3 months
1668         maxRange = naive_utcnow() - datetime.timedelta(days=30)
1669         if timeRange < maxRange:
1670             timeRange = maxRange
1671         # get proxy
1672         with self.proxyPool.get() as proxy:
1673             # exec
1674             ret = proxy.getJediTasksInTimeRange(dn, timeRange, fullFlag, minTaskID, task_type)
1675         return ret
1676 
1677     # get details of JediTask
1678     def getJediTaskDetails(self, jediTaskID, fullFlag, withTaskInfo):
1679         # get proxy
1680         with self.proxyPool.get() as proxy:
1681             # exec
1682             ret = proxy.getJediTaskDetails(jediTaskID, fullFlag, withTaskInfo)
1683         return ret
1684 
1685     # get task details as a plain dict (read-only, no lock)
1686     def get_task_details_json(self, jedi_task_id, resolve_parent=False, include_resolve_status=False):
1687         with self.proxyPool.get() as proxy:
1688             return proxy.get_task_details_json(
1689                 jedi_task_id,
1690                 resolve_parent=resolve_parent,
1691                 include_resolve_status=include_resolve_status,
1692             )
1693 
1694     # get a list of even ranges for a PandaID
1695     def getEventRanges(self, pandaID, jobsetID, jediTaskID, nRanges, acceptJson, scattered, segment_id):
1696         # get proxy
1697         with self.proxyPool.get() as proxy:
1698             # exec
1699             ret = proxy.getEventRanges(pandaID, jobsetID, jediTaskID, nRanges, acceptJson, scattered, segment_id)
1700         return ret
1701 
1702     # update an even range
1703     def updateEventRange(self, eventRangeID, eventStatus, cpuCore, cpuConsumptionTime, objstoreID=None):
1704         eventDict = {}
1705         eventDict["eventRangeID"] = eventRangeID
1706         eventDict["eventStatus"] = eventStatus
1707         eventDict["cpuCore"] = cpuCore
1708         eventDict["cpuConsumptionTime"] = cpuConsumptionTime
1709         eventDict["objstoreID"] = objstoreID
1710         # get proxy
1711         with self.proxyPool.get() as proxy:
1712             # exec
1713             ret = proxy.updateEventRanges([eventDict])
1714         # extract return
1715         try:
1716             retVal = ret[0][0]
1717         except Exception:
1718             retVal = False
1719 
1720         return retVal, json.dumps(ret[1])
1721 
1722     # update even ranges
1723     def updateEventRanges(self, eventRanges, version=0):
1724         # decode json
1725         try:
1726             eventRanges = json.loads(eventRanges)
1727         except Exception:
1728             return json.dumps("ERROR : failed to convert eventRanges with json")
1729         # get proxy
1730         with self.proxyPool.get() as proxy:
1731             # exec
1732             ret = proxy.updateEventRanges(eventRanges, version)
1733         if version != 0:
1734             return ret
1735         return json.dumps(ret[0]), json.dumps(ret[1])
1736 
1737     # change task priority
1738     def changeTaskPriorityPanda(self, jediTaskID, newPriority):
1739         # get proxy
1740         with self.proxyPool.get() as proxy:
1741             # exec
1742             ret = proxy.changeTaskPriorityPanda(jediTaskID, newPriority)
1743         return ret
1744 
1745     # throttle user jobs
1746     def throttleUserJobs(self, prodUserName, workingGroup, get_dict=False):
1747         # get proxy
1748         with self.proxyPool.get() as proxy:
1749             # exec
1750             ret = proxy.throttleUserJobs(prodUserName, workingGroup, get_dict)
1751         return ret
1752 
1753     # unthrottle user jobs
1754     def unThrottleUserJobs(self, prodUserName, workingGroup, get_dict=False):
1755         # get proxy
1756         with self.proxyPool.get() as proxy:
1757             # exec
1758             ret = proxy.unThrottleUserJobs(prodUserName, workingGroup, get_dict)
1759         return ret
1760 
1761     # get throttled users
1762     def getThrottledUsers(self):
1763         # get proxy
1764         with self.proxyPool.get() as proxy:
1765             # exec
1766             ret = proxy.getThrottledUsers()
1767         return ret
1768 
1769     # get the list of jobdefIDs for failed jobs in a task
1770     def getJobdefIDsForFailedJob(self, jediTaskID):
1771         # get proxy
1772         with self.proxyPool.get() as proxy:
1773             # exec
1774             ret = proxy.getJobdefIDsForFailedJob(jediTaskID)
1775         return ret
1776 
1777     # change task attribute
1778     def changeTaskAttributePanda(self, jediTaskID, attrName, attrValue):
1779         # get proxy
1780         with self.proxyPool.get() as proxy:
1781             # exec
1782             ret = proxy.changeTaskAttributePanda(jediTaskID, attrName, attrValue)
1783         return ret
1784 
1785     # change split rule for task
1786     def changeTaskSplitRulePanda(self, jediTaskID, attrName, attrValue):
1787         # get proxy
1788         with self.proxyPool.get() as proxy:
1789             # exec
1790             ret = proxy.changeTaskSplitRulePanda(jediTaskID, attrName, attrValue)
1791         return ret
1792 
1793     # increase attempt number for unprocessed files
1794     def increaseAttemptNrPanda(self, jediTaskID, increasedNr):
1795         # get proxy
1796         with self.proxyPool.get() as proxy:
1797             # exec
1798             ret = proxy.increaseAttemptNrPanda(jediTaskID, increasedNr)
1799         return ret
1800 
1801     # get jediTaskID from taskName
1802     def getTaskIDwithTaskNameJEDI(self, userName, taskName):
1803         # get proxy
1804         with self.proxyPool.get() as proxy:
1805             # exec
1806             ret = proxy.getTaskIDwithTaskNameJEDI(userName, taskName)
1807         return ret
1808 
1809     # update error dialog for a jediTaskID
1810     def updateTaskErrorDialogJEDI(self, jediTaskID, msg):
1811         # get proxy
1812         with self.proxyPool.get() as proxy:
1813             # exec
1814             ret = proxy.updateTaskErrorDialogJEDI(jediTaskID, msg)
1815         return ret
1816 
1817     # update modificationtime for a jediTaskID to trigger subsequent process
1818     def updateTaskModTimeJEDI(self, jediTaskID, newStatus=None):
1819         # get proxy
1820         with self.proxyPool.get() as proxy:
1821             # exec
1822             ret = proxy.updateTaskModTimeJEDI(jediTaskID, newStatus)
1823         return ret
1824 
1825     # check input file status
1826     def checkInputFileStatusInJEDI(self, jobSpec):
1827         # get proxy
1828         with self.proxyPool.get() as proxy:
1829             # exec
1830             ret = proxy.checkInputFileStatusInJEDI(jobSpec)
1831         return ret
1832 
1833     # increase memory limit
1834     def increaseRamLimitJEDI(self, jediTaskID, jobRamCount):
1835         # get proxy
1836         with self.proxyPool.get() as proxy:
1837             # exec
1838             ret = proxy.increaseRamLimitJEDI(jediTaskID, jobRamCount)
1839         return ret
1840 
1841     # increase memory limit
1842     def increaseRamLimitJobJEDI(self, job, jobRamCount, jediTaskID):
1843         # get proxy
1844         with self.proxyPool.get() as proxy:
1845             # exec
1846             ret = proxy.increaseRamLimitJobJEDI(job, jobRamCount, jediTaskID)
1847         return ret
1848 
1849     # increase memory limit xtimes
1850     def increaseRamLimitJobJEDI_xtimes(self, job, jobRamCount, jediTaskID, attemptNr):
1851         # get proxy
1852         with self.proxyPool.get() as proxy:
1853             # exec
1854             ret = proxy.increaseRamLimitJobJEDI_xtimes(job, jobRamCount, jediTaskID, attemptNr)
1855         return ret
1856 
1857     # reduce input per job
1858     def reduce_input_per_job(self, panda_id, jedi_task_id, attempt_nr, excluded_rules, steps, dry_mode=False):
1859         # get proxy
1860         with self.proxyPool.get() as proxy:
1861             # exec
1862             ret = proxy.reduce_input_per_job(panda_id, jedi_task_id, attempt_nr, excluded_rules, steps, dry_mode)
1863         return ret
1864 
1865     # reset files in JEDI
1866     def resetFileStatusInJEDI(self, dn, prodManager, datasetName, lostFiles, recoverParent, simul=False):
1867         # get proxy
1868         with self.proxyPool.get() as proxy:
1869             # exec
1870             ret = proxy.resetFileStatusInJEDI(dn, prodManager, datasetName, lostFiles, recoverParent, simul)
1871         return ret
1872 
1873     # copy file records
1874     def copy_file_records(self, new_lfns, file_spec):
1875         # get proxy
1876         with self.proxyPool.get() as proxy:
1877             # exec
1878             ret = proxy.copy_file_records(new_lfns, file_spec)
1879         return ret
1880 
1881     # retry module: get the defined rules
1882     def getRetrialRules(self):
1883         # get proxy
1884         with self.proxyPool.get() as proxy:
1885             # exec
1886             ret = proxy.getRetrialRules()
1887         return ret
1888 
1889     # retry module action: set max number of retries
1890     def setMaxAttempt(self, jobID, jediTaskID, files, attemptNr):
1891         # get proxy
1892         with self.proxyPool.get() as proxy:
1893             # exec
1894             ret = proxy.setMaxAttempt(jobID, jediTaskID, files, attemptNr)
1895         return ret
1896 
1897     # error classification action: increase by one the max number of retries
1898     def increase_max_failure(self, job_id, task_id, files):
1899         # get proxy
1900         with self.proxyPool.get() as proxy:
1901             # exec
1902             ret = proxy.increase_max_failure(job_id, task_id, files)
1903         return ret
1904 
1905     # retry module action: set maxAttempt to the current attemptNr to avoid further retries
1906     def setNoRetry(self, jobID, jediTaskID, files):
1907         # get proxy
1908         with self.proxyPool.get() as proxy:
1909             # exec
1910             ret = proxy.setNoRetry(jobID, jediTaskID, files)
1911         return ret
1912 
1913     # retry module action: increase CPU Time
1914     def initialize_cpu_time_task(self, jobID, taskID, siteid, files, active):
1915         # get proxy
1916         with self.proxyPool.get() as proxy:
1917             # exec
1918             ret = proxy.initialize_cpu_time_task(jobID, taskID, siteid, files, active)
1919         return ret
1920 
1921     # retry module action: recalculate the Task Parameters
1922     def requestTaskParameterRecalculation(self, taskID):
1923         # get proxy
1924         with self.proxyPool.get() as proxy:
1925             # exec
1926             ret = proxy.requestTaskParameterRecalculation(taskID)
1927         return ret
1928 
1929     # add associate sub datasets for single consumer job
1930     def getDestDBlocksWithSingleConsumer(self, jediTaskID, PandaID, ngDatasets):
1931         # get proxy
1932         with self.proxyPool.get() as proxy:
1933             # exec
1934             ret = proxy.getDestDBlocksWithSingleConsumer(jediTaskID, PandaID, ngDatasets)
1935         return ret
1936 
1937     # check validity of merge job
1938     def isValidMergeJob(self, pandaID, jediTaskID):
1939         # get proxy
1940         with self.proxyPool.get() as proxy:
1941             # exec
1942             ret = proxy.isValidMergeJob(pandaID, jediTaskID)
1943         return ret
1944 
1945     # Configurator: insert network matrix data
1946     def insertNetworkMatrixData(self, data):
1947         # get proxy
1948         with self.proxyPool.get() as proxy:
1949             # exec
1950             ret = proxy.insertNetworkMatrixData(data)
1951         return ret
1952 
1953     # Configurator: delete old network matrix data
1954     def deleteOldNetworkData(self):
1955         # get proxy
1956         with self.proxyPool.get() as proxy:
1957             # exec
1958             ret = proxy.deleteOldNetworkData()
1959         return ret
1960 
1961     # get dispatch datasets per user
1962     def getDispatchDatasetsPerUser(self, vo, prodSourceLabel, onlyActive, withSize):
1963         # get proxy
1964         with self.proxyPool.get() as proxy:
1965             # exec
1966             ret = proxy.getDispatchDatasetsPerUser(vo, prodSourceLabel, onlyActive, withSize)
1967         return ret
1968 
1969     # get task parameters
1970     def getTaskParamsPanda(self, jediTaskID):
1971         # get proxy
1972         with self.proxyPool.get() as proxy:
1973             # exec
1974             ret = proxy.getTaskParamsPanda(jediTaskID)
1975         return ret
1976 
1977     # get task attributes
1978     def getTaskAttributesPanda(self, jediTaskID, attrs):
1979         # get proxy
1980         with self.proxyPool.get() as proxy:
1981             # exec
1982             ret = proxy.getTaskAttributesPanda(jediTaskID, attrs)
1983         return ret
1984 
1985     # check for cloned jobs
1986     def checkClonedJob(self, jobSpec):
1987         # get proxy
1988         with self.proxyPool.get() as proxy:
1989             # exec
1990             ret = proxy.checkClonedJob(jobSpec)
1991         return ret
1992 
1993     # get co-jumbo jobs to be finished
1994     def getCoJumboJobsToBeFinished(self, timeLimit, minPriority, maxJobs):
1995         # get proxy
1996         with self.proxyPool.get() as proxy:
1997             # exec
1998             ret = proxy.getCoJumboJobsToBeFinished(timeLimit, minPriority, maxJobs)
1999         return ret
2000 
2001     # get number of events to be processed
2002     def getNumReadyEvents(self, jediTaskID):
2003         # get proxy
2004         with self.proxyPool.get() as proxy:
2005             # exec
2006             ret = proxy.getNumReadyEvents(jediTaskID)
2007         return ret
2008 
2009     # check if task is applicable for jumbo jobs
2010     def isApplicableTaskForJumbo(self, jediTaskID):
2011         # get proxy
2012         with self.proxyPool.get() as proxy:
2013             # exec
2014             ret = proxy.isApplicableTaskForJumbo(jediTaskID)
2015         return ret
2016 
2017     # cleanup jumbo jobs
2018     def cleanupJumboJobs(self, jediTaskID=None):
2019         # get proxy
2020         with self.proxyPool.get() as proxy:
2021             # exec
2022             ret = proxy.cleanupJumboJobs(jediTaskID)
2023         return ret
2024 
2025     # convert ObjID to endpoint
2026     def convertObjIDtoEndPoint(self, srcFileName, ObjID):
2027         # get DB proxy
2028         with self.proxyPool.get() as proxy:
2029             # exec
2030             res = proxy.convertObjIDtoEndPoint(srcFileName, ObjID)
2031         return res
2032 
2033     # get task status
2034     def getTaskStatus(self, jediTaskID):
2035         # get DB proxy
2036         with self.proxyPool.get() as proxy:
2037             # exec
2038             res = proxy.getTaskStatus(jediTaskID)
2039         return res
2040 
2041     # get task status and superstatus
2042     def getTaskStatusSuperstatus(self, jediTaskID):
2043         # get DB proxy
2044         with self.proxyPool.get() as proxy:
2045             # exec
2046             res = proxy.getTaskStatusSuperstatus(jediTaskID)
2047         return res
2048 
2049     # reactivate task
2050     def reactivateTask(self, jediTaskID, keep_attempt_nr=False, trigger_job_generation=False):
2051         # get DB proxy
2052         with self.proxyPool.get() as proxy:
2053             # exec
2054             res = proxy.reactivateTask(jediTaskID, keep_attempt_nr, trigger_job_generation)
2055         return res
2056 
2057     # get event statistics
2058     def getEventStat(self, jediTaskID, PandaID):
2059         # get DB proxy
2060         with self.proxyPool.get() as proxy:
2061             # exec
2062             res = proxy.getEventStat(jediTaskID, PandaID)
2063         return res
2064 
2065     # get nested dict of gshare names implying the tree structure
2066     def get_tree_of_gshare_names(self):
2067         # get DB proxy
2068         with self.proxyPool.get() as proxy:
2069             # exec
2070             res = proxy.get_tree_of_gshare_names()
2071         return res
2072 
2073     # get the HS06 distribution for global shares
2074     def get_hs_distribution(self):
2075         # get DB proxy
2076         with self.proxyPool.get() as proxy:
2077             # exec
2078             res = proxy.get_hs_distribution()
2079         return res
2080 
2081     # reassign share
2082     def reassignShare(self, jedi_task_ids, share_dest, reassign_running):
2083         # get DB proxy
2084         with self.proxyPool.get() as proxy:
2085             # exec
2086             res = proxy.reassignShare(jedi_task_ids, share_dest, reassign_running)
2087         return res
2088 
2089     def is_valid_share(self, share_name):
2090         """
2091         Checks whether the share is a valid leave share
2092         """
2093         # get DB proxy
2094         with self.proxyPool.get() as proxy:
2095             # exec
2096             res = proxy.is_valid_share(share_name)
2097         return res
2098 
2099     def get_share_for_task(self, task):
2100         """
2101         Return the share based on a task specification
2102         """
2103         # get DB proxy
2104         with self.proxyPool.get() as proxy:
2105             # exec
2106             res = proxy.get_share_for_task(task)
2107         return res
2108 
2109     def get_share_for_job(self, job):
2110         """
2111         Return the share based on a task specification
2112         """
2113         # get DB proxy
2114         with self.proxyPool.get() as proxy:
2115             # exec
2116             res = proxy.get_share_for_job(job)
2117         return res
2118 
2119     def getTaskParamsMap(self, jediTaskID):
2120         """
2121         Return the taskParamsMap
2122         """
2123         # get DB proxy
2124         with self.proxyPool.get() as proxy:
2125             # exec
2126             res = proxy.getTaskParamsPanda(jediTaskID)
2127         return res
2128 
2129     def getCommands(self, harvester_id, n_commands):
2130         """
2131         Get n commands for a particular harvester instance
2132         """
2133         # get DB proxy
2134         with self.proxyPool.get() as proxy:
2135             # exec
2136             res = proxy.getCommands(harvester_id, n_commands)
2137         return res
2138 
2139     def ackCommands(self, command_ids):
2140         """
2141         Acknowledge a list of command IDs
2142         """
2143         # get DB proxy
2144         with self.proxyPool.get() as proxy:
2145             # exec
2146             res = proxy.ackCommands(command_ids)
2147         return res
2148 
2149     # send command to harvester or lock command
2150     def commandToHarvester(
2151         self,
2152         harvester_ID,
2153         command,
2154         ack_requested,
2155         status,
2156         lockInterval=None,
2157         comInterval=None,
2158         params=None,
2159     ):
2160         # get DB proxy
2161         with self.proxyPool.get() as proxy:
2162             # exec
2163             res = proxy.commandToHarvester(
2164                 harvester_ID,
2165                 command,
2166                 ack_requested,
2167                 status,
2168                 lockInterval,
2169                 comInterval,
2170                 params,
2171             )
2172         return res
2173 
2174     def getResourceTypes(self):
2175         """
2176         Get resource types (SCORE, MCORE, ...) and their definitions
2177         """
2178         # get DB proxy
2179         with self.proxyPool.get() as proxy:
2180             # exec
2181             res = proxy.load_resource_types(formatting="dict")
2182         return res
2183 
2184     # report stat of workers
2185     def reportWorkerStats_jobtype(self, harvesterID, siteName, paramsList):
2186         # get DB proxy
2187         with self.proxyPool.get() as proxy:
2188             # exec
2189             res = proxy.reportWorkerStats_jobtype(harvesterID, siteName, paramsList)
2190         return res
2191 
2192     # get command locks
2193     def getCommandLocksHarvester(self, harvester_ID, command, lockedBy, lockInterval, commandInterval):
2194         # get DB proxy
2195         with self.proxyPool.get() as proxy:
2196             # exec
2197             res = proxy.getCommandLocksHarvester(harvester_ID, command, lockedBy, lockInterval, commandInterval)
2198         return res
2199 
2200     # release command lock
2201     def releaseCommandLockHarvester(self, harvester_ID, command, computingSite, resourceType, lockedBy):
2202         # get DB proxy
2203         with self.proxyPool.get() as proxy:
2204             # exec
2205             res = proxy.releaseCommandLockHarvester(harvester_ID, command, computingSite, resourceType, lockedBy)
2206         return res
2207 
2208     # update workers
2209     def updateWorkers(self, harvesterID, data):
2210         """
2211         Update workers
2212         """
2213         # get DB proxy
2214         with self.proxyPool.get() as proxy:
2215             # exec
2216             res = proxy.updateWorkers(harvesterID, data)
2217         return res
2218 
2219     # update workers
2220     def updateServiceMetrics(self, harvesterID, data):
2221         """
2222         Update workers
2223         """
2224         # get DB proxy
2225         with self.proxyPool.get() as proxy:
2226             # exec
2227             res = proxy.updateServiceMetrics(harvesterID, data)
2228         return res
2229 
2230     # heartbeat for harvester
2231     def harvesterIsAlive(self, user, host, harvesterID, data):
2232         """
2233         update harvester instance information
2234         """
2235         # get DB proxy
2236         with self.proxyPool.get() as proxy:
2237             # exec
2238             res = proxy.harvesterIsAlive(user, host, harvesterID, data)
2239         return res
2240 
2241     def storePilotLog(self, panda_id, pilot_log):
2242         """
2243         Store the pilot log in the pandalog table
2244         """
2245         # get DB proxy
2246         with self.proxyPool.get() as proxy:
2247             # exec
2248             res = proxy.storePilotLog(panda_id, pilot_log)
2249         return res
2250 
2251     # read the resource types from the DB
2252     def load_resource_types(self):
2253         # get DBproxy
2254         with self.proxyPool.get() as proxy:
2255             # exec
2256             ret_val = proxy.load_resource_types()
2257         return ret_val
2258 
2259     # get the resource of a task
2260     def get_resource_type_task(self, task_spec):
2261         # get DBproxy
2262         with self.proxyPool.get() as proxy:
2263             # exec
2264             ret_val = proxy.get_resource_type_task(task_spec)
2265         return ret_val
2266 
2267     def reset_resource_type_task(self, jedi_task_id, use_commit=True):
2268         # get DBproxy
2269         with self.proxyPool.get() as proxy:
2270             # exec
2271             ret_val = proxy.reset_resource_type_task(jedi_task_id, use_commit)
2272         return ret_val
2273 
2274     # get the resource of a task
2275     def get_resource_type_job(self, job_spec):
2276         # get DBproxy
2277         with self.proxyPool.get() as proxy:
2278             # exec
2279             ret_val = proxy.get_resource_type_job(job_spec)
2280         return ret_val
2281 
2282     # check Job status
2283     def checkJobStatus(self, pandaIDs):
2284         try:
2285             pandaIDs = pandaIDs.split(",")
2286         except Exception:
2287             pandaIDs = []
2288         # get DBproxy
2289         with self.proxyPool.get() as proxy:
2290             # exec
2291             retList = []
2292             for pandaID in pandaIDs:
2293                 ret = proxy.checkJobStatus(pandaID)
2294                 retList.append(ret)
2295         return retList
2296 
2297     # get stat of workers
2298     def getWorkerStats(self):
2299         # get DBproxy
2300         with self.proxyPool.get() as proxy:
2301             # exec
2302             ret = proxy.getWorkerStats()
2303         return ret
2304 
2305     # get unified pilot streaming queues
2306     def ups_get_queues(self):
2307         # get DBproxy
2308         with self.proxyPool.get() as proxy:
2309             # exec
2310             ret = proxy.ups_get_queues()
2311         return ret
2312 
2313     # load harvester worker stats
2314     def ups_load_worker_stats(self):
2315         # get DBproxy
2316         with self.proxyPool.get() as proxy:
2317             # exec
2318             ret = proxy.ups_load_worker_stats()
2319         return ret
2320 
2321     # get the distribution of new workers to submit
2322     def get_average_memory_workers(self, queue, harvester_id, target):
2323         # get DBproxy
2324         with self.proxyPool.get() as proxy:
2325             # exec
2326             ret = proxy.get_average_memory_workers(queue, harvester_id, target)
2327         return ret
2328 
2329     # get the distribution of new workers to submit
2330     def ups_new_worker_distribution(self, queue, worker_stats):
2331         # get DBproxy
2332         with self.proxyPool.get() as proxy:
2333             # exec
2334             ret = proxy.ups_new_worker_distribution(queue, worker_stats)
2335         return ret
2336 
2337     # check event availability
2338     def checkEventsAvailability(self, pandaID, jobsetID, jediTaskID):
2339         # get DBproxy
2340         with self.proxyPool.get() as proxy:
2341             # exec
2342             ret = proxy.checkEventsAvailability(pandaID, jobsetID, jediTaskID)
2343         return ret
2344 
2345     # get LNFs for jumbo job
2346     def getLFNsForJumbo(self, jediTaskID):
2347         # get DBproxy
2348         with self.proxyPool.get() as proxy:
2349             # exec
2350             ret = proxy.getLFNsForJumbo(jediTaskID)
2351         return ret
2352 
2353     # get active job attribute
2354     def getActiveJobAttributes(self, pandaID, attrs):
2355         # get DBproxy
2356         with self.proxyPool.get() as proxy:
2357             # exec
2358             ret = proxy.getActiveJobAttributes(pandaID, attrs)
2359         return ret
2360 
2361     # get original consumers
2362     def getOriginalConsumers(self, jediTaskID, jobsetID, pandaID):
2363         # get DBproxy
2364         with self.proxyPool.get() as proxy:
2365             # exec
2366             ret = proxy.getOriginalConsumers(jediTaskID, jobsetID, pandaID)
2367         return ret
2368 
2369     # add harvester dialog messages
2370     def addHarvesterDialogs(self, harvesterID, dialogs):
2371         # get DBproxy
2372         with self.proxyPool.get() as proxy:
2373             # exec
2374             ret = proxy.addHarvesterDialogs(harvesterID, dialogs)
2375         return ret
2376 
2377     # get job statistics per site and resource
2378     def getJobStatisticsPerSiteResource(self, timeWindow=None):
2379         # get DBproxy
2380         with self.proxyPool.get() as proxy:
2381             # exec
2382             ret = proxy.getJobStatisticsPerSiteResource(timeWindow)
2383         return ret
2384 
2385     # get job statistics per site, source label, and resource type
2386     def get_job_statistics_per_site_label_resource(self, time_window=None):
2387         # get DBproxy
2388         with self.proxyPool.get() as proxy:
2389             # exec
2390             ret = proxy.get_job_statistics_per_site_label_resource(time_window)
2391         return ret
2392 
2393     # set num slots for workload provisioning
2394     def setNumSlotsForWP(self, pandaQueueName, numSlots, gshare, resourceType, validPeriod):
2395         # get DBproxy
2396         with self.proxyPool.get() as proxy:
2397             # exec
2398             ret = proxy.setNumSlotsForWP(pandaQueueName, numSlots, gshare, resourceType, validPeriod)
2399         return ret
2400 
2401     # enable jumbo jobs
2402     def enableJumboJobs(self, jediTaskID, totalJumboJobs, nJumboPerSite):
2403         # get DBproxy
2404         with self.proxyPool.get() as proxy:
2405             # exec
2406             ret = proxy.enableJumboJobs(jediTaskID, totalJumboJobs, nJumboPerSite)
2407         return ret
2408 
2409     # enable event service
2410     def enableEventService(self, jediTaskID):
2411         # get DBproxy
2412         with self.proxyPool.get() as proxy:
2413             # exec
2414             ret = proxy.enableEventService(jediTaskID)
2415         return ret
2416 
2417     # get JEDI file attributes
2418     def getJediFileAttributes(self, PandaID, jediTaskID, datasetID, fileID, attrs):
2419         # get DBproxy
2420         with self.proxyPool.get() as proxy:
2421             # exec
2422             ret = proxy.getJediFileAttributes(PandaID, jediTaskID, datasetID, fileID, attrs)
2423         return ret
2424 
2425     # check if super user
2426     def isSuperUser(self, userName):
2427         # get DBproxy
2428         with self.proxyPool.get() as proxy:
2429             # exec
2430             ret = proxy.isSuperUser(userName)
2431         return ret
2432 
2433     # get workers for a job
2434     def getWorkersForJob(self, PandaID):
2435         # get DBproxy
2436         with self.proxyPool.get() as proxy:
2437             # exec
2438             ret = proxy.getWorkersForJob(PandaID)
2439         return ret
2440 
2441     # get user job metadata
2442     def getUserJobMetadata(self, jediTaskID):
2443         # get DBproxy
2444         with self.proxyPool.get() as proxy:
2445             # exec
2446             ret = proxy.getUserJobMetadata(jediTaskID)
2447         return ret
2448 
2449     # get jumbo job datasets
2450     def getJumboJobDatasets(self, n_days, grace_period):
2451         # get DBproxy
2452         with self.proxyPool.get() as proxy:
2453             # exec
2454             ret = proxy.getJumboJobDatasets(n_days, grace_period)
2455         return ret
2456 
2457     # get global shares status
2458     def getGShareStatus(self):
2459         # get DBproxy
2460         with self.proxyPool.get() as proxy:
2461             # exec
2462             ret = proxy.getGShareStatus()
2463         return ret
2464 
2465     # get output datasets
2466     def getOutputDatasetsJEDI(self, panda_id):
2467         # get DBproxy
2468         with self.proxyPool.get() as proxy:
2469             # exec
2470             ret = proxy.getOutputDatasetsJEDI(panda_id)
2471         return ret
2472 
2473     # update/insert JSON queue information into the scheconfig replica
2474     def upsertQueuesInJSONSchedconfig(self, schedconfig_dump):
2475         # get DBproxy
2476         with self.proxyPool.get() as proxy:
2477             # exec
2478             ret = proxy.upsertQueuesInJSONSchedconfig(schedconfig_dump)
2479         return ret
2480 
2481     # update/insert SW tag information
2482     def loadSWTags(self, sw_tags):
2483         # get DBproxy
2484         with self.proxyPool.get() as proxy:
2485             # exec
2486             ret = proxy.loadSWTags(sw_tags)
2487         return ret
2488 
2489     # generate a harvester command to clean up the workers of a site
2490     def sweepPQ(self, panda_queue_des, status_list_des, ce_list_des, submission_host_list_des):
2491         # get DBproxy
2492         with self.proxyPool.get() as proxy:
2493             # exec
2494             ret = proxy.sweepPQ(panda_queue_des, status_list_des, ce_list_des, submission_host_list_des)
2495         return ret
2496 
2497     # lock process
2498     def lockProcess_PANDA(self, component, pid, time_limit=5, force=False):
2499         with self.proxyPool.get() as proxy:
2500             ret = proxy.lockProcess_PANDA(component, pid, time_limit, force)
2501         return ret
2502 
2503     # unlock process
2504     def unlockProcess_PANDA(self, component, pid):
2505         with self.proxyPool.get() as proxy:
2506             ret = proxy.unlockProcess_PANDA(component, pid)
2507         return ret
2508 
2509     # check process lock
2510     def checkProcessLock_PANDA(self, component, pid, time_limit, check_base=False):
2511         with self.proxyPool.get() as proxy:
2512             ret = proxy.checkProcessLock_PANDA(component, pid, time_limit, check_base)
2513         return ret
2514 
2515     # insert job output report
2516     def insertJobOutputReport(self, panda_id, prod_source_label, job_status, attempt_nr, data):
2517         with self.proxyPool.get() as proxy:
2518             ret = proxy.insertJobOutputReport(panda_id, prod_source_label, job_status, attempt_nr, data)
2519         return ret
2520 
2521     # deleted job output report
2522     def deleteJobOutputReport(self, panda_id, attempt_nr):
2523         with self.proxyPool.get() as proxy:
2524             ret = proxy.deleteJobOutputReport(panda_id, attempt_nr)
2525         return ret
2526 
2527     # update data of job output report
2528     def updateJobOutputReport(self, panda_id, attempt_nr, data):
2529         with self.proxyPool.get() as proxy:
2530             ret = proxy.updateJobOutputReport(panda_id, attempt_nr, data)
2531         return ret
2532 
2533     # get job output report
2534     def getJobOutputReport(self, panda_id, attempt_nr):
2535         with self.proxyPool.get() as proxy:
2536             ret = proxy.getJobOutputReport(panda_id, attempt_nr)
2537         return ret
2538 
2539     # lock job output report
2540     def lockJobOutputReport(self, panda_id, attempt_nr, pid, time_limit, take_over_from=None):
2541         with self.proxyPool.get() as proxy:
2542             ret = proxy.lockJobOutputReport(panda_id, attempt_nr, pid, time_limit, take_over_from)
2543         return ret
2544 
2545     # unlock job output report
2546     def unlockJobOutputReport(self, panda_id, attempt_nr, pid, lock_offset):
2547         with self.proxyPool.get() as proxy:
2548             ret = proxy.unlockJobOutputReport(panda_id, attempt_nr, pid, lock_offset)
2549         return ret
2550 
2551     # list pandaID and attemptNr of job output report
2552     def listJobOutputReport(
2553         self,
2554         only_unlocked=False,
2555         time_limit=5,
2556         limit=999999,
2557         grace_period=3,
2558         labels=None,
2559         anti_labels=None,
2560     ):
2561         with self.proxyPool.get() as proxy:
2562             ret = proxy.listJobOutputReport(only_unlocked, time_limit, limit, grace_period, labels, anti_labels)
2563         return ret
2564 
2565     # update problematic resource info for user
2566     def update_problematic_resource_info(self, user_name, jedi_task_id, resource, problem_type):
2567         with self.proxyPool.get() as proxy:
2568             ret = proxy.update_problematic_resource_info(user_name, jedi_task_id, resource, problem_type)
2569         return ret
2570 
2571     # send command to a job
2572     def send_command_to_job(self, panda_id, com):
2573         with self.proxyPool.get() as proxy:
2574             ret = proxy.send_command_to_job(panda_id, com)
2575         return ret
2576 
2577     # get workers with stale states and update them with pilot information
2578     def get_workers_to_synchronize(self):
2579         with self.proxyPool.get() as proxy:
2580             ret = proxy.get_workers_to_synchronize()
2581         return ret
2582 
2583     # set user secret
2584     def set_user_secret(self, owner, key, value):
2585         with self.proxyPool.get() as proxy:
2586             ret = proxy.set_user_secret(owner, key, value)
2587         return ret
2588 
2589     # get user secrets
2590     def get_user_secrets(self, owner, keys=None, get_json=False):
2591         with self.proxyPool.get() as proxy:
2592             ret = proxy.get_user_secrets(owner, keys, get_json)
2593         return ret
2594 
2595     def configurator_write_sites(self, sites_list):
2596         with self.proxyPool.get() as proxy:
2597             ret = proxy.configurator_write_sites(sites_list)
2598         return ret
2599 
2600     def configurator_write_panda_sites(self, panda_site_list):
2601         with self.proxyPool.get() as proxy:
2602             ret = proxy.configurator_write_panda_sites(panda_site_list)
2603         return ret
2604 
2605     def configurator_write_ddm_endpoints(self, ddm_endpoint_list):
2606         with self.proxyPool.get() as proxy:
2607             ret = proxy.configurator_write_ddm_endpoints(ddm_endpoint_list)
2608         return ret
2609 
2610     def configurator_write_panda_ddm_relations(self, relation_list):
2611         with self.proxyPool.get() as proxy:
2612             ret = proxy.configurator_write_panda_ddm_relations(relation_list)
2613         return ret
2614 
2615     def configurator_read_sites(self):
2616         with self.proxyPool.get() as proxy:
2617             ret = proxy.configurator_read_sites()
2618         return ret
2619 
2620     def configurator_read_panda_sites(self):
2621         with self.proxyPool.get() as proxy:
2622             ret = proxy.configurator_read_panda_sites()
2623         return ret
2624 
2625     def configurator_read_ddm_endpoints(self):
2626         with self.proxyPool.get() as proxy:
2627             ret = proxy.configurator_read_ddm_endpoints()
2628         return ret
2629 
2630     def configurator_read_cric_sites(self):
2631         with self.proxyPool.get() as proxy:
2632             ret = proxy.configurator_read_cric_sites()
2633         return ret
2634 
2635     def configurator_read_cric_panda_sites(self):
2636         with self.proxyPool.get() as proxy:
2637             ret = proxy.configurator_read_cric_panda_sites()
2638         return ret
2639 
2640     def configurator_delete_sites(self, sites_to_delete):
2641         with self.proxyPool.get() as proxy:
2642             ret = proxy.configurator_delete_sites(sites_to_delete)
2643         return ret
2644 
2645     def configurator_delete_panda_sites(self, panda_sites_to_delete):
2646         with self.proxyPool.get() as proxy:
2647             ret = proxy.configurator_delete_panda_sites(panda_sites_to_delete)
2648         return ret
2649 
2650     def configurator_delete_ddm_endpoints(self, ddm_endpoints_to_delete):
2651         with self.proxyPool.get() as proxy:
2652             ret = proxy.configurator_delete_ddm_endpoints(ddm_endpoints_to_delete)
2653         return ret
2654 
2655     def carbon_write_region_emissions(self, emissions):
2656         with self.proxyPool.get() as proxy:
2657             ret = proxy.carbon_write_region_emissions(emissions)
2658         return ret
2659 
2660     def carbon_aggregate_emissions(self):
2661         with self.proxyPool.get() as proxy:
2662             ret = proxy.carbon_aggregate_emissions()
2663         return ret
2664 
2665     def get_files_in_datasets(self, task_id, dataset_types):
2666         with self.proxyPool.get() as proxy:
2667             ret = proxy.get_files_in_datasets(task_id, dataset_types)
2668         return ret
2669 
2670     def get_max_worker_id(self, harvester_id):
2671         with self.proxyPool.get() as proxy:
2672             ret = proxy.get_max_worker_id(harvester_id)
2673         return ret
2674 
2675     def get_events_status(self, ids):
2676         with self.proxyPool.get() as proxy:
2677             ret = proxy.get_events_status(ids)
2678         return ret
2679 
2680     def async_update_datasets(self, panda_id):
2681         with self.proxyPool.get() as proxy:
2682             ret = proxy.async_update_datasets(panda_id)
2683         return ret
2684 
2685     def set_workload_metrics(self, jedi_task_id, panda_id, metrics):
2686         with self.proxyPool.get() as proxy:
2687             ret = proxy.set_workload_metrics(jedi_task_id, panda_id, metrics, True)
2688         return ret
2689 
2690     def get_workload_metrics(self, jedi_task_id, panda_id):
2691         with self.proxyPool.get() as proxy:
2692             ret = proxy.get_workload_metrics(jedi_task_id, panda_id)
2693         return ret
2694 
2695     def get_jobs_metrics_in_task(self, jedi_task_id: int):
2696         with self.proxyPool.get() as proxy:
2697             ret = proxy.get_jobs_metrics_in_task(jedi_task_id)
2698         return ret
2699 
2700     def enable_job_cloning(self, jedi_task_id, mode, multiplicity, num_sites):
2701         with self.proxyPool.get() as proxy:
2702             ret = proxy.enable_job_cloning(jedi_task_id, mode, multiplicity, num_sites)
2703         return ret
2704 
2705     def disable_job_cloning(self, jedi_task_id):
2706         with self.proxyPool.get() as proxy:
2707             ret = proxy.disable_job_cloning(jedi_task_id)
2708         return ret
2709 
2710     # gets statistics on the number of jobs with a specific status for each nucleus at each site
2711     def get_num_jobs_with_status_by_nucleus(self, vo, job_status):
2712         with self.proxyPool.get() as proxy:
2713             return proxy.get_num_jobs_with_status_by_nucleus(vo, job_status)
2714 
2715     # ==== JEDI taskbuffer functions ===========================
2716 
2717     # get JEDI task with jediTaskID
2718     def getTaskWithID_JEDI(self, jediTaskID, fullFlag=False, lockTask=False, pid=None, lockInterval=None, clearError=False):
2719         with self.proxyPool.get() as proxy:
2720             return proxy.getTaskWithID_JEDI(jediTaskID, fullFlag, lockTask, pid, lockInterval, clearError)
2721 
2722     # update input files stage-in done (according to message from iDDS, called by other methods, etc.)
2723     def updateInputFilesStaged_JEDI(self, jeditaskid, scope, filenames_dict, chunk_size=500, by=None, check_scope=True):
2724         with self.proxyPool.get() as proxy:
2725             return proxy.updateInputFilesStaged_JEDI(jeditaskid, scope, filenames_dict, chunk_size, by, check_scope)
2726 
2727     # ==== Data Carousel functions =============================
2728 
2729     # query data carousel request ID by dataset
2730     def get_data_carousel_request_id_by_dataset_JEDI(self, dataset):
2731         with self.proxyPool.get() as proxy:
2732             return proxy.get_data_carousel_request_id_by_dataset_JEDI(dataset)
2733 
2734     # insert data carousel requests
2735     def insert_data_carousel_requests_JEDI(self, task_id, dc_req_specs):
2736         with self.proxyPool.get() as proxy:
2737             return proxy.insert_data_carousel_requests_JEDI(task_id, dc_req_specs)
2738 
2739     # update a data carousel request
2740     def update_data_carousel_request_JEDI(self, dc_req_spec):
2741         with self.proxyPool.get() as proxy:
2742             return proxy.update_data_carousel_request_JEDI(dc_req_spec)
2743 
2744     # insert data carousel relations
2745     def insert_data_carousel_relations_JEDI(self, task_id, request_ids):
2746         with self.proxyPool.get() as proxy:
2747             return proxy.insert_data_carousel_relations_JEDI(task_id, request_ids)
2748 
2749     # get data carousel queued requests and info of their related tasks
2750     def get_data_carousel_queued_requests_JEDI(self):
2751         with self.proxyPool.get() as proxy:
2752             return proxy.get_data_carousel_queued_requests_JEDI()
2753 
2754     # get data carousel requests of tasks by task status
2755     def get_data_carousel_requests_by_task_status_JEDI(self, status_filter_list=None, status_exclusion_list=None):
2756         with self.proxyPool.get() as proxy:
2757             return proxy.get_data_carousel_requests_by_task_status_JEDI(status_filter_list=status_filter_list, status_exclusion_list=status_exclusion_list)
2758 
2759     # get related tasks and their info of a data carousel request
2760     def get_related_tasks_of_data_carousel_request_JEDI(self, request_id, status_filter_list=None, status_exclusion_list=None):
2761         with self.proxyPool.get() as proxy:
2762             return proxy.get_related_tasks_of_data_carousel_request_JEDI(
2763                 request_id, status_filter_list=status_filter_list, status_exclusion_list=status_exclusion_list
2764             )
2765 
2766     # get data carousel staging requests
2767     def get_data_carousel_staging_requests_JEDI(self):
2768         with self.proxyPool.get() as proxy:
2769             return proxy.get_data_carousel_staging_requests_JEDI()
2770 
2771     # delete data carousel requests
2772     def delete_data_carousel_requests_JEDI(self, request_id_list):
2773         with self.proxyPool.get() as proxy:
2774             return proxy.delete_data_carousel_requests_JEDI(request_id_list)
2775 
2776     # clean up data carousel requests
2777     def clean_up_data_carousel_requests_JEDI(self, time_limit_days=30):
2778         with self.proxyPool.get() as proxy:
2779             return proxy.clean_up_data_carousel_requests_JEDI(time_limit_days)
2780 
2781     # cancel a data carousel request
2782     def cancel_data_carousel_request_JEDI(self, request_id):
2783         with self.proxyPool.get() as proxy:
2784             return proxy.cancel_data_carousel_request_JEDI(request_id)
2785 
2786     # retire a data carousel request
2787     def retire_data_carousel_request_JEDI(self, request_id):
2788         with self.proxyPool.get() as proxy:
2789             return proxy.retire_data_carousel_request_JEDI(request_id)
2790 
2791     # resubmit a data carousel request
2792     def resubmit_data_carousel_request_JEDI(self, request_id, exclude_prev_dst=False):
2793         with self.proxyPool.get() as proxy:
2794             return proxy.resubmit_data_carousel_request_JEDI(request_id, exclude_prev_dst)
2795 
2796     # ==== Workflow fucntions ==================================
2797 
2798     def get_workflow(self, workflow_id):
2799         with self.proxyPool.get() as proxy:
2800             return proxy.get_workflow(workflow_id)
2801 
2802     def get_workflow_step(self, step_id):
2803         with self.proxyPool.get() as proxy:
2804             return proxy.get_workflow_step(step_id)
2805 
2806     def get_workflow_data(self, data_id):
2807         with self.proxyPool.get() as proxy:
2808             return proxy.get_workflow_data(data_id)
2809 
2810     def get_workflow_data_by_name(self, name, workflow_id=None):
2811         with self.proxyPool.get() as proxy:
2812             return proxy.get_workflow_data_by_name(name, workflow_id)
2813 
2814     def get_steps_of_workflow(self, workflow_id, status_filter_list=None, status_exclusion_list=None):
2815         with self.proxyPool.get() as proxy:
2816             return proxy.get_steps_of_workflow(workflow_id, status_filter_list, status_exclusion_list)
2817 
2818     def get_data_of_workflow(self, workflow_id, status_filter_list=None, status_exclusion_list=None, type_filter_list=None):
2819         with self.proxyPool.get() as proxy:
2820             return proxy.get_data_of_workflow(workflow_id, status_filter_list, status_exclusion_list, type_filter_list)
2821 
2822     def query_workflows(self, status_filter_list=None, status_exclusion_list=None, check_interval_sec=300):
2823         with self.proxyPool.get() as proxy:
2824             return proxy.query_workflows(status_filter_list, status_exclusion_list, check_interval_sec)
2825 
2826     def lock_workflow(self, workflow_id, locked_by, lock_expiration_sec=120):
2827         with self.proxyPool.get() as proxy:
2828             return proxy.lock_workflow(workflow_id, locked_by, lock_expiration_sec)
2829 
2830     def unlock_workflow(self, workflow_id, locked_by):
2831         with self.proxyPool.get() as proxy:
2832             return proxy.unlock_workflow(workflow_id, locked_by)
2833 
2834     def lock_workflow_step(self, step_id, locked_by, lock_expiration_sec=120):
2835         with self.proxyPool.get() as proxy:
2836             return proxy.lock_workflow_step(step_id, locked_by, lock_expiration_sec)
2837 
2838     def unlock_workflow_step(self, step_id, locked_by):
2839         with self.proxyPool.get() as proxy:
2840             return proxy.unlock_workflow_step(step_id, locked_by)
2841 
2842     def lock_workflow_data(self, data_id, locked_by, lock_expiration_sec=120):
2843         with self.proxyPool.get() as proxy:
2844             return proxy.lock_workflow_data(data_id, locked_by, lock_expiration_sec)
2845 
2846     def unlock_workflow_data(self, data_id, locked_by):
2847         with self.proxyPool.get() as proxy:
2848             return proxy.unlock_workflow_data(data_id, locked_by)
2849 
2850     def insert_workflow(self, workflow_spec):
2851         with self.proxyPool.get() as proxy:
2852             return proxy.insert_workflow(workflow_spec)
2853 
2854     def insert_workflow_step(self, wf_step_spec):
2855         with self.proxyPool.get() as proxy:
2856             return proxy.insert_workflow_step(wf_step_spec)
2857 
2858     def insert_workflow_data(self, wf_data_spec):
2859         with self.proxyPool.get() as proxy:
2860             return proxy.insert_workflow_data(wf_data_spec)
2861 
2862     def update_workflow(self, workflow_spec):
2863         with self.proxyPool.get() as proxy:
2864             return proxy.update_workflow(workflow_spec)
2865 
2866     def update_workflow_step(self, wf_step_spec):
2867         with self.proxyPool.get() as proxy:
2868             return proxy.update_workflow_step(wf_step_spec)
2869 
2870     def update_workflow_data(self, wf_data_spec):
2871         with self.proxyPool.get() as proxy:
2872             return proxy.update_workflow_data(wf_data_spec)
2873 
2874     def upsert_workflow_entities(self, workflow_id, actions_dict=None, workflow_spec=None, step_specs=None, data_specs=None):
2875         with self.proxyPool.get() as proxy:
2876             return proxy.upsert_workflow_entities(workflow_id, actions_dict, workflow_spec, step_specs, data_specs)
2877 
2878     def get_distinct_resource_types_per_site(self, jedi_task_id, threshold=20.0):
2879         with self.proxyPool.get() as proxy:
2880             ret = proxy.get_distinct_resource_types_per_site(jedi_task_id, threshold)
2881         return ret
2882 
2883     # ==========================================================
2884 
2885 
2886 # Singleton
2887 taskBuffer = TaskBuffer()