Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:03

0001 import datetime
0002 import json
0003 import random
0004 import re
0005 import time
0006 
0007 from pandacommon.pandalogger.LogWrapper import LogWrapper
0008 from pandacommon.pandautils.PandaUtils import get_sql_IN_bind_variables, naive_utcnow
0009 
0010 from pandaserver.config import panda_config
0011 from pandaserver.srvcore import CoreUtils, srv_msg_utils
0012 from pandaserver.taskbuffer import EventServiceUtils, JobUtils
0013 from pandaserver.taskbuffer.db_proxy_mods.base_module import BaseModule
0014 from pandaserver.taskbuffer.FileSpec import FileSpec
0015 from pandaserver.taskbuffer.JobSpec import JobSpec
0016 
0017 
0018 # Module class to define miscellaneous job-related methods that are independent of another module's methods
0019 class JobStandaloneModule(BaseModule):
0020     # constructor
0021     def __init__(self, log_stream: LogWrapper):
0022         super().__init__(log_stream)
0023 
0024     # activate job. move job from jobsDefined to jobsActive
0025     def activateJob(self, job):
0026         comment = " /* DBProxy.activateJob */"
0027         if job is None:
0028             tmp_id = None
0029         else:
0030             tmp_id = job.PandaID
0031         tmp_log = self.create_tagged_logger(comment, f"PandaID={tmp_id}")
0032         updatedFlag = False
0033         if job is None:
0034             tmp_log.debug("skip job=None")
0035             return True
0036         tmp_log.debug("start")
0037         sql0 = "SELECT row_ID FROM ATLAS_PANDA.filesTable4 WHERE PandaID=:PandaID AND type=:type AND NOT status IN (:status1,:status2) "
0038         sql1 = "DELETE FROM ATLAS_PANDA.jobsDefined4 "
0039         sql1 += "WHERE PandaID=:PandaID AND (jobStatus=:oldJobStatus1 OR jobStatus=:oldJobStatus2) AND commandToPilot IS NULL"
0040         sql2 = f"INSERT INTO ATLAS_PANDA.jobsActive4 ({JobSpec.columnNames()}) "
0041         sql2 += JobSpec.bindValuesExpression()
0042         # host and time information
0043         job.modificationTime = naive_utcnow()
0044         # set stateChangeTime for defined->activated but not for assigned->activated
0045         if job.jobStatus in ["defined"]:
0046             job.stateChangeTime = job.modificationTime
0047         nTry = 3
0048         to_push = False
0049         for iTry in range(nTry):
0050             try:
0051                 # check if all files are ready
0052                 allOK = True
0053                 for file in job.Files:
0054                     if file.type == "input" and file.status not in ["ready", "cached"]:
0055                         allOK = False
0056                         break
0057                 # begin transaction
0058                 self.conn.begin()
0059                 # check all inputs are ready
0060                 varMap = {}
0061                 varMap[":type"] = "input"
0062                 varMap[":status1"] = "ready"
0063                 varMap[":status2"] = "cached"
0064                 varMap[":PandaID"] = job.PandaID
0065                 self.cur.arraysize = 100
0066                 self.cur.execute(sql0 + comment, varMap)
0067                 res = self.cur.fetchall()
0068                 if len(res) == 0 or allOK:
0069                     # check resource share
0070                     job.jobStatus = "activated"
0071 
0072                     # delete
0073                     varMap = {}
0074                     varMap[":PandaID"] = job.PandaID
0075                     varMap[":oldJobStatus1"] = "assigned"
0076                     varMap[":oldJobStatus2"] = "defined"
0077                     self.cur.execute(sql1 + comment, varMap)
0078                     n = self.cur.rowcount
0079                     if n == 0:
0080                         # already killed or activated
0081                         tmp_log.debug("Job not found to activate")
0082                     else:
0083                         # insert
0084                         self.cur.execute(sql2 + comment, job.valuesMap())
0085                         # update files
0086                         for file in job.Files:
0087                             sqlF = f"UPDATE ATLAS_PANDA.filesTable4 SET {file.bindUpdateChangesExpression()}" + "WHERE row_ID=:row_ID"
0088                             varMap = file.valuesMap(onlyChanged=True)
0089                             if varMap != {}:
0090                                 varMap[":row_ID"] = file.row_ID
0091                                 tmp_log.debug(sqlF + comment + str(varMap))
0092                                 self.cur.execute(sqlF + comment, varMap)
0093                         # job parameters
0094                         sqlJob = "UPDATE ATLAS_PANDA.jobParamsTable SET jobParameters=:param WHERE PandaID=:PandaID"
0095                         varMap = {}
0096                         varMap[":PandaID"] = job.PandaID
0097                         varMap[":param"] = job.jobParameters
0098                         self.cur.execute(sqlJob + comment, varMap)
0099                         updatedFlag = True
0100                         to_push = job.is_push_job()
0101                 else:
0102                     # update job
0103                     sqlJ = (
0104                         f"UPDATE ATLAS_PANDA.jobsDefined4 SET {job.bindUpdateChangesExpression()} "
0105                     ) + "WHERE PandaID=:PandaID AND (jobStatus=:oldJobStatus1 OR jobStatus=:oldJobStatus2)"
0106                     varMap = job.valuesMap(onlyChanged=True)
0107                     varMap[":PandaID"] = job.PandaID
0108                     varMap[":oldJobStatus1"] = "assigned"
0109                     varMap[":oldJobStatus2"] = "defined"
0110                     tmp_log.debug(sqlJ + comment + str(varMap))
0111                     self.cur.execute(sqlJ + comment, varMap)
0112                     n = self.cur.rowcount
0113                     if n == 0:
0114                         # already killed or activated
0115                         tmp_log.debug("Job not found to update")
0116                     else:
0117                         # update files
0118                         for file in job.Files:
0119                             sqlF = f"UPDATE ATLAS_PANDA.filesTable4 SET {file.bindUpdateChangesExpression()}" + "WHERE row_ID=:row_ID"
0120                             varMap = file.valuesMap(onlyChanged=True)
0121                             if varMap != {}:
0122                                 varMap[":row_ID"] = file.row_ID
0123                                 tmp_log.debug(sqlF + comment + str(varMap))
0124                                 self.cur.execute(sqlF + comment, varMap)
0125                         # job parameters
0126                         sqlJob = "UPDATE ATLAS_PANDA.jobParamsTable SET jobParameters=:param WHERE PandaID=:PandaID"
0127                         varMap = {}
0128                         varMap[":PandaID"] = job.PandaID
0129                         varMap[":param"] = job.jobParameters
0130                         self.cur.execute(sqlJob + comment, varMap)
0131                         updatedFlag = True
0132                 # commit
0133                 if not self._commit():
0134                     raise RuntimeError("Commit error")
0135                 # record status change
0136                 try:
0137                     if updatedFlag:
0138                         self.recordStatusChange(job.PandaID, job.jobStatus, jobInfo=job)
0139                 except Exception:
0140                     tmp_log.error("recordStatusChange failed")
0141                 self.push_job_status_message(job, job.PandaID, job.jobStatus)
0142                 # push job
0143                 if to_push:
0144                     mb_proxy_queue = self.get_mb_proxy("panda_pilot_queue")
0145                     mb_proxy_topic = self.get_mb_proxy("panda_pilot_topic")
0146                     if mb_proxy_queue and mb_proxy_topic:
0147                         tmp_log.debug("push job")
0148                         srv_msg_utils.send_job_message(mb_proxy_queue, mb_proxy_topic, job.jediTaskID, job.PandaID)
0149                     else:
0150                         tmp_log.debug("message queue/topic not configured")
0151                 tmp_log.debug("done")
0152                 return True
0153             except Exception as e:
0154                 # roll back
0155                 self._rollback()
0156                 if iTry + 1 < nTry:
0157                     tmp_log.debug(f"retry: {iTry}")
0158                     time.sleep(random.randint(10, 20))
0159                     continue
0160                 self.dump_error_message(tmp_log)
0161                 return False
0162 
0163     # send job to jobsWaiting
0164     def keepJob(self, job):
0165         comment = " /* DBProxy.keepJob */"
0166         tmp_log = self.create_tagged_logger(comment, f"PandaID={job.PandaID}")
0167         tmp_log.debug("start")
0168         # set status
0169         job.jobStatus = "waiting"
0170         sql1 = f"UPDATE ATLAS_PANDA.jobsDefined4 SET {job.bindUpdateChangesExpression()} "
0171         sql1 += "WHERE PandaID=:PandaID AND (jobStatus=:oldJobStatus1 OR jobStatus=:oldJobStatus2) AND commandToPilot IS NULL"
0172         # time information
0173         job.modificationTime = naive_utcnow()
0174         job.stateChangeTime = job.modificationTime
0175         updatedFlag = False
0176         nTry = 3
0177         for iTry in range(nTry):
0178             try:
0179                 # begin transaction
0180                 self.conn.begin()
0181                 # update
0182                 varMap = job.valuesMap(onlyChanged=True)
0183                 varMap[":PandaID"] = job.PandaID
0184                 varMap[":oldJobStatus1"] = "assigned"
0185                 varMap[":oldJobStatus2"] = "defined"
0186                 self.cur.execute(sql1 + comment, varMap)
0187                 n = self.cur.rowcount
0188                 if n == 0:
0189                     # already killed
0190                     tmp_log.debug(f"Not found")
0191                 else:
0192                     # update files
0193                     for file in job.Files:
0194                         sqlF = f"UPDATE ATLAS_PANDA.filesTable4 SET {file.bindUpdateChangesExpression()}" + "WHERE row_ID=:row_ID"
0195                         varMap = file.valuesMap(onlyChanged=True)
0196                         if varMap != {}:
0197                             varMap[":row_ID"] = file.row_ID
0198                             self.cur.execute(sqlF + comment, varMap)
0199                     # update parameters
0200                     sqlJob = "UPDATE ATLAS_PANDA.jobParamsTable SET jobParameters=:param WHERE PandaID=:PandaID"
0201                     varMap = {}
0202                     varMap[":PandaID"] = job.PandaID
0203                     varMap[":param"] = job.jobParameters
0204                     self.cur.execute(sqlJob + comment, varMap)
0205                     updatedFlag = True
0206                 # commit
0207                 if not self._commit():
0208                     raise RuntimeError("Commit error")
0209                 # record status change
0210                 try:
0211                     if updatedFlag:
0212                         self.recordStatusChange(job.PandaID, job.jobStatus, jobInfo=job)
0213                         self.push_job_status_message(job, job.PandaID, job.jobStatus)
0214                 except Exception:
0215                     tmp_log.error("recordStatusChange in keepJob")
0216                 return True
0217             except Exception:
0218                 # roll back
0219                 self._rollback()
0220                 if iTry + 1 < nTry:
0221                     tmp_log.debug(f"retry : {iTry}")
0222                     time.sleep(random.randint(10, 20))
0223                     continue
0224                 # dump error
0225                 self.dump_error_message(tmp_log)
0226                 return False
0227 
0228     # reset job in jobsActive
0229     def resetJob(
0230         self,
0231         pandaID,
0232         activeTable=True,
0233         keepSite=False,
0234         getOldSubs=False,
0235         forPending=True,
0236     ):
0237         comment = " /* DBProxy.resetJob */"
0238         tmp_log = self.create_tagged_logger(comment, f"PandaID={pandaID}")
0239         tmp_log.debug(f"activeTable={activeTable}")
0240         # select table
0241         table = "ATLAS_PANDA.jobsActive4"
0242         sql1 = f"SELECT {JobSpec.columnNames()} FROM {table} "
0243         sql1 += "WHERE PandaID=:PandaID"
0244         sql2 = f"DELETE FROM {table} "
0245         sql2 += "WHERE PandaID=:PandaID AND (jobStatus=:oldJobStatus1 OR jobStatus=:oldJobStatus2)"
0246         sql3 = f"INSERT INTO ATLAS_PANDA.jobsDefined4 ({JobSpec.columnNames()}) "
0247         sql3 += JobSpec.bindValuesExpression()
0248         try:
0249             # transaction causes Request ndbd time-out in ATLAS_PANDA.jobsActive4
0250             self.conn.begin()
0251             # select
0252             varMap = {}
0253             varMap[":PandaID"] = pandaID
0254             self.cur.arraysize = 10
0255             self.cur.execute(sql1 + comment, varMap)
0256             res = self.cur.fetchone()
0257             # not found
0258             if res is None:
0259                 # commit
0260                 if not self._commit():
0261                     raise RuntimeError("Commit error")
0262                 # return
0263                 return None
0264             # instantiate Job
0265             job = JobSpec()
0266             job.pack(res)
0267             # if already running
0268             if job.jobStatus != "waiting" and job.jobStatus != "activated" and (forPending and job.jobStatus != "pending"):
0269                 # commit
0270                 if not self._commit():
0271                     raise RuntimeError("Commit error")
0272                 # return
0273                 return None
0274             # do nothing for analysis jobs
0275             if job.prodSourceLabel in ["user", "panda"] and not forPending and job.jobStatus != "pending":
0276                 # commit
0277                 if not self._commit():
0278                     raise RuntimeError("Commit error")
0279                 # return
0280                 return None
0281             # delete
0282             varMap = {}
0283             varMap[":PandaID"] = pandaID
0284             if not forPending:
0285                 varMap[":oldJobStatus1"] = "waiting"
0286             else:
0287                 varMap[":oldJobStatus1"] = "pending"
0288             varMap[":oldJobStatus2"] = "activated"
0289             self.cur.execute(sql2 + comment, varMap)
0290             retD = self.cur.rowcount
0291             # delete failed
0292             tmp_log.debug(f"retD = {retD}")
0293             if retD != 1:
0294                 # commit
0295                 if not self._commit():
0296                     raise RuntimeError("Commit error")
0297                 return None
0298             # delete from jobsDefined4 just in case
0299             varMap = {}
0300             varMap[":PandaID"] = pandaID
0301             sqlD = "DELETE FROM ATLAS_PANDA.jobsDefined4 WHERE PandaID=:PandaID"
0302             self.cur.execute(sqlD + comment, varMap)
0303             # increase priority
0304             if job.jobStatus == "activated" and job.currentPriority < 100:
0305                 job.currentPriority = 100
0306             # reset computing site and dispatchDBlocks
0307             job.jobStatus = "defined"
0308             if job.prodSourceLabel not in ["user", "panda"]:
0309                 job.dispatchDBlock = None
0310                 # erase old assignment
0311                 if (not keepSite) and job.relocationFlag not in [1, 2]:
0312                     job.computingSite = None
0313                 job.computingElement = None
0314             # host and time information
0315             job.modificationHost = self.hostname
0316             job.modificationTime = naive_utcnow()
0317             job.stateChangeTime = job.modificationTime
0318             # reset
0319             job.brokerageErrorDiag = None
0320             job.brokerageErrorCode = None
0321             # insert
0322             self.cur.execute(sql3 + comment, job.valuesMap())
0323             # job parameters
0324             sqlJobP = "SELECT jobParameters FROM ATLAS_PANDA.jobParamsTable WHERE PandaID=:PandaID"
0325             self.cur.execute(sqlJobP + comment, varMap)
0326             for (clobJobP,) in self.cur:
0327                 try:
0328                     job.jobParameters = clobJobP.read()
0329                 except AttributeError:
0330                     job.jobParameters = str(clobJobP)
0331                 break
0332             # Files
0333             oldSubList = []
0334             sqlFile = f"SELECT {FileSpec.columnNames()} FROM ATLAS_PANDA.filesTable4 "
0335             sqlFile += "WHERE PandaID=:PandaID"
0336             self.cur.arraysize = 10000
0337             self.cur.execute(sqlFile + comment, varMap)
0338             resFs = self.cur.fetchall()
0339             for resF in resFs:
0340                 file = FileSpec()
0341                 file.pack(resF)
0342                 # reset GUID to trigger LRC/LFC scanning
0343                 if file.status == "missing":
0344                     file.GUID = None
0345                 # collect old subs
0346                 if job.prodSourceLabel in ["managed", "test"] and file.type in ["output", "log"] and re.search("_sub\d+$", file.destinationDBlock) is not None:
0347                     if file.destinationDBlock not in oldSubList:
0348                         oldSubList.append(file.destinationDBlock)
0349                 # reset status, destinationDBlock and dispatchDBlock
0350                 if job.lockedby != "jedi":
0351                     file.status = "unknown"
0352                 if job.prodSourceLabel not in ["user", "panda"]:
0353                     file.dispatchDBlock = None
0354                 file.destinationDBlock = re.sub("_sub\d+$", "", file.destinationDBlock)
0355                 # add file
0356                 job.addFile(file)
0357                 # update files
0358                 sqlF = f"UPDATE ATLAS_PANDA.filesTable4 SET {file.bindUpdateChangesExpression()}" + "WHERE row_ID=:row_ID"
0359                 varMap = file.valuesMap(onlyChanged=True)
0360                 if varMap != {}:
0361                     varMap[":row_ID"] = file.row_ID
0362                     tmp_log.debug(sqlF + comment + str(varMap))
0363                     self.cur.execute(sqlF + comment, varMap)
0364             # commit
0365             if not self._commit():
0366                 raise RuntimeError("Commit error")
0367             # record status change
0368             try:
0369                 self.recordStatusChange(job.PandaID, job.jobStatus, jobInfo=job)
0370             except Exception:
0371                 tmp_log.error("recordStatusChange in resetJobs")
0372             self.push_job_status_message(job, job.PandaID, job.jobStatus)
0373             tmp_log.debug(f"done with {job is not None}")
0374             if getOldSubs:
0375                 return job, oldSubList
0376             return job
0377         except Exception:
0378             # roll back
0379             self._rollback()
0380             # error report
0381             self.dump_error_message(tmp_log)
0382             return None
0383 
0384     # reset jobs in jobsDefined
0385     def resetDefinedJob(self, pandaID):
0386         comment = " /* DBProxy.resetDefinedJob */"
0387         tmp_log = self.create_tagged_logger(comment, f"PandaID={pandaID}")
0388         tmp_log.debug("start")
0389         sql1 = "UPDATE ATLAS_PANDA.jobsDefined4 SET "
0390         sql1 += "jobStatus=:newJobStatus,"
0391         sql1 += "modificationTime=CURRENT_DATE,"
0392         sql1 += "modificationHost=:modificationHost"
0393         sql1 += " WHERE PandaID=:PandaID AND jobStatus IN (:oldJobStatus1,:oldJobStatus2,:oldJobStatus3) "
0394         sql2 = f"SELECT {JobSpec.columnNames()} FROM ATLAS_PANDA.jobsDefined4 "
0395         sql2 += "WHERE PandaID=:PandaID"
0396         try:
0397             # begin transaction
0398             self.conn.begin()
0399             # update
0400             varMap = {}
0401             varMap[":PandaID"] = pandaID
0402             varMap[":newJobStatus"] = "defined"
0403             varMap[":oldJobStatus1"] = "assigned"
0404             varMap[":oldJobStatus2"] = "defined"
0405             varMap[":oldJobStatus3"] = "pending"
0406             varMap[":modificationHost"] = self.hostname
0407             self.cur.execute(sql1 + comment, varMap)
0408             retU = self.cur.rowcount
0409             # not found
0410             updatedFlag = True
0411             job = None
0412             if retU == 0:
0413                 tmp_log.debug("Not found for UPDATE")
0414                 updatedFlag = False
0415             else:
0416                 # select
0417                 varMap = {}
0418                 varMap[":PandaID"] = pandaID
0419                 self.cur.arraysize = 10
0420                 self.cur.execute(sql2 + comment, varMap)
0421                 res = self.cur.fetchone()
0422                 # not found
0423                 if res is None:
0424                     raise RuntimeError(f"Not found for SELECT")
0425                 # instantiate Job
0426                 job = JobSpec()
0427                 job.pack(res)
0428                 # job parameters
0429                 sqlJobP = "SELECT jobParameters FROM ATLAS_PANDA.jobParamsTable WHERE PandaID=:PandaID"
0430                 self.cur.execute(sqlJobP + comment, varMap)
0431                 for (clobJobP,) in self.cur:
0432                     try:
0433                         job.jobParameters = clobJobP.read()
0434                     except AttributeError:
0435                         job.jobParameters = str(clobJobP)
0436                     break
0437                 # Files
0438                 sqlFile = f"SELECT {FileSpec.columnNames()} FROM ATLAS_PANDA.filesTable4 "
0439                 sqlFile += "WHERE PandaID=:PandaID"
0440                 self.cur.arraysize = 10000
0441                 self.cur.execute(sqlFile + comment, varMap)
0442                 resFs = self.cur.fetchall()
0443                 for resF in resFs:
0444                     file = FileSpec()
0445                     file.pack(resF)
0446                     # add file
0447                     job.addFile(file)
0448             # commit
0449             if not self._commit():
0450                 raise RuntimeError("Commit error")
0451             # record status change
0452             try:
0453                 if updatedFlag:
0454                     self.recordStatusChange(job.PandaID, job.jobStatus, jobInfo=job)
0455                     self.push_job_status_message(job, job.PandaID, job.jobStatus)
0456             except Exception:
0457                 tmp_log.error("recordStatusChange in resetDefinedJobs")
0458             tmp_log.debug(f"done with {job is not None}")
0459             return job
0460         except Exception:
0461             self.dump_error_message(tmp_log)
0462             # roll back
0463             self._rollback()
0464             return None
0465 
0466     # peek at job
0467     def peekJob(self, pandaID, fromDefined, fromActive, fromArchived, fromWaiting, forAnal=False):
0468         comment = " /* DBProxy.peekJob */"
0469         tmp_log = self.create_tagged_logger(comment, f"PandaID={pandaID}")
0470         # return None for NULL PandaID
0471         if pandaID in ["NULL", "", "None", None]:
0472             return None
0473         # only int
0474         try:
0475             _ = int(pandaID)
0476         except Exception:
0477             tmp_log.debug(f"return None for {pandaID}:non-integer")
0478             return None
0479         sql1_0 = "SELECT %s FROM %s "
0480         sql1_1 = "WHERE PandaID=:PandaID"
0481         nTry = 3
0482         for iTry in range(nTry):
0483             try:
0484                 tables = []
0485                 if fromDefined or fromWaiting:
0486                     tables.append("ATLAS_PANDA.jobsDefined4")
0487                 if fromActive:
0488                     tables.append("ATLAS_PANDA.jobsActive4")
0489                 if fromArchived:
0490                     tables.append("ATLAS_PANDA.jobsArchived4")
0491                 if fromDefined:
0492                     # for jobs which are just reset
0493                     tables.append("ATLAS_PANDA.jobsDefined4")
0494                 # select
0495                 varMap = {}
0496                 varMap[":PandaID"] = pandaID
0497                 for table in tables:
0498                     # start transaction
0499                     self.conn.begin()
0500                     # select
0501                     sql = sql1_0 % (JobSpec.columnNames(), table) + sql1_1
0502                     self.cur.arraysize = 10
0503                     self.cur.execute(sql + comment, varMap)
0504                     res = self.cur.fetchall()
0505                     # commit
0506                     if not self._commit():
0507                         raise RuntimeError("Commit error")
0508                     if len(res) != 0:
0509                         # Job
0510                         job = JobSpec()
0511                         job.pack(res[0])
0512                         # Files
0513                         # start transaction
0514                         self.conn.begin()
0515                         # select
0516                         sqlFile = f"SELECT {FileSpec.columnNames()} FROM ATLAS_PANDA.filesTable4 "
0517                         sqlFile += "WHERE PandaID=:PandaID"
0518                         self.cur.arraysize = 10000
0519                         self.cur.execute(sqlFile + comment, varMap)
0520                         resFs = self.cur.fetchall()
0521                         # metadata
0522                         resMeta = None
0523                         if table == "ATLAS_PANDA.jobsArchived4" or forAnal:
0524                             # read metadata only for finished/failed production jobs
0525                             sqlMeta = "SELECT metaData FROM ATLAS_PANDA.metaTable WHERE PandaID=:PandaID"
0526                             self.cur.execute(sqlMeta + comment, varMap)
0527                             for (clobMeta,) in self.cur:
0528                                 if clobMeta is not None:
0529                                     try:
0530                                         resMeta = clobMeta.read()
0531                                     except AttributeError:
0532                                         resMeta = str(clobMeta)
0533                                 break
0534                         # job parameters
0535                         job.jobParameters = None
0536                         sqlJobP = "SELECT jobParameters FROM ATLAS_PANDA.jobParamsTable WHERE PandaID=:PandaID"
0537                         varMap = {}
0538                         varMap[":PandaID"] = job.PandaID
0539                         self.cur.execute(sqlJobP + comment, varMap)
0540                         for (clobJobP,) in self.cur:
0541                             if clobJobP is not None:
0542                                 try:
0543                                     job.jobParameters = clobJobP.read()
0544                                 except AttributeError:
0545                                     job.jobParameters = str(clobJobP)
0546                             break
0547                         # commit
0548                         if not self._commit():
0549                             raise RuntimeError("Commit error")
0550                         # set files
0551                         for resF in resFs:
0552                             file = FileSpec()
0553                             file.pack(resF)
0554                             job.addFile(file)
0555                         # set metadata
0556                         job.metadata = resMeta
0557                         return job
0558                 tmp_log.debug(f"not found")
0559                 return None
0560             except Exception:
0561                 # roll back
0562                 self._rollback()
0563                 if iTry + 1 < nTry:
0564                     tmp_log.debug(f"retry : {iTry}")
0565                     time.sleep(random.randint(10, 20))
0566                     continue
0567                 self.dump_error_message(tmp_log)
0568                 # return None for analysis
0569                 if forAnal:
0570                     return None
0571                 # return 'unknown'
0572                 job = JobSpec()
0573                 job.PandaID = pandaID
0574                 job.jobStatus = "unknown"
0575                 return job
0576 
0577     # get express jobs
0578     def getExpressJobs(self, dn):
0579         comment = " /* DBProxy.getExpressJobs */"
0580         tmp_log = self.create_tagged_logger(comment, f"DN={dn}")
0581         tmp_log.debug(f"start")
0582         sqlX = "SELECT specialHandling,COUNT(*) FROM %s "
0583         sqlX += "WHERE prodUserName=:prodUserName AND prodSourceLabel=:prodSourceLabel1 "
0584         sqlX += "AND specialHandling IS NOT NULL "
0585         sqlXJob = "SELECT PandaID,jobStatus,prodSourceLabel,modificationTime,jobDefinitionID,jobsetID,startTime,endTime FROM %s "
0586         sqlXJob += "WHERE prodUserName=:prodUserName AND prodSourceLabel=:prodSourceLabel1 "
0587         sqlXJob += "AND specialHandling IS NOT NULL AND specialHandling=:specialHandling "
0588         sqlQ = sqlX
0589         sqlQ += "GROUP BY specialHandling "
0590         sqlQJob = sqlXJob
0591         sqlA = sqlX
0592         sqlA += "AND modificationTime>:modificationTime GROUP BY specialHandling "
0593         sqlAJob = sqlXJob
0594         sqlAJob += "AND modificationTime>:modificationTime "
0595         try:
0596             # get compact DN
0597             compactDN = CoreUtils.clean_user_id(dn)
0598             if compactDN in ["", "NULL", None]:
0599                 compactDN = dn
0600             expressStr = "express"
0601             activeExpressU = []
0602             timeUsageU = datetime.timedelta(0)
0603             executionTimeU = datetime.timedelta(hours=1)
0604             jobCreditU = 3
0605             timeCreditU = executionTimeU * jobCreditU
0606             timeNow = naive_utcnow()
0607             timeLimit = timeNow - datetime.timedelta(hours=6)
0608             # loop over tables
0609             for table in [
0610                 "ATLAS_PANDA.jobsDefined4",
0611                 "ATLAS_PANDA.jobsActive4",
0612                 "ATLAS_PANDA.jobsArchived4",
0613             ]:
0614                 varMap = {}
0615                 varMap[":prodUserName"] = compactDN
0616                 varMap[":prodSourceLabel1"] = "user"
0617                 if table == "ATLAS_PANDA.jobsArchived4":
0618                     varMap[":modificationTime"] = timeLimit
0619                     sql = sqlA % table
0620                     sqlJob = sqlAJob % table
0621                 else:
0622                     sql = sqlQ % table
0623                     sqlJob = sqlQJob % table
0624                 # start transaction
0625                 self.conn.begin()
0626                 # get the number of jobs for each specialHandling
0627                 self.cur.arraysize = 10
0628                 tmp_log.debug(sql + comment + str(varMap))
0629                 self.cur.execute(sql + comment, varMap)
0630                 res = self.cur.fetchall()
0631                 tmp_log.debug(f"{str(res)}")
0632                 for specialHandling, countJobs in res:
0633                     if specialHandling is None:
0634                         continue
0635                     # look for express jobs
0636                     if expressStr in specialHandling:
0637                         varMap[":specialHandling"] = specialHandling
0638                         self.cur.arraysize = 1000
0639                         self.cur.execute(sqlJob + comment, varMap)
0640                         resJobs = self.cur.fetchall()
0641                         tmp_log.debug(f"{str(resJobs)}")
0642                         for (
0643                             tmp_PandaID,
0644                             tmp_jobStatus,
0645                             tmp_prodSourceLabel,
0646                             tmp_modificationTime,
0647                             tmp_jobDefinitionID,
0648                             tmp_jobsetID,
0649                             tmp_startTime,
0650                             tmp_endTime,
0651                         ) in resJobs:
0652                             # collect active jobs
0653                             if tmp_jobStatus not in [
0654                                 "finished",
0655                                 "failed",
0656                                 "cancelled",
0657                                 "closed",
0658                             ]:
0659                                 activeExpressU.append((tmp_PandaID, tmp_jobsetID, tmp_jobDefinitionID))
0660                             # get time usage
0661                             if tmp_jobStatus not in ["defined", "activated"]:
0662                                 # check only jobs which actually use or used CPU on WN
0663                                 if tmp_startTime is not None:
0664                                     # running or not
0665                                     if tmp_endTime is None:
0666                                         # job got started before/after the time limit
0667                                         if timeLimit > tmp_startTime:
0668                                             timeDelta = timeNow - timeLimit
0669                                         else:
0670                                             timeDelta = timeNow - tmp_startTime
0671                                     else:
0672                                         # job got started before/after the time limit
0673                                         if timeLimit > tmp_startTime:
0674                                             timeDelta = tmp_endTime - timeLimit
0675                                         else:
0676                                             timeDelta = tmp_endTime - tmp_startTime
0677                                     # add
0678                                     if timeDelta > datetime.timedelta(0):
0679                                         timeUsageU += timeDelta
0680                 # commit
0681                 if not self._commit():
0682                     raise RuntimeError("Commit error")
0683             # check quota
0684             rRet = True
0685             rRetStr = ""
0686             rQuota = 0
0687             if len(activeExpressU) >= jobCreditU:
0688                 rRetStr += f"The number of queued runXYZ exceeds the limit = {jobCreditU}. "
0689                 rRet = False
0690             if timeUsageU >= timeCreditU:
0691                 rRetStr += f"The total execution time for runXYZ exceeds the limit = {timeCreditU.seconds / 60} min. "
0692                 rRet = False
0693             # calculate available quota
0694             if rRet:
0695                 tmpQuota = jobCreditU - len(activeExpressU) - timeUsageU.seconds / executionTimeU.seconds
0696                 if tmpQuota < 0:
0697                     rRetStr += "Quota for runXYZ exceeds. "
0698                     rRet = False
0699                 else:
0700                     rQuota = tmpQuota
0701             # return
0702             retVal = {
0703                 "status": rRet,
0704                 "quota": rQuota,
0705                 "output": rRetStr,
0706                 "usage": timeUsageU,
0707                 "jobs": activeExpressU,
0708             }
0709             tmp_log.debug(f"{str(retVal)}")
0710             return retVal
0711         except Exception:
0712             # roll back
0713             self._rollback()
0714             self.dump_error_message(tmp_log)
0715             return None
0716 
0717     # get active debug jobs
0718     def getActiveDebugJobs(self, dn=None, workingGroup=None, prodRole=False):
0719         comment = " /* DBProxy.getActiveDebugJobs */"
0720         tmp_log = self.create_tagged_logger(comment, f"DN={dn}")
0721         tmp_log.debug(f"wg={workingGroup} prodRole={prodRole}")
0722         varMap = {}
0723         sqlX = "SELECT PandaID,jobStatus,specialHandling FROM %s "
0724         sqlX += "WHERE "
0725         if prodRole:
0726             pass
0727         elif workingGroup is not None:
0728             sqlX += "UPPER(workingGroup) IN (:wg1,:wg2) AND "
0729             varMap[":wg1"] = f"AP_{workingGroup.upper()}"
0730             varMap[":wg2"] = f"GP_{workingGroup.upper()}"
0731         else:
0732             sqlX += "prodUserName=:prodUserName AND "
0733             # get compact DN
0734             compactDN = CoreUtils.clean_user_id(dn)
0735             if compactDN in ["", "NULL", None]:
0736                 compactDN = dn
0737             varMap[":prodUserName"] = compactDN
0738         sqlX += "specialHandling IS NOT NULL "
0739         try:
0740             debugStr = "debug"
0741             activeDebugJobs = []
0742             # loop over tables
0743             for table in ["ATLAS_PANDA.jobsDefined4", "ATLAS_PANDA.jobsActive4"]:
0744                 sql = sqlX % table
0745                 # start transaction
0746                 self.conn.begin()
0747                 # get jobs with specialHandling
0748                 self.cur.arraysize = 100000
0749                 self.cur.execute(sql + comment, varMap)
0750                 res = self.cur.fetchall()
0751                 # commit
0752                 if not self._commit():
0753                     raise RuntimeError("Commit error")
0754                 # loop over all PandaIDs
0755                 for pandaID, jobStatus, specialHandling in res:
0756                     if specialHandling is None:
0757                         continue
0758                     # only active jobs
0759                     if jobStatus not in [
0760                         "defined",
0761                         "activated",
0762                         "running",
0763                         "sent",
0764                         "starting",
0765                     ]:
0766                         continue
0767                     # look for debug jobs
0768                     if debugStr in specialHandling and pandaID not in activeDebugJobs:
0769                         activeDebugJobs.append(pandaID)
0770             # return
0771             activeDebugJobs.sort()
0772             tmp_log.debug(f"{str(activeDebugJobs)}")
0773             return activeDebugJobs
0774         except Exception:
0775             # roll back
0776             self._rollback()
0777             self.dump_error_message(tmp_log)
0778             return None
0779 
0780     # set debug mode
0781     def setDebugMode(self, dn, pandaID, prodManager, modeOn, workingGroup):
0782         comment = " /* DBProxy.setDebugMode */"
0783         tmp_log = self.create_tagged_logger(comment, f"PandaID={pandaID}")
0784         tmp_log.debug(f"dn={dn} prod={prodManager} wg={workingGroup} mode={modeOn}")
0785         sqlX = "SELECT prodUserName,jobStatus,specialHandling,workingGroup FROM %s "
0786         sqlX += "WHERE PandaID=:PandaID "
0787         sqlU = "UPDATE %s SET specialHandling=:specialHandling "
0788         sqlU += "WHERE PandaID=:PandaID "
0789         try:
0790             # get compact DN
0791             compactDN = CoreUtils.clean_user_id(dn)
0792             if compactDN in ["", "NULL", None]:
0793                 compactDN = dn
0794             debugStr = "debug"
0795             retStr = ""
0796             retCode = False
0797             # loop over tables
0798             for table in ["ATLAS_PANDA.jobsDefined4", "ATLAS_PANDA.jobsActive4"]:
0799                 varMap = {}
0800                 varMap[":PandaID"] = pandaID
0801                 sql = sqlX % table
0802                 # start transaction
0803                 self.conn.begin()
0804                 # get jobs with specialHandling
0805                 self.cur.arraysize = 10
0806                 self.cur.execute(sql + comment, varMap)
0807                 res = self.cur.fetchone()
0808                 # not found
0809                 if res is None:
0810                     retStr = f"PandaID={pandaID} not found in active DB"
0811                     # commit
0812                     if not self._commit():
0813                         raise RuntimeError("Commit error")
0814                     continue
0815                 prodUserName, jobStatus, specialHandling, wGroup = res
0816                 # not active
0817                 changeableState = [
0818                     "defined",
0819                     "activated",
0820                     "running",
0821                     "sent",
0822                     "starting",
0823                     "assigned",
0824                 ]
0825                 if jobStatus not in changeableState:
0826                     retStr = f"Cannot set debugMode since the job status is {jobStatus} which is not in one of {str(changeableState)}"
0827                     # commit
0828                     if not self._commit():
0829                         raise RuntimeError("Commit error")
0830                     break
0831                 # extract workingGroup
0832                 try:
0833                     wGroup = wGroup.split("_")[-1]
0834                     wGroup = wGroup.lower()
0835                 except Exception:
0836                     pass
0837                 # not owner
0838                 notOwner = False
0839                 if not prodManager:
0840                     if workingGroup is not None:
0841                         if workingGroup.lower() != wGroup:
0842                             retStr = f"Permission denied. Not the production manager for workingGroup={wGroup}"
0843                             notOwner = True
0844                     else:
0845                         if prodUserName != compactDN:
0846                             retStr = "Permission denied. Not the owner or production manager"
0847                             notOwner = True
0848                     if notOwner:
0849                         # commit
0850                         if not self._commit():
0851                             raise RuntimeError("Commit error")
0852                         break
0853                 # set specialHandling
0854                 updateSH = True
0855                 if specialHandling in [None, ""]:
0856                     if modeOn:
0857                         # set debug mode
0858                         specialHandling = debugStr
0859                     else:
0860                         # already disabled debug mode
0861                         updateSH = False
0862                 elif debugStr in specialHandling:
0863                     if modeOn:
0864                         # already in debug mode
0865                         updateSH = False
0866                     else:
0867                         # disable debug mode
0868                         specialHandling = re.sub(debugStr, "", specialHandling)
0869                         specialHandling = re.sub(",,", ",", specialHandling)
0870                         specialHandling = re.sub("^,", "", specialHandling)
0871                         specialHandling = re.sub(",$", "", specialHandling)
0872                 else:
0873                     if modeOn:
0874                         # set debug mode
0875                         specialHandling = debugStr
0876                     else:
0877                         # already disabled debug mode
0878                         updateSH = False
0879 
0880                 # no update
0881                 if not updateSH:
0882                     retStr = "Already set accordingly"
0883                     # commit
0884                     if not self._commit():
0885                         raise RuntimeError("Commit error")
0886                     break
0887                 # update
0888                 varMap = {}
0889                 varMap[":PandaID"] = pandaID
0890                 varMap[":specialHandling"] = specialHandling
0891                 self.cur.execute((sqlU + comment) % table, varMap)
0892                 retD = self.cur.rowcount
0893                 # commit
0894                 if not self._commit():
0895                     raise RuntimeError("Commit error")
0896                 if retD == 0:
0897                     retStr = "Failed to update DB"
0898                 else:
0899                     retStr = "Succeeded"
0900                     break
0901             # return
0902             tmp_log.debug(f"{retStr}")
0903             return retStr
0904         except Exception:
0905             # roll back
0906             self._rollback()
0907             self.dump_error_message(tmp_log)
0908             return None
0909 
0910     # lock jobs for reassign
0911     def lockJobsForReassign(
0912         self,
0913         tableName,
0914         timeLimit,
0915         statList,
0916         labels,
0917         processTypes,
0918         sites,
0919         clouds,
0920         useJEDI=False,
0921         onlyReassignable=False,
0922         useStateChangeTime=False,
0923         getEventService=False,
0924     ):
0925         comment = " /* DBProxy.lockJobsForReassign */"
0926         tmp_log = self.create_tagged_logger(comment)
0927         tmp_log.debug(f"{tableName} {timeLimit} {statList} {labels} {processTypes} {sites} {clouds} {useJEDI}")
0928         try:
0929             # make sql
0930             if not useJEDI:
0931                 sql = f"SELECT PandaID FROM {tableName} "
0932             elif getEventService:
0933                 sql = f"SELECT PandaID,lockedby,eventService,attemptNr,computingSite FROM {tableName} "
0934             else:
0935                 sql = f"SELECT PandaID,lockedby FROM {tableName} "
0936             if not useStateChangeTime:
0937                 sql += "WHERE modificationTime<:modificationTime "
0938             else:
0939                 sql += "WHERE stateChangeTime<:modificationTime "
0940             varMap = {}
0941             varMap[":modificationTime"] = timeLimit
0942             if statList != []:
0943                 stat_var_names_str, stat_var_map = get_sql_IN_bind_variables(statList, prefix=":stat")
0944                 sql += f"AND jobStatus IN ({stat_var_names_str}) "
0945                 varMap.update(stat_var_map)
0946             if labels != []:
0947                 label_var_names_str, label_var_map = get_sql_IN_bind_variables(labels, prefix=":label")
0948                 sql += f"AND prodSourceLabel IN ({label_var_names_str}) "
0949                 varMap.update(label_var_map)
0950             if processTypes != []:
0951                 ptype_var_names_str, ptype_var_map = get_sql_IN_bind_variables(processTypes, prefix=":processType")
0952                 sql += f"AND processingType IN ({ptype_var_names_str}) "
0953                 varMap.update(ptype_var_map)
0954             if sites != []:
0955                 site_var_names_str, site_var_map = get_sql_IN_bind_variables(sites, prefix=":site")
0956                 sql += f"AND computingSite IN ({site_var_names_str}) "
0957                 varMap.update(site_var_map)
0958             if clouds != []:
0959                 cloud_var_names_str, cloud_var_map = get_sql_IN_bind_variables(clouds, prefix=":cloud")
0960                 sql += f"AND cloud IN ({cloud_var_names_str}) "
0961                 varMap.update(cloud_var_map)
0962             if onlyReassignable:
0963                 sql += "AND (relocationFlag IS NULL OR relocationFlag<>:relocationFlag) "
0964                 varMap[":relocationFlag"] = 2
0965             # sql for lock
0966             if not useStateChangeTime:
0967                 sqlLock = f"UPDATE {tableName} SET modificationTime=CURRENT_DATE WHERE PandaID=:PandaID"
0968             else:
0969                 sqlLock = f"UPDATE {tableName} SET stateChangeTime=CURRENT_DATE WHERE PandaID=:PandaID"
0970             # start transaction
0971             self.conn.begin()
0972             # select
0973             self.cur.arraysize = 1000000
0974             tmp_log.debug(sql + comment + str(varMap))
0975             self.cur.execute(sql + comment, varMap)
0976             resList = self.cur.fetchall()
0977             tmp_log.debug(f"found {len(resList)}")
0978             retList = []
0979             # lock
0980             for tmpItem in resList:
0981                 tmpID = tmpItem[0]
0982                 varLock = {":PandaID": tmpID}
0983                 self.cur.execute(sqlLock + comment, varLock)
0984                 retList.append(tmpItem)
0985             # commit
0986             if not self._commit():
0987                 raise RuntimeError("Commit error")
0988             # sort
0989             retList.sort()
0990             tmp_log.debug(f"return {len(retList)}")
0991             return True, retList
0992         except Exception:
0993             # roll back
0994             self._rollback()
0995             self.dump_error_message(tmp_log)
0996             # return empty
0997             return False, []
0998 
0999     # lock jobs for finisher
1000     def lockJobsForFinisher(self, timeNow, rownum, highPrio):
1001         comment = " /* DBProxy.lockJobsForFinisher */"
1002         tmp_log = self.create_tagged_logger(comment)
1003         tmp_log.debug(f"{timeNow} {rownum} {highPrio}")
1004         try:
1005             varMap = {}
1006             varMap[":jobStatus"] = "transferring"
1007             varMap[":currentPriority"] = 800
1008             varMap[":pLabel1"] = "managed"
1009             varMap[":pLabel2"] = "test"
1010             varMap[":esJumbo"] = EventServiceUtils.jumboJobFlagNumber
1011             # make sql
1012             sql = "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 "
1013             sql += "WHERE jobStatus=:jobStatus AND modificationTime<:modificationTime AND prodSourceLabel IN (:pLabel1,:pLabel2) "
1014             sql += "AND (eventService IS NULL OR eventService<>:esJumbo) "
1015             if highPrio:
1016                 varMap[":modificationTime"] = timeNow - datetime.timedelta(hours=1)
1017                 sql += f"AND currentPriority>=:currentPriority AND rownum<={rownum} "
1018             else:
1019                 sql += f"AND currentPriority<:currentPriority AND rownum<={rownum} "
1020                 varMap[":modificationTime"] = timeNow - datetime.timedelta(hours=2)
1021             sql += "FOR UPDATE "
1022             # sql for lock
1023             sqlLock = "UPDATE ATLAS_PANDA.jobsActive4 SET modificationTime=CURRENT_DATE WHERE PandaID=:PandaID"
1024             # start transaction
1025             self.conn.begin()
1026             # select
1027             self.cur.arraysize = 1000
1028             self.cur.execute(sql + comment, varMap)
1029             resList = self.cur.fetchall()
1030             retList = []
1031             # lock
1032             for (tmpID,) in resList:
1033                 varLock = {":PandaID": tmpID}
1034                 self.cur.execute(sqlLock + comment, varLock)
1035                 retList.append(tmpID)
1036             # commit
1037             if not self._commit():
1038                 raise RuntimeError("Commit error")
1039             # sort
1040             retList.sort()
1041             tmp_log.debug(f"{len(retList)}")
1042             return True, retList
1043         except Exception:
1044             # roll back
1045             self._rollback()
1046             self.dump_error_message(tmp_log)
1047             # return empty
1048             return False, []
1049 
1050     # lock jobs for activator
1051     def lockJobsForActivator(self, timeLimit, rownum, prio):
1052         comment = " /* DBProxy.lockJobsForActivator */"
1053         tmp_log = self.create_tagged_logger(comment)
1054         tmp_log.debug("start")
1055         try:
1056             varMap = {}
1057             varMap[":jobStatus"] = "assigned"
1058             if prio > 0:
1059                 varMap[":currentPriority"] = prio
1060             varMap[":timeLimit"] = timeLimit
1061             # make sql
1062             sql = "SELECT PandaID FROM ATLAS_PANDA.jobsDefined4 "
1063             sql += "WHERE jobStatus=:jobStatus AND (prodDBUpdateTime IS NULL OR prodDBUpdateTime<:timeLimit) "
1064             if prio > 0:
1065                 sql += "AND currentPriority>=:currentPriority "
1066             sql += f"AND rownum<={rownum} "
1067             sql += "FOR UPDATE "
1068             # sql for lock
1069             sqlLock = "UPDATE ATLAS_PANDA.jobsDefined4 SET prodDBUpdateTime=CURRENT_DATE WHERE PandaID=:PandaID"
1070             # start transaction
1071             self.conn.begin()
1072             # select
1073             self.cur.arraysize = 1000
1074             self.cur.execute(sql + comment, varMap)
1075             resList = self.cur.fetchall()
1076             retList = []
1077             # lock
1078             for (tmpID,) in resList:
1079                 varLock = {":PandaID": tmpID}
1080                 self.cur.execute(sqlLock + comment, varLock)
1081                 retList.append(tmpID)
1082             # commit
1083             if not self._commit():
1084                 raise RuntimeError("Commit error")
1085             # sort
1086             retList.sort()
1087             tmp_log.debug(f"locked {len(retList)} jobs")
1088             return True, retList
1089         except Exception:
1090             # roll back
1091             self._rollback()
1092             # error
1093             self.dump_error_message(tmp_log)
1094             # return empty
1095             return False, []
1096 
1097     # add metadata
1098     def addMetadata(self, pandaID, metadata, newStatus):
1099         comment = " /* DBProxy.addMetaData */"
1100         tmp_log = self.create_tagged_logger(comment, f"PandaID={pandaID}")
1101         tmp_log.debug(f"start")
1102         # discard metadata for failed jobs
1103         if newStatus == "failed":
1104             tmp_log.debug("skip")
1105             return True
1106         sqlJ = "SELECT jobStatus FROM ATLAS_PANDA.jobsActive4 WHERE PandaID=:PandaID "
1107         sqlJ += "UNION "
1108         sqlJ += "SELECT jobStatus FROM ATLAS_PANDA.jobsArchived4 WHERE PandaID=:PandaID "
1109         sql0 = "SELECT PandaID FROM ATLAS_PANDA.metaTable WHERE PandaID=:PandaID"
1110         sql1 = "INSERT INTO ATLAS_PANDA.metaTable (PandaID,metaData) VALUES (:PandaID,:metaData)"
1111         nTry = 1
1112         regStart = naive_utcnow()
1113         for iTry in range(nTry):
1114             try:
1115                 # autocommit on
1116                 self.conn.begin()
1117                 self.cur.arraysize = 10
1118                 # check job status
1119                 varMap = {}
1120                 varMap[":PandaID"] = pandaID
1121                 self.cur.execute(sqlJ + comment, varMap)
1122                 resJ = self.cur.fetchone()
1123                 if resJ is not None:
1124                     (jobStatus,) = resJ
1125                 else:
1126                     jobStatus = "unknown"
1127                 if jobStatus in ["unknown"]:
1128                     tmp_log.debug(f"skip jobStatus={jobStatus}")
1129                     if not self._commit():
1130                         raise RuntimeError("Commit error")
1131                     return False
1132                 # skip if in final state
1133                 if jobStatus in ["cancelled", "closed", "finished", "failed"]:
1134                     tmp_log.debug(f"skip jobStatus={jobStatus}")
1135                     if not self._commit():
1136                         raise RuntimeError("Commit error")
1137                     # return True so that subsequent procedure can keep going
1138                     return True
1139                 # select
1140                 varMap = {}
1141                 varMap[":PandaID"] = pandaID
1142                 self.cur.arraysize = 10
1143                 self.cur.execute(sql0 + comment, varMap)
1144                 res = self.cur.fetchone()
1145                 # already exist
1146                 if res is not None:
1147                     tmp_log.debug(f"skip duplicated during jobStatus={jobStatus}")
1148                     if not self._commit():
1149                         raise RuntimeError("Commit error")
1150                     return True
1151                 # truncate
1152                 if metadata is not None:
1153                     origSize = len(metadata)
1154                 else:
1155                     origSize = 0
1156                 maxSize = 1024 * 1024
1157                 if newStatus in ["failed"] and origSize > maxSize:
1158                     metadata = metadata[:maxSize]
1159                 # insert
1160                 varMap = {}
1161                 varMap[":PandaID"] = pandaID
1162                 varMap[":metaData"] = metadata
1163                 self.cur.execute(sql1 + comment, varMap)
1164                 # commit
1165                 if not self._commit():
1166                     raise RuntimeError("Commit error")
1167                 regTime = naive_utcnow() - regStart
1168                 msgStr = f"done in jobStatus={jobStatus}->{newStatus} took {regTime.seconds} sec"
1169                 if metadata is not None:
1170                     msgStr += f" for {len(metadata)} (orig {origSize}) bytes"
1171                 tmp_log.debug(msgStr)
1172                 return True
1173             except Exception:
1174                 # roll back
1175                 self._rollback()
1176                 if iTry + 1 < nTry:
1177                     tmp_log.debug(f"retry : {iTry}")
1178                     time.sleep(random.randint(10, 20))
1179                     continue
1180                 self.dump_error_message(tmp_log)
1181                 return False
1182 
1183     # add stdout
1184     def addStdOut(self, pandaID, stdOut):
1185         comment = " /* DBProxy.addStdOut */"
1186         tmp_log = self.create_tagged_logger(comment, f"PandaID={pandaID}")
1187         tmp_log.debug(f"start")
1188         sqlJ = "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 WHERE PandaID=:PandaID FOR UPDATE "
1189         sqlC = "SELECT PandaID FROM ATLAS_PANDA.jobsDebug WHERE PandaID=:PandaID "
1190         sqlI = "INSERT INTO ATLAS_PANDA.jobsDebug (PandaID,stdOut) VALUES (:PandaID,:stdOut) "
1191         sqlU = "UPDATE ATLAS_PANDA.jobsDebug SET stdOut=:stdOut WHERE PandaID=:PandaID "
1192         try:
1193             # autocommit on
1194             self.conn.begin()
1195             # select
1196             varMap = {}
1197             varMap[":PandaID"] = pandaID
1198             self.cur.arraysize = 10
1199             # check job table
1200             self.cur.execute(sqlJ + comment, varMap)
1201             res = self.cur.fetchone()
1202             if res is None:
1203                 tmp_log.debug(f"addStdOut : {pandaID} non active")
1204             else:
1205                 # check debug table
1206                 self.cur.execute(sqlC + comment, varMap)
1207                 res = self.cur.fetchone()
1208                 # already exist
1209                 if res is not None:
1210                     # update
1211                     sql = sqlU
1212                 else:
1213                     # insert
1214                     sql = sqlI
1215                 # write stdout
1216                 varMap = {}
1217                 varMap[":PandaID"] = pandaID
1218                 varMap[":stdOut"] = stdOut
1219                 self.cur.execute(sql + comment, varMap)
1220             # commit
1221             if not self._commit():
1222                 raise RuntimeError("Commit error")
1223             return True
1224         except Exception:
1225             # roll back
1226             self._rollback()
1227             self.dump_error_message(tmp_log)
1228             return False
1229 
1230     # get job statistics
1231     def getJobStatistics(self):
1232         comment = " /* DBProxy.getJobStatistics */"
1233         tmp_log = self.create_tagged_logger(comment)
1234         tmp_log.debug("start")
1235 
1236         # tables to query
1237         jobs_active_4_table = f"{panda_config.schemaPANDA}.jobsActive4"
1238         jobs_defined_4_table = f"{panda_config.schemaPANDA}.jobsDefined4"
1239         tables = [jobs_active_4_table, jobs_defined_4_table]
1240 
1241         # states that are necessary and irrelevant states
1242         included_states = ["assigned", "activated", "running"]
1243         excluded_states = ["merging"]
1244 
1245         # sql template for jobs table
1246         sql_template = f"SELECT computingSite, jobStatus, COUNT(*) FROM {{table_name}} GROUP BY computingSite, jobStatus"
1247 
1248         # sql template for statistics table (materialized view)
1249         sql_mv_template = sql_template.replace("COUNT(*)", "SUM(num_of_jobs)")
1250         sql_mv_template = sql_mv_template.replace("SELECT ", "SELECT /*+ RESULT_CACHE */ ")
1251         ret = {}
1252         max_retries = 3
1253 
1254         for retry in range(max_retries):
1255             try:
1256                 for table in tables:
1257                     # start transaction
1258                     self.conn.begin()
1259                     var_map = {}
1260                     self.cur.arraysize = 10000
1261 
1262                     # for active jobs we will query the summarized materialized view
1263                     if table == jobs_active_4_table:
1264                         table_name = f"{panda_config.schemaPANDA}.MV_JOBSACTIVE4_STATS"
1265                         sql = (sql_mv_template + comment).format(table_name=table_name)
1266                     # for defined jobs we will query the actual table
1267                     else:
1268                         table_name = table
1269                         sql = (sql_template + comment).format(table_name=table_name)
1270                     tmp_log.debug(f"Will execute: {sql} {str(var_map)}")
1271 
1272                     self.cur.execute(sql, var_map)
1273                     res = self.cur.fetchall()
1274                     if not self._commit():
1275                         raise RuntimeError("Commit error")
1276 
1277                     # create map
1278                     for computing_site, job_status, n_jobs in res:
1279                         if job_status in excluded_states:  # ignore some job status since they break APF
1280                             continue
1281 
1282                         ret.setdefault(computing_site, {}).setdefault(job_status, 0)
1283                         ret[computing_site][job_status] += n_jobs
1284 
1285                 # fill in missing states with 0
1286                 for site in ret:
1287                     for state in included_states:
1288                         ret[site].setdefault(state, 0)
1289 
1290                 tmp_log.debug(f"done")
1291                 return ret
1292 
1293             except Exception:
1294                 self._rollback()
1295 
1296                 if retry + 1 < max_retries:  # wait 2 seconds before the next retry
1297                     tmp_log.debug(f"retry: {retry}")
1298                     time.sleep(2)
1299                 else:  # reached max retries - leave
1300                     self.dump_error_message(tmp_log)
1301                     return {}
1302 
1303     # get detailed job statistics with resource_type and prodsourcelabel
1304     def getDetailedJobStatistics(self):
1305         comment = " /* DBProxy.getDetailedJobStatistics */"
1306         tmp_log = self.create_tagged_logger(comment)
1307         tmp_log.debug("start")
1308 
1309         # tables to query
1310         jobs_active_4_table = f"{panda_config.schemaPANDA}.jobsActive4"
1311         jobs_defined_4_table = f"{panda_config.schemaPANDA}.jobsDefined4"
1312         tables = [jobs_active_4_table, jobs_defined_4_table]
1313 
1314         # states that are necessary and irrelevant states
1315         included_states = ["assigned", "activated", "running"]
1316         excluded_states = ["merging"]
1317 
1318         # sql template for jobs table
1319         sql_template = f"SELECT computingSite, resource_type, prodSourceLabel, jobStatus, COUNT(*) FROM {{table_name}} GROUP BY computingSite, resource_type, prodSourceLabel, jobStatus"
1320         # sql template for statistics table (materialized view)
1321         sql_mv_template = sql_template.replace("COUNT(*)", "SUM(num_of_jobs)")
1322         sql_mv_template = sql_mv_template.replace("SELECT ", "SELECT /*+ RESULT_CACHE */ ")
1323         ret = {}
1324         max_retries = 3
1325 
1326         for retry in range(max_retries):
1327             try:
1328                 for table in tables:
1329                     # start transaction
1330                     self.conn.begin()
1331                     var_map = {}
1332                     self.cur.arraysize = 10000
1333 
1334                     # for active jobs we will query the summarized materialized view
1335                     if table == jobs_active_4_table:
1336                         table_name = f"{panda_config.schemaPANDA}.MV_JOBSACTIVE4_STATS"
1337                         sql = (sql_mv_template + comment).format(table_name=table_name)
1338                     # for defined jobs we will query the actual table
1339                     else:
1340                         table_name = table
1341                         sql = (sql_template + comment).format(table_name=table_name)
1342                     tmp_log.debug(f"Will execute: {sql} {str(var_map)}")
1343 
1344                     self.cur.execute(sql, var_map)
1345                     res = self.cur.fetchall()
1346                     if not self._commit():
1347                         raise RuntimeError("Commit error")
1348 
1349                     # create map
1350                     for computing_site, resource_type, prod_source_label, job_status, n_jobs in res:
1351                         if job_status in excluded_states:  # ignore some job status since they break APF
1352                             continue
1353 
1354                         ret.setdefault(computing_site, {}).setdefault(resource_type, {}).setdefault(prod_source_label, {}).setdefault(job_status, 0)
1355                         ret[computing_site][resource_type][prod_source_label][job_status] += n_jobs
1356 
1357                 # fill in missing states with 0
1358                 for site in ret:
1359                     for resource_type in ret[site]:
1360                         for prod_source_label in ret[site][resource_type]:
1361                             for state in included_states:
1362                                 ret[site][resource_type][prod_source_label].setdefault(state, 0)
1363 
1364                 tmp_log.debug(f"done")
1365                 return ret
1366 
1367             except Exception:
1368                 self._rollback()
1369 
1370                 if retry + 1 < max_retries:  # wait 2 seconds before the next retry
1371                     tmp_log.debug(f"retry: {retry}")
1372                     time.sleep(2)
1373                 else:  # reached max retries - leave
1374                     self.dump_error_message(tmp_log)
1375                     return {}
1376 
1377     # get job statistics per site and resource type (SCORE, MCORE, ...)
1378     def getJobStatisticsPerSiteResource(self, time_window):
1379         comment = " /* DBProxy.getJobStatisticsPerSiteResource */"
1380         tmp_log = self.create_tagged_logger(comment)
1381         tmp_log.debug("start")
1382 
1383         tables = ["ATLAS_PANDA.jobsActive4", "ATLAS_PANDA.jobsDefined4", "ATLAS_PANDA.jobsArchived4"]
1384 
1385         # basic SQL for active and defined jobs
1386         sql = "SELECT computingSite, jobStatus, resource_type, COUNT(*) FROM %s GROUP BY computingSite, jobStatus, resource_type "
1387 
1388         # SQL for archived table including time window
1389         sql_archive = (
1390             "SELECT /*+ INDEX_RS_ASC(tab (MODIFICATIONTIME PRODSOURCELABEL)) */ computingSite, jobStatus, resource_type, COUNT(*) "
1391             "FROM ATLAS_PANDA.jobsArchived4 tab WHERE modificationTime > :modificationTime "
1392             "GROUP BY computingSite, jobStatus, resource_type "
1393         )
1394 
1395         # sql for materialized view
1396         sql_mv = re.sub("COUNT\(\*\)", "SUM(njobs)", sql)
1397         sql_mv = re.sub("SELECT ", "SELECT /*+ RESULT_CACHE */ ", sql_mv)
1398 
1399         ret = dict()
1400         try:
1401             # calculate the time floor based on the window specified by the caller
1402             if time_window is None:
1403                 time_floor = naive_utcnow() - datetime.timedelta(hours=12)
1404             else:
1405                 time_floor = naive_utcnow() - datetime.timedelta(minutes=int(time_window))
1406 
1407             for table in tables:
1408                 # start transaction
1409                 self.conn.begin()
1410                 self.cur.arraysize = 10000
1411                 # select
1412 
1413                 var_map = {}
1414                 if table == "ATLAS_PANDA.jobsArchived4":
1415                     var_map[":modificationTime"] = time_floor
1416                     sql_tmp = sql_archive + comment
1417                 elif table == "ATLAS_PANDA.jobsActive4":
1418                     sql_tmp = (sql_mv + comment) % "ATLAS_PANDA.JOBS_SHARE_STATS"
1419                 else:
1420                     sql_tmp = (sql + comment) % table
1421 
1422                 self.cur.execute(sql_tmp, var_map)
1423                 res = self.cur.fetchall()
1424 
1425                 # commit
1426                 if not self._commit():
1427                     raise RuntimeError("Commit error")
1428 
1429                 # create map
1430                 for computing_site, job_status, resource_type, n_jobs in res:
1431                     ret.setdefault(computing_site, dict()).setdefault(resource_type, dict()).setdefault(job_status, 0)
1432                     ret[computing_site][resource_type][job_status] += n_jobs
1433 
1434             # fill in missing states with 0
1435             included_states = ["assigned", "activated", "running", "finished", "failed"]
1436             for computing_site in ret:
1437                 for resource_type in ret[computing_site]:
1438                     for job_status in included_states:
1439                         ret[computing_site][resource_type].setdefault(job_status, 0)
1440 
1441             tmp_log.debug("done")
1442             return ret
1443         except Exception:
1444             # roll back
1445             self._rollback()
1446             # error
1447             self.dump_error_message(tmp_log)
1448             return dict()
1449 
1450     # get the number of job for a user
1451     def getNumberJobsUser(self, dn, workingGroup=None):
1452         comment = " /* DBProxy.getNumberJobsUser */"
1453         tmp_log = self.create_tagged_logger(comment, f"DN={dn}")
1454         tmp_log.debug(f"workingGroup={workingGroup})")
1455 
1456         # get compact DN
1457         compact_dn = CoreUtils.clean_user_id(dn)
1458         if compact_dn in ["", "NULL", None]:
1459             compact_dn = dn
1460 
1461         if workingGroup is not None:
1462             sql_count_jobs = "SELECT COUNT(*) FROM %s WHERE prodUserName=:prodUserName AND prodSourceLabel=:prodSourceLabel AND workingGroup=:workingGroup "
1463         else:
1464             sql_count_jobs = "SELECT COUNT(*) FROM %s WHERE prodUserName=:prodUserName AND prodSourceLabel=:prodSourceLabel AND workingGroup IS NULL "
1465         sql_count_jobs += "AND NOT jobStatus IN (:failed,:merging) "
1466 
1467         n_try = 1
1468         n_jobs = 0
1469         for i_try in range(n_try):
1470             try:
1471                 for table in ("ATLAS_PANDA.jobsActive4", "ATLAS_PANDA.jobsDefined4"):
1472                     # start transaction
1473                     self.conn.begin()
1474                     # select
1475                     var_map = {":prodUserName": compact_dn, ":prodSourceLabel": "user", ":failed": "failed", ":merging": "merging"}
1476                     if workingGroup is not None:
1477                         var_map[":workingGroup"] = workingGroup
1478                     self.cur.arraysize = 10
1479                     self.cur.execute((sql_count_jobs + comment) % table, var_map)
1480                     rows = self.cur.fetchall()
1481                     # commit
1482                     if not self._commit():
1483                         raise RuntimeError("Commit error")
1484                     if len(rows) != 0:
1485                         n_jobs += rows[0][0]
1486                 # return
1487                 tmp_log.debug(f"{n_jobs}")
1488                 return n_jobs
1489             except Exception:
1490                 # roll back
1491                 self._rollback()
1492                 if i_try + 1 < n_try:
1493                     time.sleep(2)
1494                     continue
1495                 self.dump_error_message(tmp_log)
1496                 return 0
1497 
1498     # get job statistics for ExtIF. Source type is analysis or production
1499     def getJobStatisticsForExtIF(self, source_type=None):
1500         comment = " /* DBProxy.getJobStatisticsForExtIF */"
1501         tmp_log = self.create_tagged_logger(comment)
1502         tmp_log.debug(f"start source_type={source_type}")
1503 
1504         time_floor = naive_utcnow() - datetime.timedelta(hours=12)
1505 
1506         # analysis
1507         if source_type == "analysis":
1508             sql = "SELECT jobStatus, COUNT(*), cloud FROM %s WHERE prodSourceLabel IN (:prodSourceLabel1, :prodSourceLabel2) GROUP BY jobStatus, cloud"
1509 
1510             sql_archived = (
1511                 "SELECT /* use_json_type */ /*+ INDEX_RS_ASC(tab (MODIFICATIONTIME PRODSOURCELABEL)) */ "
1512                 "jobStatus, COUNT(*), tabS.data.cloud "
1513                 "FROM %s tab, ATLAS_PANDA.schedconfig_json tabS "
1514                 "WHERE prodSourceLabel IN (:prodSourceLabel1, :prodSourceLabel2) "
1515                 "AND tab.computingSite = tabS.panda_queue "
1516                 "AND modificationTime>:modificationTime GROUP BY tab.jobStatus,tabS.data.cloud"
1517             )
1518 
1519         # production
1520         else:
1521             prod_source_label_string = ":prodSourceLabel1, " + ", ".join(f":prodSourceLabel_{label}" for label in JobUtils.list_ptest_prod_sources)
1522             sql = (
1523                 "SELECT /* use_json_type */ tab.jobStatus, COUNT(*), tabS.data.cloud "
1524                 "FROM %s tab, ATLAS_PANDA.schedconfig_json tabS "
1525                 f"WHERE prodSourceLabel IN ({prod_source_label_string}) "
1526                 "AND tab.computingSite = tabS.panda_queue "
1527                 "GROUP BY tab.jobStatus, tabS.data.cloud"
1528             )
1529 
1530             sql_archived = (
1531                 "SELECT /* use_json_type */ /*+ INDEX_RS_ASC(tab (MODIFICATIONTIME PRODSOURCELABEL)) */ "
1532                 "jobStatus, COUNT(*), tabS.data.cloud "
1533                 "FROM %s tab, ATLAS_PANDA.schedconfig_json tabS "
1534                 f"WHERE prodSourceLabel IN ({prod_source_label_string}) "
1535                 "AND tab.computingSite = tabS.panda_queue "
1536                 "AND modificationTime>:modificationTime GROUP BY tab.jobStatus,tabS.data.cloud"
1537             )
1538 
1539         # sql for materialized view
1540         sql_active_mv = re.sub("COUNT\(\*\)", "SUM(num_of_jobs)", sql)
1541         sql_active_mv = re.sub("SELECT ", "SELECT /*+ RESULT_CACHE */ ", sql_active_mv)
1542 
1543         ret = {}
1544 
1545         tables = ["ATLAS_PANDA.jobsActive4", "ATLAS_PANDA.jobsArchived4", "ATLAS_PANDA.jobsDefined4"]
1546         try:
1547             for table in tables:
1548                 # start transaction
1549                 self.conn.begin()
1550 
1551                 # select
1552                 var_map = {}
1553                 if source_type == "analysis":
1554                     var_map[":prodSourceLabel1"] = "user"
1555                     var_map[":prodSourceLabel2"] = "panda"
1556                 else:
1557                     var_map[":prodSourceLabel1"] = "managed"
1558                     for tmp_label in JobUtils.list_ptest_prod_sources:
1559                         tmp_key = f":prodSourceLabel_{tmp_label}"
1560                         var_map[tmp_key] = tmp_label
1561 
1562                 if table != "ATLAS_PANDA.jobsArchived4":
1563                     self.cur.arraysize = 10000
1564                     # active uses materialized view
1565                     if table == "ATLAS_PANDA.jobsActive4":
1566                         self.cur.execute(
1567                             (sql_active_mv + comment) % "ATLAS_PANDA.MV_JOBSACTIVE4_STATS",
1568                             var_map,
1569                         )
1570                     # defined and waiting tables
1571                     else:
1572                         self.cur.execute((sql + comment) % table, var_map)
1573                 else:
1574                     var_map[":modificationTime"] = time_floor
1575                     self.cur.arraysize = 10000
1576                     self.cur.execute((sql_archived + comment) % table, var_map)
1577                 res = self.cur.fetchall()
1578 
1579                 # commit
1580                 if not self._commit():
1581                     raise RuntimeError("Commit error")
1582 
1583                 # create map
1584                 for job_status, count, cloud in res:
1585                     ret.setdefault(cloud, dict())
1586                     ret[cloud].setdefault(job_status, 0)
1587                     ret[cloud][job_status] += count
1588 
1589             # return
1590             tmp_log.debug(f"done")
1591             return ret
1592         except Exception:
1593             # roll back
1594             self._rollback()
1595             # error
1596             self.dump_error_message(tmp_log)
1597             return {}
1598 
1599     # get statistics for production jobs per processingType
1600     def getJobStatisticsPerProcessingType(self):
1601         comment = " /* DBProxy.getJobStatisticsPerProcessingType */"
1602         tmp_log = self.create_tagged_logger(comment)
1603         tmp_log.debug("start")
1604 
1605         time_floor = naive_utcnow() - datetime.timedelta(hours=12)
1606 
1607         # Job tables we are going to query
1608         tables = ["ATLAS_PANDA.jobsActive4", "ATLAS_PANDA.jobsArchived4", "ATLAS_PANDA.jobsDefined4"]
1609 
1610         # Define the prodSourceLabel list
1611         prod_source_labels = ", ".join([f":prodSourceLabel_{label}" for label in JobUtils.list_ptest_prod_sources])
1612 
1613         # Construct the SQL query for active jobs
1614         sql_active = (
1615             f"SELECT /* use_json_type */ jobStatus, COUNT(*), tabS.data.cloud, processingType "
1616             f"FROM %s tab, ATLAS_PANDA.schedconfig_json tabS "
1617             f"WHERE prodSourceLabel IN (:prodSourceLabelManaged, {prod_source_labels}) "
1618             f"AND computingSite=tabS.panda_queue "
1619             f"GROUP BY jobStatus, tabS.data.cloud, processingType"
1620         )
1621 
1622         # Construct the SQL query for archived jobs
1623         sql_archived = (
1624             f"SELECT /* use_json_type */ /*+ INDEX_RS_ASC(tab (MODIFICATIONTIME PRODSOURCELABEL)) */ "
1625             f"jobStatus, COUNT(*), tabS.data.cloud, processingType "
1626             f"FROM %s tab, ATLAS_PANDA.schedconfig_json tabS "
1627             f"WHERE prodSourceLabel IN (:prodSourceLabelManaged, {prod_source_labels}) "
1628             f"AND modificationTime > :modificationTime "
1629             f"AND computingSite = tabS.panda_queue "
1630             f"GROUP BY jobStatus, tabS.data.cloud, processingType"
1631         )
1632 
1633         # sql for materialized view
1634         sql_active_mv = re.sub("COUNT\(\*\)", "SUM(num_of_jobs)", sql_active)
1635         sql_active_mv = re.sub("SELECT ", "SELECT /*+ RESULT_CACHE */ ", sql_active_mv)
1636 
1637         ret = {}
1638         try:
1639             for table in tables:
1640                 # start transaction
1641                 self.conn.begin()
1642 
1643                 # select
1644                 self.cur.arraysize = 10000
1645                 var_map = {":prodSourceLabelManaged": "managed"}
1646                 var_map.update({f":prodSourceLabel_{label}": label for label in JobUtils.list_ptest_prod_sources})
1647 
1648                 if table == "ATLAS_PANDA.jobsArchived4":
1649                     var_map[":modificationTime"] = time_floor
1650                     self.cur.execute((sql_archived + comment) % table, var_map)
1651 
1652                 elif table == "ATLAS_PANDA.jobsActive4":
1653                     self.cur.execute(
1654                         (sql_active_mv + comment) % "ATLAS_PANDA.MV_JOBSACTIVE4_STATS",
1655                         var_map,
1656                     )
1657                 else:
1658                     # use real table since coreCount is unavailable in MatView
1659                     self.cur.execute((sql_active + comment) % table, var_map)
1660 
1661                 results = self.cur.fetchall()
1662 
1663                 if not self._commit():
1664                     raise RuntimeError("Commit error")
1665 
1666                 # create map
1667                 for row in results:
1668                     job_status, count, cloud, processing_type = row
1669                     ret.setdefault(cloud, {}).setdefault(processing_type, {}).setdefault(job_status, 0)
1670                     ret[cloud][processing_type][job_status] += count
1671 
1672             tmp_log.debug(f"done")
1673             return ret
1674         except Exception:
1675             # roll back
1676             self._rollback()
1677             # error
1678             self.dump_error_message(tmp_log)
1679             return {}
1680 
1681     # peek at job
1682     def peekJobLog(self, pandaID, days=None):
1683         comment = " /* DBProxy.peekJobLog */"
1684         tmp_log = self.create_tagged_logger(comment, f"PandaID={pandaID}")
1685         tmp_log.debug(f"days={days}")
1686 
1687         # return None for NULL PandaID
1688         if pandaID in ["NULL", "", "None", None]:
1689             return None
1690 
1691         sql_select = "SELECT %s FROM %s "
1692         sql_where = "WHERE PandaID=:PandaID AND modificationTime>(CURRENT_DATE-:days) "
1693 
1694         # select
1695         var_map = {":PandaID": pandaID}
1696         if days is None:
1697             days = 30
1698         var_map[":days"] = days
1699         n_try = 1
1700 
1701         for i_try in range(n_try):
1702             try:
1703                 # get list of archived tables
1704                 tables = [f"{panda_config.schemaPANDAARCH}.jobsArchived"]
1705                 # select
1706                 for table in tables:
1707                     # start transaction
1708                     self.conn.begin()
1709                     # select
1710                     sql = sql_select % (JobSpec.columnNames(), table) + sql_where
1711                     self.cur.arraysize = 10
1712                     self.cur.execute(sql + comment, var_map)
1713                     rows = self.cur.fetchall()
1714                     # commit
1715                     if not self._commit():
1716                         raise RuntimeError("Commit error")
1717                     if len(rows) != 0:
1718                         # Job
1719                         job = JobSpec()
1720                         job.pack(rows[0])
1721                         # Files
1722                         # start transaction
1723                         self.conn.begin()
1724                         # select
1725                         file_table_name = re.sub("jobsArchived", "filesTable_ARCH", table)
1726                         sql_get_files = (
1727                             f"SELECT /*+ INDEX(tab FILES_ARCH_PANDAID_IDX)*/ {FileSpec.columnNames()} "
1728                             f"FROM {file_table_name} tab "
1729                             "WHERE PandaID=:PandaID AND modificationTime>(CURRENT_DATE-:days)"
1730                         )
1731                         self.cur.arraysize = 10000
1732                         self.cur.execute(sql_get_files + comment, var_map)
1733                         file_rows = self.cur.fetchall()
1734 
1735                         # metadata
1736                         var_map = {}
1737                         var_map[":PandaID"] = job.PandaID
1738                         job.metadata = None
1739                         meta_table_name = re.sub("jobsArchived", "metaTable_ARCH", table)
1740                         sql_get_meta = f"SELECT metaData FROM {meta_table_name} WHERE PandaID=:PandaID"
1741                         self.cur.execute(sql_get_meta + comment, var_map)
1742                         for (clob_meta,) in self.cur:
1743                             if clob_meta is not None:
1744                                 try:
1745                                     job.metadata = clob_meta.read()
1746                                 except AttributeError:
1747                                     job.metadata = str(clob_meta)
1748                             break
1749 
1750                         # job parameters
1751                         job.jobParameters = None
1752                         job_param_table_name = re.sub("jobsArchived", "jobParamsTable_ARCH", table)
1753                         sql_get_job_params = f"SELECT jobParameters FROM {job_param_table_name} WHERE PandaID=:PandaID"
1754                         var_map = {}
1755                         var_map[":PandaID"] = job.PandaID
1756                         self.cur.execute(sql_get_job_params + comment, var_map)
1757                         for (clob_job_params,) in self.cur:
1758                             if clob_job_params is not None:
1759                                 try:
1760                                     job.jobParameters = clob_job_params.read()
1761                                 except AttributeError:
1762                                     job.jobParameters = str(clob_job_params)
1763                             break
1764                         # commit
1765                         if not self._commit():
1766                             raise RuntimeError("Commit error")
1767                         # set files
1768                         for file_row in file_rows:
1769                             file = FileSpec()
1770                             file.pack(file_row)
1771                             # remove redundant white spaces
1772                             try:
1773                                 file.md5sum = file.md5sum.strip()
1774                             except Exception:
1775                                 pass
1776                             try:
1777                                 file.checksum = file.checksum.strip()
1778                             except Exception:
1779                                 pass
1780                             job.addFile(file)
1781                         return job
1782                 tmp_log.debug(f"not found")
1783                 return None
1784             except Exception:
1785                 # roll back
1786                 self._rollback()
1787                 if i_try + 1 < n_try:
1788                     tmp_log.error(f"retry {i_try}")
1789                     time.sleep(random.randint(10, 20))
1790                     continue
1791                 self.dump_error_message(tmp_log)
1792                 # return None
1793                 return None
1794 
1795     # throttle user jobs
1796     def throttleUserJobs(self, prodUserName, workingGroup, get_dict):
1797         comment = " /* DBProxy.throttleUserJobs */"
1798         tmp_log = self.create_tagged_logger(comment, f"user={prodUserName} group={workingGroup}")
1799         tmp_log.debug("start")
1800         try:
1801             # sql to get tasks
1802             sql_get_tasks = (
1803                 "SELECT /*+ INDEX_RS_ASC(tab JOBSACTIVE4_PRODUSERNAMEST_IDX) */ DISTINCT jediTaskID "
1804                 "FROM ATLAS_PANDA.jobsActive4 tab "
1805                 "WHERE prodSourceLabel=:prodSourceLabel AND prodUserName=:prodUserName "
1806                 "AND jobStatus=:oldJobStatus AND relocationFlag=:oldRelFlag "
1807                 "AND maxCpuCount>:maxTime "
1808             )
1809 
1810             if workingGroup is not None:
1811                 sql_get_tasks += "AND workingGroup=:workingGroup "
1812             else:
1813                 sql_get_tasks += "AND workingGroup IS NULL "
1814 
1815             # sql to get jobs
1816             sql_get_jobs = (
1817                 "SELECT "
1818                 "PandaID, jediTaskID, cloud, computingSite, prodSourceLabel "
1819                 "FROM ATLAS_PANDA.jobsActive4 "
1820                 "WHERE jediTaskID=:jediTaskID "
1821                 "AND jobStatus=:oldJobStatus AND relocationFlag=:oldRelFlag "
1822                 "AND maxCpuCount>:maxTime "
1823             )
1824 
1825             # sql to update jobs
1826             sql_update_jobs = (
1827                 f"UPDATE {panda_config.schemaPANDA}.jobsActive4 SET jobStatus=:newJobStatus,relocationFlag=:newRelFlag "
1828                 f"WHERE jediTaskID=:jediTaskID AND jobStatus=:oldJobStatus AND maxCpuCount>:maxTime"
1829             )
1830 
1831             # start transaction
1832             self.conn.begin()
1833             # select
1834             self.cur.arraysize = 10
1835             var_map = {":prodSourceLabel": "user", ":oldRelFlag": 1, ":prodUserName": prodUserName, ":oldJobStatus": "activated", ":maxTime": 6 * 60 * 60}
1836             if workingGroup is not None:
1837                 var_map[":workingGroup"] = workingGroup
1838             # get tasks
1839             self.cur.execute(sql_get_tasks + comment, var_map)
1840             task_rows = self.cur.fetchall()
1841             # commit
1842             if not self._commit():
1843                 raise RuntimeError("Commit error")
1844             # loop over all tasks
1845             task_ids = [task_id for task_id, in task_rows]
1846             random.shuffle(task_ids)
1847             total_updated = 0
1848             updated_per_task = {}
1849             for task_id in task_ids:
1850                 tmp_log.debug(f"reset jediTaskID={task_id}")
1851                 # start transaction
1852                 self.conn.begin()
1853                 # get jobs
1854                 var_map = {":jediTaskID": task_id, ":oldRelFlag": 1, ":oldJobStatus": "activated", ":maxTime": 6 * 60 * 60}
1855                 self.cur.execute(sql_get_jobs + comment, var_map)
1856                 job_rows = self.cur.fetchall()
1857                 job_info_map = {
1858                     panda_id: {
1859                         "computingSite": computing_site,
1860                         "cloud": cloud,
1861                         "prodSourceLabel": prod_source_label,
1862                     }
1863                     for panda_id, _task_id, cloud, computing_site, prod_source_label in job_rows
1864                 }
1865                 # update jobs
1866                 var_map = {":jediTaskID": task_id, ":newRelFlag": 3, ":newJobStatus": "throttled", ":oldJobStatus": "activated", ":maxTime": 6 * 60 * 60}
1867                 self.cur.execute(sql_update_jobs + comment, var_map)
1868                 n_updated = self.cur.rowcount
1869                 tmp_log.debug(f"reset {n_updated} jobs")
1870                 if n_updated > 0:
1871                     total_updated += n_updated
1872                     updated_per_task[task_id] = n_updated
1873                 for panda_id, job_info in job_info_map.items():
1874                     self.recordStatusChange(
1875                         panda_id,
1876                         var_map[":newJobStatus"],
1877                         infoMap=job_info,
1878                         useCommit=False,
1879                     )
1880                 # commit
1881                 if not self._commit():
1882                     raise RuntimeError("Commit error")
1883             if get_dict:
1884                 tmp_log.debug(f"done with {updated_per_task}")
1885                 return updated_per_task
1886             tmp_log.debug(f"done with {total_updated}")
1887             return total_updated
1888         except Exception:
1889             # roll back
1890             self._rollback()
1891             # error
1892             self.dump_error_message(tmp_log)
1893             return None
1894 
1895     # unthrottle user jobs
1896     def unThrottleUserJobs(self, prodUserName, workingGroup, get_dict):
1897         comment = " /* DBProxy.unThrottleUserJobs */"
1898         tmp_log = self.create_tagged_logger(comment, f"user={prodUserName} group={workingGroup}")
1899         tmp_log.debug("start")
1900         try:
1901             # sql to get tasks
1902             sql_get_tasks = (
1903                 "SELECT /*+ INDEX_RS_ASC(tab JOBSACTIVE4_PRODUSERNAMEST_IDX) */ DISTINCT jediTaskID "
1904                 "FROM ATLAS_PANDA.jobsActive4 tab "
1905                 "WHERE prodSourceLabel=:prodSourceLabel AND prodUserName=:prodUserName "
1906                 "AND jobStatus=:oldJobStatus AND relocationFlag=:oldRelFlag "
1907             )
1908 
1909             if workingGroup is not None:
1910                 sql_get_tasks += "AND workingGroup=:workingGroup "
1911             else:
1912                 sql_get_tasks += "AND workingGroup IS NULL "
1913 
1914             # sql to get jobs
1915             sql_get_jobs = (
1916                 "SELECT "
1917                 "PandaID, jediTaskID, cloud, computingSite, prodSourceLabel "
1918                 "FROM ATLAS_PANDA.jobsActive4 "
1919                 "WHERE jediTaskID=:jediTaskID "
1920                 "AND jobStatus=:oldJobStatus AND relocationFlag=:oldRelFlag "
1921             )
1922 
1923             # sql to update jobs
1924             sql_update_jobs = (
1925                 f"UPDATE {panda_config.schemaPANDA}.jobsActive4 SET jobStatus=:newJobStatus,relocationFlag=:newRelFlag "
1926                 "WHERE jediTaskID=:jediTaskID AND jobStatus=:oldJobStatus "
1927             )
1928 
1929             # start transaction
1930             self.conn.begin()
1931             # select
1932             self.cur.arraysize = 10
1933             var_map = {":prodSourceLabel": "user", ":oldRelFlag": 3, ":prodUserName": prodUserName, ":oldJobStatus": "throttled"}
1934             if workingGroup is not None:
1935                 var_map[":workingGroup"] = workingGroup
1936             # get tasks
1937             self.cur.execute(sql_get_tasks + comment, var_map)
1938             task_rows = self.cur.fetchall()
1939             # commit
1940             if not self._commit():
1941                 raise RuntimeError("Commit error")
1942             # loop over all tasks
1943             task_ids = [task_id for task_id, in task_rows]
1944             random.shuffle(task_ids)
1945             total_updated = 0
1946             updated_per_task = {}
1947             for task_id in task_ids:
1948                 tmp_log.debug(f"reset jediTaskID={task_id}")
1949                 # start transaction
1950                 self.conn.begin()
1951                 # get jobs
1952                 var_map = {":jediTaskID": task_id, ":oldRelFlag": 3, ":oldJobStatus": "throttled"}
1953                 self.cur.execute(sql_get_jobs + comment, var_map)
1954                 job_rows = self.cur.fetchall()
1955                 job_info_map = {
1956                     panda_id: {
1957                         "computingSite": computing_site,
1958                         "cloud": cloud,
1959                         "prodSourceLabel": prod_source_label,
1960                     }
1961                     for panda_id, _task_id, cloud, computing_site, prod_source_label in job_rows
1962                 }
1963                 # update jobs
1964                 var_map = {":jediTaskID": task_id, ":newRelFlag": 1, ":newJobStatus": "activated", ":oldJobStatus": "throttled"}
1965                 self.cur.execute(sql_update_jobs + comment, var_map)
1966                 n_updated = self.cur.rowcount
1967                 tmp_log.debug(f"reset {n_updated} jobs")
1968                 if n_updated > 0:
1969                     total_updated += n_updated
1970                     updated_per_task[task_id] = n_updated
1971                 for panda_id, job_info in job_info_map.items():
1972                     self.recordStatusChange(
1973                         panda_id,
1974                         var_map[":newJobStatus"],
1975                         infoMap=job_info,
1976                         useCommit=False,
1977                     )
1978                 # commit
1979                 if not self._commit():
1980                     raise RuntimeError("Commit error")
1981             if get_dict:
1982                 tmp_log.debug(f"done with {updated_per_task}")
1983                 return updated_per_task
1984             tmp_log.debug(f"done with {total_updated}")
1985             return total_updated
1986         except Exception:
1987             # roll back
1988             self._rollback()
1989             # error
1990             self.dump_error_message(tmp_log)
1991             return None
1992 
1993     # get the list of jobdefIDs for failed jobs in a task
1994     def getJobdefIDsForFailedJob(self, jediTaskID):
1995         comment = " /* DBProxy.getJobdefIDsForFailedJob */"
1996         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
1997         tmp_log.debug(f"start")
1998         try:
1999             # begin transaction
2000             self.conn.begin()
2001             # dql to get jobDefIDs
2002             sqlGF = "SELECT distinct jobDefinitionID FROM ATLAS_PANDA.jobsActive4 "
2003             sqlGF += "WHERE jediTaskID=:jediTaskID AND jobStatus=:jobStatus "
2004             sqlGF += "AND attemptNr<maxAttempt "
2005             varMap = {}
2006             varMap[":jediTaskID"] = jediTaskID
2007             varMap[":jobStatus"] = "failed"
2008             self.cur.execute(sqlGF + comment, varMap)
2009             resGF = self.cur.fetchall()
2010             retList = []
2011             for (jobDefinitionID,) in resGF:
2012                 retList.append(jobDefinitionID)
2013             # commit
2014             if not self._commit():
2015                 raise RuntimeError("Commit error")
2016             tmp_log.debug(f"{str(retList)}")
2017             return retList
2018         except Exception:
2019             # roll back
2020             self._rollback()
2021             # error
2022             self.dump_error_message(tmp_log)
2023             return []
2024 
2025     # check validity of merge job
2026     def isValidMergeJob(self, pandaID, jediTaskID):
2027         comment = " /* DBProxy.isValidMergeJob */"
2028         tmp_log = self.create_tagged_logger(comment, f"PandaID={pandaID} jediTaskID={jediTaskID}")
2029         tmp_log.debug("start")
2030         try:
2031             retVal = True
2032             retMsg = ""
2033             # sql to check if merge job is active
2034             sqlJ = "SELECT jobStatus FROM ATLAS_PANDA.jobsDefined4 WHERE PandaID=:PandaID "
2035             sqlJ += "UNION "
2036             sqlJ += "SELECT jobStatus FROM ATLAS_PANDA.jobsActive4 WHERE PandaID=:PandaID "
2037             # sql to get input files
2038             sqlF = "SELECT datasetID,fileID FROM ATLAS_PANDA.filesTable4 "
2039             sqlF += "WHERE PandaID=:PandaID AND type IN (:type1,:type2) "
2040             # sql to get PandaIDs for pre-merged jobs
2041             sqlP = "SELECT outPandaID FROM ATLAS_PANDA.JEDI_Dataset_Contents "
2042             sqlP += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID AND type<>:type1"
2043             # sql to check if pre-merge job is active
2044             sqlC = "SELECT jobStatus FROM ATLAS_PANDA.jobsActive4 WHERE PandaID=:PandaID "
2045             # begin transaction
2046             self.conn.begin()
2047             # check if merge job is active
2048             varMap = {}
2049             varMap[":PandaID"] = pandaID
2050             self.cur.execute(sqlJ + comment, varMap)
2051             resJ = self.cur.fetchone()
2052             if resJ is None:
2053                 tmp_log.debug("merge job not found")
2054             else:
2055                 # get input files
2056                 varMap = {}
2057                 varMap[":PandaID"] = pandaID
2058                 varMap[":type1"] = "input"
2059                 varMap[":type2"] = "pseudo_input"
2060                 self.cur.execute(sqlF + comment, varMap)
2061                 resF = self.cur.fetchall()
2062                 firstDatasetID = None
2063                 fileIDsMap = {}
2064                 for datasetID, fileID in resF:
2065                     if datasetID not in fileIDsMap:
2066                         fileIDsMap[datasetID] = set()
2067                     fileIDsMap[datasetID].add(fileID)
2068                 # get PandaIDs for pre-merged jobs
2069                 pandaIDs = set()
2070                 for datasetID in fileIDsMap:
2071                     fileIDs = fileIDsMap[datasetID]
2072                     for fileID in fileIDs:
2073                         varMap = {}
2074                         varMap[":jediTaskID"] = jediTaskID
2075                         varMap[":datasetID"] = datasetID
2076                         varMap[":fileID"] = fileID
2077                         varMap[":type1"] = "lib"
2078                         self.cur.execute(sqlP + comment, varMap)
2079                         resP = self.cur.fetchone()
2080                         if resP is not None and resP[0] is not None:
2081                             pandaIDs.add(resP[0])
2082                     # only files in the first dataset are enough
2083                     if len(pandaIDs) > 0:
2084                         break
2085                 # check pre-merge job
2086                 for tmpPandaID in pandaIDs:
2087                     varMap = {}
2088                     varMap[":PandaID"] = tmpPandaID
2089                     self.cur.execute(sqlC + comment, varMap)
2090                     resC = self.cur.fetchone()
2091                     if resC is None:
2092                         # not found
2093                         tmp_log.debug(f"pre-merge job {tmpPandaID} not found")
2094                         retVal = False
2095                         retMsg = tmpPandaID
2096                         break
2097                     elif resC[0] != "merging":
2098                         # not in merging
2099                         tmp_log.debug("pre-merge job in {0} != merging".format(tmpPandaID, resC[0]))
2100                         retVal = False
2101                         retMsg = tmpPandaID
2102                         break
2103             # commit
2104             if not self._commit():
2105                 raise RuntimeError("Commit error")
2106             tmp_log.debug(f"ret={retVal}")
2107             return retVal, retMsg
2108         except Exception:
2109             # roll back
2110             self._rollback()
2111             # error
2112             self.dump_error_message(tmp_log)
2113             return None, ""
2114 
2115     # check Job status
2116     def checkJobStatus(self, pandaID):
2117         comment = " /* DBProxy.checkJobStatus */"
2118         tmp_log = self.create_tagged_logger(comment, f"PandaID={pandaID}")
2119         tmp_log.debug("start")
2120         retVal = {"command": None, "status": None}
2121         try:
2122             sqlC = (
2123                 "SELECT jobStatus,commandToPilot FROM ATLAS_PANDA.jobsActive4 "
2124                 "WHERE PandaID=:pandaID "
2125                 "UNION "
2126                 "SELECT /*+ INDEX_RS_ASC(JOBSARCHIVED4 PART_JOBSARCHIVED4_PK) */ "
2127                 "jobStatus,commandToPilot FROM ATLAS_PANDA.jobsArchived4 "
2128                 "WHERE PandaID=:pandaID AND modificationTime>:timeLimit "
2129             )
2130             varMap = dict()
2131             varMap[":pandaID"] = int(pandaID)
2132             varMap[":timeLimit"] = naive_utcnow() - datetime.timedelta(hours=1)
2133             # begin transaction
2134             self.conn.begin()
2135             # select
2136             self.cur.arraysize = 10
2137             self.cur.execute(sqlC + comment, varMap)
2138             res = self.cur.fetchone()
2139             if res is not None:
2140                 retVal["status"], retVal["command"] = res
2141             else:
2142                 retVal["status"], retVal["command"] = "unknown", "tobekilled"
2143             # commit
2144             if not self._commit():
2145                 raise RuntimeError("Commit error")
2146             tmp_log.debug(f"done with {str(retVal)}")
2147             return retVal
2148         except Exception:
2149             # roll back
2150             self._rollback()
2151             self.dump_error_message(tmp_log)
2152             return retVal
2153 
2154     # get active job attribute
2155     def getActiveJobAttributes(self, pandaID, attrs):
2156         comment = " /* DBProxy.getActiveJobAttributes */"
2157         tmp_log = self.create_tagged_logger(comment, f"PandaID={pandaID}")
2158         tmp_log.debug("start")
2159         try:
2160             sqlS = f"SELECT {','.join(attrs)} FROM ATLAS_PANDA.jobsActive4 "
2161             sqlS += "WHERE PandaID=:PandaID "
2162             # start transaction
2163             self.conn.begin()
2164             varMap = {}
2165             varMap[":PandaID"] = pandaID
2166             self.cur.execute(sqlS + comment, varMap)
2167             res = self.cur.fetchone()
2168             if res is not None:
2169                 retMap = dict()
2170                 for idx, attr in enumerate(attrs):
2171                     retMap[attr] = res[idx]
2172             else:
2173                 retMap = None
2174             # commit
2175             if not self._commit():
2176                 raise RuntimeError("Commit error")
2177             tmp_log.debug(f"got {str(retMap)}")
2178             return retMap
2179         except Exception:
2180             # roll back
2181             self._rollback()
2182             # error
2183             self.dump_error_message(tmp_log)
2184             return None
2185 
2186     # get user job metadata
2187     def getUserJobMetadata(self, jediTaskID):
2188         comment = " /* DBProxy.getUserJobMetadata */"
2189         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
2190         tmp_log.debug("start")
2191         try:
2192             # sql to get workers
2193             sqlC = "SELECT j.PandaID,m.metaData FROM {0} j,{1} m "
2194             sqlC += "WHERE j.jediTaskID=:jediTaskID AND j.jobStatus=:jobStatus AND m.PandaID=j.PandaID AND j.prodSourceLabel=:label "
2195             retMap = dict()
2196             for a, m in [
2197                 ("ATLAS_PANDA.jobsArchived4", "ATLAS_PANDA.metaTable"),
2198                 ("ATLAS_PANDAARCH.jobsArchived", "ATLAS_PANDAARCH.metaTable_ARCH"),
2199             ]:
2200                 sql = sqlC.format(a, m)
2201                 varMap = {}
2202                 varMap[":jediTaskID"] = jediTaskID
2203                 varMap[":label"] = "user"
2204                 varMap[":jobStatus"] = "finished"
2205                 # start transaction
2206                 self.conn.begin()
2207                 self.cur.execute(sql + comment, varMap)
2208                 resCs = self.cur.fetchall()
2209                 for pandaID, clobMeta in resCs:
2210                     try:
2211                         metadata = clobMeta.read()
2212                     except AttributeError:
2213                         metadata = str(clobMeta)
2214                     try:
2215                         retMap[pandaID] = json.loads(metadata)
2216                     except Exception:
2217                         pass
2218                 # commit
2219                 if not self._commit():
2220                     raise RuntimeError("Commit error")
2221             tmp_log.debug(f"got {len(retMap)} data blocks")
2222             return retMap
2223         except Exception:
2224             # roll back
2225             self._rollback()
2226             # error
2227             self.dump_error_message(tmp_log)
2228             return {}
2229 
2230     # insert job output report
2231     def insertJobOutputReport(self, panda_id, prod_source_label, job_status, attempt_nr, data):
2232         comment = " /* DBProxy.insertJobOutputReport */"
2233         tmp_log = self.create_tagged_logger(comment, f"PandaID={panda_id} attemptNr={attempt_nr}")
2234         tmp_log.debug("start")
2235         # sql to insert
2236         sqlI = (
2237             "INSERT INTO {0}.Job_Output_Report "
2238             "(PandaID, prodSourceLabel, jobStatus, attemptNr, data, timeStamp) "
2239             "VALUES(:PandaID, :prodSourceLabel, :jobStatus, :attemptNr, :data, :timeStamp) "
2240         ).format(panda_config.schemaPANDA)
2241         try:
2242             retVal = False
2243             # start transaction
2244             self.conn.begin()
2245             # insert
2246             varMap = {}
2247             varMap[":PandaID"] = panda_id
2248             varMap[":prodSourceLabel"] = prod_source_label
2249             varMap[":jobStatus"] = job_status
2250             varMap[":attemptNr"] = attempt_nr
2251             varMap[":data"] = data
2252             varMap[":timeStamp"] = naive_utcnow()
2253             self.cur.execute(sqlI + comment, varMap)
2254             tmp_log.debug("successfully inserted")
2255             retVal = True
2256             # commit
2257             if not self._commit():
2258                 raise RuntimeError("Commit error")
2259             tmp_log.debug("done")
2260             return retVal
2261         except Exception:
2262             # roll back
2263             self._rollback()
2264             # error
2265             self.dump_error_message(tmp_log)
2266             return retVal
2267 
2268     # update data of job output report
2269     def updateJobOutputReport(self, panda_id, attempt_nr, data):
2270         comment = " /* DBProxy.updateJobOutputReport */"
2271         tmp_log = self.create_tagged_logger(comment, f"PandaID={panda_id} attemptNr={attempt_nr}")
2272         tmp_log.debug("start")
2273         # try to lock
2274         try:
2275             retVal = False
2276             # sql to update
2277             sqlU = f"UPDATE {panda_config.schemaPANDA}.Job_Output_Report SET data=:data, timeStamp=:timeStamp WHERE PandaID=:PandaID AND attemptNr=:attemptNr "
2278             # start transaction
2279             self.conn.begin()
2280             # update
2281             varMap = {}
2282             varMap[":PandaID"] = panda_id
2283             varMap[":attemptNr"] = attempt_nr
2284             varMap[":data"] = data
2285             varMap[":timeStamp"] = naive_utcnow()
2286             self.cur.execute(sqlU + comment, varMap)
2287             nRow = self.cur.rowcount
2288             if nRow == 1:
2289                 tmp_log.debug("successfully updated")
2290                 retVal = True
2291             elif nRow == 0:
2292                 tmp_log.debug("entry not found, not updated")
2293             else:
2294                 tmp_log.warning(f"updated unspecific number of rows: {nRow}")
2295             # commit
2296             if not self._commit():
2297                 raise RuntimeError("Commit error")
2298             tmp_log.debug("done")
2299             return retVal
2300         except Exception:
2301             # roll back
2302             self._rollback()
2303             # error
2304             self.dump_error_message(tmp_log)
2305             return retVal
2306 
2307     # deleted job output report
2308     def deleteJobOutputReport(self, panda_id, attempt_nr):
2309         comment = " /* DBProxy.deleteJobOutputReport */"
2310         tmp_log = self.create_tagged_logger(comment, f"PandaID={panda_id} attemptNr={attempt_nr}")
2311         tmp_log.debug("start")
2312         # sql to delete
2313         sqlD = f"DELETE FROM {panda_config.schemaPANDA}.Job_Output_Report WHERE PandaID=:PandaID AND attemptNr=:attemptNr "
2314         try:
2315             retVal = False
2316             # start transaction
2317             self.conn.begin()
2318             # delete
2319             varMap = {}
2320             varMap[":PandaID"] = panda_id
2321             varMap[":attemptNr"] = attempt_nr
2322             self.cur.execute(sqlD + comment, varMap)
2323             tmp_log.debug("successfully deleted")
2324             retVal = True
2325             # commit
2326             if not self._commit():
2327                 raise RuntimeError("Commit error")
2328             tmp_log.debug("done")
2329             return retVal
2330         except Exception:
2331             # roll back
2332             self._rollback()
2333             # error
2334             self.dump_error_message(tmp_log)
2335             return retVal
2336 
2337     # get record of a job output report
2338     def getJobOutputReport(self, panda_id, attempt_nr):
2339         comment = " /* DBProxy.getJobOutputReport */"
2340         tmp_log = self.create_tagged_logger(comment, f"PandaID={panda_id} attemptNr={attempt_nr}")
2341         tmp_log.debug("start")
2342         # try to lock
2343         try:
2344             retVal = {}
2345             # sql to get records
2346             sqlGR = (
2347                 "SELECT PandaID,prodSourceLabel,jobStatus,attemptNr,data,timeStamp,lockedBy,lockedTime "
2348                 "FROM {0}.Job_Output_Report "
2349                 "WHERE PandaID=:PandaID AND attemptNr=:attemptNr "
2350             ).format(panda_config.schemaPANDA)
2351             # start transaction
2352             self.conn.begin()
2353             # check
2354             varMap = {}
2355             varMap[":PandaID"] = panda_id
2356             varMap[":attemptNr"] = attempt_nr
2357             self.cur.execute(sqlGR + comment, varMap)
2358             resGR = self.cur.fetchall()
2359             if not resGR:
2360                 tmp_log.debug("record does not exist, skipped")
2361             for (
2362                 PandaID,
2363                 prodSourceLabel,
2364                 jobStatus,
2365                 attemptNr,
2366                 data,
2367                 timeStamp,
2368                 lockedBy,
2369                 lockedTime,
2370             ) in resGR:
2371                 # fill result
2372                 retVal = {
2373                     "PandaID": PandaID,
2374                     "jobStatus": jobStatus,
2375                     "attemptNr": attemptNr,
2376                     "timeStamp": timeStamp,
2377                     "data": data,
2378                     "lockedBy": lockedBy,
2379                     "lockedTime": lockedTime,
2380                 }
2381                 tmp_log.debug("got record")
2382                 break
2383             # commit
2384             if not self._commit():
2385                 raise RuntimeError("Commit error")
2386             tmp_log.debug("done")
2387             return retVal
2388         except Exception:
2389             # roll back
2390             self._rollback()
2391             # error
2392             self.dump_error_message(tmp_log)
2393             return retVal
2394 
2395     # lock job output report
2396     def lockJobOutputReport(self, panda_id, attempt_nr, pid, time_limit, take_over_from=None):
2397         comment = " /* DBProxy.lockJobOutputReport */"
2398         tmp_log = self.create_tagged_logger(comment, f"PandaID={panda_id} attemptNr={attempt_nr}")
2399         tmp_log.debug("start")
2400         # try to lock
2401         try:
2402             retVal = []
2403             # sql to get lock
2404             sqlGL = (
2405                 "SELECT PandaID,attemptNr "
2406                 "FROM {0}.Job_Output_Report "
2407                 "WHERE PandaID=:PandaID AND attemptNr=:attemptNr "
2408                 "AND (lockedBy IS NULL OR lockedBy=:lockedBy OR lockedTime<:lockedTime) "
2409                 "FOR UPDATE NOWAIT "
2410             ).format(panda_config.schemaPANDA)
2411             # sql to update lock
2412             sqlUL = (
2413                 "UPDATE {0}.Job_Output_Report " "SET lockedBy=:lockedBy, lockedTime=:lockedTime " "WHERE PandaID=:PandaID AND attemptNr=:attemptNr "
2414             ).format(panda_config.schemaPANDA)
2415             # start transaction
2416             self.conn.begin()
2417             # check
2418             varMap = {}
2419             varMap[":PandaID"] = panda_id
2420             varMap[":attemptNr"] = attempt_nr
2421             if take_over_from is None:
2422                 varMap[":lockedBy"] = pid
2423             else:
2424                 varMap[":lockedBy"] = take_over_from
2425             varMap[":lockedTime"] = naive_utcnow() - datetime.timedelta(minutes=time_limit)
2426             utc_now = naive_utcnow()
2427             try:
2428                 self.cur.execute(sqlGL + comment, varMap)
2429                 resGL = self.cur.fetchall()
2430                 if not resGL:
2431                     tmp_log.debug("record already locked by other thread, skipped")
2432             except Exception:
2433                 resGL = None
2434                 tmp_log.debug("record skipped due to NOWAIT")
2435             if resGL:
2436                 for panda_id, attempt_nr in resGL:
2437                     # lock
2438                     varMap = {}
2439                     varMap[":PandaID"] = panda_id
2440                     varMap[":attemptNr"] = attempt_nr
2441                     varMap[":lockedBy"] = pid
2442                     varMap[":lockedTime"] = utc_now
2443                     self.cur.execute(sqlUL + comment, varMap)
2444                     if take_over_from is None:
2445                         tmp_log.debug(f"successfully locked record by {pid}")
2446                     else:
2447                         tmp_log.debug(f"successfully took over locked record from {take_over_from} by {pid}")
2448                     retVal = True
2449                     break
2450             # commit
2451             if not self._commit():
2452                 raise RuntimeError("Commit error")
2453             tmp_log.debug("done")
2454             return retVal
2455         except Exception:
2456             # roll back
2457             self._rollback()
2458             # error
2459             self.dump_error_message(tmp_log)
2460             return retVal
2461 
2462     # unlock job output report
2463     def unlockJobOutputReport(self, panda_id, attempt_nr, pid, lock_offset):
2464         comment = " /* DBProxy.unlockJobOutputReport */"
2465         tmp_log = self.create_tagged_logger(comment, f"PandaID={panda_id} attemptNr={attempt_nr}")
2466         tmp_log.debug("start")
2467         # try to lock
2468         try:
2469             retVal = []
2470             # sql to get lock
2471             sqlGL = (
2472                 "SELECT PandaID,attemptNr "
2473                 "FROM {0}.Job_Output_Report "
2474                 "WHERE PandaID=:PandaID AND attemptNr=:attemptNr "
2475                 "AND lockedBy=:lockedBy "
2476                 "FOR UPDATE"
2477             ).format(panda_config.schemaPANDA)
2478             # sql to update lock
2479             sqlUL = f"UPDATE {panda_config.schemaPANDA}.Job_Output_Report SET lockedTime=:lockedTime WHERE PandaID=:PandaID AND attemptNr=:attemptNr "
2480             # start transaction
2481             self.conn.begin()
2482             # check
2483             varMap = {}
2484             varMap[":PandaID"] = panda_id
2485             varMap[":attemptNr"] = attempt_nr
2486             varMap[":lockedBy"] = pid
2487             self.cur.execute(sqlGL + comment, varMap)
2488             resGL = self.cur.fetchall()
2489             if not resGL:
2490                 tmp_log.debug("record not locked by this thread, skipped")
2491             else:
2492                 for panda_id, attempt_nr in resGL:
2493                     # lock
2494                     varMap = {}
2495                     varMap[":PandaID"] = panda_id
2496                     varMap[":attemptNr"] = attempt_nr
2497                     varMap[":lockedTime"] = naive_utcnow() - datetime.timedelta(minutes=lock_offset)
2498                     self.cur.execute(sqlUL + comment, varMap)
2499                     tmp_log.debug("successfully unlocked record")
2500                     retVal = True
2501                     break
2502             # commit
2503             if not self._commit():
2504                 raise RuntimeError("Commit error")
2505             tmp_log.debug("done")
2506             return retVal
2507         except Exception:
2508             # roll back
2509             self._rollback()
2510             # error
2511             self.dump_error_message(tmp_log)
2512             return retVal
2513 
2514     # list pandaID, jobStatus, attemptNr, timeStamp of job output report
2515     def listJobOutputReport(self, only_unlocked, time_limit, limit, grace_period, labels, anti_labels):
2516         comment = " /* DBProxy.listJobOutputReport */"
2517         tmp_log = self.create_tagged_logger(comment)
2518         tmp_log.debug(f"start label={str(labels)} limit={limit} anti_label={str(anti_labels)}")
2519         try:
2520             retVal = None
2521             if only_unlocked:
2522                 # try to get only records unlocked or with expired lock
2523                 varMap = {}
2524                 varMap[":limit"] = limit * 10
2525                 varMap[":lockedTime"] = naive_utcnow() - datetime.timedelta(minutes=time_limit)
2526                 varMap[":timeStamp"] = naive_utcnow() - datetime.timedelta(seconds=grace_period)
2527                 # sql to get record
2528                 sqlGR = (
2529                     "SELECT * "
2530                     "FROM ( "
2531                     "SELECT PandaID,jobStatus,attemptNr,timeStamp "
2532                     "FROM {0}.Job_Output_Report "
2533                     "WHERE (lockedBy IS NULL OR lockedTime<:lockedTime) "
2534                     "AND timeStamp<:timeStamp ".format(panda_config.schemaPANDA)
2535                 )
2536                 if labels is not None:
2537                     label_var_names_str, label_var_map = get_sql_IN_bind_variables(labels, prefix=":l_", value_as_suffix=True)
2538                     sqlGR += f"AND prodSourceLabel IN ({label_var_names_str}) "
2539                     varMap.update(label_var_map)
2540                 if anti_labels is not None:
2541                     anti_label_var_names_str, anti_label_var_map = get_sql_IN_bind_variables(anti_labels, prefix=":al_", value_as_suffix=True)
2542                     sqlGR += f"AND prodSourceLabel NOT IN ({anti_label_var_names_str}) "
2543                     varMap.update(anti_label_var_map)
2544                 sqlGR += "ORDER BY timeStamp " ") " "WHERE rownum<=:limit "
2545                 # start transaction
2546                 self.conn.begin()
2547                 # check
2548                 self.cur.execute(sqlGR + comment, varMap)
2549                 separator = limit // 10
2550                 retVal = self.cur.fetchall()
2551                 # shuffle tail to avoid conflict
2552                 ret_head = retVal[:separator]
2553                 ret_tail = retVal[separator:]
2554                 random.shuffle(ret_tail)
2555                 retVal = ret_head + ret_tail
2556                 retVal = retVal[:limit]
2557                 tmp_log.debug(f"listed {len(retVal)} unlocked records")
2558             else:
2559                 # sql to select
2560                 sqlS = (
2561                     "SELECT * "
2562                     "FROM ( "
2563                     "SELECT PandaID,jobStatus,attemptNr,timeStamp "
2564                     "FROM {0}.Job_Output_Report "
2565                     "ORDER BY timeStamp "
2566                     ") "
2567                     "WHERE rownum<=:limit "
2568                 ).format(panda_config.schemaPANDA)
2569                 # start transaction
2570                 self.conn.begin()
2571                 varMap = {}
2572                 varMap[":limit"] = limit
2573                 # check
2574                 self.cur.execute(sqlS + comment, varMap)
2575                 retVal = self.cur.fetchall()
2576                 tmp_log.debug(f"listed {len(retVal)} records")
2577             # commit
2578             if not self._commit():
2579                 raise RuntimeError("Commit error")
2580             tmp_log.debug("done")
2581             return retVal
2582         except Exception:
2583             # roll back
2584             self._rollback()
2585             # error
2586             self.dump_error_message(tmp_log)
2587             return retVal
2588 
2589     # send command to a job
2590     def send_command_to_job(self, panda_id, com):
2591         comment = " /* DBProxy.send_command_to_job */"
2592         tmp_log = self.create_tagged_logger(comment, f"PandaID={panda_id}")
2593         tmp_log.debug("start")
2594         retVal = None
2595         try:
2596             # check length
2597             new_com = JobSpec.truncateStringAttr("commandToPilot", com)
2598             if len(new_com) != len(com):
2599                 retVal = (
2600                     False,
2601                     f"command string too long. must be less than {len(new_com)} chars",
2602                 )
2603             else:
2604                 sqlR = "SELECT commandToPilot FROM ATLAS_PANDA.{} WHERE PandaID=:PandaID FOR UPDATE "
2605                 sqlU = "UPDATE ATLAS_PANDA.{} SET commandToPilot=:commandToPilot " "WHERE PandaID=:PandaID "
2606                 for table in ["jobsDefined4", "jobsActive4"]:
2607                     # start transaction
2608                     self.conn.begin()
2609                     # read
2610                     varMap = {}
2611                     varMap[":PandaID"] = panda_id
2612                     self.cur.execute(sqlR.format(table) + comment, varMap)
2613                     data = self.cur.fetchone()
2614                     if data is not None:
2615                         (commandToPilot,) = data
2616                         if commandToPilot == "tobekilled":
2617                             retVal = (False, "job is being killed")
2618                         else:
2619                             varMap = {}
2620                             varMap[":PandaID"] = panda_id
2621                             varMap[":commandToPilot"] = com
2622                             self.cur.execute(sqlU.format(table) + comment, varMap)
2623                             nRow = self.cur.rowcount
2624                             if nRow:
2625                                 retVal = (True, "command received")
2626                     # commit
2627                     if not self._commit():
2628                         raise RuntimeError("Commit error")
2629                     if retVal is not None:
2630                         break
2631             if retVal is None:
2632                 retVal = (False, f"no active job with PandaID={panda_id}")
2633             tmp_log.debug(f"done with {str(retVal)}")
2634             return retVal
2635         except Exception:
2636             # roll back
2637             self._rollback()
2638             # error
2639             self.dump_error_message(tmp_log)
2640             return False, "database error"
2641 
2642     def get_distinct_resource_types_per_site(self, jedi_task_id: int, threshold: float = 20.0) -> dict[str, set[str]]:
2643         """Get distinct resource types per computingSite from jobsActive4 and jobsDefined4 for a given task,
2644         ignoring resource types whose share of total jobs at that site is below the threshold percentage.
2645 
2646         :param jedi_task_id: the jediTaskID to filter jobs by
2647         :param threshold: minimum percentage (0.0~100.0) of site jobs required to include a resource type (0.0 = no filtering)
2648         :return: dict mapping computingSite to a set of resource_type strings, or empty dict on error
2649         """
2650         comment = " /* DBProxy.get_distinct_resource_types_per_site */"
2651         tmp_log = self.create_tagged_logger(comment)
2652         tmp_log.debug("start")
2653         try:
2654             # start transaction
2655             self.conn.begin()
2656             var_map = {":jediTaskID": jedi_task_id}
2657             sql = (
2658                 "SELECT computingSite, resource_type, COUNT(*) "
2659                 "FROM ATLAS_PANDA.jobsActive4 "
2660                 "WHERE jediTaskID=:jediTaskID AND resource_type IS NOT NULL "
2661                 "GROUP BY computingSite, resource_type "
2662                 "UNION ALL "
2663                 "SELECT computingSite, resource_type, COUNT(*) "
2664                 "FROM ATLAS_PANDA.jobsDefined4 "
2665                 "WHERE jediTaskID=:jediTaskID AND resource_type IS NOT NULL "
2666                 "GROUP BY computingSite, resource_type"
2667             )
2668             self.cur.execute(sql + comment, var_map)
2669             res = self.cur.fetchall()
2670             # aggregate counts per (site, resource_type) across both tables
2671             count_map: dict[tuple[str, str], int] = {}
2672             site_totals: dict[str, int] = {}
2673             for computing_site, resource_type, cnt in res:
2674                 count_map[(computing_site, resource_type)] = count_map.get((computing_site, resource_type), 0) + cnt
2675                 site_totals[computing_site] = site_totals.get(computing_site, 0) + cnt
2676             # apply percentage threshold
2677             ret: dict[str, set[str]] = {}
2678             for (computing_site, resource_type), cnt in count_map.items():
2679                 if cnt * 100.0 / site_totals[computing_site] >= threshold:
2680                     ret.setdefault(computing_site, set()).add(resource_type)
2681             if not self._commit():
2682                 raise RuntimeError("Commit error")
2683             tmp_log.debug(f"done, {len(ret)} sites found")
2684             return ret
2685         except Exception:
2686             self._rollback()
2687             self.dump_error_message(tmp_log)
2688             return {}