File indexing completed on 2026-04-10 08:39:03
0001 import datetime
0002 import json
0003 import random
0004 import re
0005 import time
0006
0007 from pandacommon.pandalogger.LogWrapper import LogWrapper
0008 from pandacommon.pandautils.PandaUtils import get_sql_IN_bind_variables, naive_utcnow
0009
0010 from pandaserver.config import panda_config
0011 from pandaserver.srvcore import CoreUtils, srv_msg_utils
0012 from pandaserver.taskbuffer import EventServiceUtils, JobUtils
0013 from pandaserver.taskbuffer.db_proxy_mods.base_module import BaseModule
0014 from pandaserver.taskbuffer.FileSpec import FileSpec
0015 from pandaserver.taskbuffer.JobSpec import JobSpec
0016
0017
0018
0019 class JobStandaloneModule(BaseModule):
0020
0021 def __init__(self, log_stream: LogWrapper):
0022 super().__init__(log_stream)
0023
0024
0025 def activateJob(self, job):
0026 comment = " /* DBProxy.activateJob */"
0027 if job is None:
0028 tmp_id = None
0029 else:
0030 tmp_id = job.PandaID
0031 tmp_log = self.create_tagged_logger(comment, f"PandaID={tmp_id}")
0032 updatedFlag = False
0033 if job is None:
0034 tmp_log.debug("skip job=None")
0035 return True
0036 tmp_log.debug("start")
0037 sql0 = "SELECT row_ID FROM ATLAS_PANDA.filesTable4 WHERE PandaID=:PandaID AND type=:type AND NOT status IN (:status1,:status2) "
0038 sql1 = "DELETE FROM ATLAS_PANDA.jobsDefined4 "
0039 sql1 += "WHERE PandaID=:PandaID AND (jobStatus=:oldJobStatus1 OR jobStatus=:oldJobStatus2) AND commandToPilot IS NULL"
0040 sql2 = f"INSERT INTO ATLAS_PANDA.jobsActive4 ({JobSpec.columnNames()}) "
0041 sql2 += JobSpec.bindValuesExpression()
0042
0043 job.modificationTime = naive_utcnow()
0044
0045 if job.jobStatus in ["defined"]:
0046 job.stateChangeTime = job.modificationTime
0047 nTry = 3
0048 to_push = False
0049 for iTry in range(nTry):
0050 try:
0051
0052 allOK = True
0053 for file in job.Files:
0054 if file.type == "input" and file.status not in ["ready", "cached"]:
0055 allOK = False
0056 break
0057
0058 self.conn.begin()
0059
0060 varMap = {}
0061 varMap[":type"] = "input"
0062 varMap[":status1"] = "ready"
0063 varMap[":status2"] = "cached"
0064 varMap[":PandaID"] = job.PandaID
0065 self.cur.arraysize = 100
0066 self.cur.execute(sql0 + comment, varMap)
0067 res = self.cur.fetchall()
0068 if len(res) == 0 or allOK:
0069
0070 job.jobStatus = "activated"
0071
0072
0073 varMap = {}
0074 varMap[":PandaID"] = job.PandaID
0075 varMap[":oldJobStatus1"] = "assigned"
0076 varMap[":oldJobStatus2"] = "defined"
0077 self.cur.execute(sql1 + comment, varMap)
0078 n = self.cur.rowcount
0079 if n == 0:
0080
0081 tmp_log.debug("Job not found to activate")
0082 else:
0083
0084 self.cur.execute(sql2 + comment, job.valuesMap())
0085
0086 for file in job.Files:
0087 sqlF = f"UPDATE ATLAS_PANDA.filesTable4 SET {file.bindUpdateChangesExpression()}" + "WHERE row_ID=:row_ID"
0088 varMap = file.valuesMap(onlyChanged=True)
0089 if varMap != {}:
0090 varMap[":row_ID"] = file.row_ID
0091 tmp_log.debug(sqlF + comment + str(varMap))
0092 self.cur.execute(sqlF + comment, varMap)
0093
0094 sqlJob = "UPDATE ATLAS_PANDA.jobParamsTable SET jobParameters=:param WHERE PandaID=:PandaID"
0095 varMap = {}
0096 varMap[":PandaID"] = job.PandaID
0097 varMap[":param"] = job.jobParameters
0098 self.cur.execute(sqlJob + comment, varMap)
0099 updatedFlag = True
0100 to_push = job.is_push_job()
0101 else:
0102
0103 sqlJ = (
0104 f"UPDATE ATLAS_PANDA.jobsDefined4 SET {job.bindUpdateChangesExpression()} "
0105 ) + "WHERE PandaID=:PandaID AND (jobStatus=:oldJobStatus1 OR jobStatus=:oldJobStatus2)"
0106 varMap = job.valuesMap(onlyChanged=True)
0107 varMap[":PandaID"] = job.PandaID
0108 varMap[":oldJobStatus1"] = "assigned"
0109 varMap[":oldJobStatus2"] = "defined"
0110 tmp_log.debug(sqlJ + comment + str(varMap))
0111 self.cur.execute(sqlJ + comment, varMap)
0112 n = self.cur.rowcount
0113 if n == 0:
0114
0115 tmp_log.debug("Job not found to update")
0116 else:
0117
0118 for file in job.Files:
0119 sqlF = f"UPDATE ATLAS_PANDA.filesTable4 SET {file.bindUpdateChangesExpression()}" + "WHERE row_ID=:row_ID"
0120 varMap = file.valuesMap(onlyChanged=True)
0121 if varMap != {}:
0122 varMap[":row_ID"] = file.row_ID
0123 tmp_log.debug(sqlF + comment + str(varMap))
0124 self.cur.execute(sqlF + comment, varMap)
0125
0126 sqlJob = "UPDATE ATLAS_PANDA.jobParamsTable SET jobParameters=:param WHERE PandaID=:PandaID"
0127 varMap = {}
0128 varMap[":PandaID"] = job.PandaID
0129 varMap[":param"] = job.jobParameters
0130 self.cur.execute(sqlJob + comment, varMap)
0131 updatedFlag = True
0132
0133 if not self._commit():
0134 raise RuntimeError("Commit error")
0135
0136 try:
0137 if updatedFlag:
0138 self.recordStatusChange(job.PandaID, job.jobStatus, jobInfo=job)
0139 except Exception:
0140 tmp_log.error("recordStatusChange failed")
0141 self.push_job_status_message(job, job.PandaID, job.jobStatus)
0142
0143 if to_push:
0144 mb_proxy_queue = self.get_mb_proxy("panda_pilot_queue")
0145 mb_proxy_topic = self.get_mb_proxy("panda_pilot_topic")
0146 if mb_proxy_queue and mb_proxy_topic:
0147 tmp_log.debug("push job")
0148 srv_msg_utils.send_job_message(mb_proxy_queue, mb_proxy_topic, job.jediTaskID, job.PandaID)
0149 else:
0150 tmp_log.debug("message queue/topic not configured")
0151 tmp_log.debug("done")
0152 return True
0153 except Exception as e:
0154
0155 self._rollback()
0156 if iTry + 1 < nTry:
0157 tmp_log.debug(f"retry: {iTry}")
0158 time.sleep(random.randint(10, 20))
0159 continue
0160 self.dump_error_message(tmp_log)
0161 return False
0162
0163
0164 def keepJob(self, job):
0165 comment = " /* DBProxy.keepJob */"
0166 tmp_log = self.create_tagged_logger(comment, f"PandaID={job.PandaID}")
0167 tmp_log.debug("start")
0168
0169 job.jobStatus = "waiting"
0170 sql1 = f"UPDATE ATLAS_PANDA.jobsDefined4 SET {job.bindUpdateChangesExpression()} "
0171 sql1 += "WHERE PandaID=:PandaID AND (jobStatus=:oldJobStatus1 OR jobStatus=:oldJobStatus2) AND commandToPilot IS NULL"
0172
0173 job.modificationTime = naive_utcnow()
0174 job.stateChangeTime = job.modificationTime
0175 updatedFlag = False
0176 nTry = 3
0177 for iTry in range(nTry):
0178 try:
0179
0180 self.conn.begin()
0181
0182 varMap = job.valuesMap(onlyChanged=True)
0183 varMap[":PandaID"] = job.PandaID
0184 varMap[":oldJobStatus1"] = "assigned"
0185 varMap[":oldJobStatus2"] = "defined"
0186 self.cur.execute(sql1 + comment, varMap)
0187 n = self.cur.rowcount
0188 if n == 0:
0189
0190 tmp_log.debug(f"Not found")
0191 else:
0192
0193 for file in job.Files:
0194 sqlF = f"UPDATE ATLAS_PANDA.filesTable4 SET {file.bindUpdateChangesExpression()}" + "WHERE row_ID=:row_ID"
0195 varMap = file.valuesMap(onlyChanged=True)
0196 if varMap != {}:
0197 varMap[":row_ID"] = file.row_ID
0198 self.cur.execute(sqlF + comment, varMap)
0199
0200 sqlJob = "UPDATE ATLAS_PANDA.jobParamsTable SET jobParameters=:param WHERE PandaID=:PandaID"
0201 varMap = {}
0202 varMap[":PandaID"] = job.PandaID
0203 varMap[":param"] = job.jobParameters
0204 self.cur.execute(sqlJob + comment, varMap)
0205 updatedFlag = True
0206
0207 if not self._commit():
0208 raise RuntimeError("Commit error")
0209
0210 try:
0211 if updatedFlag:
0212 self.recordStatusChange(job.PandaID, job.jobStatus, jobInfo=job)
0213 self.push_job_status_message(job, job.PandaID, job.jobStatus)
0214 except Exception:
0215 tmp_log.error("recordStatusChange in keepJob")
0216 return True
0217 except Exception:
0218
0219 self._rollback()
0220 if iTry + 1 < nTry:
0221 tmp_log.debug(f"retry : {iTry}")
0222 time.sleep(random.randint(10, 20))
0223 continue
0224
0225 self.dump_error_message(tmp_log)
0226 return False
0227
0228
0229 def resetJob(
0230 self,
0231 pandaID,
0232 activeTable=True,
0233 keepSite=False,
0234 getOldSubs=False,
0235 forPending=True,
0236 ):
0237 comment = " /* DBProxy.resetJob */"
0238 tmp_log = self.create_tagged_logger(comment, f"PandaID={pandaID}")
0239 tmp_log.debug(f"activeTable={activeTable}")
0240
0241 table = "ATLAS_PANDA.jobsActive4"
0242 sql1 = f"SELECT {JobSpec.columnNames()} FROM {table} "
0243 sql1 += "WHERE PandaID=:PandaID"
0244 sql2 = f"DELETE FROM {table} "
0245 sql2 += "WHERE PandaID=:PandaID AND (jobStatus=:oldJobStatus1 OR jobStatus=:oldJobStatus2)"
0246 sql3 = f"INSERT INTO ATLAS_PANDA.jobsDefined4 ({JobSpec.columnNames()}) "
0247 sql3 += JobSpec.bindValuesExpression()
0248 try:
0249
0250 self.conn.begin()
0251
0252 varMap = {}
0253 varMap[":PandaID"] = pandaID
0254 self.cur.arraysize = 10
0255 self.cur.execute(sql1 + comment, varMap)
0256 res = self.cur.fetchone()
0257
0258 if res is None:
0259
0260 if not self._commit():
0261 raise RuntimeError("Commit error")
0262
0263 return None
0264
0265 job = JobSpec()
0266 job.pack(res)
0267
0268 if job.jobStatus != "waiting" and job.jobStatus != "activated" and (forPending and job.jobStatus != "pending"):
0269
0270 if not self._commit():
0271 raise RuntimeError("Commit error")
0272
0273 return None
0274
0275 if job.prodSourceLabel in ["user", "panda"] and not forPending and job.jobStatus != "pending":
0276
0277 if not self._commit():
0278 raise RuntimeError("Commit error")
0279
0280 return None
0281
0282 varMap = {}
0283 varMap[":PandaID"] = pandaID
0284 if not forPending:
0285 varMap[":oldJobStatus1"] = "waiting"
0286 else:
0287 varMap[":oldJobStatus1"] = "pending"
0288 varMap[":oldJobStatus2"] = "activated"
0289 self.cur.execute(sql2 + comment, varMap)
0290 retD = self.cur.rowcount
0291
0292 tmp_log.debug(f"retD = {retD}")
0293 if retD != 1:
0294
0295 if not self._commit():
0296 raise RuntimeError("Commit error")
0297 return None
0298
0299 varMap = {}
0300 varMap[":PandaID"] = pandaID
0301 sqlD = "DELETE FROM ATLAS_PANDA.jobsDefined4 WHERE PandaID=:PandaID"
0302 self.cur.execute(sqlD + comment, varMap)
0303
0304 if job.jobStatus == "activated" and job.currentPriority < 100:
0305 job.currentPriority = 100
0306
0307 job.jobStatus = "defined"
0308 if job.prodSourceLabel not in ["user", "panda"]:
0309 job.dispatchDBlock = None
0310
0311 if (not keepSite) and job.relocationFlag not in [1, 2]:
0312 job.computingSite = None
0313 job.computingElement = None
0314
0315 job.modificationHost = self.hostname
0316 job.modificationTime = naive_utcnow()
0317 job.stateChangeTime = job.modificationTime
0318
0319 job.brokerageErrorDiag = None
0320 job.brokerageErrorCode = None
0321
0322 self.cur.execute(sql3 + comment, job.valuesMap())
0323
0324 sqlJobP = "SELECT jobParameters FROM ATLAS_PANDA.jobParamsTable WHERE PandaID=:PandaID"
0325 self.cur.execute(sqlJobP + comment, varMap)
0326 for (clobJobP,) in self.cur:
0327 try:
0328 job.jobParameters = clobJobP.read()
0329 except AttributeError:
0330 job.jobParameters = str(clobJobP)
0331 break
0332
0333 oldSubList = []
0334 sqlFile = f"SELECT {FileSpec.columnNames()} FROM ATLAS_PANDA.filesTable4 "
0335 sqlFile += "WHERE PandaID=:PandaID"
0336 self.cur.arraysize = 10000
0337 self.cur.execute(sqlFile + comment, varMap)
0338 resFs = self.cur.fetchall()
0339 for resF in resFs:
0340 file = FileSpec()
0341 file.pack(resF)
0342
0343 if file.status == "missing":
0344 file.GUID = None
0345
0346 if job.prodSourceLabel in ["managed", "test"] and file.type in ["output", "log"] and re.search("_sub\d+$", file.destinationDBlock) is not None:
0347 if file.destinationDBlock not in oldSubList:
0348 oldSubList.append(file.destinationDBlock)
0349
0350 if job.lockedby != "jedi":
0351 file.status = "unknown"
0352 if job.prodSourceLabel not in ["user", "panda"]:
0353 file.dispatchDBlock = None
0354 file.destinationDBlock = re.sub("_sub\d+$", "", file.destinationDBlock)
0355
0356 job.addFile(file)
0357
0358 sqlF = f"UPDATE ATLAS_PANDA.filesTable4 SET {file.bindUpdateChangesExpression()}" + "WHERE row_ID=:row_ID"
0359 varMap = file.valuesMap(onlyChanged=True)
0360 if varMap != {}:
0361 varMap[":row_ID"] = file.row_ID
0362 tmp_log.debug(sqlF + comment + str(varMap))
0363 self.cur.execute(sqlF + comment, varMap)
0364
0365 if not self._commit():
0366 raise RuntimeError("Commit error")
0367
0368 try:
0369 self.recordStatusChange(job.PandaID, job.jobStatus, jobInfo=job)
0370 except Exception:
0371 tmp_log.error("recordStatusChange in resetJobs")
0372 self.push_job_status_message(job, job.PandaID, job.jobStatus)
0373 tmp_log.debug(f"done with {job is not None}")
0374 if getOldSubs:
0375 return job, oldSubList
0376 return job
0377 except Exception:
0378
0379 self._rollback()
0380
0381 self.dump_error_message(tmp_log)
0382 return None
0383
0384
0385 def resetDefinedJob(self, pandaID):
0386 comment = " /* DBProxy.resetDefinedJob */"
0387 tmp_log = self.create_tagged_logger(comment, f"PandaID={pandaID}")
0388 tmp_log.debug("start")
0389 sql1 = "UPDATE ATLAS_PANDA.jobsDefined4 SET "
0390 sql1 += "jobStatus=:newJobStatus,"
0391 sql1 += "modificationTime=CURRENT_DATE,"
0392 sql1 += "modificationHost=:modificationHost"
0393 sql1 += " WHERE PandaID=:PandaID AND jobStatus IN (:oldJobStatus1,:oldJobStatus2,:oldJobStatus3) "
0394 sql2 = f"SELECT {JobSpec.columnNames()} FROM ATLAS_PANDA.jobsDefined4 "
0395 sql2 += "WHERE PandaID=:PandaID"
0396 try:
0397
0398 self.conn.begin()
0399
0400 varMap = {}
0401 varMap[":PandaID"] = pandaID
0402 varMap[":newJobStatus"] = "defined"
0403 varMap[":oldJobStatus1"] = "assigned"
0404 varMap[":oldJobStatus2"] = "defined"
0405 varMap[":oldJobStatus3"] = "pending"
0406 varMap[":modificationHost"] = self.hostname
0407 self.cur.execute(sql1 + comment, varMap)
0408 retU = self.cur.rowcount
0409
0410 updatedFlag = True
0411 job = None
0412 if retU == 0:
0413 tmp_log.debug("Not found for UPDATE")
0414 updatedFlag = False
0415 else:
0416
0417 varMap = {}
0418 varMap[":PandaID"] = pandaID
0419 self.cur.arraysize = 10
0420 self.cur.execute(sql2 + comment, varMap)
0421 res = self.cur.fetchone()
0422
0423 if res is None:
0424 raise RuntimeError(f"Not found for SELECT")
0425
0426 job = JobSpec()
0427 job.pack(res)
0428
0429 sqlJobP = "SELECT jobParameters FROM ATLAS_PANDA.jobParamsTable WHERE PandaID=:PandaID"
0430 self.cur.execute(sqlJobP + comment, varMap)
0431 for (clobJobP,) in self.cur:
0432 try:
0433 job.jobParameters = clobJobP.read()
0434 except AttributeError:
0435 job.jobParameters = str(clobJobP)
0436 break
0437
0438 sqlFile = f"SELECT {FileSpec.columnNames()} FROM ATLAS_PANDA.filesTable4 "
0439 sqlFile += "WHERE PandaID=:PandaID"
0440 self.cur.arraysize = 10000
0441 self.cur.execute(sqlFile + comment, varMap)
0442 resFs = self.cur.fetchall()
0443 for resF in resFs:
0444 file = FileSpec()
0445 file.pack(resF)
0446
0447 job.addFile(file)
0448
0449 if not self._commit():
0450 raise RuntimeError("Commit error")
0451
0452 try:
0453 if updatedFlag:
0454 self.recordStatusChange(job.PandaID, job.jobStatus, jobInfo=job)
0455 self.push_job_status_message(job, job.PandaID, job.jobStatus)
0456 except Exception:
0457 tmp_log.error("recordStatusChange in resetDefinedJobs")
0458 tmp_log.debug(f"done with {job is not None}")
0459 return job
0460 except Exception:
0461 self.dump_error_message(tmp_log)
0462
0463 self._rollback()
0464 return None
0465
0466
0467 def peekJob(self, pandaID, fromDefined, fromActive, fromArchived, fromWaiting, forAnal=False):
0468 comment = " /* DBProxy.peekJob */"
0469 tmp_log = self.create_tagged_logger(comment, f"PandaID={pandaID}")
0470
0471 if pandaID in ["NULL", "", "None", None]:
0472 return None
0473
0474 try:
0475 _ = int(pandaID)
0476 except Exception:
0477 tmp_log.debug(f"return None for {pandaID}:non-integer")
0478 return None
0479 sql1_0 = "SELECT %s FROM %s "
0480 sql1_1 = "WHERE PandaID=:PandaID"
0481 nTry = 3
0482 for iTry in range(nTry):
0483 try:
0484 tables = []
0485 if fromDefined or fromWaiting:
0486 tables.append("ATLAS_PANDA.jobsDefined4")
0487 if fromActive:
0488 tables.append("ATLAS_PANDA.jobsActive4")
0489 if fromArchived:
0490 tables.append("ATLAS_PANDA.jobsArchived4")
0491 if fromDefined:
0492
0493 tables.append("ATLAS_PANDA.jobsDefined4")
0494
0495 varMap = {}
0496 varMap[":PandaID"] = pandaID
0497 for table in tables:
0498
0499 self.conn.begin()
0500
0501 sql = sql1_0 % (JobSpec.columnNames(), table) + sql1_1
0502 self.cur.arraysize = 10
0503 self.cur.execute(sql + comment, varMap)
0504 res = self.cur.fetchall()
0505
0506 if not self._commit():
0507 raise RuntimeError("Commit error")
0508 if len(res) != 0:
0509
0510 job = JobSpec()
0511 job.pack(res[0])
0512
0513
0514 self.conn.begin()
0515
0516 sqlFile = f"SELECT {FileSpec.columnNames()} FROM ATLAS_PANDA.filesTable4 "
0517 sqlFile += "WHERE PandaID=:PandaID"
0518 self.cur.arraysize = 10000
0519 self.cur.execute(sqlFile + comment, varMap)
0520 resFs = self.cur.fetchall()
0521
0522 resMeta = None
0523 if table == "ATLAS_PANDA.jobsArchived4" or forAnal:
0524
0525 sqlMeta = "SELECT metaData FROM ATLAS_PANDA.metaTable WHERE PandaID=:PandaID"
0526 self.cur.execute(sqlMeta + comment, varMap)
0527 for (clobMeta,) in self.cur:
0528 if clobMeta is not None:
0529 try:
0530 resMeta = clobMeta.read()
0531 except AttributeError:
0532 resMeta = str(clobMeta)
0533 break
0534
0535 job.jobParameters = None
0536 sqlJobP = "SELECT jobParameters FROM ATLAS_PANDA.jobParamsTable WHERE PandaID=:PandaID"
0537 varMap = {}
0538 varMap[":PandaID"] = job.PandaID
0539 self.cur.execute(sqlJobP + comment, varMap)
0540 for (clobJobP,) in self.cur:
0541 if clobJobP is not None:
0542 try:
0543 job.jobParameters = clobJobP.read()
0544 except AttributeError:
0545 job.jobParameters = str(clobJobP)
0546 break
0547
0548 if not self._commit():
0549 raise RuntimeError("Commit error")
0550
0551 for resF in resFs:
0552 file = FileSpec()
0553 file.pack(resF)
0554 job.addFile(file)
0555
0556 job.metadata = resMeta
0557 return job
0558 tmp_log.debug(f"not found")
0559 return None
0560 except Exception:
0561
0562 self._rollback()
0563 if iTry + 1 < nTry:
0564 tmp_log.debug(f"retry : {iTry}")
0565 time.sleep(random.randint(10, 20))
0566 continue
0567 self.dump_error_message(tmp_log)
0568
0569 if forAnal:
0570 return None
0571
0572 job = JobSpec()
0573 job.PandaID = pandaID
0574 job.jobStatus = "unknown"
0575 return job
0576
0577
0578 def getExpressJobs(self, dn):
0579 comment = " /* DBProxy.getExpressJobs */"
0580 tmp_log = self.create_tagged_logger(comment, f"DN={dn}")
0581 tmp_log.debug(f"start")
0582 sqlX = "SELECT specialHandling,COUNT(*) FROM %s "
0583 sqlX += "WHERE prodUserName=:prodUserName AND prodSourceLabel=:prodSourceLabel1 "
0584 sqlX += "AND specialHandling IS NOT NULL "
0585 sqlXJob = "SELECT PandaID,jobStatus,prodSourceLabel,modificationTime,jobDefinitionID,jobsetID,startTime,endTime FROM %s "
0586 sqlXJob += "WHERE prodUserName=:prodUserName AND prodSourceLabel=:prodSourceLabel1 "
0587 sqlXJob += "AND specialHandling IS NOT NULL AND specialHandling=:specialHandling "
0588 sqlQ = sqlX
0589 sqlQ += "GROUP BY specialHandling "
0590 sqlQJob = sqlXJob
0591 sqlA = sqlX
0592 sqlA += "AND modificationTime>:modificationTime GROUP BY specialHandling "
0593 sqlAJob = sqlXJob
0594 sqlAJob += "AND modificationTime>:modificationTime "
0595 try:
0596
0597 compactDN = CoreUtils.clean_user_id(dn)
0598 if compactDN in ["", "NULL", None]:
0599 compactDN = dn
0600 expressStr = "express"
0601 activeExpressU = []
0602 timeUsageU = datetime.timedelta(0)
0603 executionTimeU = datetime.timedelta(hours=1)
0604 jobCreditU = 3
0605 timeCreditU = executionTimeU * jobCreditU
0606 timeNow = naive_utcnow()
0607 timeLimit = timeNow - datetime.timedelta(hours=6)
0608
0609 for table in [
0610 "ATLAS_PANDA.jobsDefined4",
0611 "ATLAS_PANDA.jobsActive4",
0612 "ATLAS_PANDA.jobsArchived4",
0613 ]:
0614 varMap = {}
0615 varMap[":prodUserName"] = compactDN
0616 varMap[":prodSourceLabel1"] = "user"
0617 if table == "ATLAS_PANDA.jobsArchived4":
0618 varMap[":modificationTime"] = timeLimit
0619 sql = sqlA % table
0620 sqlJob = sqlAJob % table
0621 else:
0622 sql = sqlQ % table
0623 sqlJob = sqlQJob % table
0624
0625 self.conn.begin()
0626
0627 self.cur.arraysize = 10
0628 tmp_log.debug(sql + comment + str(varMap))
0629 self.cur.execute(sql + comment, varMap)
0630 res = self.cur.fetchall()
0631 tmp_log.debug(f"{str(res)}")
0632 for specialHandling, countJobs in res:
0633 if specialHandling is None:
0634 continue
0635
0636 if expressStr in specialHandling:
0637 varMap[":specialHandling"] = specialHandling
0638 self.cur.arraysize = 1000
0639 self.cur.execute(sqlJob + comment, varMap)
0640 resJobs = self.cur.fetchall()
0641 tmp_log.debug(f"{str(resJobs)}")
0642 for (
0643 tmp_PandaID,
0644 tmp_jobStatus,
0645 tmp_prodSourceLabel,
0646 tmp_modificationTime,
0647 tmp_jobDefinitionID,
0648 tmp_jobsetID,
0649 tmp_startTime,
0650 tmp_endTime,
0651 ) in resJobs:
0652
0653 if tmp_jobStatus not in [
0654 "finished",
0655 "failed",
0656 "cancelled",
0657 "closed",
0658 ]:
0659 activeExpressU.append((tmp_PandaID, tmp_jobsetID, tmp_jobDefinitionID))
0660
0661 if tmp_jobStatus not in ["defined", "activated"]:
0662
0663 if tmp_startTime is not None:
0664
0665 if tmp_endTime is None:
0666
0667 if timeLimit > tmp_startTime:
0668 timeDelta = timeNow - timeLimit
0669 else:
0670 timeDelta = timeNow - tmp_startTime
0671 else:
0672
0673 if timeLimit > tmp_startTime:
0674 timeDelta = tmp_endTime - timeLimit
0675 else:
0676 timeDelta = tmp_endTime - tmp_startTime
0677
0678 if timeDelta > datetime.timedelta(0):
0679 timeUsageU += timeDelta
0680
0681 if not self._commit():
0682 raise RuntimeError("Commit error")
0683
0684 rRet = True
0685 rRetStr = ""
0686 rQuota = 0
0687 if len(activeExpressU) >= jobCreditU:
0688 rRetStr += f"The number of queued runXYZ exceeds the limit = {jobCreditU}. "
0689 rRet = False
0690 if timeUsageU >= timeCreditU:
0691 rRetStr += f"The total execution time for runXYZ exceeds the limit = {timeCreditU.seconds / 60} min. "
0692 rRet = False
0693
0694 if rRet:
0695 tmpQuota = jobCreditU - len(activeExpressU) - timeUsageU.seconds / executionTimeU.seconds
0696 if tmpQuota < 0:
0697 rRetStr += "Quota for runXYZ exceeds. "
0698 rRet = False
0699 else:
0700 rQuota = tmpQuota
0701
0702 retVal = {
0703 "status": rRet,
0704 "quota": rQuota,
0705 "output": rRetStr,
0706 "usage": timeUsageU,
0707 "jobs": activeExpressU,
0708 }
0709 tmp_log.debug(f"{str(retVal)}")
0710 return retVal
0711 except Exception:
0712
0713 self._rollback()
0714 self.dump_error_message(tmp_log)
0715 return None
0716
0717
0718 def getActiveDebugJobs(self, dn=None, workingGroup=None, prodRole=False):
0719 comment = " /* DBProxy.getActiveDebugJobs */"
0720 tmp_log = self.create_tagged_logger(comment, f"DN={dn}")
0721 tmp_log.debug(f"wg={workingGroup} prodRole={prodRole}")
0722 varMap = {}
0723 sqlX = "SELECT PandaID,jobStatus,specialHandling FROM %s "
0724 sqlX += "WHERE "
0725 if prodRole:
0726 pass
0727 elif workingGroup is not None:
0728 sqlX += "UPPER(workingGroup) IN (:wg1,:wg2) AND "
0729 varMap[":wg1"] = f"AP_{workingGroup.upper()}"
0730 varMap[":wg2"] = f"GP_{workingGroup.upper()}"
0731 else:
0732 sqlX += "prodUserName=:prodUserName AND "
0733
0734 compactDN = CoreUtils.clean_user_id(dn)
0735 if compactDN in ["", "NULL", None]:
0736 compactDN = dn
0737 varMap[":prodUserName"] = compactDN
0738 sqlX += "specialHandling IS NOT NULL "
0739 try:
0740 debugStr = "debug"
0741 activeDebugJobs = []
0742
0743 for table in ["ATLAS_PANDA.jobsDefined4", "ATLAS_PANDA.jobsActive4"]:
0744 sql = sqlX % table
0745
0746 self.conn.begin()
0747
0748 self.cur.arraysize = 100000
0749 self.cur.execute(sql + comment, varMap)
0750 res = self.cur.fetchall()
0751
0752 if not self._commit():
0753 raise RuntimeError("Commit error")
0754
0755 for pandaID, jobStatus, specialHandling in res:
0756 if specialHandling is None:
0757 continue
0758
0759 if jobStatus not in [
0760 "defined",
0761 "activated",
0762 "running",
0763 "sent",
0764 "starting",
0765 ]:
0766 continue
0767
0768 if debugStr in specialHandling and pandaID not in activeDebugJobs:
0769 activeDebugJobs.append(pandaID)
0770
0771 activeDebugJobs.sort()
0772 tmp_log.debug(f"{str(activeDebugJobs)}")
0773 return activeDebugJobs
0774 except Exception:
0775
0776 self._rollback()
0777 self.dump_error_message(tmp_log)
0778 return None
0779
0780
0781 def setDebugMode(self, dn, pandaID, prodManager, modeOn, workingGroup):
0782 comment = " /* DBProxy.setDebugMode */"
0783 tmp_log = self.create_tagged_logger(comment, f"PandaID={pandaID}")
0784 tmp_log.debug(f"dn={dn} prod={prodManager} wg={workingGroup} mode={modeOn}")
0785 sqlX = "SELECT prodUserName,jobStatus,specialHandling,workingGroup FROM %s "
0786 sqlX += "WHERE PandaID=:PandaID "
0787 sqlU = "UPDATE %s SET specialHandling=:specialHandling "
0788 sqlU += "WHERE PandaID=:PandaID "
0789 try:
0790
0791 compactDN = CoreUtils.clean_user_id(dn)
0792 if compactDN in ["", "NULL", None]:
0793 compactDN = dn
0794 debugStr = "debug"
0795 retStr = ""
0796 retCode = False
0797
0798 for table in ["ATLAS_PANDA.jobsDefined4", "ATLAS_PANDA.jobsActive4"]:
0799 varMap = {}
0800 varMap[":PandaID"] = pandaID
0801 sql = sqlX % table
0802
0803 self.conn.begin()
0804
0805 self.cur.arraysize = 10
0806 self.cur.execute(sql + comment, varMap)
0807 res = self.cur.fetchone()
0808
0809 if res is None:
0810 retStr = f"PandaID={pandaID} not found in active DB"
0811
0812 if not self._commit():
0813 raise RuntimeError("Commit error")
0814 continue
0815 prodUserName, jobStatus, specialHandling, wGroup = res
0816
0817 changeableState = [
0818 "defined",
0819 "activated",
0820 "running",
0821 "sent",
0822 "starting",
0823 "assigned",
0824 ]
0825 if jobStatus not in changeableState:
0826 retStr = f"Cannot set debugMode since the job status is {jobStatus} which is not in one of {str(changeableState)}"
0827
0828 if not self._commit():
0829 raise RuntimeError("Commit error")
0830 break
0831
0832 try:
0833 wGroup = wGroup.split("_")[-1]
0834 wGroup = wGroup.lower()
0835 except Exception:
0836 pass
0837
0838 notOwner = False
0839 if not prodManager:
0840 if workingGroup is not None:
0841 if workingGroup.lower() != wGroup:
0842 retStr = f"Permission denied. Not the production manager for workingGroup={wGroup}"
0843 notOwner = True
0844 else:
0845 if prodUserName != compactDN:
0846 retStr = "Permission denied. Not the owner or production manager"
0847 notOwner = True
0848 if notOwner:
0849
0850 if not self._commit():
0851 raise RuntimeError("Commit error")
0852 break
0853
0854 updateSH = True
0855 if specialHandling in [None, ""]:
0856 if modeOn:
0857
0858 specialHandling = debugStr
0859 else:
0860
0861 updateSH = False
0862 elif debugStr in specialHandling:
0863 if modeOn:
0864
0865 updateSH = False
0866 else:
0867
0868 specialHandling = re.sub(debugStr, "", specialHandling)
0869 specialHandling = re.sub(",,", ",", specialHandling)
0870 specialHandling = re.sub("^,", "", specialHandling)
0871 specialHandling = re.sub(",$", "", specialHandling)
0872 else:
0873 if modeOn:
0874
0875 specialHandling = debugStr
0876 else:
0877
0878 updateSH = False
0879
0880
0881 if not updateSH:
0882 retStr = "Already set accordingly"
0883
0884 if not self._commit():
0885 raise RuntimeError("Commit error")
0886 break
0887
0888 varMap = {}
0889 varMap[":PandaID"] = pandaID
0890 varMap[":specialHandling"] = specialHandling
0891 self.cur.execute((sqlU + comment) % table, varMap)
0892 retD = self.cur.rowcount
0893
0894 if not self._commit():
0895 raise RuntimeError("Commit error")
0896 if retD == 0:
0897 retStr = "Failed to update DB"
0898 else:
0899 retStr = "Succeeded"
0900 break
0901
0902 tmp_log.debug(f"{retStr}")
0903 return retStr
0904 except Exception:
0905
0906 self._rollback()
0907 self.dump_error_message(tmp_log)
0908 return None
0909
0910
0911 def lockJobsForReassign(
0912 self,
0913 tableName,
0914 timeLimit,
0915 statList,
0916 labels,
0917 processTypes,
0918 sites,
0919 clouds,
0920 useJEDI=False,
0921 onlyReassignable=False,
0922 useStateChangeTime=False,
0923 getEventService=False,
0924 ):
0925 comment = " /* DBProxy.lockJobsForReassign */"
0926 tmp_log = self.create_tagged_logger(comment)
0927 tmp_log.debug(f"{tableName} {timeLimit} {statList} {labels} {processTypes} {sites} {clouds} {useJEDI}")
0928 try:
0929
0930 if not useJEDI:
0931 sql = f"SELECT PandaID FROM {tableName} "
0932 elif getEventService:
0933 sql = f"SELECT PandaID,lockedby,eventService,attemptNr,computingSite FROM {tableName} "
0934 else:
0935 sql = f"SELECT PandaID,lockedby FROM {tableName} "
0936 if not useStateChangeTime:
0937 sql += "WHERE modificationTime<:modificationTime "
0938 else:
0939 sql += "WHERE stateChangeTime<:modificationTime "
0940 varMap = {}
0941 varMap[":modificationTime"] = timeLimit
0942 if statList != []:
0943 stat_var_names_str, stat_var_map = get_sql_IN_bind_variables(statList, prefix=":stat")
0944 sql += f"AND jobStatus IN ({stat_var_names_str}) "
0945 varMap.update(stat_var_map)
0946 if labels != []:
0947 label_var_names_str, label_var_map = get_sql_IN_bind_variables(labels, prefix=":label")
0948 sql += f"AND prodSourceLabel IN ({label_var_names_str}) "
0949 varMap.update(label_var_map)
0950 if processTypes != []:
0951 ptype_var_names_str, ptype_var_map = get_sql_IN_bind_variables(processTypes, prefix=":processType")
0952 sql += f"AND processingType IN ({ptype_var_names_str}) "
0953 varMap.update(ptype_var_map)
0954 if sites != []:
0955 site_var_names_str, site_var_map = get_sql_IN_bind_variables(sites, prefix=":site")
0956 sql += f"AND computingSite IN ({site_var_names_str}) "
0957 varMap.update(site_var_map)
0958 if clouds != []:
0959 cloud_var_names_str, cloud_var_map = get_sql_IN_bind_variables(clouds, prefix=":cloud")
0960 sql += f"AND cloud IN ({cloud_var_names_str}) "
0961 varMap.update(cloud_var_map)
0962 if onlyReassignable:
0963 sql += "AND (relocationFlag IS NULL OR relocationFlag<>:relocationFlag) "
0964 varMap[":relocationFlag"] = 2
0965
0966 if not useStateChangeTime:
0967 sqlLock = f"UPDATE {tableName} SET modificationTime=CURRENT_DATE WHERE PandaID=:PandaID"
0968 else:
0969 sqlLock = f"UPDATE {tableName} SET stateChangeTime=CURRENT_DATE WHERE PandaID=:PandaID"
0970
0971 self.conn.begin()
0972
0973 self.cur.arraysize = 1000000
0974 tmp_log.debug(sql + comment + str(varMap))
0975 self.cur.execute(sql + comment, varMap)
0976 resList = self.cur.fetchall()
0977 tmp_log.debug(f"found {len(resList)}")
0978 retList = []
0979
0980 for tmpItem in resList:
0981 tmpID = tmpItem[0]
0982 varLock = {":PandaID": tmpID}
0983 self.cur.execute(sqlLock + comment, varLock)
0984 retList.append(tmpItem)
0985
0986 if not self._commit():
0987 raise RuntimeError("Commit error")
0988
0989 retList.sort()
0990 tmp_log.debug(f"return {len(retList)}")
0991 return True, retList
0992 except Exception:
0993
0994 self._rollback()
0995 self.dump_error_message(tmp_log)
0996
0997 return False, []
0998
0999
1000 def lockJobsForFinisher(self, timeNow, rownum, highPrio):
1001 comment = " /* DBProxy.lockJobsForFinisher */"
1002 tmp_log = self.create_tagged_logger(comment)
1003 tmp_log.debug(f"{timeNow} {rownum} {highPrio}")
1004 try:
1005 varMap = {}
1006 varMap[":jobStatus"] = "transferring"
1007 varMap[":currentPriority"] = 800
1008 varMap[":pLabel1"] = "managed"
1009 varMap[":pLabel2"] = "test"
1010 varMap[":esJumbo"] = EventServiceUtils.jumboJobFlagNumber
1011
1012 sql = "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 "
1013 sql += "WHERE jobStatus=:jobStatus AND modificationTime<:modificationTime AND prodSourceLabel IN (:pLabel1,:pLabel2) "
1014 sql += "AND (eventService IS NULL OR eventService<>:esJumbo) "
1015 if highPrio:
1016 varMap[":modificationTime"] = timeNow - datetime.timedelta(hours=1)
1017 sql += f"AND currentPriority>=:currentPriority AND rownum<={rownum} "
1018 else:
1019 sql += f"AND currentPriority<:currentPriority AND rownum<={rownum} "
1020 varMap[":modificationTime"] = timeNow - datetime.timedelta(hours=2)
1021 sql += "FOR UPDATE "
1022
1023 sqlLock = "UPDATE ATLAS_PANDA.jobsActive4 SET modificationTime=CURRENT_DATE WHERE PandaID=:PandaID"
1024
1025 self.conn.begin()
1026
1027 self.cur.arraysize = 1000
1028 self.cur.execute(sql + comment, varMap)
1029 resList = self.cur.fetchall()
1030 retList = []
1031
1032 for (tmpID,) in resList:
1033 varLock = {":PandaID": tmpID}
1034 self.cur.execute(sqlLock + comment, varLock)
1035 retList.append(tmpID)
1036
1037 if not self._commit():
1038 raise RuntimeError("Commit error")
1039
1040 retList.sort()
1041 tmp_log.debug(f"{len(retList)}")
1042 return True, retList
1043 except Exception:
1044
1045 self._rollback()
1046 self.dump_error_message(tmp_log)
1047
1048 return False, []
1049
1050
1051 def lockJobsForActivator(self, timeLimit, rownum, prio):
1052 comment = " /* DBProxy.lockJobsForActivator */"
1053 tmp_log = self.create_tagged_logger(comment)
1054 tmp_log.debug("start")
1055 try:
1056 varMap = {}
1057 varMap[":jobStatus"] = "assigned"
1058 if prio > 0:
1059 varMap[":currentPriority"] = prio
1060 varMap[":timeLimit"] = timeLimit
1061
1062 sql = "SELECT PandaID FROM ATLAS_PANDA.jobsDefined4 "
1063 sql += "WHERE jobStatus=:jobStatus AND (prodDBUpdateTime IS NULL OR prodDBUpdateTime<:timeLimit) "
1064 if prio > 0:
1065 sql += "AND currentPriority>=:currentPriority "
1066 sql += f"AND rownum<={rownum} "
1067 sql += "FOR UPDATE "
1068
1069 sqlLock = "UPDATE ATLAS_PANDA.jobsDefined4 SET prodDBUpdateTime=CURRENT_DATE WHERE PandaID=:PandaID"
1070
1071 self.conn.begin()
1072
1073 self.cur.arraysize = 1000
1074 self.cur.execute(sql + comment, varMap)
1075 resList = self.cur.fetchall()
1076 retList = []
1077
1078 for (tmpID,) in resList:
1079 varLock = {":PandaID": tmpID}
1080 self.cur.execute(sqlLock + comment, varLock)
1081 retList.append(tmpID)
1082
1083 if not self._commit():
1084 raise RuntimeError("Commit error")
1085
1086 retList.sort()
1087 tmp_log.debug(f"locked {len(retList)} jobs")
1088 return True, retList
1089 except Exception:
1090
1091 self._rollback()
1092
1093 self.dump_error_message(tmp_log)
1094
1095 return False, []
1096
1097
1098 def addMetadata(self, pandaID, metadata, newStatus):
1099 comment = " /* DBProxy.addMetaData */"
1100 tmp_log = self.create_tagged_logger(comment, f"PandaID={pandaID}")
1101 tmp_log.debug(f"start")
1102
1103 if newStatus == "failed":
1104 tmp_log.debug("skip")
1105 return True
1106 sqlJ = "SELECT jobStatus FROM ATLAS_PANDA.jobsActive4 WHERE PandaID=:PandaID "
1107 sqlJ += "UNION "
1108 sqlJ += "SELECT jobStatus FROM ATLAS_PANDA.jobsArchived4 WHERE PandaID=:PandaID "
1109 sql0 = "SELECT PandaID FROM ATLAS_PANDA.metaTable WHERE PandaID=:PandaID"
1110 sql1 = "INSERT INTO ATLAS_PANDA.metaTable (PandaID,metaData) VALUES (:PandaID,:metaData)"
1111 nTry = 1
1112 regStart = naive_utcnow()
1113 for iTry in range(nTry):
1114 try:
1115
1116 self.conn.begin()
1117 self.cur.arraysize = 10
1118
1119 varMap = {}
1120 varMap[":PandaID"] = pandaID
1121 self.cur.execute(sqlJ + comment, varMap)
1122 resJ = self.cur.fetchone()
1123 if resJ is not None:
1124 (jobStatus,) = resJ
1125 else:
1126 jobStatus = "unknown"
1127 if jobStatus in ["unknown"]:
1128 tmp_log.debug(f"skip jobStatus={jobStatus}")
1129 if not self._commit():
1130 raise RuntimeError("Commit error")
1131 return False
1132
1133 if jobStatus in ["cancelled", "closed", "finished", "failed"]:
1134 tmp_log.debug(f"skip jobStatus={jobStatus}")
1135 if not self._commit():
1136 raise RuntimeError("Commit error")
1137
1138 return True
1139
1140 varMap = {}
1141 varMap[":PandaID"] = pandaID
1142 self.cur.arraysize = 10
1143 self.cur.execute(sql0 + comment, varMap)
1144 res = self.cur.fetchone()
1145
1146 if res is not None:
1147 tmp_log.debug(f"skip duplicated during jobStatus={jobStatus}")
1148 if not self._commit():
1149 raise RuntimeError("Commit error")
1150 return True
1151
1152 if metadata is not None:
1153 origSize = len(metadata)
1154 else:
1155 origSize = 0
1156 maxSize = 1024 * 1024
1157 if newStatus in ["failed"] and origSize > maxSize:
1158 metadata = metadata[:maxSize]
1159
1160 varMap = {}
1161 varMap[":PandaID"] = pandaID
1162 varMap[":metaData"] = metadata
1163 self.cur.execute(sql1 + comment, varMap)
1164
1165 if not self._commit():
1166 raise RuntimeError("Commit error")
1167 regTime = naive_utcnow() - regStart
1168 msgStr = f"done in jobStatus={jobStatus}->{newStatus} took {regTime.seconds} sec"
1169 if metadata is not None:
1170 msgStr += f" for {len(metadata)} (orig {origSize}) bytes"
1171 tmp_log.debug(msgStr)
1172 return True
1173 except Exception:
1174
1175 self._rollback()
1176 if iTry + 1 < nTry:
1177 tmp_log.debug(f"retry : {iTry}")
1178 time.sleep(random.randint(10, 20))
1179 continue
1180 self.dump_error_message(tmp_log)
1181 return False
1182
1183
1184 def addStdOut(self, pandaID, stdOut):
1185 comment = " /* DBProxy.addStdOut */"
1186 tmp_log = self.create_tagged_logger(comment, f"PandaID={pandaID}")
1187 tmp_log.debug(f"start")
1188 sqlJ = "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 WHERE PandaID=:PandaID FOR UPDATE "
1189 sqlC = "SELECT PandaID FROM ATLAS_PANDA.jobsDebug WHERE PandaID=:PandaID "
1190 sqlI = "INSERT INTO ATLAS_PANDA.jobsDebug (PandaID,stdOut) VALUES (:PandaID,:stdOut) "
1191 sqlU = "UPDATE ATLAS_PANDA.jobsDebug SET stdOut=:stdOut WHERE PandaID=:PandaID "
1192 try:
1193
1194 self.conn.begin()
1195
1196 varMap = {}
1197 varMap[":PandaID"] = pandaID
1198 self.cur.arraysize = 10
1199
1200 self.cur.execute(sqlJ + comment, varMap)
1201 res = self.cur.fetchone()
1202 if res is None:
1203 tmp_log.debug(f"addStdOut : {pandaID} non active")
1204 else:
1205
1206 self.cur.execute(sqlC + comment, varMap)
1207 res = self.cur.fetchone()
1208
1209 if res is not None:
1210
1211 sql = sqlU
1212 else:
1213
1214 sql = sqlI
1215
1216 varMap = {}
1217 varMap[":PandaID"] = pandaID
1218 varMap[":stdOut"] = stdOut
1219 self.cur.execute(sql + comment, varMap)
1220
1221 if not self._commit():
1222 raise RuntimeError("Commit error")
1223 return True
1224 except Exception:
1225
1226 self._rollback()
1227 self.dump_error_message(tmp_log)
1228 return False
1229
1230
1231 def getJobStatistics(self):
1232 comment = " /* DBProxy.getJobStatistics */"
1233 tmp_log = self.create_tagged_logger(comment)
1234 tmp_log.debug("start")
1235
1236
1237 jobs_active_4_table = f"{panda_config.schemaPANDA}.jobsActive4"
1238 jobs_defined_4_table = f"{panda_config.schemaPANDA}.jobsDefined4"
1239 tables = [jobs_active_4_table, jobs_defined_4_table]
1240
1241
1242 included_states = ["assigned", "activated", "running"]
1243 excluded_states = ["merging"]
1244
1245
1246 sql_template = f"SELECT computingSite, jobStatus, COUNT(*) FROM {{table_name}} GROUP BY computingSite, jobStatus"
1247
1248
1249 sql_mv_template = sql_template.replace("COUNT(*)", "SUM(num_of_jobs)")
1250 sql_mv_template = sql_mv_template.replace("SELECT ", "SELECT /*+ RESULT_CACHE */ ")
1251 ret = {}
1252 max_retries = 3
1253
1254 for retry in range(max_retries):
1255 try:
1256 for table in tables:
1257
1258 self.conn.begin()
1259 var_map = {}
1260 self.cur.arraysize = 10000
1261
1262
1263 if table == jobs_active_4_table:
1264 table_name = f"{panda_config.schemaPANDA}.MV_JOBSACTIVE4_STATS"
1265 sql = (sql_mv_template + comment).format(table_name=table_name)
1266
1267 else:
1268 table_name = table
1269 sql = (sql_template + comment).format(table_name=table_name)
1270 tmp_log.debug(f"Will execute: {sql} {str(var_map)}")
1271
1272 self.cur.execute(sql, var_map)
1273 res = self.cur.fetchall()
1274 if not self._commit():
1275 raise RuntimeError("Commit error")
1276
1277
1278 for computing_site, job_status, n_jobs in res:
1279 if job_status in excluded_states:
1280 continue
1281
1282 ret.setdefault(computing_site, {}).setdefault(job_status, 0)
1283 ret[computing_site][job_status] += n_jobs
1284
1285
1286 for site in ret:
1287 for state in included_states:
1288 ret[site].setdefault(state, 0)
1289
1290 tmp_log.debug(f"done")
1291 return ret
1292
1293 except Exception:
1294 self._rollback()
1295
1296 if retry + 1 < max_retries:
1297 tmp_log.debug(f"retry: {retry}")
1298 time.sleep(2)
1299 else:
1300 self.dump_error_message(tmp_log)
1301 return {}
1302
1303
1304 def getDetailedJobStatistics(self):
1305 comment = " /* DBProxy.getDetailedJobStatistics */"
1306 tmp_log = self.create_tagged_logger(comment)
1307 tmp_log.debug("start")
1308
1309
1310 jobs_active_4_table = f"{panda_config.schemaPANDA}.jobsActive4"
1311 jobs_defined_4_table = f"{panda_config.schemaPANDA}.jobsDefined4"
1312 tables = [jobs_active_4_table, jobs_defined_4_table]
1313
1314
1315 included_states = ["assigned", "activated", "running"]
1316 excluded_states = ["merging"]
1317
1318
1319 sql_template = f"SELECT computingSite, resource_type, prodSourceLabel, jobStatus, COUNT(*) FROM {{table_name}} GROUP BY computingSite, resource_type, prodSourceLabel, jobStatus"
1320
1321 sql_mv_template = sql_template.replace("COUNT(*)", "SUM(num_of_jobs)")
1322 sql_mv_template = sql_mv_template.replace("SELECT ", "SELECT /*+ RESULT_CACHE */ ")
1323 ret = {}
1324 max_retries = 3
1325
1326 for retry in range(max_retries):
1327 try:
1328 for table in tables:
1329
1330 self.conn.begin()
1331 var_map = {}
1332 self.cur.arraysize = 10000
1333
1334
1335 if table == jobs_active_4_table:
1336 table_name = f"{panda_config.schemaPANDA}.MV_JOBSACTIVE4_STATS"
1337 sql = (sql_mv_template + comment).format(table_name=table_name)
1338
1339 else:
1340 table_name = table
1341 sql = (sql_template + comment).format(table_name=table_name)
1342 tmp_log.debug(f"Will execute: {sql} {str(var_map)}")
1343
1344 self.cur.execute(sql, var_map)
1345 res = self.cur.fetchall()
1346 if not self._commit():
1347 raise RuntimeError("Commit error")
1348
1349
1350 for computing_site, resource_type, prod_source_label, job_status, n_jobs in res:
1351 if job_status in excluded_states:
1352 continue
1353
1354 ret.setdefault(computing_site, {}).setdefault(resource_type, {}).setdefault(prod_source_label, {}).setdefault(job_status, 0)
1355 ret[computing_site][resource_type][prod_source_label][job_status] += n_jobs
1356
1357
1358 for site in ret:
1359 for resource_type in ret[site]:
1360 for prod_source_label in ret[site][resource_type]:
1361 for state in included_states:
1362 ret[site][resource_type][prod_source_label].setdefault(state, 0)
1363
1364 tmp_log.debug(f"done")
1365 return ret
1366
1367 except Exception:
1368 self._rollback()
1369
1370 if retry + 1 < max_retries:
1371 tmp_log.debug(f"retry: {retry}")
1372 time.sleep(2)
1373 else:
1374 self.dump_error_message(tmp_log)
1375 return {}
1376
1377
1378 def getJobStatisticsPerSiteResource(self, time_window):
1379 comment = " /* DBProxy.getJobStatisticsPerSiteResource */"
1380 tmp_log = self.create_tagged_logger(comment)
1381 tmp_log.debug("start")
1382
1383 tables = ["ATLAS_PANDA.jobsActive4", "ATLAS_PANDA.jobsDefined4", "ATLAS_PANDA.jobsArchived4"]
1384
1385
1386 sql = "SELECT computingSite, jobStatus, resource_type, COUNT(*) FROM %s GROUP BY computingSite, jobStatus, resource_type "
1387
1388
1389 sql_archive = (
1390 "SELECT /*+ INDEX_RS_ASC(tab (MODIFICATIONTIME PRODSOURCELABEL)) */ computingSite, jobStatus, resource_type, COUNT(*) "
1391 "FROM ATLAS_PANDA.jobsArchived4 tab WHERE modificationTime > :modificationTime "
1392 "GROUP BY computingSite, jobStatus, resource_type "
1393 )
1394
1395
1396 sql_mv = re.sub("COUNT\(\*\)", "SUM(njobs)", sql)
1397 sql_mv = re.sub("SELECT ", "SELECT /*+ RESULT_CACHE */ ", sql_mv)
1398
1399 ret = dict()
1400 try:
1401
1402 if time_window is None:
1403 time_floor = naive_utcnow() - datetime.timedelta(hours=12)
1404 else:
1405 time_floor = naive_utcnow() - datetime.timedelta(minutes=int(time_window))
1406
1407 for table in tables:
1408
1409 self.conn.begin()
1410 self.cur.arraysize = 10000
1411
1412
1413 var_map = {}
1414 if table == "ATLAS_PANDA.jobsArchived4":
1415 var_map[":modificationTime"] = time_floor
1416 sql_tmp = sql_archive + comment
1417 elif table == "ATLAS_PANDA.jobsActive4":
1418 sql_tmp = (sql_mv + comment) % "ATLAS_PANDA.JOBS_SHARE_STATS"
1419 else:
1420 sql_tmp = (sql + comment) % table
1421
1422 self.cur.execute(sql_tmp, var_map)
1423 res = self.cur.fetchall()
1424
1425
1426 if not self._commit():
1427 raise RuntimeError("Commit error")
1428
1429
1430 for computing_site, job_status, resource_type, n_jobs in res:
1431 ret.setdefault(computing_site, dict()).setdefault(resource_type, dict()).setdefault(job_status, 0)
1432 ret[computing_site][resource_type][job_status] += n_jobs
1433
1434
1435 included_states = ["assigned", "activated", "running", "finished", "failed"]
1436 for computing_site in ret:
1437 for resource_type in ret[computing_site]:
1438 for job_status in included_states:
1439 ret[computing_site][resource_type].setdefault(job_status, 0)
1440
1441 tmp_log.debug("done")
1442 return ret
1443 except Exception:
1444
1445 self._rollback()
1446
1447 self.dump_error_message(tmp_log)
1448 return dict()
1449
1450
1451 def getNumberJobsUser(self, dn, workingGroup=None):
1452 comment = " /* DBProxy.getNumberJobsUser */"
1453 tmp_log = self.create_tagged_logger(comment, f"DN={dn}")
1454 tmp_log.debug(f"workingGroup={workingGroup})")
1455
1456
1457 compact_dn = CoreUtils.clean_user_id(dn)
1458 if compact_dn in ["", "NULL", None]:
1459 compact_dn = dn
1460
1461 if workingGroup is not None:
1462 sql_count_jobs = "SELECT COUNT(*) FROM %s WHERE prodUserName=:prodUserName AND prodSourceLabel=:prodSourceLabel AND workingGroup=:workingGroup "
1463 else:
1464 sql_count_jobs = "SELECT COUNT(*) FROM %s WHERE prodUserName=:prodUserName AND prodSourceLabel=:prodSourceLabel AND workingGroup IS NULL "
1465 sql_count_jobs += "AND NOT jobStatus IN (:failed,:merging) "
1466
1467 n_try = 1
1468 n_jobs = 0
1469 for i_try in range(n_try):
1470 try:
1471 for table in ("ATLAS_PANDA.jobsActive4", "ATLAS_PANDA.jobsDefined4"):
1472
1473 self.conn.begin()
1474
1475 var_map = {":prodUserName": compact_dn, ":prodSourceLabel": "user", ":failed": "failed", ":merging": "merging"}
1476 if workingGroup is not None:
1477 var_map[":workingGroup"] = workingGroup
1478 self.cur.arraysize = 10
1479 self.cur.execute((sql_count_jobs + comment) % table, var_map)
1480 rows = self.cur.fetchall()
1481
1482 if not self._commit():
1483 raise RuntimeError("Commit error")
1484 if len(rows) != 0:
1485 n_jobs += rows[0][0]
1486
1487 tmp_log.debug(f"{n_jobs}")
1488 return n_jobs
1489 except Exception:
1490
1491 self._rollback()
1492 if i_try + 1 < n_try:
1493 time.sleep(2)
1494 continue
1495 self.dump_error_message(tmp_log)
1496 return 0
1497
1498
1499 def getJobStatisticsForExtIF(self, source_type=None):
1500 comment = " /* DBProxy.getJobStatisticsForExtIF */"
1501 tmp_log = self.create_tagged_logger(comment)
1502 tmp_log.debug(f"start source_type={source_type}")
1503
1504 time_floor = naive_utcnow() - datetime.timedelta(hours=12)
1505
1506
1507 if source_type == "analysis":
1508 sql = "SELECT jobStatus, COUNT(*), cloud FROM %s WHERE prodSourceLabel IN (:prodSourceLabel1, :prodSourceLabel2) GROUP BY jobStatus, cloud"
1509
1510 sql_archived = (
1511 "SELECT /* use_json_type */ /*+ INDEX_RS_ASC(tab (MODIFICATIONTIME PRODSOURCELABEL)) */ "
1512 "jobStatus, COUNT(*), tabS.data.cloud "
1513 "FROM %s tab, ATLAS_PANDA.schedconfig_json tabS "
1514 "WHERE prodSourceLabel IN (:prodSourceLabel1, :prodSourceLabel2) "
1515 "AND tab.computingSite = tabS.panda_queue "
1516 "AND modificationTime>:modificationTime GROUP BY tab.jobStatus,tabS.data.cloud"
1517 )
1518
1519
1520 else:
1521 prod_source_label_string = ":prodSourceLabel1, " + ", ".join(f":prodSourceLabel_{label}" for label in JobUtils.list_ptest_prod_sources)
1522 sql = (
1523 "SELECT /* use_json_type */ tab.jobStatus, COUNT(*), tabS.data.cloud "
1524 "FROM %s tab, ATLAS_PANDA.schedconfig_json tabS "
1525 f"WHERE prodSourceLabel IN ({prod_source_label_string}) "
1526 "AND tab.computingSite = tabS.panda_queue "
1527 "GROUP BY tab.jobStatus, tabS.data.cloud"
1528 )
1529
1530 sql_archived = (
1531 "SELECT /* use_json_type */ /*+ INDEX_RS_ASC(tab (MODIFICATIONTIME PRODSOURCELABEL)) */ "
1532 "jobStatus, COUNT(*), tabS.data.cloud "
1533 "FROM %s tab, ATLAS_PANDA.schedconfig_json tabS "
1534 f"WHERE prodSourceLabel IN ({prod_source_label_string}) "
1535 "AND tab.computingSite = tabS.panda_queue "
1536 "AND modificationTime>:modificationTime GROUP BY tab.jobStatus,tabS.data.cloud"
1537 )
1538
1539
1540 sql_active_mv = re.sub("COUNT\(\*\)", "SUM(num_of_jobs)", sql)
1541 sql_active_mv = re.sub("SELECT ", "SELECT /*+ RESULT_CACHE */ ", sql_active_mv)
1542
1543 ret = {}
1544
1545 tables = ["ATLAS_PANDA.jobsActive4", "ATLAS_PANDA.jobsArchived4", "ATLAS_PANDA.jobsDefined4"]
1546 try:
1547 for table in tables:
1548
1549 self.conn.begin()
1550
1551
1552 var_map = {}
1553 if source_type == "analysis":
1554 var_map[":prodSourceLabel1"] = "user"
1555 var_map[":prodSourceLabel2"] = "panda"
1556 else:
1557 var_map[":prodSourceLabel1"] = "managed"
1558 for tmp_label in JobUtils.list_ptest_prod_sources:
1559 tmp_key = f":prodSourceLabel_{tmp_label}"
1560 var_map[tmp_key] = tmp_label
1561
1562 if table != "ATLAS_PANDA.jobsArchived4":
1563 self.cur.arraysize = 10000
1564
1565 if table == "ATLAS_PANDA.jobsActive4":
1566 self.cur.execute(
1567 (sql_active_mv + comment) % "ATLAS_PANDA.MV_JOBSACTIVE4_STATS",
1568 var_map,
1569 )
1570
1571 else:
1572 self.cur.execute((sql + comment) % table, var_map)
1573 else:
1574 var_map[":modificationTime"] = time_floor
1575 self.cur.arraysize = 10000
1576 self.cur.execute((sql_archived + comment) % table, var_map)
1577 res = self.cur.fetchall()
1578
1579
1580 if not self._commit():
1581 raise RuntimeError("Commit error")
1582
1583
1584 for job_status, count, cloud in res:
1585 ret.setdefault(cloud, dict())
1586 ret[cloud].setdefault(job_status, 0)
1587 ret[cloud][job_status] += count
1588
1589
1590 tmp_log.debug(f"done")
1591 return ret
1592 except Exception:
1593
1594 self._rollback()
1595
1596 self.dump_error_message(tmp_log)
1597 return {}
1598
1599
1600 def getJobStatisticsPerProcessingType(self):
1601 comment = " /* DBProxy.getJobStatisticsPerProcessingType */"
1602 tmp_log = self.create_tagged_logger(comment)
1603 tmp_log.debug("start")
1604
1605 time_floor = naive_utcnow() - datetime.timedelta(hours=12)
1606
1607
1608 tables = ["ATLAS_PANDA.jobsActive4", "ATLAS_PANDA.jobsArchived4", "ATLAS_PANDA.jobsDefined4"]
1609
1610
1611 prod_source_labels = ", ".join([f":prodSourceLabel_{label}" for label in JobUtils.list_ptest_prod_sources])
1612
1613
1614 sql_active = (
1615 f"SELECT /* use_json_type */ jobStatus, COUNT(*), tabS.data.cloud, processingType "
1616 f"FROM %s tab, ATLAS_PANDA.schedconfig_json tabS "
1617 f"WHERE prodSourceLabel IN (:prodSourceLabelManaged, {prod_source_labels}) "
1618 f"AND computingSite=tabS.panda_queue "
1619 f"GROUP BY jobStatus, tabS.data.cloud, processingType"
1620 )
1621
1622
1623 sql_archived = (
1624 f"SELECT /* use_json_type */ /*+ INDEX_RS_ASC(tab (MODIFICATIONTIME PRODSOURCELABEL)) */ "
1625 f"jobStatus, COUNT(*), tabS.data.cloud, processingType "
1626 f"FROM %s tab, ATLAS_PANDA.schedconfig_json tabS "
1627 f"WHERE prodSourceLabel IN (:prodSourceLabelManaged, {prod_source_labels}) "
1628 f"AND modificationTime > :modificationTime "
1629 f"AND computingSite = tabS.panda_queue "
1630 f"GROUP BY jobStatus, tabS.data.cloud, processingType"
1631 )
1632
1633
1634 sql_active_mv = re.sub("COUNT\(\*\)", "SUM(num_of_jobs)", sql_active)
1635 sql_active_mv = re.sub("SELECT ", "SELECT /*+ RESULT_CACHE */ ", sql_active_mv)
1636
1637 ret = {}
1638 try:
1639 for table in tables:
1640
1641 self.conn.begin()
1642
1643
1644 self.cur.arraysize = 10000
1645 var_map = {":prodSourceLabelManaged": "managed"}
1646 var_map.update({f":prodSourceLabel_{label}": label for label in JobUtils.list_ptest_prod_sources})
1647
1648 if table == "ATLAS_PANDA.jobsArchived4":
1649 var_map[":modificationTime"] = time_floor
1650 self.cur.execute((sql_archived + comment) % table, var_map)
1651
1652 elif table == "ATLAS_PANDA.jobsActive4":
1653 self.cur.execute(
1654 (sql_active_mv + comment) % "ATLAS_PANDA.MV_JOBSACTIVE4_STATS",
1655 var_map,
1656 )
1657 else:
1658
1659 self.cur.execute((sql_active + comment) % table, var_map)
1660
1661 results = self.cur.fetchall()
1662
1663 if not self._commit():
1664 raise RuntimeError("Commit error")
1665
1666
1667 for row in results:
1668 job_status, count, cloud, processing_type = row
1669 ret.setdefault(cloud, {}).setdefault(processing_type, {}).setdefault(job_status, 0)
1670 ret[cloud][processing_type][job_status] += count
1671
1672 tmp_log.debug(f"done")
1673 return ret
1674 except Exception:
1675
1676 self._rollback()
1677
1678 self.dump_error_message(tmp_log)
1679 return {}
1680
1681
1682 def peekJobLog(self, pandaID, days=None):
1683 comment = " /* DBProxy.peekJobLog */"
1684 tmp_log = self.create_tagged_logger(comment, f"PandaID={pandaID}")
1685 tmp_log.debug(f"days={days}")
1686
1687
1688 if pandaID in ["NULL", "", "None", None]:
1689 return None
1690
1691 sql_select = "SELECT %s FROM %s "
1692 sql_where = "WHERE PandaID=:PandaID AND modificationTime>(CURRENT_DATE-:days) "
1693
1694
1695 var_map = {":PandaID": pandaID}
1696 if days is None:
1697 days = 30
1698 var_map[":days"] = days
1699 n_try = 1
1700
1701 for i_try in range(n_try):
1702 try:
1703
1704 tables = [f"{panda_config.schemaPANDAARCH}.jobsArchived"]
1705
1706 for table in tables:
1707
1708 self.conn.begin()
1709
1710 sql = sql_select % (JobSpec.columnNames(), table) + sql_where
1711 self.cur.arraysize = 10
1712 self.cur.execute(sql + comment, var_map)
1713 rows = self.cur.fetchall()
1714
1715 if not self._commit():
1716 raise RuntimeError("Commit error")
1717 if len(rows) != 0:
1718
1719 job = JobSpec()
1720 job.pack(rows[0])
1721
1722
1723 self.conn.begin()
1724
1725 file_table_name = re.sub("jobsArchived", "filesTable_ARCH", table)
1726 sql_get_files = (
1727 f"SELECT /*+ INDEX(tab FILES_ARCH_PANDAID_IDX)*/ {FileSpec.columnNames()} "
1728 f"FROM {file_table_name} tab "
1729 "WHERE PandaID=:PandaID AND modificationTime>(CURRENT_DATE-:days)"
1730 )
1731 self.cur.arraysize = 10000
1732 self.cur.execute(sql_get_files + comment, var_map)
1733 file_rows = self.cur.fetchall()
1734
1735
1736 var_map = {}
1737 var_map[":PandaID"] = job.PandaID
1738 job.metadata = None
1739 meta_table_name = re.sub("jobsArchived", "metaTable_ARCH", table)
1740 sql_get_meta = f"SELECT metaData FROM {meta_table_name} WHERE PandaID=:PandaID"
1741 self.cur.execute(sql_get_meta + comment, var_map)
1742 for (clob_meta,) in self.cur:
1743 if clob_meta is not None:
1744 try:
1745 job.metadata = clob_meta.read()
1746 except AttributeError:
1747 job.metadata = str(clob_meta)
1748 break
1749
1750
1751 job.jobParameters = None
1752 job_param_table_name = re.sub("jobsArchived", "jobParamsTable_ARCH", table)
1753 sql_get_job_params = f"SELECT jobParameters FROM {job_param_table_name} WHERE PandaID=:PandaID"
1754 var_map = {}
1755 var_map[":PandaID"] = job.PandaID
1756 self.cur.execute(sql_get_job_params + comment, var_map)
1757 for (clob_job_params,) in self.cur:
1758 if clob_job_params is not None:
1759 try:
1760 job.jobParameters = clob_job_params.read()
1761 except AttributeError:
1762 job.jobParameters = str(clob_job_params)
1763 break
1764
1765 if not self._commit():
1766 raise RuntimeError("Commit error")
1767
1768 for file_row in file_rows:
1769 file = FileSpec()
1770 file.pack(file_row)
1771
1772 try:
1773 file.md5sum = file.md5sum.strip()
1774 except Exception:
1775 pass
1776 try:
1777 file.checksum = file.checksum.strip()
1778 except Exception:
1779 pass
1780 job.addFile(file)
1781 return job
1782 tmp_log.debug(f"not found")
1783 return None
1784 except Exception:
1785
1786 self._rollback()
1787 if i_try + 1 < n_try:
1788 tmp_log.error(f"retry {i_try}")
1789 time.sleep(random.randint(10, 20))
1790 continue
1791 self.dump_error_message(tmp_log)
1792
1793 return None
1794
1795
1796 def throttleUserJobs(self, prodUserName, workingGroup, get_dict):
1797 comment = " /* DBProxy.throttleUserJobs */"
1798 tmp_log = self.create_tagged_logger(comment, f"user={prodUserName} group={workingGroup}")
1799 tmp_log.debug("start")
1800 try:
1801
1802 sql_get_tasks = (
1803 "SELECT /*+ INDEX_RS_ASC(tab JOBSACTIVE4_PRODUSERNAMEST_IDX) */ DISTINCT jediTaskID "
1804 "FROM ATLAS_PANDA.jobsActive4 tab "
1805 "WHERE prodSourceLabel=:prodSourceLabel AND prodUserName=:prodUserName "
1806 "AND jobStatus=:oldJobStatus AND relocationFlag=:oldRelFlag "
1807 "AND maxCpuCount>:maxTime "
1808 )
1809
1810 if workingGroup is not None:
1811 sql_get_tasks += "AND workingGroup=:workingGroup "
1812 else:
1813 sql_get_tasks += "AND workingGroup IS NULL "
1814
1815
1816 sql_get_jobs = (
1817 "SELECT "
1818 "PandaID, jediTaskID, cloud, computingSite, prodSourceLabel "
1819 "FROM ATLAS_PANDA.jobsActive4 "
1820 "WHERE jediTaskID=:jediTaskID "
1821 "AND jobStatus=:oldJobStatus AND relocationFlag=:oldRelFlag "
1822 "AND maxCpuCount>:maxTime "
1823 )
1824
1825
1826 sql_update_jobs = (
1827 f"UPDATE {panda_config.schemaPANDA}.jobsActive4 SET jobStatus=:newJobStatus,relocationFlag=:newRelFlag "
1828 f"WHERE jediTaskID=:jediTaskID AND jobStatus=:oldJobStatus AND maxCpuCount>:maxTime"
1829 )
1830
1831
1832 self.conn.begin()
1833
1834 self.cur.arraysize = 10
1835 var_map = {":prodSourceLabel": "user", ":oldRelFlag": 1, ":prodUserName": prodUserName, ":oldJobStatus": "activated", ":maxTime": 6 * 60 * 60}
1836 if workingGroup is not None:
1837 var_map[":workingGroup"] = workingGroup
1838
1839 self.cur.execute(sql_get_tasks + comment, var_map)
1840 task_rows = self.cur.fetchall()
1841
1842 if not self._commit():
1843 raise RuntimeError("Commit error")
1844
1845 task_ids = [task_id for task_id, in task_rows]
1846 random.shuffle(task_ids)
1847 total_updated = 0
1848 updated_per_task = {}
1849 for task_id in task_ids:
1850 tmp_log.debug(f"reset jediTaskID={task_id}")
1851
1852 self.conn.begin()
1853
1854 var_map = {":jediTaskID": task_id, ":oldRelFlag": 1, ":oldJobStatus": "activated", ":maxTime": 6 * 60 * 60}
1855 self.cur.execute(sql_get_jobs + comment, var_map)
1856 job_rows = self.cur.fetchall()
1857 job_info_map = {
1858 panda_id: {
1859 "computingSite": computing_site,
1860 "cloud": cloud,
1861 "prodSourceLabel": prod_source_label,
1862 }
1863 for panda_id, _task_id, cloud, computing_site, prod_source_label in job_rows
1864 }
1865
1866 var_map = {":jediTaskID": task_id, ":newRelFlag": 3, ":newJobStatus": "throttled", ":oldJobStatus": "activated", ":maxTime": 6 * 60 * 60}
1867 self.cur.execute(sql_update_jobs + comment, var_map)
1868 n_updated = self.cur.rowcount
1869 tmp_log.debug(f"reset {n_updated} jobs")
1870 if n_updated > 0:
1871 total_updated += n_updated
1872 updated_per_task[task_id] = n_updated
1873 for panda_id, job_info in job_info_map.items():
1874 self.recordStatusChange(
1875 panda_id,
1876 var_map[":newJobStatus"],
1877 infoMap=job_info,
1878 useCommit=False,
1879 )
1880
1881 if not self._commit():
1882 raise RuntimeError("Commit error")
1883 if get_dict:
1884 tmp_log.debug(f"done with {updated_per_task}")
1885 return updated_per_task
1886 tmp_log.debug(f"done with {total_updated}")
1887 return total_updated
1888 except Exception:
1889
1890 self._rollback()
1891
1892 self.dump_error_message(tmp_log)
1893 return None
1894
1895
1896 def unThrottleUserJobs(self, prodUserName, workingGroup, get_dict):
1897 comment = " /* DBProxy.unThrottleUserJobs */"
1898 tmp_log = self.create_tagged_logger(comment, f"user={prodUserName} group={workingGroup}")
1899 tmp_log.debug("start")
1900 try:
1901
1902 sql_get_tasks = (
1903 "SELECT /*+ INDEX_RS_ASC(tab JOBSACTIVE4_PRODUSERNAMEST_IDX) */ DISTINCT jediTaskID "
1904 "FROM ATLAS_PANDA.jobsActive4 tab "
1905 "WHERE prodSourceLabel=:prodSourceLabel AND prodUserName=:prodUserName "
1906 "AND jobStatus=:oldJobStatus AND relocationFlag=:oldRelFlag "
1907 )
1908
1909 if workingGroup is not None:
1910 sql_get_tasks += "AND workingGroup=:workingGroup "
1911 else:
1912 sql_get_tasks += "AND workingGroup IS NULL "
1913
1914
1915 sql_get_jobs = (
1916 "SELECT "
1917 "PandaID, jediTaskID, cloud, computingSite, prodSourceLabel "
1918 "FROM ATLAS_PANDA.jobsActive4 "
1919 "WHERE jediTaskID=:jediTaskID "
1920 "AND jobStatus=:oldJobStatus AND relocationFlag=:oldRelFlag "
1921 )
1922
1923
1924 sql_update_jobs = (
1925 f"UPDATE {panda_config.schemaPANDA}.jobsActive4 SET jobStatus=:newJobStatus,relocationFlag=:newRelFlag "
1926 "WHERE jediTaskID=:jediTaskID AND jobStatus=:oldJobStatus "
1927 )
1928
1929
1930 self.conn.begin()
1931
1932 self.cur.arraysize = 10
1933 var_map = {":prodSourceLabel": "user", ":oldRelFlag": 3, ":prodUserName": prodUserName, ":oldJobStatus": "throttled"}
1934 if workingGroup is not None:
1935 var_map[":workingGroup"] = workingGroup
1936
1937 self.cur.execute(sql_get_tasks + comment, var_map)
1938 task_rows = self.cur.fetchall()
1939
1940 if not self._commit():
1941 raise RuntimeError("Commit error")
1942
1943 task_ids = [task_id for task_id, in task_rows]
1944 random.shuffle(task_ids)
1945 total_updated = 0
1946 updated_per_task = {}
1947 for task_id in task_ids:
1948 tmp_log.debug(f"reset jediTaskID={task_id}")
1949
1950 self.conn.begin()
1951
1952 var_map = {":jediTaskID": task_id, ":oldRelFlag": 3, ":oldJobStatus": "throttled"}
1953 self.cur.execute(sql_get_jobs + comment, var_map)
1954 job_rows = self.cur.fetchall()
1955 job_info_map = {
1956 panda_id: {
1957 "computingSite": computing_site,
1958 "cloud": cloud,
1959 "prodSourceLabel": prod_source_label,
1960 }
1961 for panda_id, _task_id, cloud, computing_site, prod_source_label in job_rows
1962 }
1963
1964 var_map = {":jediTaskID": task_id, ":newRelFlag": 1, ":newJobStatus": "activated", ":oldJobStatus": "throttled"}
1965 self.cur.execute(sql_update_jobs + comment, var_map)
1966 n_updated = self.cur.rowcount
1967 tmp_log.debug(f"reset {n_updated} jobs")
1968 if n_updated > 0:
1969 total_updated += n_updated
1970 updated_per_task[task_id] = n_updated
1971 for panda_id, job_info in job_info_map.items():
1972 self.recordStatusChange(
1973 panda_id,
1974 var_map[":newJobStatus"],
1975 infoMap=job_info,
1976 useCommit=False,
1977 )
1978
1979 if not self._commit():
1980 raise RuntimeError("Commit error")
1981 if get_dict:
1982 tmp_log.debug(f"done with {updated_per_task}")
1983 return updated_per_task
1984 tmp_log.debug(f"done with {total_updated}")
1985 return total_updated
1986 except Exception:
1987
1988 self._rollback()
1989
1990 self.dump_error_message(tmp_log)
1991 return None
1992
1993
1994 def getJobdefIDsForFailedJob(self, jediTaskID):
1995 comment = " /* DBProxy.getJobdefIDsForFailedJob */"
1996 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
1997 tmp_log.debug(f"start")
1998 try:
1999
2000 self.conn.begin()
2001
2002 sqlGF = "SELECT distinct jobDefinitionID FROM ATLAS_PANDA.jobsActive4 "
2003 sqlGF += "WHERE jediTaskID=:jediTaskID AND jobStatus=:jobStatus "
2004 sqlGF += "AND attemptNr<maxAttempt "
2005 varMap = {}
2006 varMap[":jediTaskID"] = jediTaskID
2007 varMap[":jobStatus"] = "failed"
2008 self.cur.execute(sqlGF + comment, varMap)
2009 resGF = self.cur.fetchall()
2010 retList = []
2011 for (jobDefinitionID,) in resGF:
2012 retList.append(jobDefinitionID)
2013
2014 if not self._commit():
2015 raise RuntimeError("Commit error")
2016 tmp_log.debug(f"{str(retList)}")
2017 return retList
2018 except Exception:
2019
2020 self._rollback()
2021
2022 self.dump_error_message(tmp_log)
2023 return []
2024
2025
2026 def isValidMergeJob(self, pandaID, jediTaskID):
2027 comment = " /* DBProxy.isValidMergeJob */"
2028 tmp_log = self.create_tagged_logger(comment, f"PandaID={pandaID} jediTaskID={jediTaskID}")
2029 tmp_log.debug("start")
2030 try:
2031 retVal = True
2032 retMsg = ""
2033
2034 sqlJ = "SELECT jobStatus FROM ATLAS_PANDA.jobsDefined4 WHERE PandaID=:PandaID "
2035 sqlJ += "UNION "
2036 sqlJ += "SELECT jobStatus FROM ATLAS_PANDA.jobsActive4 WHERE PandaID=:PandaID "
2037
2038 sqlF = "SELECT datasetID,fileID FROM ATLAS_PANDA.filesTable4 "
2039 sqlF += "WHERE PandaID=:PandaID AND type IN (:type1,:type2) "
2040
2041 sqlP = "SELECT outPandaID FROM ATLAS_PANDA.JEDI_Dataset_Contents "
2042 sqlP += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID AND type<>:type1"
2043
2044 sqlC = "SELECT jobStatus FROM ATLAS_PANDA.jobsActive4 WHERE PandaID=:PandaID "
2045
2046 self.conn.begin()
2047
2048 varMap = {}
2049 varMap[":PandaID"] = pandaID
2050 self.cur.execute(sqlJ + comment, varMap)
2051 resJ = self.cur.fetchone()
2052 if resJ is None:
2053 tmp_log.debug("merge job not found")
2054 else:
2055
2056 varMap = {}
2057 varMap[":PandaID"] = pandaID
2058 varMap[":type1"] = "input"
2059 varMap[":type2"] = "pseudo_input"
2060 self.cur.execute(sqlF + comment, varMap)
2061 resF = self.cur.fetchall()
2062 firstDatasetID = None
2063 fileIDsMap = {}
2064 for datasetID, fileID in resF:
2065 if datasetID not in fileIDsMap:
2066 fileIDsMap[datasetID] = set()
2067 fileIDsMap[datasetID].add(fileID)
2068
2069 pandaIDs = set()
2070 for datasetID in fileIDsMap:
2071 fileIDs = fileIDsMap[datasetID]
2072 for fileID in fileIDs:
2073 varMap = {}
2074 varMap[":jediTaskID"] = jediTaskID
2075 varMap[":datasetID"] = datasetID
2076 varMap[":fileID"] = fileID
2077 varMap[":type1"] = "lib"
2078 self.cur.execute(sqlP + comment, varMap)
2079 resP = self.cur.fetchone()
2080 if resP is not None and resP[0] is not None:
2081 pandaIDs.add(resP[0])
2082
2083 if len(pandaIDs) > 0:
2084 break
2085
2086 for tmpPandaID in pandaIDs:
2087 varMap = {}
2088 varMap[":PandaID"] = tmpPandaID
2089 self.cur.execute(sqlC + comment, varMap)
2090 resC = self.cur.fetchone()
2091 if resC is None:
2092
2093 tmp_log.debug(f"pre-merge job {tmpPandaID} not found")
2094 retVal = False
2095 retMsg = tmpPandaID
2096 break
2097 elif resC[0] != "merging":
2098
2099 tmp_log.debug("pre-merge job in {0} != merging".format(tmpPandaID, resC[0]))
2100 retVal = False
2101 retMsg = tmpPandaID
2102 break
2103
2104 if not self._commit():
2105 raise RuntimeError("Commit error")
2106 tmp_log.debug(f"ret={retVal}")
2107 return retVal, retMsg
2108 except Exception:
2109
2110 self._rollback()
2111
2112 self.dump_error_message(tmp_log)
2113 return None, ""
2114
2115
2116 def checkJobStatus(self, pandaID):
2117 comment = " /* DBProxy.checkJobStatus */"
2118 tmp_log = self.create_tagged_logger(comment, f"PandaID={pandaID}")
2119 tmp_log.debug("start")
2120 retVal = {"command": None, "status": None}
2121 try:
2122 sqlC = (
2123 "SELECT jobStatus,commandToPilot FROM ATLAS_PANDA.jobsActive4 "
2124 "WHERE PandaID=:pandaID "
2125 "UNION "
2126 "SELECT /*+ INDEX_RS_ASC(JOBSARCHIVED4 PART_JOBSARCHIVED4_PK) */ "
2127 "jobStatus,commandToPilot FROM ATLAS_PANDA.jobsArchived4 "
2128 "WHERE PandaID=:pandaID AND modificationTime>:timeLimit "
2129 )
2130 varMap = dict()
2131 varMap[":pandaID"] = int(pandaID)
2132 varMap[":timeLimit"] = naive_utcnow() - datetime.timedelta(hours=1)
2133
2134 self.conn.begin()
2135
2136 self.cur.arraysize = 10
2137 self.cur.execute(sqlC + comment, varMap)
2138 res = self.cur.fetchone()
2139 if res is not None:
2140 retVal["status"], retVal["command"] = res
2141 else:
2142 retVal["status"], retVal["command"] = "unknown", "tobekilled"
2143
2144 if not self._commit():
2145 raise RuntimeError("Commit error")
2146 tmp_log.debug(f"done with {str(retVal)}")
2147 return retVal
2148 except Exception:
2149
2150 self._rollback()
2151 self.dump_error_message(tmp_log)
2152 return retVal
2153
2154
2155 def getActiveJobAttributes(self, pandaID, attrs):
2156 comment = " /* DBProxy.getActiveJobAttributes */"
2157 tmp_log = self.create_tagged_logger(comment, f"PandaID={pandaID}")
2158 tmp_log.debug("start")
2159 try:
2160 sqlS = f"SELECT {','.join(attrs)} FROM ATLAS_PANDA.jobsActive4 "
2161 sqlS += "WHERE PandaID=:PandaID "
2162
2163 self.conn.begin()
2164 varMap = {}
2165 varMap[":PandaID"] = pandaID
2166 self.cur.execute(sqlS + comment, varMap)
2167 res = self.cur.fetchone()
2168 if res is not None:
2169 retMap = dict()
2170 for idx, attr in enumerate(attrs):
2171 retMap[attr] = res[idx]
2172 else:
2173 retMap = None
2174
2175 if not self._commit():
2176 raise RuntimeError("Commit error")
2177 tmp_log.debug(f"got {str(retMap)}")
2178 return retMap
2179 except Exception:
2180
2181 self._rollback()
2182
2183 self.dump_error_message(tmp_log)
2184 return None
2185
2186
2187 def getUserJobMetadata(self, jediTaskID):
2188 comment = " /* DBProxy.getUserJobMetadata */"
2189 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
2190 tmp_log.debug("start")
2191 try:
2192
2193 sqlC = "SELECT j.PandaID,m.metaData FROM {0} j,{1} m "
2194 sqlC += "WHERE j.jediTaskID=:jediTaskID AND j.jobStatus=:jobStatus AND m.PandaID=j.PandaID AND j.prodSourceLabel=:label "
2195 retMap = dict()
2196 for a, m in [
2197 ("ATLAS_PANDA.jobsArchived4", "ATLAS_PANDA.metaTable"),
2198 ("ATLAS_PANDAARCH.jobsArchived", "ATLAS_PANDAARCH.metaTable_ARCH"),
2199 ]:
2200 sql = sqlC.format(a, m)
2201 varMap = {}
2202 varMap[":jediTaskID"] = jediTaskID
2203 varMap[":label"] = "user"
2204 varMap[":jobStatus"] = "finished"
2205
2206 self.conn.begin()
2207 self.cur.execute(sql + comment, varMap)
2208 resCs = self.cur.fetchall()
2209 for pandaID, clobMeta in resCs:
2210 try:
2211 metadata = clobMeta.read()
2212 except AttributeError:
2213 metadata = str(clobMeta)
2214 try:
2215 retMap[pandaID] = json.loads(metadata)
2216 except Exception:
2217 pass
2218
2219 if not self._commit():
2220 raise RuntimeError("Commit error")
2221 tmp_log.debug(f"got {len(retMap)} data blocks")
2222 return retMap
2223 except Exception:
2224
2225 self._rollback()
2226
2227 self.dump_error_message(tmp_log)
2228 return {}
2229
2230
2231 def insertJobOutputReport(self, panda_id, prod_source_label, job_status, attempt_nr, data):
2232 comment = " /* DBProxy.insertJobOutputReport */"
2233 tmp_log = self.create_tagged_logger(comment, f"PandaID={panda_id} attemptNr={attempt_nr}")
2234 tmp_log.debug("start")
2235
2236 sqlI = (
2237 "INSERT INTO {0}.Job_Output_Report "
2238 "(PandaID, prodSourceLabel, jobStatus, attemptNr, data, timeStamp) "
2239 "VALUES(:PandaID, :prodSourceLabel, :jobStatus, :attemptNr, :data, :timeStamp) "
2240 ).format(panda_config.schemaPANDA)
2241 try:
2242 retVal = False
2243
2244 self.conn.begin()
2245
2246 varMap = {}
2247 varMap[":PandaID"] = panda_id
2248 varMap[":prodSourceLabel"] = prod_source_label
2249 varMap[":jobStatus"] = job_status
2250 varMap[":attemptNr"] = attempt_nr
2251 varMap[":data"] = data
2252 varMap[":timeStamp"] = naive_utcnow()
2253 self.cur.execute(sqlI + comment, varMap)
2254 tmp_log.debug("successfully inserted")
2255 retVal = True
2256
2257 if not self._commit():
2258 raise RuntimeError("Commit error")
2259 tmp_log.debug("done")
2260 return retVal
2261 except Exception:
2262
2263 self._rollback()
2264
2265 self.dump_error_message(tmp_log)
2266 return retVal
2267
2268
2269 def updateJobOutputReport(self, panda_id, attempt_nr, data):
2270 comment = " /* DBProxy.updateJobOutputReport */"
2271 tmp_log = self.create_tagged_logger(comment, f"PandaID={panda_id} attemptNr={attempt_nr}")
2272 tmp_log.debug("start")
2273
2274 try:
2275 retVal = False
2276
2277 sqlU = f"UPDATE {panda_config.schemaPANDA}.Job_Output_Report SET data=:data, timeStamp=:timeStamp WHERE PandaID=:PandaID AND attemptNr=:attemptNr "
2278
2279 self.conn.begin()
2280
2281 varMap = {}
2282 varMap[":PandaID"] = panda_id
2283 varMap[":attemptNr"] = attempt_nr
2284 varMap[":data"] = data
2285 varMap[":timeStamp"] = naive_utcnow()
2286 self.cur.execute(sqlU + comment, varMap)
2287 nRow = self.cur.rowcount
2288 if nRow == 1:
2289 tmp_log.debug("successfully updated")
2290 retVal = True
2291 elif nRow == 0:
2292 tmp_log.debug("entry not found, not updated")
2293 else:
2294 tmp_log.warning(f"updated unspecific number of rows: {nRow}")
2295
2296 if not self._commit():
2297 raise RuntimeError("Commit error")
2298 tmp_log.debug("done")
2299 return retVal
2300 except Exception:
2301
2302 self._rollback()
2303
2304 self.dump_error_message(tmp_log)
2305 return retVal
2306
2307
2308 def deleteJobOutputReport(self, panda_id, attempt_nr):
2309 comment = " /* DBProxy.deleteJobOutputReport */"
2310 tmp_log = self.create_tagged_logger(comment, f"PandaID={panda_id} attemptNr={attempt_nr}")
2311 tmp_log.debug("start")
2312
2313 sqlD = f"DELETE FROM {panda_config.schemaPANDA}.Job_Output_Report WHERE PandaID=:PandaID AND attemptNr=:attemptNr "
2314 try:
2315 retVal = False
2316
2317 self.conn.begin()
2318
2319 varMap = {}
2320 varMap[":PandaID"] = panda_id
2321 varMap[":attemptNr"] = attempt_nr
2322 self.cur.execute(sqlD + comment, varMap)
2323 tmp_log.debug("successfully deleted")
2324 retVal = True
2325
2326 if not self._commit():
2327 raise RuntimeError("Commit error")
2328 tmp_log.debug("done")
2329 return retVal
2330 except Exception:
2331
2332 self._rollback()
2333
2334 self.dump_error_message(tmp_log)
2335 return retVal
2336
2337
2338 def getJobOutputReport(self, panda_id, attempt_nr):
2339 comment = " /* DBProxy.getJobOutputReport */"
2340 tmp_log = self.create_tagged_logger(comment, f"PandaID={panda_id} attemptNr={attempt_nr}")
2341 tmp_log.debug("start")
2342
2343 try:
2344 retVal = {}
2345
2346 sqlGR = (
2347 "SELECT PandaID,prodSourceLabel,jobStatus,attemptNr,data,timeStamp,lockedBy,lockedTime "
2348 "FROM {0}.Job_Output_Report "
2349 "WHERE PandaID=:PandaID AND attemptNr=:attemptNr "
2350 ).format(panda_config.schemaPANDA)
2351
2352 self.conn.begin()
2353
2354 varMap = {}
2355 varMap[":PandaID"] = panda_id
2356 varMap[":attemptNr"] = attempt_nr
2357 self.cur.execute(sqlGR + comment, varMap)
2358 resGR = self.cur.fetchall()
2359 if not resGR:
2360 tmp_log.debug("record does not exist, skipped")
2361 for (
2362 PandaID,
2363 prodSourceLabel,
2364 jobStatus,
2365 attemptNr,
2366 data,
2367 timeStamp,
2368 lockedBy,
2369 lockedTime,
2370 ) in resGR:
2371
2372 retVal = {
2373 "PandaID": PandaID,
2374 "jobStatus": jobStatus,
2375 "attemptNr": attemptNr,
2376 "timeStamp": timeStamp,
2377 "data": data,
2378 "lockedBy": lockedBy,
2379 "lockedTime": lockedTime,
2380 }
2381 tmp_log.debug("got record")
2382 break
2383
2384 if not self._commit():
2385 raise RuntimeError("Commit error")
2386 tmp_log.debug("done")
2387 return retVal
2388 except Exception:
2389
2390 self._rollback()
2391
2392 self.dump_error_message(tmp_log)
2393 return retVal
2394
2395
2396 def lockJobOutputReport(self, panda_id, attempt_nr, pid, time_limit, take_over_from=None):
2397 comment = " /* DBProxy.lockJobOutputReport */"
2398 tmp_log = self.create_tagged_logger(comment, f"PandaID={panda_id} attemptNr={attempt_nr}")
2399 tmp_log.debug("start")
2400
2401 try:
2402 retVal = []
2403
2404 sqlGL = (
2405 "SELECT PandaID,attemptNr "
2406 "FROM {0}.Job_Output_Report "
2407 "WHERE PandaID=:PandaID AND attemptNr=:attemptNr "
2408 "AND (lockedBy IS NULL OR lockedBy=:lockedBy OR lockedTime<:lockedTime) "
2409 "FOR UPDATE NOWAIT "
2410 ).format(panda_config.schemaPANDA)
2411
2412 sqlUL = (
2413 "UPDATE {0}.Job_Output_Report " "SET lockedBy=:lockedBy, lockedTime=:lockedTime " "WHERE PandaID=:PandaID AND attemptNr=:attemptNr "
2414 ).format(panda_config.schemaPANDA)
2415
2416 self.conn.begin()
2417
2418 varMap = {}
2419 varMap[":PandaID"] = panda_id
2420 varMap[":attemptNr"] = attempt_nr
2421 if take_over_from is None:
2422 varMap[":lockedBy"] = pid
2423 else:
2424 varMap[":lockedBy"] = take_over_from
2425 varMap[":lockedTime"] = naive_utcnow() - datetime.timedelta(minutes=time_limit)
2426 utc_now = naive_utcnow()
2427 try:
2428 self.cur.execute(sqlGL + comment, varMap)
2429 resGL = self.cur.fetchall()
2430 if not resGL:
2431 tmp_log.debug("record already locked by other thread, skipped")
2432 except Exception:
2433 resGL = None
2434 tmp_log.debug("record skipped due to NOWAIT")
2435 if resGL:
2436 for panda_id, attempt_nr in resGL:
2437
2438 varMap = {}
2439 varMap[":PandaID"] = panda_id
2440 varMap[":attemptNr"] = attempt_nr
2441 varMap[":lockedBy"] = pid
2442 varMap[":lockedTime"] = utc_now
2443 self.cur.execute(sqlUL + comment, varMap)
2444 if take_over_from is None:
2445 tmp_log.debug(f"successfully locked record by {pid}")
2446 else:
2447 tmp_log.debug(f"successfully took over locked record from {take_over_from} by {pid}")
2448 retVal = True
2449 break
2450
2451 if not self._commit():
2452 raise RuntimeError("Commit error")
2453 tmp_log.debug("done")
2454 return retVal
2455 except Exception:
2456
2457 self._rollback()
2458
2459 self.dump_error_message(tmp_log)
2460 return retVal
2461
2462
2463 def unlockJobOutputReport(self, panda_id, attempt_nr, pid, lock_offset):
2464 comment = " /* DBProxy.unlockJobOutputReport */"
2465 tmp_log = self.create_tagged_logger(comment, f"PandaID={panda_id} attemptNr={attempt_nr}")
2466 tmp_log.debug("start")
2467
2468 try:
2469 retVal = []
2470
2471 sqlGL = (
2472 "SELECT PandaID,attemptNr "
2473 "FROM {0}.Job_Output_Report "
2474 "WHERE PandaID=:PandaID AND attemptNr=:attemptNr "
2475 "AND lockedBy=:lockedBy "
2476 "FOR UPDATE"
2477 ).format(panda_config.schemaPANDA)
2478
2479 sqlUL = f"UPDATE {panda_config.schemaPANDA}.Job_Output_Report SET lockedTime=:lockedTime WHERE PandaID=:PandaID AND attemptNr=:attemptNr "
2480
2481 self.conn.begin()
2482
2483 varMap = {}
2484 varMap[":PandaID"] = panda_id
2485 varMap[":attemptNr"] = attempt_nr
2486 varMap[":lockedBy"] = pid
2487 self.cur.execute(sqlGL + comment, varMap)
2488 resGL = self.cur.fetchall()
2489 if not resGL:
2490 tmp_log.debug("record not locked by this thread, skipped")
2491 else:
2492 for panda_id, attempt_nr in resGL:
2493
2494 varMap = {}
2495 varMap[":PandaID"] = panda_id
2496 varMap[":attemptNr"] = attempt_nr
2497 varMap[":lockedTime"] = naive_utcnow() - datetime.timedelta(minutes=lock_offset)
2498 self.cur.execute(sqlUL + comment, varMap)
2499 tmp_log.debug("successfully unlocked record")
2500 retVal = True
2501 break
2502
2503 if not self._commit():
2504 raise RuntimeError("Commit error")
2505 tmp_log.debug("done")
2506 return retVal
2507 except Exception:
2508
2509 self._rollback()
2510
2511 self.dump_error_message(tmp_log)
2512 return retVal
2513
2514
2515 def listJobOutputReport(self, only_unlocked, time_limit, limit, grace_period, labels, anti_labels):
2516 comment = " /* DBProxy.listJobOutputReport */"
2517 tmp_log = self.create_tagged_logger(comment)
2518 tmp_log.debug(f"start label={str(labels)} limit={limit} anti_label={str(anti_labels)}")
2519 try:
2520 retVal = None
2521 if only_unlocked:
2522
2523 varMap = {}
2524 varMap[":limit"] = limit * 10
2525 varMap[":lockedTime"] = naive_utcnow() - datetime.timedelta(minutes=time_limit)
2526 varMap[":timeStamp"] = naive_utcnow() - datetime.timedelta(seconds=grace_period)
2527
2528 sqlGR = (
2529 "SELECT * "
2530 "FROM ( "
2531 "SELECT PandaID,jobStatus,attemptNr,timeStamp "
2532 "FROM {0}.Job_Output_Report "
2533 "WHERE (lockedBy IS NULL OR lockedTime<:lockedTime) "
2534 "AND timeStamp<:timeStamp ".format(panda_config.schemaPANDA)
2535 )
2536 if labels is not None:
2537 label_var_names_str, label_var_map = get_sql_IN_bind_variables(labels, prefix=":l_", value_as_suffix=True)
2538 sqlGR += f"AND prodSourceLabel IN ({label_var_names_str}) "
2539 varMap.update(label_var_map)
2540 if anti_labels is not None:
2541 anti_label_var_names_str, anti_label_var_map = get_sql_IN_bind_variables(anti_labels, prefix=":al_", value_as_suffix=True)
2542 sqlGR += f"AND prodSourceLabel NOT IN ({anti_label_var_names_str}) "
2543 varMap.update(anti_label_var_map)
2544 sqlGR += "ORDER BY timeStamp " ") " "WHERE rownum<=:limit "
2545
2546 self.conn.begin()
2547
2548 self.cur.execute(sqlGR + comment, varMap)
2549 separator = limit // 10
2550 retVal = self.cur.fetchall()
2551
2552 ret_head = retVal[:separator]
2553 ret_tail = retVal[separator:]
2554 random.shuffle(ret_tail)
2555 retVal = ret_head + ret_tail
2556 retVal = retVal[:limit]
2557 tmp_log.debug(f"listed {len(retVal)} unlocked records")
2558 else:
2559
2560 sqlS = (
2561 "SELECT * "
2562 "FROM ( "
2563 "SELECT PandaID,jobStatus,attemptNr,timeStamp "
2564 "FROM {0}.Job_Output_Report "
2565 "ORDER BY timeStamp "
2566 ") "
2567 "WHERE rownum<=:limit "
2568 ).format(panda_config.schemaPANDA)
2569
2570 self.conn.begin()
2571 varMap = {}
2572 varMap[":limit"] = limit
2573
2574 self.cur.execute(sqlS + comment, varMap)
2575 retVal = self.cur.fetchall()
2576 tmp_log.debug(f"listed {len(retVal)} records")
2577
2578 if not self._commit():
2579 raise RuntimeError("Commit error")
2580 tmp_log.debug("done")
2581 return retVal
2582 except Exception:
2583
2584 self._rollback()
2585
2586 self.dump_error_message(tmp_log)
2587 return retVal
2588
2589
2590 def send_command_to_job(self, panda_id, com):
2591 comment = " /* DBProxy.send_command_to_job */"
2592 tmp_log = self.create_tagged_logger(comment, f"PandaID={panda_id}")
2593 tmp_log.debug("start")
2594 retVal = None
2595 try:
2596
2597 new_com = JobSpec.truncateStringAttr("commandToPilot", com)
2598 if len(new_com) != len(com):
2599 retVal = (
2600 False,
2601 f"command string too long. must be less than {len(new_com)} chars",
2602 )
2603 else:
2604 sqlR = "SELECT commandToPilot FROM ATLAS_PANDA.{} WHERE PandaID=:PandaID FOR UPDATE "
2605 sqlU = "UPDATE ATLAS_PANDA.{} SET commandToPilot=:commandToPilot " "WHERE PandaID=:PandaID "
2606 for table in ["jobsDefined4", "jobsActive4"]:
2607
2608 self.conn.begin()
2609
2610 varMap = {}
2611 varMap[":PandaID"] = panda_id
2612 self.cur.execute(sqlR.format(table) + comment, varMap)
2613 data = self.cur.fetchone()
2614 if data is not None:
2615 (commandToPilot,) = data
2616 if commandToPilot == "tobekilled":
2617 retVal = (False, "job is being killed")
2618 else:
2619 varMap = {}
2620 varMap[":PandaID"] = panda_id
2621 varMap[":commandToPilot"] = com
2622 self.cur.execute(sqlU.format(table) + comment, varMap)
2623 nRow = self.cur.rowcount
2624 if nRow:
2625 retVal = (True, "command received")
2626
2627 if not self._commit():
2628 raise RuntimeError("Commit error")
2629 if retVal is not None:
2630 break
2631 if retVal is None:
2632 retVal = (False, f"no active job with PandaID={panda_id}")
2633 tmp_log.debug(f"done with {str(retVal)}")
2634 return retVal
2635 except Exception:
2636
2637 self._rollback()
2638
2639 self.dump_error_message(tmp_log)
2640 return False, "database error"
2641
2642 def get_distinct_resource_types_per_site(self, jedi_task_id: int, threshold: float = 20.0) -> dict[str, set[str]]:
2643 """Get distinct resource types per computingSite from jobsActive4 and jobsDefined4 for a given task,
2644 ignoring resource types whose share of total jobs at that site is below the threshold percentage.
2645
2646 :param jedi_task_id: the jediTaskID to filter jobs by
2647 :param threshold: minimum percentage (0.0~100.0) of site jobs required to include a resource type (0.0 = no filtering)
2648 :return: dict mapping computingSite to a set of resource_type strings, or empty dict on error
2649 """
2650 comment = " /* DBProxy.get_distinct_resource_types_per_site */"
2651 tmp_log = self.create_tagged_logger(comment)
2652 tmp_log.debug("start")
2653 try:
2654
2655 self.conn.begin()
2656 var_map = {":jediTaskID": jedi_task_id}
2657 sql = (
2658 "SELECT computingSite, resource_type, COUNT(*) "
2659 "FROM ATLAS_PANDA.jobsActive4 "
2660 "WHERE jediTaskID=:jediTaskID AND resource_type IS NOT NULL "
2661 "GROUP BY computingSite, resource_type "
2662 "UNION ALL "
2663 "SELECT computingSite, resource_type, COUNT(*) "
2664 "FROM ATLAS_PANDA.jobsDefined4 "
2665 "WHERE jediTaskID=:jediTaskID AND resource_type IS NOT NULL "
2666 "GROUP BY computingSite, resource_type"
2667 )
2668 self.cur.execute(sql + comment, var_map)
2669 res = self.cur.fetchall()
2670
2671 count_map: dict[tuple[str, str], int] = {}
2672 site_totals: dict[str, int] = {}
2673 for computing_site, resource_type, cnt in res:
2674 count_map[(computing_site, resource_type)] = count_map.get((computing_site, resource_type), 0) + cnt
2675 site_totals[computing_site] = site_totals.get(computing_site, 0) + cnt
2676
2677 ret: dict[str, set[str]] = {}
2678 for (computing_site, resource_type), cnt in count_map.items():
2679 if cnt * 100.0 / site_totals[computing_site] >= threshold:
2680 ret.setdefault(computing_site, set()).add(resource_type)
2681 if not self._commit():
2682 raise RuntimeError("Commit error")
2683 tmp_log.debug(f"done, {len(ret)} sites found")
2684 return ret
2685 except Exception:
2686 self._rollback()
2687 self.dump_error_message(tmp_log)
2688 return {}