File indexing completed on 2026-04-10 08:39:06
0001 import datetime
0002 import json
0003 import re
0004 import shlex
0005 import sys
0006 import time
0007 import traceback
0008 from contextlib import contextmanager
0009 from threading import Lock
0010
0011 from pandacommon.pandalogger.LogWrapper import LogWrapper
0012 from pandacommon.pandalogger.PandaLogger import PandaLogger
0013 from pandacommon.pandautils.PandaUtils import naive_utcnow
0014
0015 from pandaserver.brokerage.SiteMapper import SiteMapper
0016 from pandaserver.config import panda_config
0017 from pandaserver.dataservice.closer import Closer
0018 from pandaserver.dataservice.setupper import Setupper
0019 from pandaserver.srvcore import CoreUtils
0020 from pandaserver.taskbuffer import ErrorCode, EventServiceUtils, JobUtils, ProcessGroups
0021 from pandaserver.taskbuffer.DBProxyPool import DBProxyPool
0022
0023 _logger = PandaLogger().getLogger("TaskBuffer")
0024
0025
0026 class TaskBuffer:
0027 """
0028 task queue
0029
0030 """
0031
0032
0033 def __init__(self):
0034 self.proxyPool = None
0035 self.lock = Lock()
0036 self.nDBConnection = None
0037
0038
0039 self.start_time = time.time()
0040
0041
0042 self.site_mapper = None
0043
0044 self.last_update_site_mapper = None
0045
0046 def __repr__(self):
0047 return "TaskBuffer"
0048
0049
0050 def init(self, dbname, dbpass, nDBConnection=10, useTimeout=False, requester=None):
0051
0052 self.lock.acquire()
0053 self.nDBConnection = nDBConnection
0054
0055
0056 if self.proxyPool is None:
0057 _logger.info(f"creating DBProxyPool with n_connections={nDBConnection} on behalf of {requester}")
0058 self.start_time = time.time()
0059 self.proxyPool = DBProxyPool(dbname, dbpass, nDBConnection, useTimeout)
0060
0061
0062 self.lock.release()
0063
0064
0065 def cleanup(self, requester=None):
0066 if self.proxyPool:
0067 try:
0068 pool_duration = time.time() - self.start_time
0069 except TypeError:
0070 pool_duration = -1
0071
0072 _logger.info(f"destroying DBProxyPool after n_seconds={pool_duration} on behalf of {requester}")
0073 self.proxyPool.cleanup()
0074
0075
0076
0077 @contextmanager
0078 def transaction(self, name=None, tmp_log=None):
0079 with self.proxyPool.get() as proxy:
0080 with proxy.transaction(name, tmp_log) as txn:
0081 if txn is None:
0082 raise RuntimeError(f"Failed to start transaction {name}")
0083
0084 yield txn
0085
0086
0087 def get_num_connections(self):
0088 return self.nDBConnection
0089
0090
0091 def get_site_mapper(self):
0092 time_now = naive_utcnow()
0093 if self.last_update_site_mapper is None or datetime.datetime.now(datetime.timezone.utc).replace(
0094 tzinfo=None
0095 ) - self.last_update_site_mapper > datetime.timedelta(minutes=10):
0096 self.site_mapper = SiteMapper(self)
0097 self.last_update_site_mapper = time_now
0098 return self.site_mapper
0099
0100
0101 def checkProdRole(self, fqans):
0102 for fqan in fqans:
0103
0104 match = re.search("/([^/]+)/Role=production", fqan)
0105 if match is not None:
0106 return True, match.group(1)
0107 return False, None
0108
0109
0110 def getPrioParameters(self, jobs, user, fqans, userDefinedWG, validWorkingGroup):
0111 priorityOffset = 0
0112 serNum = 0
0113 weight = None
0114 prio_reduction = True
0115
0116 boost_dict = {}
0117
0118 with self.proxyPool.get() as proxy:
0119
0120 withProdRole, workingGroup = self.checkProdRole(fqans)
0121 if withProdRole and jobs != []:
0122
0123 for tmpFile in jobs[-1].Files:
0124 if tmpFile.type in ["output", "log"] and not (tmpFile.lfn.startswith("group") or tmpFile.lfn.startswith("panda.um.group")):
0125
0126 withProdRole, workingGroup = False, None
0127 break
0128
0129 if jobs != []:
0130 if jobs[0].processingType in [
0131 "hammercloud",
0132 "gangarobot",
0133 "hammercloud-fax",
0134 ] or jobs[
0135 0
0136 ].processingType.startswith("gangarobot-"):
0137 serNum = 0
0138 weight = 0.0
0139 elif jobs[0].processingType in ["gangarobot", "gangarobot-pft"]:
0140 priorityOffset = 3000
0141 elif jobs[0].processingType in ["hammercloud-fax"]:
0142 priorityOffset = 1001
0143 else:
0144
0145 boost_dict = proxy.get_dict_to_boost_job_prio(jobs[-1].VO)
0146 if boost_dict:
0147 prodUserName = CoreUtils.clean_user_id(user)
0148
0149 if userDefinedWG and validWorkingGroup:
0150 if "group" in boost_dict and jobs[-1].workingGroup in boost_dict["group"]:
0151 priorityOffset = boost_dict["group"][jobs[-1].workingGroup]
0152 weight = 0.0
0153 prio_reduction = False
0154 else:
0155 if "user" in boost_dict and prodUserName in boost_dict["user"]:
0156 priorityOffset = boost_dict["user"][prodUserName]
0157 weight = 0.0
0158 prio_reduction = False
0159
0160 if weight is None:
0161 weight = proxy.checkQuota(user)
0162
0163 if jobs == []:
0164 serNum = proxy.getNumberJobsUser(user, workingGroup=userDefinedWG)
0165 elif userDefinedWG and validWorkingGroup:
0166
0167 isSU, isGU = proxy.isSuperUser(jobs[0].workingGroup)
0168 if not isSU:
0169 serNum = proxy.getNumberJobsUser(user, workingGroup=jobs[0].workingGroup)
0170 else:
0171
0172 serNum = 0
0173 weight = 0.0
0174 priorityOffset = 2000
0175 else:
0176 serNum = proxy.getNumberJobsUser(user, workingGroup=None)
0177
0178 return (
0179 withProdRole,
0180 workingGroup,
0181 priorityOffset,
0182 serNum,
0183 weight,
0184 prio_reduction,
0185 )
0186
0187
0188 def storeJobs(
0189 self,
0190 jobs,
0191 user,
0192 joinThr=False,
0193 fqans=[],
0194 hostname="",
0195 checkSpecialHandling=True,
0196 toPending=False,
0197 oldPandaIDs=None,
0198 relationType=None,
0199 userVO="atlas",
0200 esJobsetMap=None,
0201 getEsJobsetMap=False,
0202 unprocessedMap=None,
0203 bulk_job_insert=False,
0204 trust_user=False,
0205 ):
0206 try:
0207 tmpLog = LogWrapper(_logger, f"storeJobs <{CoreUtils.clean_user_id(user)} nJobs={len(jobs)}>")
0208 tmpLog.debug(f"start toPending={toPending}")
0209
0210 weight = 0.0
0211 userJobID = -1
0212 userJobsetID = -1
0213 userStatus = True
0214 priorityOffset = 0
0215 userCountry = None
0216 useExpress = False
0217 nExpressJobs = 0
0218 useDebugMode = False
0219 siteMapper = self.get_site_mapper()
0220
0221
0222 if not trust_user and len(jobs) > 0:
0223
0224 with self.proxyPool.get() as proxy:
0225
0226 tmpStatus = proxy.checkBanUser(user, jobs[0].prodSourceLabel)
0227
0228 if not tmpStatus:
0229 tmpLog.debug(f"end 1 since DN {user} is blocked")
0230 if getEsJobsetMap:
0231 return [], None, unprocessedMap
0232 return []
0233
0234 tmpLog.debug(f"checked ban user")
0235
0236 if (
0237 len(jobs) > 0
0238 and (jobs[0].prodSourceLabel in JobUtils.analy_sources + JobUtils.list_ptest_prod_sources)
0239 and (not jobs[0].processingType in ["merge", "unmerge"])
0240 ):
0241
0242 with self.proxyPool.get() as proxy:
0243
0244 userJobID, userJobsetID, userStatus = proxy.getUserParameter(user, jobs[0].jobDefinitionID, jobs[0].jobsetID)
0245
0246
0247 if "express" in jobs[0].specialHandling:
0248 expressQuota = proxy.getExpressJobs(user)
0249 if expressQuota is not None and expressQuota["status"] and expressQuota["quota"] > 0:
0250 nExpressJobs = expressQuota["quota"]
0251 if nExpressJobs > 0:
0252 useExpress = True
0253
0254 if jobs[0].is_debug_mode() or jobs[-1].is_debug_mode():
0255 useDebugMode = True
0256
0257
0258 for tmpFQAN in fqans:
0259 match = re.search("^/atlas/([^/]+)/", tmpFQAN)
0260 if match is not None:
0261 tmpCountry = match.group(1)
0262
0263 if len(tmpCountry) == 2:
0264 userCountry = tmpCountry
0265 break
0266
0267 if tmpCountry in ["usatlas"]:
0268 userCountry = "us"
0269 break
0270 tmpLog.debug(f"set user job parameters")
0271
0272
0273 if not userStatus:
0274 tmpLog.debug(f"end 2 since {user} DN is blocked")
0275 if getEsJobsetMap:
0276 return ([], None, unprocessedMap)
0277 return []
0278
0279 for tmpFQAN in fqans:
0280 match = re.search("^/([^/]+)/", tmpFQAN)
0281 if match is not None:
0282 userVO = match.group(1)
0283 break
0284
0285
0286 serNum = 0
0287 userDefinedWG = False
0288 validWorkingGroup = False
0289 usingBuild = False
0290 withProdRole = False
0291 workingGroup = None
0292 prio_reduction = True
0293 if len(jobs) > 0 and (jobs[0].prodSourceLabel in JobUtils.analy_sources) and (not jobs[0].processingType in ["merge", "unmerge"]):
0294
0295 userWorkingGroupList = []
0296 for tmpFQAN in fqans:
0297 match = re.search("/([^/]+)/Role=production", tmpFQAN)
0298 if match is not None:
0299 userWorkingGroupList.append(match.group(1))
0300
0301 if jobs[0].workingGroup not in ["", None, "NULL"]:
0302 userDefinedWG = True
0303
0304 if jobs[0].workingGroup in userWorkingGroupList:
0305 validWorkingGroup = True
0306
0307 if jobs[0].prodSourceLabel == "panda":
0308 usingBuild = True
0309
0310 (
0311 withProdRole,
0312 workingGroup,
0313 priorityOffset,
0314 serNum,
0315 weight,
0316 prio_reduction,
0317 ) = self.getPrioParameters(jobs, user, fqans, userDefinedWG, validWorkingGroup)
0318 tmpLog.debug(f"workingGroup={jobs[0].workingGroup} serNum={serNum} weight={weight} pOffset={priorityOffset} reduction={prio_reduction}")
0319 tmpLog.debug(f"got prio parameters")
0320
0321 with self.proxyPool.get() as proxy:
0322 tmpLog.debug(f"got proxy")
0323
0324 totalNumFiles = 0
0325 for job in jobs:
0326 totalNumFiles += len(job.Files)
0327
0328 new_panda_ids = proxy.bulk_fetch_panda_ids(len(jobs))
0329 tmpLog.debug(f"got PandaIDs")
0330
0331 fileIDPool = []
0332 if totalNumFiles > 0:
0333 fileIDPool = sorted(proxy.bulkFetchFileIDsPanda(totalNumFiles))
0334
0335 ret = []
0336 newJobs = []
0337 if esJobsetMap is None:
0338 esJobsetMap = {}
0339 try:
0340 tmpLog.debug(f"jediTaskID={jobs[0].jediTaskID} len(esJobsetMap)={len(esJobsetMap)} nJobs={len(jobs)}")
0341 except Exception:
0342 pass
0343 job_ret_list = []
0344 params_for_bulk_insert = []
0345 special_handling_list = []
0346 num_original_event_service_jobs = 0
0347 for idxJob, job in enumerate(jobs):
0348
0349 job.PandaID = new_panda_ids[idxJob]
0350
0351 if (
0352 userJobID != -1
0353 and job.prodSourceLabel in JobUtils.analy_sources
0354 and (job.attemptNr in [0, "0", "NULL"] or (job.jobExecutionID not in [0, "0", "NULL"]) or job.lockedby == "jedi")
0355 and (not jobs[0].processingType in ["merge", "unmerge"])
0356 ):
0357 job.jobDefinitionID = userJobID
0358
0359 if job.prodSourceLabel in JobUtils.analy_sources + JobUtils.list_ptest_prod_sources:
0360 job.jobsetID = userJobsetID
0361
0362 if job.computingSite != "NULL" and job.relocationFlag != 2:
0363 job.relocationFlag = 1
0364
0365 if job.jobParameters in ["", None, "NULL"]:
0366 job.jobParameters = " "
0367
0368 if job.prodSourceLabel in JobUtils.analy_sources:
0369 if job.lockedby != "jedi":
0370 job.countryGroup = userCountry
0371
0372 if not validWorkingGroup:
0373 if withProdRole:
0374
0375 job.workingGroup = workingGroup
0376 else:
0377 if userDefinedWG:
0378
0379 job.workingGroup = None
0380
0381 if usingBuild:
0382 tmpNumBuild = 1
0383 tmpNunRun = len(jobs) - 1
0384 else:
0385 tmpNumBuild = 0
0386 tmpNunRun = len(jobs)
0387
0388 job.taskID = tmpNumBuild + (tmpNunRun << 1)
0389
0390 if hostname != "":
0391 job.creationHost = hostname
0392
0393
0394 if not job.job_label or job.job_label not in (
0395 JobUtils.PROD_PS,
0396 JobUtils.ANALY_PS,
0397 ):
0398 tmpSiteSpec = siteMapper.getSite(job.computingSite)
0399 queue_type = tmpSiteSpec.type
0400 if queue_type == "analysis":
0401 job.job_label = JobUtils.ANALY_PS
0402 elif queue_type == "production":
0403 job.job_label = JobUtils.PROD_PS
0404 elif queue_type == "unified":
0405 if job.prodSourceLabel in JobUtils.analy_sources:
0406 job.job_label = JobUtils.ANALY_PS
0407 else:
0408
0409 job.job_label = JobUtils.PROD_PS
0410 else:
0411 job.job_label = JobUtils.PROD_PS
0412
0413
0414 origSH = job.specialHandling
0415 (
0416 eventServiceInfo,
0417 job.specialHandling,
0418 esIndex,
0419 ) = EventServiceUtils.decodeFileInfo(job.specialHandling)
0420 origEsJob = False
0421 if eventServiceInfo != {}:
0422
0423 if esIndex in esJobsetMap:
0424 job.jobsetID = esJobsetMap[esIndex]
0425 else:
0426 origEsJob = True
0427 num_original_event_service_jobs += 1
0428 esJobsetMap[esIndex] = None
0429
0430 job.sortFiles()
0431 if oldPandaIDs is not None and len(oldPandaIDs) > idxJob:
0432 jobOldPandaIDs = oldPandaIDs[idxJob]
0433 else:
0434 jobOldPandaIDs = None
0435
0436 isOK = True
0437 if EventServiceUtils.isJumboJob(job):
0438 hasReadyEvents = proxy.hasReadyEvents(job.jediTaskID)
0439 if hasReadyEvents is False:
0440 isOK = False
0441
0442 if not isOK:
0443
0444 job.PandaID = None
0445 if not bulk_job_insert:
0446 tmp_ret_i = proxy.insertNewJob(
0447 job,
0448 user,
0449 serNum,
0450 weight,
0451 priorityOffset,
0452 userVO,
0453 toPending,
0454 origEsJob,
0455 eventServiceInfo,
0456 oldPandaIDs=jobOldPandaIDs,
0457 relationType=relationType,
0458 fileIDPool=fileIDPool,
0459 origSpecialHandling=origSH,
0460 unprocessedMap=unprocessedMap,
0461 prio_reduction=prio_reduction,
0462 )
0463 if unprocessedMap is not None:
0464 tmp_ret_i, unprocessedMap = tmp_ret_i
0465 else:
0466
0467 params_for_bulk_insert.append(
0468 [
0469 [job, user, serNum, weight, priorityOffset, userVO, toPending],
0470 {
0471 "origEsJob": origEsJob,
0472 "eventServiceInfo": eventServiceInfo,
0473 "oldPandaIDs": jobOldPandaIDs,
0474 "relationType": relationType,
0475 "fileIDPool": fileIDPool,
0476 "origSpecialHandling": origSH,
0477 "unprocessedMap": unprocessedMap,
0478 "prio_reduction": prio_reduction,
0479 },
0480 {"esIndex": esIndex},
0481 ]
0482 )
0483 special_handling_list.append(origSH)
0484 tmp_ret_i = True
0485 if tmp_ret_i and origEsJob and not bulk_job_insert:
0486
0487 esJobsetMap[esIndex] = job.jobsetID
0488 job_ret_list.append([job, tmp_ret_i])
0489 serNum += 1
0490 try:
0491 fileIDPool = fileIDPool[len(job.Files) :]
0492 except Exception:
0493 fileIDPool = []
0494
0495 if bulk_job_insert:
0496
0497 if num_original_event_service_jobs > 0:
0498 new_jobset_ids = proxy.bulk_fetch_panda_ids(num_original_event_service_jobs)
0499 else:
0500 new_jobset_ids = []
0501 tmp_ret, job_ret_list, es_jobset_map = proxy.bulk_insert_new_jobs(
0502 jobs[0].jediTaskID, params_for_bulk_insert, new_jobset_ids, special_handling_list
0503 )
0504 if not tmp_ret:
0505 raise RuntimeError("bulk job insert failed")
0506
0507 esJobsetMap.update(es_jobset_map)
0508
0509
0510 for job, tmp_ret_i in job_ret_list:
0511 if not tmp_ret_i:
0512
0513 job.PandaID = None
0514 else:
0515
0516 newJobs.append(job)
0517 if job.prodSourceLabel in JobUtils.analy_sources + JobUtils.list_ptest_prod_sources:
0518 ret.append((job.PandaID, job.jobDefinitionID, {"jobsetID": job.jobsetID}))
0519 else:
0520 ret.append((job.PandaID, job.jobDefinitionID, job.jobName))
0521
0522 if not toPending:
0523 if joinThr:
0524 thr = Setupper(
0525 self,
0526 newJobs,
0527 )
0528 thr.start()
0529 thr.join()
0530 else:
0531
0532 Setupper(
0533 self,
0534 newJobs,
0535 ).start()
0536
0537 tmpLog.debug("end successfully")
0538 if getEsJobsetMap:
0539 return (ret, esJobsetMap, unprocessedMap)
0540 return ret
0541 except Exception as e:
0542 tmpLog.error(f"{str(e)} {traceback.format_exc()}")
0543 errStr = "ERROR: ServerError with storeJobs"
0544 if getEsJobsetMap:
0545 return (errStr, None, unprocessedMap)
0546 return errStr
0547
0548
0549 def lockJobsForReassign(
0550 self,
0551 tableName,
0552 timeLimit,
0553 statList,
0554 labels,
0555 processTypes,
0556 sites,
0557 clouds,
0558 useJEDI=False,
0559 onlyReassignable=False,
0560 useStateChangeTime=False,
0561 getEventService=False,
0562 ):
0563
0564 with self.proxyPool.get() as proxy:
0565
0566 res = proxy.lockJobsForReassign(
0567 tableName,
0568 timeLimit,
0569 statList,
0570 labels,
0571 processTypes,
0572 sites,
0573 clouds,
0574 useJEDI,
0575 onlyReassignable,
0576 useStateChangeTime,
0577 getEventService,
0578 )
0579 return res
0580
0581
0582 def getConfigValue(self, component, key, app="pandaserver", vo=None, default=None):
0583
0584 with self.proxyPool.get() as proxy:
0585
0586 res = proxy.getConfigValue(component, key, app, vo)
0587 if res is None and default is not None:
0588 res = default
0589 return res
0590
0591
0592 def lockJobsForFinisher(self, timeNow, rownum, highPrio):
0593
0594 with self.proxyPool.get() as proxy:
0595
0596 res = proxy.lockJobsForFinisher(timeNow, rownum, highPrio)
0597 return res
0598
0599
0600 def lockJobsForActivator(self, timeLimit, rownum, prio):
0601
0602 with self.proxyPool.get() as proxy:
0603
0604 res = proxy.lockJobsForActivator(timeLimit, rownum, prio)
0605 return res
0606
0607
0608 def updateJobs(
0609 self,
0610 jobs,
0611 inJobsDefined,
0612 oldJobStatusList=None,
0613 extraInfo=None,
0614 async_dataset_update=False,
0615 ):
0616
0617 with self.proxyPool.get() as proxy:
0618
0619 returns = []
0620 ddmIDs = []
0621 ddmAttempt = 0
0622 newMover = None
0623 for idxJob, job in enumerate(jobs):
0624
0625 tmpddmIDs = []
0626 if oldJobStatusList is not None and idxJob < len(oldJobStatusList):
0627 oldJobStatus = oldJobStatusList[idxJob]
0628 else:
0629 oldJobStatus = None
0630
0631 if EventServiceUtils.isJumboJob(job):
0632 if job.jobStatus in ["defined", "assigned", "activated"]:
0633 pass
0634 else:
0635
0636 hasDone = proxy.hasDoneEvents(job.jediTaskID, job.PandaID, job)
0637 if hasDone:
0638 job.jobStatus = "finished"
0639 else:
0640 if job.pilotErrorCode in [1144, "1144"]:
0641 job.jobStatus = "cancelled"
0642 else:
0643 job.jobStatus = "failed"
0644 if job.taskBufferErrorDiag in ["", "NULL", None]:
0645 job.taskBufferErrorDiag = f"set {job.jobStatus} since no successful events"
0646 job.taskBufferErrorCode = ErrorCode.EC_EventServiceNoEvent
0647 if job.jobStatus in ["finished", "failed", "cancelled"]:
0648 if async_dataset_update and job.jediTaskID not in [None, "NULL"]:
0649 async_params = {
0650 "exec_order": 0,
0651 "PandaID": job.PandaID,
0652 "jediTaskID": job.jediTaskID,
0653 }
0654 else:
0655 async_params = None
0656 ret, tmpddmIDs, ddmAttempt, newMover = proxy.archiveJob(job, inJobsDefined, extraInfo=extraInfo, async_params=async_params)
0657 if async_params is not None and ret:
0658 proxy.async_update_datasets(job.PandaID)
0659 else:
0660 ret = proxy.updateJob(job, inJobsDefined, oldJobStatus=oldJobStatus, extraInfo=extraInfo)
0661 returns.append(ret)
0662
0663 if ret:
0664 ddmIDs += tmpddmIDs
0665
0666 if newMover is not None:
0667 self.storeJobs([newMover], None, joinThr=True)
0668
0669 if ddmIDs != []:
0670 self.reassignJobs(ddmIDs, ddmAttempt, joinThr=True)
0671
0672 return returns
0673
0674
0675 def updateJobStatus(self, jobID, jobStatus, param, updateStateChange=False, attemptNr=None):
0676
0677 with self.proxyPool.get() as proxy:
0678
0679 ret, post_action = proxy.updateJobStatus(jobID, jobStatus, param, updateStateChange, attemptNr)
0680
0681 if post_action:
0682
0683 if post_action["action"] == "get_event":
0684 event_ret = proxy.getEventRanges(post_action["pandaID"], post_action["jobsetID"], post_action["jediTaskID"], 1, True, False, None)
0685 if not event_ret:
0686 proxy.killJob(post_action["pandaID"], "job cloning", "", True)
0687 ret = "tobekilled"
0688
0689 if isinstance(ret, str) and "debug" in ret:
0690 tmpS, secrets = proxy.get_user_secrets(panda_config.pilot_secrets)
0691 if tmpS and secrets:
0692 ret = {"command": ret, "secrets": secrets}
0693 return ret
0694
0695
0696 def updateWorkerPilotStatus(self, workerID, harvesterID, status, node_id):
0697
0698 with self.proxyPool.get() as proxy:
0699
0700 ret = proxy.updateWorkerPilotStatus(workerID, harvesterID, status, node_id)
0701 return ret
0702
0703 def update_worker_node(
0704 self,
0705 site,
0706 panda_queue,
0707 host_name,
0708 cpu_model,
0709 cpu_model_normalized,
0710 n_logical_cpus,
0711 n_sockets,
0712 cores_per_socket,
0713 threads_per_core,
0714 cpu_architecture,
0715 cpu_architecture_level,
0716 clock_speed,
0717 total_memory,
0718 total_local_disk,
0719 ):
0720
0721 with self.proxyPool.get() as proxy:
0722
0723 ret = proxy.update_worker_node(
0724 site,
0725 panda_queue,
0726 host_name,
0727 cpu_model,
0728 cpu_model_normalized,
0729 n_logical_cpus,
0730 n_sockets,
0731 cores_per_socket,
0732 threads_per_core,
0733 cpu_architecture,
0734 cpu_architecture_level,
0735 clock_speed,
0736 total_memory,
0737 total_local_disk,
0738 )
0739 return ret
0740
0741 def update_worker_node_gpu(
0742 self,
0743 site: str,
0744 host_name: str,
0745 vendor: str,
0746 model: str,
0747 count: int,
0748 vram: int,
0749 architecture: str,
0750 framework: str,
0751 framework_version: str,
0752 driver_version: str,
0753 ):
0754
0755 with self.proxyPool.get() as proxy:
0756
0757 ret = proxy.update_worker_node_gpu(
0758 site,
0759 host_name,
0760 vendor,
0761 model,
0762 count,
0763 vram,
0764 architecture,
0765 framework,
0766 framework_version,
0767 driver_version,
0768 )
0769 return ret
0770
0771 def get_architecture_level_map(self):
0772
0773 with self.proxyPool.get() as proxy:
0774 ret = proxy.get_architecture_level_map()
0775 return ret
0776
0777
0778 def finalizePendingJobs(self, prodUserName, jobDefinitionID, waitLock=False):
0779
0780 with self.proxyPool.get() as proxy:
0781
0782 ret = proxy.finalizePendingJobs(prodUserName, jobDefinitionID, waitLock)
0783 return ret
0784
0785
0786 def retryJob(
0787 self,
0788 jobID,
0789 param,
0790 failedInActive=False,
0791 changeJobInMem=False,
0792 inMemJob=None,
0793 getNewPandaID=False,
0794 attemptNr=None,
0795 recoverableEsMerge=False,
0796 ):
0797
0798 with self.proxyPool.get() as proxy:
0799
0800 ret = proxy.retryJob(
0801 jobID,
0802 param,
0803 failedInActive,
0804 changeJobInMem,
0805 inMemJob,
0806 getNewPandaID,
0807 attemptNr,
0808 recoverableEsMerge,
0809 )
0810 return ret
0811
0812
0813 def activateJobs(self, jobs):
0814
0815 with self.proxyPool.get() as proxy:
0816
0817 returns = []
0818 for job in jobs:
0819
0820 ret = proxy.activateJob(job)
0821 returns.append(ret)
0822 return returns
0823
0824
0825 def keepJobs(self, jobs):
0826
0827 with self.proxyPool.get() as proxy:
0828
0829 returns = []
0830 for job in jobs:
0831
0832 ret = proxy.keepJob(job)
0833 returns.append(ret)
0834 return returns
0835
0836
0837 def archiveJobs(self, jobs, inJobsDefined, fromJobsWaiting=False):
0838
0839 with self.proxyPool.get() as proxy:
0840
0841 returns = []
0842 for job in jobs:
0843
0844 ret = proxy.archiveJob(job, inJobsDefined, fromJobsWaiting=fromJobsWaiting)
0845 returns.append(ret[0])
0846 return returns
0847
0848
0849 def setDebugMode(self, dn, pandaID, prodManager, modeOn, workingGroup):
0850
0851 with self.proxyPool.get() as proxy:
0852
0853 hitLimit = False
0854 if modeOn is True:
0855 if prodManager:
0856 limitNum = None
0857 elif workingGroup is not None:
0858 jobList = proxy.getActiveDebugJobs(workingGroup=workingGroup)
0859 limitNum = ProcessGroups.maxDebugWgJobs
0860 else:
0861 jobList = proxy.getActiveDebugJobs(dn=dn)
0862 limitNum = ProcessGroups.maxDebugJobs
0863 if limitNum and len(jobList) >= limitNum:
0864
0865 retStr = "You already hit the limit on the maximum number of debug subjobs "
0866 retStr += f"({limitNum} jobs). "
0867 retStr += "Please set the debug mode off for one of the following PandaIDs : "
0868 for tmpID in jobList:
0869 retStr += f"{tmpID},"
0870 retStr = retStr[:-1]
0871 hitLimit = True
0872 if not hitLimit:
0873
0874 retStr = proxy.setDebugMode(dn, pandaID, prodManager, modeOn, workingGroup)
0875 return retStr
0876
0877
0878 def getJobs(
0879 self,
0880 nJobs,
0881 siteName,
0882 prodSourceLabel,
0883 mem,
0884 diskSpace,
0885 node,
0886 timeout,
0887 computingElement,
0888 prodUserID,
0889 taskID,
0890 background,
0891 resourceType,
0892 harvester_id,
0893 worker_id,
0894 schedulerID,
0895 jobType,
0896 is_gu,
0897 via_topic,
0898 remaining_time,
0899 ):
0900
0901 with self.proxyPool.get() as proxy:
0902
0903 t_before = time.time()
0904 jobs, nSent = proxy.getJobs(
0905 nJobs,
0906 siteName,
0907 prodSourceLabel,
0908 mem,
0909 diskSpace,
0910 node,
0911 timeout,
0912 computingElement,
0913 prodUserID,
0914 taskID,
0915 background,
0916 resourceType,
0917 harvester_id,
0918 worker_id,
0919 schedulerID,
0920 jobType,
0921 is_gu,
0922 via_topic,
0923 remaining_time,
0924 )
0925 t_after = time.time()
0926 t_total = t_after - t_before
0927 _logger.debug(f"getJobs : took {t_total}s for {siteName} nJobs={nJobs} prodSourceLabel={prodSourceLabel}")
0928
0929
0930 secrets_map = {}
0931 for job in jobs:
0932 if job.use_secrets() and job.prodUserName not in secrets_map:
0933
0934 with self.proxyPool.get() as proxy:
0935 tmp_status, secret = proxy.get_user_secrets(job.prodUserName)
0936 if not tmp_status:
0937 secret = None
0938 secrets_map[job.prodUserName] = secret
0939 if job.is_debug_mode():
0940 if panda_config.pilot_secrets not in secrets_map:
0941
0942 with self.proxyPool.get() as proxy:
0943 tmp_status, secret = proxy.get_user_secrets(panda_config.pilot_secrets)
0944 if not tmp_status:
0945 secret = None
0946 secrets_map[panda_config.pilot_secrets] = secret
0947
0948 return jobs + [nSent, {}, secrets_map]
0949
0950
0951 def getJobStatus(
0952 self,
0953 jobIDs,
0954 fromDefined=True,
0955 fromActive=True,
0956 fromArchived=True,
0957 fromWaiting=True,
0958 ):
0959
0960 with self.proxyPool.get() as proxy:
0961 retStatus = []
0962
0963 for jobID in jobIDs:
0964 res = proxy.peekJob(jobID, fromDefined, fromActive, fromArchived, fromWaiting)
0965 if res:
0966 retStatus.append(res.jobStatus)
0967 else:
0968 retStatus.append(None)
0969 return retStatus
0970
0971
0972 def peekJobs(
0973 self,
0974 jobIDs,
0975 fromDefined=True,
0976 fromActive=True,
0977 fromArchived=True,
0978 fromWaiting=True,
0979 forAnal=False,
0980 use_json=False,
0981 ):
0982
0983 with self.proxyPool.get() as proxy:
0984 retJobs = []
0985
0986 for jobID in jobIDs:
0987 res = proxy.peekJob(jobID, fromDefined, fromActive, fromArchived, fromWaiting, forAnal)
0988 if res:
0989 if use_json:
0990 retJobs.append(res.to_dict())
0991 else:
0992 retJobs.append(res)
0993 else:
0994 retJobs.append(None)
0995 return retJobs
0996
0997
0998 def getPandaIDsWithTaskID(self, jediTaskID: int, scout_only: bool = False, unsuccessful_only: bool = False) -> list[int]:
0999 """Get PanDA job IDs associated with a JEDI task.
1000
1001 Args:
1002 jediTaskID: JEDI task ID.
1003 scout_only: When True, return only scout job IDs.
1004 unsuccessful_only: When True, return only job IDs with status failed, cancelled, or closed.
1005
1006 Returns:
1007 list[int]: PanDA job IDs for the task.
1008 """
1009
1010 with self.proxyPool.get() as proxy:
1011
1012 retJobs = proxy.getPandaIDsWithTaskID(jediTaskID, scout_only=scout_only, unsuccessful_only=unsuccessful_only)
1013 return retJobs
1014
1015
1016 def getFullJobStatus(self, jobIDs, fromDefined=True, fromActive=True, fromArchived=True, fromWaiting=True, forAnal=True, days=30, use_json=False):
1017 retJobMap = {}
1018
1019
1020 for jobID in jobIDs:
1021
1022 with self.proxyPool.get() as proxy:
1023
1024 res = proxy.peekJob(jobID, fromDefined, fromActive, fromArchived, fromWaiting, forAnal)
1025 retJobMap[jobID] = res
1026
1027
1028 for jobID in jobIDs:
1029 if retJobMap[jobID] is None:
1030
1031 with self.proxyPool.get() as proxy:
1032
1033 res = proxy.peekJobLog(jobID, days)
1034 retJobMap[jobID] = res
1035
1036
1037 retJobs = []
1038 for jobID in jobIDs:
1039 if use_json:
1040 if retJobMap[jobID] is None:
1041 retJobs.append(None)
1042 else:
1043 retJobs.append(retJobMap[jobID].to_dict())
1044 else:
1045 retJobs.append(retJobMap[jobID])
1046
1047 return retJobs
1048
1049
1050 def getScriptOfflineRunning(self, pandaID, days=None):
1051 try:
1052
1053 tmpJobs = self.getFullJobStatus([pandaID], days=days)
1054 if tmpJobs == [] or tmpJobs[0] is None:
1055 errStr = f"ERROR: Cannot get PandaID={pandaID} in DB "
1056 if days is None:
1057 errStr += "for the last 30 days. You may add &days=N to the URL"
1058 else:
1059 errStr += f"for the last {days} days. You may change &days=N in the URL"
1060 return errStr
1061 tmpJob = tmpJobs[0]
1062
1063 isUser = False
1064 for trf in [
1065 "runAthena",
1066 "runGen",
1067 "runcontainer",
1068 "runMerge",
1069 "buildJob",
1070 "buildGen",
1071 ]:
1072 if trf in tmpJob.transformation:
1073 isUser = True
1074 break
1075
1076 if tmpJob.prodSourceLabel == "user":
1077 isUser = True
1078 if isUser:
1079 tmpAtls = [tmpJob.AtlasRelease]
1080 tmpRels = [re.sub("^AnalysisTransforms-*", "", tmpJob.homepackage)]
1081 tmpPars = [tmpJob.jobParameters]
1082 tmpTrfs = [tmpJob.transformation]
1083 else:
1084
1085 tmpAtls = tmpJob.AtlasRelease.split("\n")
1086 tmpRels = tmpJob.homepackage.split("\n")
1087 tmpPars = tmpJob.jobParameters.split("\n")
1088 tmpTrfs = tmpJob.transformation.split("\n")
1089 if not (len(tmpRels) == len(tmpPars) == len(tmpTrfs)):
1090 return "ERROR: The number of releases or parameters or trfs is inconsistent with others"
1091
1092 scrStr = (
1093 "#!/bin/bash\n\n"
1094 "# To rerun the job interactively :\n"
1095 "# 1) download this script\n"
1096 "# 2) chmod +x ./<this script>\n"
1097 "# 3) setupATLAS\n"
1098 "# 4) ./<this script>\n\n"
1099 "temp_file=$(mktemp)\n"
1100 'cat << EOF > "$temp_file"\n\n'
1101 "source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh\n"
1102 "lsetup rucio\n\n"
1103 "#retrieve inputs\n\n"
1104 )
1105
1106 dsFileMap = {}
1107 for tmpFile in tmpJob.Files:
1108 if tmpFile.type == "input":
1109 if tmpFile.dataset not in dsFileMap:
1110 dsFileMap[tmpFile.dataset] = []
1111 if tmpFile.lfn not in dsFileMap[tmpFile.dataset]:
1112 dsFileMap[tmpFile.dataset].append(tmpFile.scope + ":" + tmpFile.lfn)
1113
1114 for tmpDS in dsFileMap:
1115 tmpFileList = dsFileMap[tmpDS]
1116 for tmpLFN in tmpFileList:
1117 scrStr += f"rucio download {tmpLFN} --no-subdir\n"
1118 if isUser:
1119 scrStr += "\n#get trf\n"
1120 scrStr += f"wget {tmpTrfs[0]}\n"
1121 scrStr += f"chmod +x {tmpTrfs[0].split('/')[-1]}\n"
1122 scrStr += "\n#transform commands\n\n"
1123 for tmpIdx, tmpRel in enumerate(tmpRels):
1124
1125 atlRel = re.sub("Atlas-", "", tmpAtls[tmpIdx])
1126 atlTags = re.split("[/_]", tmpRel)
1127 if "" in atlTags:
1128 atlTags.remove("")
1129 if atlRel != "" and atlRel not in atlTags and (re.search("^\d+\.\d+\.\d+$", atlRel) is None or isUser):
1130 atlTags.append(atlRel)
1131 try:
1132 cmtConfig = [s for s in tmpJob.cmtConfig.split("@") if s][-1]
1133 except Exception:
1134 cmtConfig = ""
1135 scrStr += f"asetup --platform={tmpJob.cmtConfig.split('@')[0]} {','.join(atlTags)}\n"
1136
1137 if tmpJob.coreCount not in ["NULL", None] and tmpJob.coreCount > 1:
1138 scrStr += f"export ATHENA_PROC_NUMBER={tmpJob.coreCount}\n"
1139 scrStr += f"export ATHENA_CORE_NUMBER={tmpJob.coreCount}\n"
1140
1141 tmpParamStr = tmpPars[tmpIdx]
1142 tmpSplitter = shlex.shlex(tmpParamStr, posix=True)
1143 tmpSplitter.whitespace = " "
1144 tmpSplitter.whitespace_split = True
1145
1146 for tmpItem in tmpSplitter:
1147 tmpMatch = re.search("^(-[^=]+=)(.+)$", tmpItem)
1148 if tmpMatch is not None:
1149 tmpArgName = tmpMatch.group(1)
1150 tmpArgVal = tmpMatch.group(2)
1151 tmpArgIdx = tmpParamStr.find(tmpArgName) + len(tmpArgName)
1152
1153 if tmpParamStr[tmpArgIdx] != '"':
1154 tmpParamStr = tmpParamStr.replace(tmpMatch.group(0), tmpArgName + '"' + tmpArgVal + '"')
1155
1156 if isUser:
1157 scrStr += "./"
1158 tmpParamStr += " --debug"
1159 scrStr += f"{tmpTrfs[tmpIdx].split('/')[-1]} {tmpParamStr}\n\n"
1160 scrStr += "EOF\n\n" 'chmod +x "$temp_file"\n'
1161 scrStr += 'source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh -c %s -r "$temp_file"\n' % cmtConfig
1162 scrStr += 'rm "$temp_file"\n'
1163 return scrStr
1164 except Exception as e:
1165 _logger.error(f"getScriptOfflineRunning : {str(e)} {traceback.format_exc()}")
1166 return f"ERROR: ServerError in getScriptOfflineRunning with {str(e)}"
1167
1168
1169 def killJobs(self, ids, user, code, prodManager, wgProdRole=[], killOptions=[]):
1170 tmp_log = LogWrapper(_logger, "killJobs")
1171 tmp_log.debug(f"start for {len(ids)} IDs")
1172
1173 with self.proxyPool.get() as proxy:
1174 rets = []
1175
1176 pandaIDforCloserMap = {}
1177 for id in ids:
1178
1179 toKill = True
1180 if "keepUnmerged" in killOptions:
1181 tmpJobSpec = proxy.peekJob(id, True, True, False, False, False)
1182 if tmpJobSpec is not None:
1183 if EventServiceUtils.isEventServiceMerge(tmpJobSpec):
1184
1185 proxy.retryJob(
1186 id,
1187 {},
1188 getNewPandaID=True,
1189 attemptNr=tmpJobSpec.attemptNr,
1190 recoverableEsMerge=True,
1191 )
1192 elif EventServiceUtils.isEventServiceJob(tmpJobSpec) and not EventServiceUtils.isJobCloningJob(tmpJobSpec):
1193
1194 nEvt = proxy.getNumStartedEvents(tmpJobSpec)
1195
1196 if nEvt is not None and nEvt > 0:
1197
1198 for killOpt in killOptions:
1199 if killOpt.startswith("jobSubStatus"):
1200 tmpJobSpec.jobSubStatus = killOpt.split("=")[-1]
1201 break
1202
1203 ret = proxy.archiveJob(
1204 tmpJobSpec,
1205 tmpJobSpec.jobStatus in ["defined", "assigned"],
1206 )
1207 toKill = False
1208 userInfo = {"prodSourceLabel": None}
1209 if toKill:
1210 ret, userInfo = proxy.killJob(id, user, code, prodManager, True, wgProdRole, killOptions)
1211 rets.append(ret)
1212 if ret and userInfo["prodSourceLabel"] in ["user", "managed", "test"]:
1213 jobIDKey = (
1214 userInfo["prodUserID"],
1215 userInfo["jobDefinitionID"],
1216 userInfo["jobsetID"],
1217 )
1218 if jobIDKey not in pandaIDforCloserMap:
1219 pandaIDforCloserMap[jobIDKey] = id
1220
1221 try:
1222 if pandaIDforCloserMap != {}:
1223 for pandaIDforCloser in pandaIDforCloserMap.values():
1224 tmpJobs = self.peekJobs([pandaIDforCloser])
1225 tmpJob = tmpJobs[0]
1226 if tmpJob is not None:
1227 tmpDestDBlocks = []
1228
1229 for tmpFile in tmpJob.Files:
1230 if tmpFile.type in ["output", "log"]:
1231 if tmpFile.destinationDBlock not in tmpDestDBlocks:
1232 tmpDestDBlocks.append(tmpFile.destinationDBlock)
1233
1234 closer_process = Closer(self, tmpDestDBlocks, tmpJob)
1235 closer_process.run()
1236 except Exception as e:
1237 tmp_log.error(f"failed with {str(e)} {traceback.format_exc()}")
1238 tmp_log.debug("done")
1239 return rets
1240
1241
1242 def reassignJobs(
1243 self,
1244 ids,
1245 attempt=0,
1246 joinThr=False,
1247 forPending=False,
1248 firstSubmission=True,
1249 ):
1250 tmp_log = LogWrapper(_logger, "reassignJobs")
1251 tmp_log.debug(f"start for {len(ids)} IDs")
1252
1253 with self.proxyPool.get() as proxy:
1254 jobs = []
1255
1256 n_reset_waiting = 0
1257 n_reset_defined = 0
1258 for tmp_id in ids:
1259 try:
1260
1261 ret = proxy.resetJob(
1262 tmp_id,
1263 activeTable=False,
1264 forPending=True,
1265 )
1266 if ret is not None:
1267 jobs.append(ret)
1268 n_reset_waiting += 1
1269
1270 continue
1271
1272 ret = proxy.resetDefinedJob(tmp_id)
1273 if ret is not None:
1274 jobs.append(ret)
1275 n_reset_defined += 1
1276 continue
1277 except Exception as e:
1278 tmp_log.error(f"failed with {str(e)} {traceback.format_exc()}")
1279 tmp_log.debug(f"got {len(jobs)} IDs in total: {n_reset_waiting} from Waiting, {n_reset_defined} from Defined")
1280
1281 if jobs:
1282 if joinThr:
1283 thr = Setupper(
1284 self,
1285 jobs,
1286 resubmit=True,
1287 first_submission=firstSubmission,
1288 )
1289 thr.start()
1290 thr.join()
1291 else:
1292
1293 Setupper(
1294 self,
1295 jobs,
1296 resubmit=True,
1297 first_submission=firstSubmission,
1298 ).start()
1299 tmp_log.debug("done")
1300
1301 return True
1302
1303
1304 def updateInFilesReturnPandaIDs(self, dataset, status, fileLFN=""):
1305
1306 with self.proxyPool.get() as proxy:
1307 retList = []
1308
1309 retList = proxy.updateInFilesReturnPandaIDs(dataset, status, fileLFN)
1310 return retList
1311
1312
1313 def update_input_files_at_sites_and_get_panda_ids(self, filename: str, sites: list) -> list:
1314 with self.proxyPool.get() as proxy:
1315 ret = proxy.update_input_files_at_sites_and_get_panda_ids(filename, sites)
1316 return ret
1317
1318
1319 def updateOutFilesReturnPandaIDs(self, dataset, fileLFN=""):
1320
1321 with self.proxyPool.get() as proxy:
1322 retList = []
1323
1324 retList = proxy.updateOutFilesReturnPandaIDs(dataset, fileLFN)
1325 return retList
1326
1327
1328 def getAssociatedDisDatasets(self, subDsName):
1329
1330 with self.proxyPool.get() as proxy:
1331 retList = []
1332
1333 retList = proxy.getAssociatedDisDatasets(subDsName)
1334 return retList
1335
1336
1337 def insertSandboxFileInfo(self, userName, hostName, fileName, fileSize, checkSum):
1338
1339 with self.proxyPool.get() as proxy:
1340
1341 ret = proxy.insertSandboxFileInfo(userName, hostName, fileName, fileSize, checkSum)
1342 return ret
1343
1344
1345 def getLockSandboxFiles(self, time_limit, n_files):
1346
1347 with self.proxyPool.get() as proxy:
1348
1349 ret = proxy.getLockSandboxFiles(time_limit, n_files)
1350 return ret
1351
1352
1353 def checkSandboxFile(self, userName, fileSize, checkSum):
1354
1355 with self.proxyPool.get() as proxy:
1356
1357 ret = proxy.checkSandboxFile(userName, fileSize, checkSum)
1358 return ret
1359
1360
1361 def insertDatasets(self, datasets):
1362
1363 with self.proxyPool.get() as proxy:
1364 retList = []
1365
1366 for dataset in datasets:
1367 ret = proxy.insertDataset(dataset)
1368 retList.append(ret)
1369 return retList
1370
1371
1372 def getLockDatasets(self, sqlQuery, varMapGet, modTimeOffset="", getVersion=False):
1373
1374 with self.proxyPool.get() as proxy:
1375
1376 ret = proxy.getLockDatasets(sqlQuery, varMapGet, modTimeOffset, getVersion)
1377 return ret
1378
1379
1380 def queryDatasetWithMap(self, map):
1381
1382 with self.proxyPool.get() as proxy:
1383
1384 ret = proxy.queryDatasetWithMap(map)
1385 return ret
1386
1387
1388 def setGUIDs(self, files):
1389
1390 with self.proxyPool.get() as proxy:
1391
1392 ret = proxy.setGUIDs(files)
1393 return ret
1394
1395
1396 def updateDatasets(self, datasets, withLock=False, withCriteria="", criteriaMap={}):
1397
1398 with self.proxyPool.get() as proxy:
1399
1400 retList = proxy.updateDataset(datasets, withLock, withCriteria, criteriaMap)
1401 return retList
1402
1403
1404 def trigger_cleanup_internal_datasets(self, task_id: int) -> bool:
1405 with self.proxyPool.get() as proxy:
1406 ret = proxy.trigger_cleanup_internal_datasets(task_id)
1407 return ret
1408
1409
1410 def countFilesWithMap(self, map):
1411
1412 with self.proxyPool.get() as proxy:
1413
1414 ret = proxy.countFilesWithMap(map)
1415 return ret
1416
1417
1418 def getSerialNumber(self, datasetname, definedFreshFlag=None):
1419
1420 with self.proxyPool.get() as proxy:
1421
1422 ret = proxy.getSerialNumber(datasetname, definedFreshFlag)
1423 return ret
1424
1425
1426 def addMetadata(self, ids, metadataList, newStatusList):
1427
1428 with self.proxyPool.get() as proxy:
1429
1430 index = 0
1431 retList = []
1432 for id in ids:
1433 ret = proxy.addMetadata(id, metadataList[index], newStatusList[index])
1434 retList.append(ret)
1435 index += 1
1436 return retList
1437
1438
1439 def addStdOut(self, id, stdout):
1440
1441 with self.proxyPool.get() as proxy:
1442
1443 ret = proxy.addStdOut(id, stdout)
1444 return ret
1445
1446
1447 def extractScope(self, name):
1448
1449 with self.proxyPool.get() as proxy:
1450
1451 ret = proxy.extractScope(name)
1452 return ret
1453
1454
1455 def getJobStatistics(self):
1456
1457 with self.proxyPool.get() as proxy:
1458
1459 ret = proxy.getJobStatistics()
1460 return ret
1461
1462
1463 def getDetailedJobStatistics(self):
1464 with self.proxyPool.get() as proxy:
1465 ret = proxy.getDetailedJobStatistics()
1466 return ret
1467
1468
1469 def getJobStatisticsForExtIF(self, sourcetype=None):
1470
1471 with self.proxyPool.get() as proxy:
1472
1473 ret = proxy.getJobStatisticsForExtIF(sourcetype)
1474 return ret
1475
1476
1477 def getJobStatisticsForBamboo(self):
1478
1479 with self.proxyPool.get() as proxy:
1480
1481 ret = proxy.getJobStatisticsPerProcessingType()
1482 return ret
1483
1484
1485 def updateSiteData(self, hostID, pilotRequests, interval=3):
1486
1487 with self.proxyPool.get() as proxy:
1488
1489 ret = proxy.updateSiteData(hostID, pilotRequests, interval)
1490 return ret
1491
1492
1493 def getCurrentSiteData(self):
1494
1495 with self.proxyPool.get() as proxy:
1496
1497 ret = proxy.getCurrentSiteData()
1498 return ret
1499
1500
1501 def insertnRunningInSiteData(self):
1502
1503 with self.proxyPool.get() as proxy:
1504
1505 ret = proxy.insertnRunningInSiteData()
1506 return ret
1507
1508
1509 def getSiteInfo(self):
1510
1511 with self.proxyPool.get() as proxy:
1512
1513 ret = proxy.getSiteInfo()
1514 return ret
1515
1516
1517 def get_cloud_list(self):
1518
1519 with self.proxyPool.get() as proxy:
1520
1521 ret = proxy.get_cloud_list()
1522 return ret
1523
1524
1525 def get_special_dispatch_params(self):
1526
1527 with self.proxyPool.get() as proxy:
1528
1529 ret = proxy.get_special_dispatch_params()
1530 return ret
1531
1532
1533 def getEmailAddr(self, name, withDN=False, withUpTime=False):
1534
1535 with self.proxyPool.get() as proxy:
1536
1537 ret = proxy.getEmailAddr(name, withDN, withUpTime)
1538 return ret
1539
1540
1541 def setEmailAddr(self, userName, emailAddr):
1542
1543 with self.proxyPool.get() as proxy:
1544
1545 ret = proxy.setEmailAddr(userName, emailAddr)
1546 return ret
1547
1548
1549 def get_ban_users(self):
1550
1551 with self.proxyPool.get() as proxy:
1552
1553 ret = proxy.get_ban_users()
1554 return ret
1555
1556
1557 def register_token_key(self, client_name, lifetime):
1558
1559 with self.proxyPool.get() as proxy:
1560
1561 ret = proxy.register_token_key(client_name, lifetime)
1562 return ret
1563
1564
1565 def querySQLS(self, sql, varMap, arraySize=1000):
1566
1567 with self.proxyPool.get() as proxy:
1568
1569 ret = proxy.querySQLS(sql, varMap, arraySize)
1570 return ret
1571
1572
1573 def querySQL(self, sql, varMap, arraySize=1000):
1574
1575 with self.proxyPool.get() as proxy:
1576
1577 ret = proxy.querySQLS(sql, varMap, arraySize)[1]
1578 return ret
1579
1580
1581 def executemanySQL(self, sql, varMaps, arraySize=1000):
1582
1583 with self.proxyPool.get() as proxy:
1584
1585 ret = proxy.executemanySQL(sql, varMaps, arraySize)
1586 return ret
1587
1588
1589 def checkQuota(self, dn):
1590
1591 with self.proxyPool.get() as proxy:
1592
1593 ret = proxy.checkQuota(dn)
1594 return ret
1595
1596
1597 def insertTaskParamsPanda(self, taskParams, user, prodRole, fqans=[], parent_tid=None, properErrorCode=False, allowActiveTask=False, decode=True):
1598
1599 with self.proxyPool.get() as proxy:
1600
1601 tmpStatus = proxy.checkBanUser(user, None, True)
1602 if tmpStatus is True:
1603
1604 ret = proxy.insertTaskParamsPanda(taskParams, user, prodRole, fqans, parent_tid, properErrorCode, allowActiveTask, decode)
1605 elif tmpStatus == 1:
1606 ret = False, "Failed to update DN in PandaDB"
1607 elif tmpStatus == 2:
1608 ret = False, "Failed to insert user info to PandaDB"
1609 else:
1610 ret = False, f"The following DN is banned: DN={user}"
1611 return ret
1612
1613
1614 def sendCommandTaskPanda(
1615 self,
1616 jediTaskID,
1617 dn,
1618 prodRole,
1619 comStr,
1620 comComment=None,
1621 useCommit=True,
1622 properErrorCode=False,
1623 comQualifier=None,
1624 broadcast=False,
1625 ):
1626
1627 with self.proxyPool.get() as proxy:
1628
1629 ret = proxy.sendCommandTaskPanda(
1630 jediTaskID,
1631 dn,
1632 prodRole,
1633 comStr,
1634 comComment,
1635 useCommit,
1636 properErrorCode,
1637 comQualifier,
1638 broadcast,
1639 )
1640 return ret
1641
1642
1643 def updateUnmergedDatasets(self, job, finalStatusDS, updateCompleted=False):
1644
1645 with self.proxyPool.get() as proxy:
1646
1647 ret = proxy.updateUnmergedDatasets(job, finalStatusDS, updateCompleted)
1648 return ret
1649
1650
1651 def getJediTasksInTimeRange(self, dn, timeRangeStr, fullFlag=False, minTaskID=None, task_type="user"):
1652
1653 if dn in ["NULL", "", "None", None]:
1654 return {}
1655
1656 match = re.match("^(\d+)-(\d+)-(\d+) (\d+):(\d+):(\d+)$", timeRangeStr)
1657 if match is None:
1658 return {}
1659 timeRange = datetime.datetime(
1660 year=int(match.group(1)),
1661 month=int(match.group(2)),
1662 day=int(match.group(3)),
1663 hour=int(match.group(4)),
1664 minute=int(match.group(5)),
1665 second=int(match.group(6)),
1666 )
1667
1668 maxRange = naive_utcnow() - datetime.timedelta(days=30)
1669 if timeRange < maxRange:
1670 timeRange = maxRange
1671
1672 with self.proxyPool.get() as proxy:
1673
1674 ret = proxy.getJediTasksInTimeRange(dn, timeRange, fullFlag, minTaskID, task_type)
1675 return ret
1676
1677
1678 def getJediTaskDetails(self, jediTaskID, fullFlag, withTaskInfo):
1679
1680 with self.proxyPool.get() as proxy:
1681
1682 ret = proxy.getJediTaskDetails(jediTaskID, fullFlag, withTaskInfo)
1683 return ret
1684
1685
1686 def get_task_details_json(self, jedi_task_id, resolve_parent=False, include_resolve_status=False):
1687 with self.proxyPool.get() as proxy:
1688 return proxy.get_task_details_json(
1689 jedi_task_id,
1690 resolve_parent=resolve_parent,
1691 include_resolve_status=include_resolve_status,
1692 )
1693
1694
1695 def getEventRanges(self, pandaID, jobsetID, jediTaskID, nRanges, acceptJson, scattered, segment_id):
1696
1697 with self.proxyPool.get() as proxy:
1698
1699 ret = proxy.getEventRanges(pandaID, jobsetID, jediTaskID, nRanges, acceptJson, scattered, segment_id)
1700 return ret
1701
1702
1703 def updateEventRange(self, eventRangeID, eventStatus, cpuCore, cpuConsumptionTime, objstoreID=None):
1704 eventDict = {}
1705 eventDict["eventRangeID"] = eventRangeID
1706 eventDict["eventStatus"] = eventStatus
1707 eventDict["cpuCore"] = cpuCore
1708 eventDict["cpuConsumptionTime"] = cpuConsumptionTime
1709 eventDict["objstoreID"] = objstoreID
1710
1711 with self.proxyPool.get() as proxy:
1712
1713 ret = proxy.updateEventRanges([eventDict])
1714
1715 try:
1716 retVal = ret[0][0]
1717 except Exception:
1718 retVal = False
1719
1720 return retVal, json.dumps(ret[1])
1721
1722
1723 def updateEventRanges(self, eventRanges, version=0):
1724
1725 try:
1726 eventRanges = json.loads(eventRanges)
1727 except Exception:
1728 return json.dumps("ERROR : failed to convert eventRanges with json")
1729
1730 with self.proxyPool.get() as proxy:
1731
1732 ret = proxy.updateEventRanges(eventRanges, version)
1733 if version != 0:
1734 return ret
1735 return json.dumps(ret[0]), json.dumps(ret[1])
1736
1737
1738 def changeTaskPriorityPanda(self, jediTaskID, newPriority):
1739
1740 with self.proxyPool.get() as proxy:
1741
1742 ret = proxy.changeTaskPriorityPanda(jediTaskID, newPriority)
1743 return ret
1744
1745
1746 def throttleUserJobs(self, prodUserName, workingGroup, get_dict=False):
1747
1748 with self.proxyPool.get() as proxy:
1749
1750 ret = proxy.throttleUserJobs(prodUserName, workingGroup, get_dict)
1751 return ret
1752
1753
1754 def unThrottleUserJobs(self, prodUserName, workingGroup, get_dict=False):
1755
1756 with self.proxyPool.get() as proxy:
1757
1758 ret = proxy.unThrottleUserJobs(prodUserName, workingGroup, get_dict)
1759 return ret
1760
1761
1762 def getThrottledUsers(self):
1763
1764 with self.proxyPool.get() as proxy:
1765
1766 ret = proxy.getThrottledUsers()
1767 return ret
1768
1769
1770 def getJobdefIDsForFailedJob(self, jediTaskID):
1771
1772 with self.proxyPool.get() as proxy:
1773
1774 ret = proxy.getJobdefIDsForFailedJob(jediTaskID)
1775 return ret
1776
1777
1778 def changeTaskAttributePanda(self, jediTaskID, attrName, attrValue):
1779
1780 with self.proxyPool.get() as proxy:
1781
1782 ret = proxy.changeTaskAttributePanda(jediTaskID, attrName, attrValue)
1783 return ret
1784
1785
1786 def changeTaskSplitRulePanda(self, jediTaskID, attrName, attrValue):
1787
1788 with self.proxyPool.get() as proxy:
1789
1790 ret = proxy.changeTaskSplitRulePanda(jediTaskID, attrName, attrValue)
1791 return ret
1792
1793
1794 def increaseAttemptNrPanda(self, jediTaskID, increasedNr):
1795
1796 with self.proxyPool.get() as proxy:
1797
1798 ret = proxy.increaseAttemptNrPanda(jediTaskID, increasedNr)
1799 return ret
1800
1801
1802 def getTaskIDwithTaskNameJEDI(self, userName, taskName):
1803
1804 with self.proxyPool.get() as proxy:
1805
1806 ret = proxy.getTaskIDwithTaskNameJEDI(userName, taskName)
1807 return ret
1808
1809
1810 def updateTaskErrorDialogJEDI(self, jediTaskID, msg):
1811
1812 with self.proxyPool.get() as proxy:
1813
1814 ret = proxy.updateTaskErrorDialogJEDI(jediTaskID, msg)
1815 return ret
1816
1817
1818 def updateTaskModTimeJEDI(self, jediTaskID, newStatus=None):
1819
1820 with self.proxyPool.get() as proxy:
1821
1822 ret = proxy.updateTaskModTimeJEDI(jediTaskID, newStatus)
1823 return ret
1824
1825
1826 def checkInputFileStatusInJEDI(self, jobSpec):
1827
1828 with self.proxyPool.get() as proxy:
1829
1830 ret = proxy.checkInputFileStatusInJEDI(jobSpec)
1831 return ret
1832
1833
1834 def increaseRamLimitJEDI(self, jediTaskID, jobRamCount):
1835
1836 with self.proxyPool.get() as proxy:
1837
1838 ret = proxy.increaseRamLimitJEDI(jediTaskID, jobRamCount)
1839 return ret
1840
1841
1842 def increaseRamLimitJobJEDI(self, job, jobRamCount, jediTaskID):
1843
1844 with self.proxyPool.get() as proxy:
1845
1846 ret = proxy.increaseRamLimitJobJEDI(job, jobRamCount, jediTaskID)
1847 return ret
1848
1849
1850 def increaseRamLimitJobJEDI_xtimes(self, job, jobRamCount, jediTaskID, attemptNr):
1851
1852 with self.proxyPool.get() as proxy:
1853
1854 ret = proxy.increaseRamLimitJobJEDI_xtimes(job, jobRamCount, jediTaskID, attemptNr)
1855 return ret
1856
1857
1858 def reduce_input_per_job(self, panda_id, jedi_task_id, attempt_nr, excluded_rules, steps, dry_mode=False):
1859
1860 with self.proxyPool.get() as proxy:
1861
1862 ret = proxy.reduce_input_per_job(panda_id, jedi_task_id, attempt_nr, excluded_rules, steps, dry_mode)
1863 return ret
1864
1865
1866 def resetFileStatusInJEDI(self, dn, prodManager, datasetName, lostFiles, recoverParent, simul=False):
1867
1868 with self.proxyPool.get() as proxy:
1869
1870 ret = proxy.resetFileStatusInJEDI(dn, prodManager, datasetName, lostFiles, recoverParent, simul)
1871 return ret
1872
1873
1874 def copy_file_records(self, new_lfns, file_spec):
1875
1876 with self.proxyPool.get() as proxy:
1877
1878 ret = proxy.copy_file_records(new_lfns, file_spec)
1879 return ret
1880
1881
1882 def getRetrialRules(self):
1883
1884 with self.proxyPool.get() as proxy:
1885
1886 ret = proxy.getRetrialRules()
1887 return ret
1888
1889
1890 def setMaxAttempt(self, jobID, jediTaskID, files, attemptNr):
1891
1892 with self.proxyPool.get() as proxy:
1893
1894 ret = proxy.setMaxAttempt(jobID, jediTaskID, files, attemptNr)
1895 return ret
1896
1897
1898 def increase_max_failure(self, job_id, task_id, files):
1899
1900 with self.proxyPool.get() as proxy:
1901
1902 ret = proxy.increase_max_failure(job_id, task_id, files)
1903 return ret
1904
1905
1906 def setNoRetry(self, jobID, jediTaskID, files):
1907
1908 with self.proxyPool.get() as proxy:
1909
1910 ret = proxy.setNoRetry(jobID, jediTaskID, files)
1911 return ret
1912
1913
1914 def initialize_cpu_time_task(self, jobID, taskID, siteid, files, active):
1915
1916 with self.proxyPool.get() as proxy:
1917
1918 ret = proxy.initialize_cpu_time_task(jobID, taskID, siteid, files, active)
1919 return ret
1920
1921
1922 def requestTaskParameterRecalculation(self, taskID):
1923
1924 with self.proxyPool.get() as proxy:
1925
1926 ret = proxy.requestTaskParameterRecalculation(taskID)
1927 return ret
1928
1929
1930 def getDestDBlocksWithSingleConsumer(self, jediTaskID, PandaID, ngDatasets):
1931
1932 with self.proxyPool.get() as proxy:
1933
1934 ret = proxy.getDestDBlocksWithSingleConsumer(jediTaskID, PandaID, ngDatasets)
1935 return ret
1936
1937
1938 def isValidMergeJob(self, pandaID, jediTaskID):
1939
1940 with self.proxyPool.get() as proxy:
1941
1942 ret = proxy.isValidMergeJob(pandaID, jediTaskID)
1943 return ret
1944
1945
1946 def insertNetworkMatrixData(self, data):
1947
1948 with self.proxyPool.get() as proxy:
1949
1950 ret = proxy.insertNetworkMatrixData(data)
1951 return ret
1952
1953
1954 def deleteOldNetworkData(self):
1955
1956 with self.proxyPool.get() as proxy:
1957
1958 ret = proxy.deleteOldNetworkData()
1959 return ret
1960
1961
1962 def getDispatchDatasetsPerUser(self, vo, prodSourceLabel, onlyActive, withSize):
1963
1964 with self.proxyPool.get() as proxy:
1965
1966 ret = proxy.getDispatchDatasetsPerUser(vo, prodSourceLabel, onlyActive, withSize)
1967 return ret
1968
1969
1970 def getTaskParamsPanda(self, jediTaskID):
1971
1972 with self.proxyPool.get() as proxy:
1973
1974 ret = proxy.getTaskParamsPanda(jediTaskID)
1975 return ret
1976
1977
1978 def getTaskAttributesPanda(self, jediTaskID, attrs):
1979
1980 with self.proxyPool.get() as proxy:
1981
1982 ret = proxy.getTaskAttributesPanda(jediTaskID, attrs)
1983 return ret
1984
1985
1986 def checkClonedJob(self, jobSpec):
1987
1988 with self.proxyPool.get() as proxy:
1989
1990 ret = proxy.checkClonedJob(jobSpec)
1991 return ret
1992
1993
1994 def getCoJumboJobsToBeFinished(self, timeLimit, minPriority, maxJobs):
1995
1996 with self.proxyPool.get() as proxy:
1997
1998 ret = proxy.getCoJumboJobsToBeFinished(timeLimit, minPriority, maxJobs)
1999 return ret
2000
2001
2002 def getNumReadyEvents(self, jediTaskID):
2003
2004 with self.proxyPool.get() as proxy:
2005
2006 ret = proxy.getNumReadyEvents(jediTaskID)
2007 return ret
2008
2009
2010 def isApplicableTaskForJumbo(self, jediTaskID):
2011
2012 with self.proxyPool.get() as proxy:
2013
2014 ret = proxy.isApplicableTaskForJumbo(jediTaskID)
2015 return ret
2016
2017
2018 def cleanupJumboJobs(self, jediTaskID=None):
2019
2020 with self.proxyPool.get() as proxy:
2021
2022 ret = proxy.cleanupJumboJobs(jediTaskID)
2023 return ret
2024
2025
2026 def convertObjIDtoEndPoint(self, srcFileName, ObjID):
2027
2028 with self.proxyPool.get() as proxy:
2029
2030 res = proxy.convertObjIDtoEndPoint(srcFileName, ObjID)
2031 return res
2032
2033
2034 def getTaskStatus(self, jediTaskID):
2035
2036 with self.proxyPool.get() as proxy:
2037
2038 res = proxy.getTaskStatus(jediTaskID)
2039 return res
2040
2041
2042 def getTaskStatusSuperstatus(self, jediTaskID):
2043
2044 with self.proxyPool.get() as proxy:
2045
2046 res = proxy.getTaskStatusSuperstatus(jediTaskID)
2047 return res
2048
2049
2050 def reactivateTask(self, jediTaskID, keep_attempt_nr=False, trigger_job_generation=False):
2051
2052 with self.proxyPool.get() as proxy:
2053
2054 res = proxy.reactivateTask(jediTaskID, keep_attempt_nr, trigger_job_generation)
2055 return res
2056
2057
2058 def getEventStat(self, jediTaskID, PandaID):
2059
2060 with self.proxyPool.get() as proxy:
2061
2062 res = proxy.getEventStat(jediTaskID, PandaID)
2063 return res
2064
2065
2066 def get_tree_of_gshare_names(self):
2067
2068 with self.proxyPool.get() as proxy:
2069
2070 res = proxy.get_tree_of_gshare_names()
2071 return res
2072
2073
2074 def get_hs_distribution(self):
2075
2076 with self.proxyPool.get() as proxy:
2077
2078 res = proxy.get_hs_distribution()
2079 return res
2080
2081
2082 def reassignShare(self, jedi_task_ids, share_dest, reassign_running):
2083
2084 with self.proxyPool.get() as proxy:
2085
2086 res = proxy.reassignShare(jedi_task_ids, share_dest, reassign_running)
2087 return res
2088
2089 def is_valid_share(self, share_name):
2090 """
2091 Checks whether the share is a valid leave share
2092 """
2093
2094 with self.proxyPool.get() as proxy:
2095
2096 res = proxy.is_valid_share(share_name)
2097 return res
2098
2099 def get_share_for_task(self, task):
2100 """
2101 Return the share based on a task specification
2102 """
2103
2104 with self.proxyPool.get() as proxy:
2105
2106 res = proxy.get_share_for_task(task)
2107 return res
2108
2109 def get_share_for_job(self, job):
2110 """
2111 Return the share based on a task specification
2112 """
2113
2114 with self.proxyPool.get() as proxy:
2115
2116 res = proxy.get_share_for_job(job)
2117 return res
2118
2119 def getTaskParamsMap(self, jediTaskID):
2120 """
2121 Return the taskParamsMap
2122 """
2123
2124 with self.proxyPool.get() as proxy:
2125
2126 res = proxy.getTaskParamsPanda(jediTaskID)
2127 return res
2128
2129 def getCommands(self, harvester_id, n_commands):
2130 """
2131 Get n commands for a particular harvester instance
2132 """
2133
2134 with self.proxyPool.get() as proxy:
2135
2136 res = proxy.getCommands(harvester_id, n_commands)
2137 return res
2138
2139 def ackCommands(self, command_ids):
2140 """
2141 Acknowledge a list of command IDs
2142 """
2143
2144 with self.proxyPool.get() as proxy:
2145
2146 res = proxy.ackCommands(command_ids)
2147 return res
2148
2149
2150 def commandToHarvester(
2151 self,
2152 harvester_ID,
2153 command,
2154 ack_requested,
2155 status,
2156 lockInterval=None,
2157 comInterval=None,
2158 params=None,
2159 ):
2160
2161 with self.proxyPool.get() as proxy:
2162
2163 res = proxy.commandToHarvester(
2164 harvester_ID,
2165 command,
2166 ack_requested,
2167 status,
2168 lockInterval,
2169 comInterval,
2170 params,
2171 )
2172 return res
2173
2174 def getResourceTypes(self):
2175 """
2176 Get resource types (SCORE, MCORE, ...) and their definitions
2177 """
2178
2179 with self.proxyPool.get() as proxy:
2180
2181 res = proxy.load_resource_types(formatting="dict")
2182 return res
2183
2184
2185 def reportWorkerStats_jobtype(self, harvesterID, siteName, paramsList):
2186
2187 with self.proxyPool.get() as proxy:
2188
2189 res = proxy.reportWorkerStats_jobtype(harvesterID, siteName, paramsList)
2190 return res
2191
2192
2193 def getCommandLocksHarvester(self, harvester_ID, command, lockedBy, lockInterval, commandInterval):
2194
2195 with self.proxyPool.get() as proxy:
2196
2197 res = proxy.getCommandLocksHarvester(harvester_ID, command, lockedBy, lockInterval, commandInterval)
2198 return res
2199
2200
2201 def releaseCommandLockHarvester(self, harvester_ID, command, computingSite, resourceType, lockedBy):
2202
2203 with self.proxyPool.get() as proxy:
2204
2205 res = proxy.releaseCommandLockHarvester(harvester_ID, command, computingSite, resourceType, lockedBy)
2206 return res
2207
2208
2209 def updateWorkers(self, harvesterID, data):
2210 """
2211 Update workers
2212 """
2213
2214 with self.proxyPool.get() as proxy:
2215
2216 res = proxy.updateWorkers(harvesterID, data)
2217 return res
2218
2219
2220 def updateServiceMetrics(self, harvesterID, data):
2221 """
2222 Update workers
2223 """
2224
2225 with self.proxyPool.get() as proxy:
2226
2227 res = proxy.updateServiceMetrics(harvesterID, data)
2228 return res
2229
2230
2231 def harvesterIsAlive(self, user, host, harvesterID, data):
2232 """
2233 update harvester instance information
2234 """
2235
2236 with self.proxyPool.get() as proxy:
2237
2238 res = proxy.harvesterIsAlive(user, host, harvesterID, data)
2239 return res
2240
2241 def storePilotLog(self, panda_id, pilot_log):
2242 """
2243 Store the pilot log in the pandalog table
2244 """
2245
2246 with self.proxyPool.get() as proxy:
2247
2248 res = proxy.storePilotLog(panda_id, pilot_log)
2249 return res
2250
2251
2252 def load_resource_types(self):
2253
2254 with self.proxyPool.get() as proxy:
2255
2256 ret_val = proxy.load_resource_types()
2257 return ret_val
2258
2259
2260 def get_resource_type_task(self, task_spec):
2261
2262 with self.proxyPool.get() as proxy:
2263
2264 ret_val = proxy.get_resource_type_task(task_spec)
2265 return ret_val
2266
2267 def reset_resource_type_task(self, jedi_task_id, use_commit=True):
2268
2269 with self.proxyPool.get() as proxy:
2270
2271 ret_val = proxy.reset_resource_type_task(jedi_task_id, use_commit)
2272 return ret_val
2273
2274
2275 def get_resource_type_job(self, job_spec):
2276
2277 with self.proxyPool.get() as proxy:
2278
2279 ret_val = proxy.get_resource_type_job(job_spec)
2280 return ret_val
2281
2282
2283 def checkJobStatus(self, pandaIDs):
2284 try:
2285 pandaIDs = pandaIDs.split(",")
2286 except Exception:
2287 pandaIDs = []
2288
2289 with self.proxyPool.get() as proxy:
2290
2291 retList = []
2292 for pandaID in pandaIDs:
2293 ret = proxy.checkJobStatus(pandaID)
2294 retList.append(ret)
2295 return retList
2296
2297
2298 def getWorkerStats(self):
2299
2300 with self.proxyPool.get() as proxy:
2301
2302 ret = proxy.getWorkerStats()
2303 return ret
2304
2305
2306 def ups_get_queues(self):
2307
2308 with self.proxyPool.get() as proxy:
2309
2310 ret = proxy.ups_get_queues()
2311 return ret
2312
2313
2314 def ups_load_worker_stats(self):
2315
2316 with self.proxyPool.get() as proxy:
2317
2318 ret = proxy.ups_load_worker_stats()
2319 return ret
2320
2321
2322 def get_average_memory_workers(self, queue, harvester_id, target):
2323
2324 with self.proxyPool.get() as proxy:
2325
2326 ret = proxy.get_average_memory_workers(queue, harvester_id, target)
2327 return ret
2328
2329
2330 def ups_new_worker_distribution(self, queue, worker_stats):
2331
2332 with self.proxyPool.get() as proxy:
2333
2334 ret = proxy.ups_new_worker_distribution(queue, worker_stats)
2335 return ret
2336
2337
2338 def checkEventsAvailability(self, pandaID, jobsetID, jediTaskID):
2339
2340 with self.proxyPool.get() as proxy:
2341
2342 ret = proxy.checkEventsAvailability(pandaID, jobsetID, jediTaskID)
2343 return ret
2344
2345
2346 def getLFNsForJumbo(self, jediTaskID):
2347
2348 with self.proxyPool.get() as proxy:
2349
2350 ret = proxy.getLFNsForJumbo(jediTaskID)
2351 return ret
2352
2353
2354 def getActiveJobAttributes(self, pandaID, attrs):
2355
2356 with self.proxyPool.get() as proxy:
2357
2358 ret = proxy.getActiveJobAttributes(pandaID, attrs)
2359 return ret
2360
2361
2362 def getOriginalConsumers(self, jediTaskID, jobsetID, pandaID):
2363
2364 with self.proxyPool.get() as proxy:
2365
2366 ret = proxy.getOriginalConsumers(jediTaskID, jobsetID, pandaID)
2367 return ret
2368
2369
2370 def addHarvesterDialogs(self, harvesterID, dialogs):
2371
2372 with self.proxyPool.get() as proxy:
2373
2374 ret = proxy.addHarvesterDialogs(harvesterID, dialogs)
2375 return ret
2376
2377
2378 def getJobStatisticsPerSiteResource(self, timeWindow=None):
2379
2380 with self.proxyPool.get() as proxy:
2381
2382 ret = proxy.getJobStatisticsPerSiteResource(timeWindow)
2383 return ret
2384
2385
2386 def get_job_statistics_per_site_label_resource(self, time_window=None):
2387
2388 with self.proxyPool.get() as proxy:
2389
2390 ret = proxy.get_job_statistics_per_site_label_resource(time_window)
2391 return ret
2392
2393
2394 def setNumSlotsForWP(self, pandaQueueName, numSlots, gshare, resourceType, validPeriod):
2395
2396 with self.proxyPool.get() as proxy:
2397
2398 ret = proxy.setNumSlotsForWP(pandaQueueName, numSlots, gshare, resourceType, validPeriod)
2399 return ret
2400
2401
2402 def enableJumboJobs(self, jediTaskID, totalJumboJobs, nJumboPerSite):
2403
2404 with self.proxyPool.get() as proxy:
2405
2406 ret = proxy.enableJumboJobs(jediTaskID, totalJumboJobs, nJumboPerSite)
2407 return ret
2408
2409
2410 def enableEventService(self, jediTaskID):
2411
2412 with self.proxyPool.get() as proxy:
2413
2414 ret = proxy.enableEventService(jediTaskID)
2415 return ret
2416
2417
2418 def getJediFileAttributes(self, PandaID, jediTaskID, datasetID, fileID, attrs):
2419
2420 with self.proxyPool.get() as proxy:
2421
2422 ret = proxy.getJediFileAttributes(PandaID, jediTaskID, datasetID, fileID, attrs)
2423 return ret
2424
2425
2426 def isSuperUser(self, userName):
2427
2428 with self.proxyPool.get() as proxy:
2429
2430 ret = proxy.isSuperUser(userName)
2431 return ret
2432
2433
2434 def getWorkersForJob(self, PandaID):
2435
2436 with self.proxyPool.get() as proxy:
2437
2438 ret = proxy.getWorkersForJob(PandaID)
2439 return ret
2440
2441
2442 def getUserJobMetadata(self, jediTaskID):
2443
2444 with self.proxyPool.get() as proxy:
2445
2446 ret = proxy.getUserJobMetadata(jediTaskID)
2447 return ret
2448
2449
2450 def getJumboJobDatasets(self, n_days, grace_period):
2451
2452 with self.proxyPool.get() as proxy:
2453
2454 ret = proxy.getJumboJobDatasets(n_days, grace_period)
2455 return ret
2456
2457
2458 def getGShareStatus(self):
2459
2460 with self.proxyPool.get() as proxy:
2461
2462 ret = proxy.getGShareStatus()
2463 return ret
2464
2465
2466 def getOutputDatasetsJEDI(self, panda_id):
2467
2468 with self.proxyPool.get() as proxy:
2469
2470 ret = proxy.getOutputDatasetsJEDI(panda_id)
2471 return ret
2472
2473
2474 def upsertQueuesInJSONSchedconfig(self, schedconfig_dump):
2475
2476 with self.proxyPool.get() as proxy:
2477
2478 ret = proxy.upsertQueuesInJSONSchedconfig(schedconfig_dump)
2479 return ret
2480
2481
2482 def loadSWTags(self, sw_tags):
2483
2484 with self.proxyPool.get() as proxy:
2485
2486 ret = proxy.loadSWTags(sw_tags)
2487 return ret
2488
2489
2490 def sweepPQ(self, panda_queue_des, status_list_des, ce_list_des, submission_host_list_des):
2491
2492 with self.proxyPool.get() as proxy:
2493
2494 ret = proxy.sweepPQ(panda_queue_des, status_list_des, ce_list_des, submission_host_list_des)
2495 return ret
2496
2497
2498 def lockProcess_PANDA(self, component, pid, time_limit=5, force=False):
2499 with self.proxyPool.get() as proxy:
2500 ret = proxy.lockProcess_PANDA(component, pid, time_limit, force)
2501 return ret
2502
2503
2504 def unlockProcess_PANDA(self, component, pid):
2505 with self.proxyPool.get() as proxy:
2506 ret = proxy.unlockProcess_PANDA(component, pid)
2507 return ret
2508
2509
2510 def checkProcessLock_PANDA(self, component, pid, time_limit, check_base=False):
2511 with self.proxyPool.get() as proxy:
2512 ret = proxy.checkProcessLock_PANDA(component, pid, time_limit, check_base)
2513 return ret
2514
2515
2516 def insertJobOutputReport(self, panda_id, prod_source_label, job_status, attempt_nr, data):
2517 with self.proxyPool.get() as proxy:
2518 ret = proxy.insertJobOutputReport(panda_id, prod_source_label, job_status, attempt_nr, data)
2519 return ret
2520
2521
2522 def deleteJobOutputReport(self, panda_id, attempt_nr):
2523 with self.proxyPool.get() as proxy:
2524 ret = proxy.deleteJobOutputReport(panda_id, attempt_nr)
2525 return ret
2526
2527
2528 def updateJobOutputReport(self, panda_id, attempt_nr, data):
2529 with self.proxyPool.get() as proxy:
2530 ret = proxy.updateJobOutputReport(panda_id, attempt_nr, data)
2531 return ret
2532
2533
2534 def getJobOutputReport(self, panda_id, attempt_nr):
2535 with self.proxyPool.get() as proxy:
2536 ret = proxy.getJobOutputReport(panda_id, attempt_nr)
2537 return ret
2538
2539
2540 def lockJobOutputReport(self, panda_id, attempt_nr, pid, time_limit, take_over_from=None):
2541 with self.proxyPool.get() as proxy:
2542 ret = proxy.lockJobOutputReport(panda_id, attempt_nr, pid, time_limit, take_over_from)
2543 return ret
2544
2545
2546 def unlockJobOutputReport(self, panda_id, attempt_nr, pid, lock_offset):
2547 with self.proxyPool.get() as proxy:
2548 ret = proxy.unlockJobOutputReport(panda_id, attempt_nr, pid, lock_offset)
2549 return ret
2550
2551
2552 def listJobOutputReport(
2553 self,
2554 only_unlocked=False,
2555 time_limit=5,
2556 limit=999999,
2557 grace_period=3,
2558 labels=None,
2559 anti_labels=None,
2560 ):
2561 with self.proxyPool.get() as proxy:
2562 ret = proxy.listJobOutputReport(only_unlocked, time_limit, limit, grace_period, labels, anti_labels)
2563 return ret
2564
2565
2566 def update_problematic_resource_info(self, user_name, jedi_task_id, resource, problem_type):
2567 with self.proxyPool.get() as proxy:
2568 ret = proxy.update_problematic_resource_info(user_name, jedi_task_id, resource, problem_type)
2569 return ret
2570
2571
2572 def send_command_to_job(self, panda_id, com):
2573 with self.proxyPool.get() as proxy:
2574 ret = proxy.send_command_to_job(panda_id, com)
2575 return ret
2576
2577
2578 def get_workers_to_synchronize(self):
2579 with self.proxyPool.get() as proxy:
2580 ret = proxy.get_workers_to_synchronize()
2581 return ret
2582
2583
2584 def set_user_secret(self, owner, key, value):
2585 with self.proxyPool.get() as proxy:
2586 ret = proxy.set_user_secret(owner, key, value)
2587 return ret
2588
2589
2590 def get_user_secrets(self, owner, keys=None, get_json=False):
2591 with self.proxyPool.get() as proxy:
2592 ret = proxy.get_user_secrets(owner, keys, get_json)
2593 return ret
2594
2595 def configurator_write_sites(self, sites_list):
2596 with self.proxyPool.get() as proxy:
2597 ret = proxy.configurator_write_sites(sites_list)
2598 return ret
2599
2600 def configurator_write_panda_sites(self, panda_site_list):
2601 with self.proxyPool.get() as proxy:
2602 ret = proxy.configurator_write_panda_sites(panda_site_list)
2603 return ret
2604
2605 def configurator_write_ddm_endpoints(self, ddm_endpoint_list):
2606 with self.proxyPool.get() as proxy:
2607 ret = proxy.configurator_write_ddm_endpoints(ddm_endpoint_list)
2608 return ret
2609
2610 def configurator_write_panda_ddm_relations(self, relation_list):
2611 with self.proxyPool.get() as proxy:
2612 ret = proxy.configurator_write_panda_ddm_relations(relation_list)
2613 return ret
2614
2615 def configurator_read_sites(self):
2616 with self.proxyPool.get() as proxy:
2617 ret = proxy.configurator_read_sites()
2618 return ret
2619
2620 def configurator_read_panda_sites(self):
2621 with self.proxyPool.get() as proxy:
2622 ret = proxy.configurator_read_panda_sites()
2623 return ret
2624
2625 def configurator_read_ddm_endpoints(self):
2626 with self.proxyPool.get() as proxy:
2627 ret = proxy.configurator_read_ddm_endpoints()
2628 return ret
2629
2630 def configurator_read_cric_sites(self):
2631 with self.proxyPool.get() as proxy:
2632 ret = proxy.configurator_read_cric_sites()
2633 return ret
2634
2635 def configurator_read_cric_panda_sites(self):
2636 with self.proxyPool.get() as proxy:
2637 ret = proxy.configurator_read_cric_panda_sites()
2638 return ret
2639
2640 def configurator_delete_sites(self, sites_to_delete):
2641 with self.proxyPool.get() as proxy:
2642 ret = proxy.configurator_delete_sites(sites_to_delete)
2643 return ret
2644
2645 def configurator_delete_panda_sites(self, panda_sites_to_delete):
2646 with self.proxyPool.get() as proxy:
2647 ret = proxy.configurator_delete_panda_sites(panda_sites_to_delete)
2648 return ret
2649
2650 def configurator_delete_ddm_endpoints(self, ddm_endpoints_to_delete):
2651 with self.proxyPool.get() as proxy:
2652 ret = proxy.configurator_delete_ddm_endpoints(ddm_endpoints_to_delete)
2653 return ret
2654
2655 def carbon_write_region_emissions(self, emissions):
2656 with self.proxyPool.get() as proxy:
2657 ret = proxy.carbon_write_region_emissions(emissions)
2658 return ret
2659
2660 def carbon_aggregate_emissions(self):
2661 with self.proxyPool.get() as proxy:
2662 ret = proxy.carbon_aggregate_emissions()
2663 return ret
2664
2665 def get_files_in_datasets(self, task_id, dataset_types):
2666 with self.proxyPool.get() as proxy:
2667 ret = proxy.get_files_in_datasets(task_id, dataset_types)
2668 return ret
2669
2670 def get_max_worker_id(self, harvester_id):
2671 with self.proxyPool.get() as proxy:
2672 ret = proxy.get_max_worker_id(harvester_id)
2673 return ret
2674
2675 def get_events_status(self, ids):
2676 with self.proxyPool.get() as proxy:
2677 ret = proxy.get_events_status(ids)
2678 return ret
2679
2680 def async_update_datasets(self, panda_id):
2681 with self.proxyPool.get() as proxy:
2682 ret = proxy.async_update_datasets(panda_id)
2683 return ret
2684
2685 def set_workload_metrics(self, jedi_task_id, panda_id, metrics):
2686 with self.proxyPool.get() as proxy:
2687 ret = proxy.set_workload_metrics(jedi_task_id, panda_id, metrics, True)
2688 return ret
2689
2690 def get_workload_metrics(self, jedi_task_id, panda_id):
2691 with self.proxyPool.get() as proxy:
2692 ret = proxy.get_workload_metrics(jedi_task_id, panda_id)
2693 return ret
2694
2695 def get_jobs_metrics_in_task(self, jedi_task_id: int):
2696 with self.proxyPool.get() as proxy:
2697 ret = proxy.get_jobs_metrics_in_task(jedi_task_id)
2698 return ret
2699
2700 def enable_job_cloning(self, jedi_task_id, mode, multiplicity, num_sites):
2701 with self.proxyPool.get() as proxy:
2702 ret = proxy.enable_job_cloning(jedi_task_id, mode, multiplicity, num_sites)
2703 return ret
2704
2705 def disable_job_cloning(self, jedi_task_id):
2706 with self.proxyPool.get() as proxy:
2707 ret = proxy.disable_job_cloning(jedi_task_id)
2708 return ret
2709
2710
2711 def get_num_jobs_with_status_by_nucleus(self, vo, job_status):
2712 with self.proxyPool.get() as proxy:
2713 return proxy.get_num_jobs_with_status_by_nucleus(vo, job_status)
2714
2715
2716
2717
2718 def getTaskWithID_JEDI(self, jediTaskID, fullFlag=False, lockTask=False, pid=None, lockInterval=None, clearError=False):
2719 with self.proxyPool.get() as proxy:
2720 return proxy.getTaskWithID_JEDI(jediTaskID, fullFlag, lockTask, pid, lockInterval, clearError)
2721
2722
2723 def updateInputFilesStaged_JEDI(self, jeditaskid, scope, filenames_dict, chunk_size=500, by=None, check_scope=True):
2724 with self.proxyPool.get() as proxy:
2725 return proxy.updateInputFilesStaged_JEDI(jeditaskid, scope, filenames_dict, chunk_size, by, check_scope)
2726
2727
2728
2729
2730 def get_data_carousel_request_id_by_dataset_JEDI(self, dataset):
2731 with self.proxyPool.get() as proxy:
2732 return proxy.get_data_carousel_request_id_by_dataset_JEDI(dataset)
2733
2734
2735 def insert_data_carousel_requests_JEDI(self, task_id, dc_req_specs):
2736 with self.proxyPool.get() as proxy:
2737 return proxy.insert_data_carousel_requests_JEDI(task_id, dc_req_specs)
2738
2739
2740 def update_data_carousel_request_JEDI(self, dc_req_spec):
2741 with self.proxyPool.get() as proxy:
2742 return proxy.update_data_carousel_request_JEDI(dc_req_spec)
2743
2744
2745 def insert_data_carousel_relations_JEDI(self, task_id, request_ids):
2746 with self.proxyPool.get() as proxy:
2747 return proxy.insert_data_carousel_relations_JEDI(task_id, request_ids)
2748
2749
2750 def get_data_carousel_queued_requests_JEDI(self):
2751 with self.proxyPool.get() as proxy:
2752 return proxy.get_data_carousel_queued_requests_JEDI()
2753
2754
2755 def get_data_carousel_requests_by_task_status_JEDI(self, status_filter_list=None, status_exclusion_list=None):
2756 with self.proxyPool.get() as proxy:
2757 return proxy.get_data_carousel_requests_by_task_status_JEDI(status_filter_list=status_filter_list, status_exclusion_list=status_exclusion_list)
2758
2759
2760 def get_related_tasks_of_data_carousel_request_JEDI(self, request_id, status_filter_list=None, status_exclusion_list=None):
2761 with self.proxyPool.get() as proxy:
2762 return proxy.get_related_tasks_of_data_carousel_request_JEDI(
2763 request_id, status_filter_list=status_filter_list, status_exclusion_list=status_exclusion_list
2764 )
2765
2766
2767 def get_data_carousel_staging_requests_JEDI(self):
2768 with self.proxyPool.get() as proxy:
2769 return proxy.get_data_carousel_staging_requests_JEDI()
2770
2771
2772 def delete_data_carousel_requests_JEDI(self, request_id_list):
2773 with self.proxyPool.get() as proxy:
2774 return proxy.delete_data_carousel_requests_JEDI(request_id_list)
2775
2776
2777 def clean_up_data_carousel_requests_JEDI(self, time_limit_days=30):
2778 with self.proxyPool.get() as proxy:
2779 return proxy.clean_up_data_carousel_requests_JEDI(time_limit_days)
2780
2781
2782 def cancel_data_carousel_request_JEDI(self, request_id):
2783 with self.proxyPool.get() as proxy:
2784 return proxy.cancel_data_carousel_request_JEDI(request_id)
2785
2786
2787 def retire_data_carousel_request_JEDI(self, request_id):
2788 with self.proxyPool.get() as proxy:
2789 return proxy.retire_data_carousel_request_JEDI(request_id)
2790
2791
2792 def resubmit_data_carousel_request_JEDI(self, request_id, exclude_prev_dst=False):
2793 with self.proxyPool.get() as proxy:
2794 return proxy.resubmit_data_carousel_request_JEDI(request_id, exclude_prev_dst)
2795
2796
2797
2798 def get_workflow(self, workflow_id):
2799 with self.proxyPool.get() as proxy:
2800 return proxy.get_workflow(workflow_id)
2801
2802 def get_workflow_step(self, step_id):
2803 with self.proxyPool.get() as proxy:
2804 return proxy.get_workflow_step(step_id)
2805
2806 def get_workflow_data(self, data_id):
2807 with self.proxyPool.get() as proxy:
2808 return proxy.get_workflow_data(data_id)
2809
2810 def get_workflow_data_by_name(self, name, workflow_id=None):
2811 with self.proxyPool.get() as proxy:
2812 return proxy.get_workflow_data_by_name(name, workflow_id)
2813
2814 def get_steps_of_workflow(self, workflow_id, status_filter_list=None, status_exclusion_list=None):
2815 with self.proxyPool.get() as proxy:
2816 return proxy.get_steps_of_workflow(workflow_id, status_filter_list, status_exclusion_list)
2817
2818 def get_data_of_workflow(self, workflow_id, status_filter_list=None, status_exclusion_list=None, type_filter_list=None):
2819 with self.proxyPool.get() as proxy:
2820 return proxy.get_data_of_workflow(workflow_id, status_filter_list, status_exclusion_list, type_filter_list)
2821
2822 def query_workflows(self, status_filter_list=None, status_exclusion_list=None, check_interval_sec=300):
2823 with self.proxyPool.get() as proxy:
2824 return proxy.query_workflows(status_filter_list, status_exclusion_list, check_interval_sec)
2825
2826 def lock_workflow(self, workflow_id, locked_by, lock_expiration_sec=120):
2827 with self.proxyPool.get() as proxy:
2828 return proxy.lock_workflow(workflow_id, locked_by, lock_expiration_sec)
2829
2830 def unlock_workflow(self, workflow_id, locked_by):
2831 with self.proxyPool.get() as proxy:
2832 return proxy.unlock_workflow(workflow_id, locked_by)
2833
2834 def lock_workflow_step(self, step_id, locked_by, lock_expiration_sec=120):
2835 with self.proxyPool.get() as proxy:
2836 return proxy.lock_workflow_step(step_id, locked_by, lock_expiration_sec)
2837
2838 def unlock_workflow_step(self, step_id, locked_by):
2839 with self.proxyPool.get() as proxy:
2840 return proxy.unlock_workflow_step(step_id, locked_by)
2841
2842 def lock_workflow_data(self, data_id, locked_by, lock_expiration_sec=120):
2843 with self.proxyPool.get() as proxy:
2844 return proxy.lock_workflow_data(data_id, locked_by, lock_expiration_sec)
2845
2846 def unlock_workflow_data(self, data_id, locked_by):
2847 with self.proxyPool.get() as proxy:
2848 return proxy.unlock_workflow_data(data_id, locked_by)
2849
2850 def insert_workflow(self, workflow_spec):
2851 with self.proxyPool.get() as proxy:
2852 return proxy.insert_workflow(workflow_spec)
2853
2854 def insert_workflow_step(self, wf_step_spec):
2855 with self.proxyPool.get() as proxy:
2856 return proxy.insert_workflow_step(wf_step_spec)
2857
2858 def insert_workflow_data(self, wf_data_spec):
2859 with self.proxyPool.get() as proxy:
2860 return proxy.insert_workflow_data(wf_data_spec)
2861
2862 def update_workflow(self, workflow_spec):
2863 with self.proxyPool.get() as proxy:
2864 return proxy.update_workflow(workflow_spec)
2865
2866 def update_workflow_step(self, wf_step_spec):
2867 with self.proxyPool.get() as proxy:
2868 return proxy.update_workflow_step(wf_step_spec)
2869
2870 def update_workflow_data(self, wf_data_spec):
2871 with self.proxyPool.get() as proxy:
2872 return proxy.update_workflow_data(wf_data_spec)
2873
2874 def upsert_workflow_entities(self, workflow_id, actions_dict=None, workflow_spec=None, step_specs=None, data_specs=None):
2875 with self.proxyPool.get() as proxy:
2876 return proxy.upsert_workflow_entities(workflow_id, actions_dict, workflow_spec, step_specs, data_specs)
2877
2878 def get_distinct_resource_types_per_site(self, jedi_task_id, threshold=20.0):
2879 with self.proxyPool.get() as proxy:
2880 ret = proxy.get_distinct_resource_types_per_site(jedi_task_id, threshold)
2881 return ret
2882
2883
2884
2885
2886
2887 taskBuffer = TaskBuffer()