Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:04

0001 import copy
0002 import datetime
0003 import glob
0004 import json
0005 import os
0006 import random
0007 import re
0008 import time
0009 from typing import Dict, List, Tuple
0010 
0011 from pandacommon.pandalogger.LogWrapper import LogWrapper
0012 from pandacommon.pandautils.PandaUtils import get_sql_IN_bind_variables, naive_utcnow
0013 
0014 from pandaserver.config import panda_config
0015 from pandaserver.srvcore import CoreUtils, srv_msg_utils
0016 from pandaserver.taskbuffer import EventServiceUtils, PrioUtil
0017 from pandaserver.taskbuffer.DatasetSpec import DatasetSpec
0018 from pandaserver.taskbuffer.db_proxy_mods.base_module import (
0019     BaseModule,
0020     SQL_QUEUE_TOPIC_async_dataset_update,
0021     memoize,
0022     varNUMBER,
0023 )
0024 from pandaserver.taskbuffer.FileSpec import FileSpec
0025 from pandaserver.taskbuffer.JediDatasetSpec import (
0026     INPUT_TYPES_var_map,
0027     INPUT_TYPES_var_str,
0028     JediDatasetSpec,
0029 )
0030 from pandaserver.taskbuffer.JediTaskSpec import JediTaskSpec
0031 from pandaserver.taskbuffer.JobSpec import JobSpec
0032 
0033 
0034 # Module class to define miscellaneous standalone methods that are independent of another module's methods
0035 class MiscStandaloneModule(BaseModule):
0036     # constructor
0037     def __init__(self, log_stream: LogWrapper):
0038         super().__init__(log_stream)
0039 
0040     # get PandaIDs with TaskID
0041     def getPandaIDsWithTaskID(self, jediTaskID: int, scout_only: bool = False, unsuccessful_only: bool = False) -> List[int]:
0042         """Get PanDA job IDs for a task.
0043 
0044         Args:
0045             jediTaskID: JEDI task ID.
0046             scout_only: When True, return only jobs with the ``sj`` token in the
0047                 comma-separated ``specialHandling`` field.
0048             unsuccessful_only: When True, return only jobs with status in
0049                 ``failed``, ``cancelled``, or ``closed``.
0050 
0051         Returns:
0052             List[int]: PanDA job IDs found in defined, active, and archived tables.
0053         """
0054         comment = " /* DBProxy.getPandaIDsWithTaskID */"
0055         tmp_log = self.create_tagged_logger(comment, f"<jediTaskID={jediTaskID} scout_only={scout_only} unsuccessful_only={unsuccessful_only}>")
0056         tmp_log.debug("start")
0057         # SQL
0058         scout_filter = "AND INSTR(','||NVL(specialHandling, '')||',',',sj,')>0 "
0059         unsuccessful_filter = "AND jobStatus IN ('failed','cancelled','closed') "
0060         sql = "SELECT PandaID FROM ATLAS_PANDA.jobsDefined4 "
0061         sql += "WHERE jediTaskID=:jediTaskID "
0062         if scout_only:
0063             sql += scout_filter
0064         if unsuccessful_only:
0065             sql += unsuccessful_filter
0066         sql += "UNION "
0067         sql += "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 "
0068         sql += "WHERE jediTaskID=:jediTaskID "
0069         if scout_only:
0070             sql += scout_filter
0071         if unsuccessful_only:
0072             sql += unsuccessful_filter
0073         sql += "UNION "
0074         sql += "SELECT PandaID FROM ATLAS_PANDA.jobsArchived4 "
0075         sql += "WHERE jediTaskID=:jediTaskID "
0076         if scout_only:
0077             sql += scout_filter
0078         if unsuccessful_only:
0079             sql += unsuccessful_filter
0080         varMap = {}
0081         varMap[":jediTaskID"] = jediTaskID
0082         try:
0083             # start transaction
0084             self.conn.begin()
0085             # select
0086             self.cur.arraysize = 1000000
0087             self.cur.execute(sql + comment, varMap)
0088             res = self.cur.fetchall()
0089             # commit
0090             if not self._commit():
0091                 raise RuntimeError("Commit error")
0092             retList = []
0093             for (pandaID,) in res:
0094                 retList.append(pandaID)
0095 
0096             if scout_only:
0097                 tmp_log.debug(f"found {len(retList)} scout IDs")
0098             elif unsuccessful_only:
0099                 tmp_log.debug(f"found {len(retList)} unsuccessful IDs")
0100             else:
0101                 tmp_log.debug(f"found {len(retList)} IDs")
0102             return retList
0103         except Exception:
0104             # roll back
0105             self._rollback()
0106             # error
0107             self.dump_error_message(tmp_log)
0108             return []
0109 
0110     # change task priority
0111     def changeTaskPriorityPanda(self, jediTaskID, newPriority):
0112         comment = " /* DBProxy.changeTaskPriorityPanda */"
0113         tmp_log = self.create_tagged_logger(comment, f"<jediTaskID={jediTaskID}>")
0114         tmp_log.debug(f"newPrio={newPriority}")
0115         try:
0116             # sql to update JEDI task table
0117             sqlT = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET currentPriority=:newPriority WHERE jediTaskID=:jediTaskID "
0118             # sql to update DEFT task table
0119             schemaDEFT = panda_config.schemaDEFT
0120             sqlD = f"UPDATE {schemaDEFT}.T_TASK SET current_priority=:newPriority,timestamp=CURRENT_DATE WHERE taskid=:jediTaskID "
0121             # update job priorities
0122             sqlJ = "UPDATE ATLAS_PANDA.{0} SET currentPriority=:newPriority WHERE jediTaskID=:jediTaskID "
0123             # start transaction
0124             self.conn.begin()
0125             # select
0126             self.cur.arraysize = 10
0127             varMap = {}
0128             varMap[":jediTaskID"] = jediTaskID
0129             varMap[":newPriority"] = newPriority
0130             # update JEDI
0131             self.cur.execute(sqlT + comment, varMap)
0132             nRow = self.cur.rowcount
0133             if nRow == 1:
0134                 # update jobs
0135                 for tableName in ["jobsActive4", "jobsDefined4"]:
0136                     self.cur.execute(sqlJ.format(tableName) + comment, varMap)
0137             # update DEFT
0138             self.cur.execute(sqlD + comment, varMap)
0139             # commit
0140             if not self._commit():
0141                 raise RuntimeError("Commit error")
0142             tmp_log.debug(f"done with {nRow}")
0143             return nRow
0144         except Exception:
0145             # roll back
0146             self._rollback()
0147             # error
0148             self.dump_error_message(tmp_log)
0149             return None
0150 
0151     # get jediTaskID from taskName
0152     def getTaskIDwithTaskNameJEDI(self, userName, taskName):
0153         comment = " /* DBProxy.getTaskIDwithTaskNameJEDI */"
0154         tmp_log = self.create_tagged_logger(comment, f"<userName={userName} taskName={taskName}")
0155         tmp_log.debug(f"start")
0156         try:
0157             # begin transaction
0158             self.conn.begin()
0159             # sql to get jediTaskID
0160             sqlGF = f"SELECT MAX(jediTaskID) FROM {panda_config.schemaJEDI}.JEDI_Tasks "
0161             sqlGF += "WHERE userName=:userName AND taskName=:taskName "
0162             varMap = {}
0163             varMap[":userName"] = userName
0164             varMap[":taskName"] = taskName
0165             self.cur.execute(sqlGF + comment, varMap)
0166             resFJ = self.cur.fetchone()
0167             if resFJ is not None:
0168                 (jediTaskID,) = resFJ
0169             else:
0170                 jediTaskID = None
0171             # commit
0172             if not self._commit():
0173                 raise RuntimeError("Commit error")
0174             tmp_log.debug(f"jediTaskID={jediTaskID}")
0175             return jediTaskID
0176         except Exception:
0177             # roll back
0178             self._rollback()
0179             # error
0180             self.dump_error_message(tmp_log)
0181             return None
0182 
0183     # update modificationtime for a jediTaskID to trigger subsequent process
0184     def updateTaskModTimeJEDI(self, jediTaskID, newStatus):
0185         comment = " /* DBProxy.updateTaskErrorDialogJEDI */"
0186         tmp_log = self.create_tagged_logger(comment, f"<jediTaskID={jediTaskID}>")
0187         tmp_log.debug(f"start")
0188         try:
0189             # begin transaction
0190             self.conn.begin()
0191             # update mod time
0192             varMap = {}
0193             varMap[":jediTaskID"] = jediTaskID
0194             if newStatus is not None:
0195                 varMap[":newStatus"] = newStatus
0196             sqlUE = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET "
0197             sqlUE += "modificationTime=CURRENT_DATE-1,"
0198             if newStatus is not None:
0199                 sqlUE += "status=:newStatus,oldStatus=NULL,"
0200             sqlUE = sqlUE[:-1]
0201             sqlUE += " WHERE jediTaskID=:jediTaskID "
0202             self.cur.execute(sqlUE + comment, varMap)
0203             # commit
0204             if not self._commit():
0205                 raise RuntimeError("Commit error")
0206             tmp_log.debug(f"done")
0207             return True
0208         except Exception:
0209             # roll back
0210             self._rollback()
0211             # error
0212             self.dump_error_message(tmp_log)
0213             return False
0214 
0215     def initialize_cpu_time_task(self, job_id, task_id, site_id, files, active):
0216         """
0217         Increases the CPU time of a task
0218         walltime = basewalltime + cpuefficiency*CPUTime*nEvents/Corepower/Corecount
0219 
0220         CPU time: execution time per event
0221         Walltime: time for a job
0222         Corepower: HS06 score
0223         Basewalltime: Setup time, time to download, etc. taken by the pilot
0224         """
0225         comment = " /* DBProxy.initialize_cpu_time_task */"
0226         tmp_log = self.create_tagged_logger(comment, f"PandaID={job_id}; jediTaskID={task_id}; siteID={site_id}")
0227         tmp_log.debug("start")
0228 
0229         # See if there are successful jobs for this task. If yes, skip this method
0230         sql = (
0231             f"SELECT 1 FROM "
0232             f"(SELECT 1 FROM atlas_panda.jobsarchived4 "
0233             f"WHERE jeditaskid = :jedi_task_id AND jobstatus = 'finished' AND transformation NOT LIKE '%build%' AND ROWNUM = 1 "
0234             f"UNION ALL "
0235             f"SELECT 1 FROM atlas_pandaarch.jobsarchived "
0236             f"WHERE jeditaskid = :jedi_task_id AND jobstatus = 'finished' AND transformation NOT LIKE '%build%' AND ROWNUM = 1) "
0237             f"WHERE ROWNUM = 1"
0238         )
0239         var_map = {":jedi_task_id": task_id}
0240         self.cur.execute(sql + comment, var_map)
0241         exists = False
0242         if self.cur.fetchone():
0243             exists = True
0244 
0245         # if we found a successful job, we skip the CPU time increase
0246         if exists:
0247             tmp_log.debug(f"Task {task_id} already has successful jobs, skipping CPU time increase and leaving it up to the scouting mechanism")
0248             return None, None
0249 
0250         # Get the site information from schedconfig
0251         sql = (
0252             "SELECT /* use_json_type */ sc.data.maxtime, sc.data.corepower, NVL(TO_NUMBER(sc.data.corecount), 1) AS corecount "
0253             "FROM ATLAS_PANDA.schedconfig_json sc "
0254             "WHERE sc.panda_queue= :site_id "
0255         )
0256 
0257         var_map = {":site_id": site_id}
0258         self.cur.execute(sql + comment, var_map)
0259         site_parameters = self.cur.fetchone()
0260 
0261         if not site_parameters:
0262             tmp_log.debug(f"No site parameters retrieved for {site_id}")
0263 
0264         (max_time_site, core_power_site, core_count_site) = site_parameters
0265         tmp_log.debug(f"site_id {site_id} has parameters: max_time_site {max_time_site}, core_power_site {core_power_site}, core_count_site {core_count_site}")
0266         if (not max_time_site) or (not core_power_site) or (not core_count_site):
0267             tmp_log.debug(f"One or more site parameters are not defined for {site_id}... nothing to do")
0268             return None, None
0269         else:
0270             (max_time_site, core_power_site, core_count_site) = (
0271                 int(max_time_site),
0272                 float(core_power_site),
0273                 int(core_count_site),
0274             )
0275 
0276         # Get the task information
0277         sql = """
0278         SELECT jt.cputime, jt.walltime, jt.basewalltime, jt.cpuefficiency, jt.cputimeunit
0279         FROM ATLAS_PANDA.jedi_tasks jt
0280         WHERE jt.jeditaskid=:jeditaskid
0281         """
0282         var_map = {":jeditaskid": task_id}
0283         self.cur.execute(sql + comment, var_map)
0284         task_parameters = self.cur.fetchone()
0285 
0286         if not task_parameters:
0287             tmp_log.debug(f"No task parameters retrieved for jeditaskid {task_id}... nothing to do")
0288             return None, None
0289 
0290         (old_cputime, walltime, basewalltime, cpuefficiency, old_cputime_unit) = task_parameters
0291         if not cpuefficiency or not basewalltime:
0292             tmp_log.debug(f"CPU efficiency and/or basewalltime are not defined for task {task_id}... nothing to do")
0293             return None, None
0294 
0295         tmp_log.debug(
0296             f"task {task_id} has parameters: cputime {old_cputime} {old_cputime_unit}, walltime {walltime}, "
0297             f"basewalltime {basewalltime}, cpuefficiency {cpuefficiency}"
0298         )
0299 
0300         old_cputime_normalized = None
0301         if old_cputime is not None:
0302             if old_cputime_unit == "HS06sPerEvent":
0303                 old_cputime_normalized = old_cputime
0304             elif old_cputime_unit == "mHS06sPerEvent":
0305                 old_cputime_normalized = old_cputime / 1000  # convert to HS06sPerEvent
0306 
0307         # Get the file information
0308         input_types = ("input", "pseudo_input", "pp_input", "trn_log", "trn_output")
0309         input_files = list(
0310             filter(
0311                 lambda pandafile: pandafile.type in input_types and re.search("DBRelease", pandafile.lfn) is None,
0312                 files,
0313             )
0314         )
0315         input_fileIDs = [input_file.fileID for input_file in input_files]
0316         input_datasetIDs = [input_file.datasetID for input_file in input_files]
0317 
0318         if input_fileIDs:
0319             var_map = {":taskID": task_id}
0320 
0321             # Bind the files
0322             file_var_names_str, file_var_map = get_sql_IN_bind_variables(input_fileIDs, prefix=":file")
0323             var_map.update(file_var_map)
0324 
0325             # Bind the datasets
0326             ds_var_names_str, ds_var_map = get_sql_IN_bind_variables(input_datasetIDs, prefix=":dataset")
0327             var_map.update(ds_var_map)
0328 
0329             sql_select = f"""
0330             SELECT jdc.fileid, jdc.nevents, jdc.startevent, jdc.endevent
0331             FROM ATLAS_PANDA.JEDI_Dataset_Contents jdc, ATLAS_PANDA.JEDI_Datasets jd
0332             WHERE jdc.JEDITaskID = :taskID
0333             AND jdc.datasetID IN ({ds_var_names_str})
0334             AND jdc.fileID IN ({file_var_names_str})
0335             AND jd.datasetID = jdc.datasetID
0336             AND jd.masterID IS NULL
0337             """
0338             self.cur.execute(sql_select + comment, var_map)
0339 
0340             result_list = self.cur.fetchall()
0341             n_events_total = 0
0342             for fileid, n_events, start_event, end_event in result_list:
0343                 tmp_log.debug(f"event information: fileid {fileid}, n_events {n_events}, start_event {start_event}, end_event {end_event}")
0344 
0345                 start_event = start_event if start_event is not None else 0
0346                 end_event = end_event if end_event is not None else 0
0347                 if end_event and start_event:
0348                     n_events_total += end_event - start_event
0349                 elif n_events:
0350                     n_events_total += n_events
0351 
0352             if not n_events_total:
0353                 tmp_log.debug(f"n_events could not be calculated for job {job_id}... nothing to do")
0354                 return None, None
0355         else:
0356             tmp_log.debug(f"No input files for job {job_id}, so could not update CPU time for task {task_id}")
0357             return None, None
0358 
0359         # Get the corecount from the job spec
0360         var_map = {":task_id": task_id, ":job_id": job_id}
0361         sql_select = f"""
0362         SELECT jact4.corecount 
0363         FROM ATLAS_PANDA.jobsactive4 jact4
0364         WHERE jeditaskid = :task_id AND pandaid = :job_id
0365         UNION
0366         SELECT jarc4.corecount 
0367         FROM ATLAS_PANDA.jobsarchived4 jarc4
0368         WHERE jeditaskid = :task_id AND pandaid = :job_id
0369         UNION
0370         SELECT jarch.corecount 
0371         FROM ATLAS_PANDAARCH.jobsarchived jarch
0372         WHERE jeditaskid = :task_id AND pandaid = :job_id
0373         """
0374         self.cur.execute(sql_select + comment, var_map)
0375 
0376         results = self.cur.fetchone()
0377         try:
0378             core_count_job = results[0]
0379         except (IndexError, TypeError):
0380             core_count_job = None
0381 
0382         if not core_count_job:
0383             core_count_job = 1  # Default to 1 if no core_count is defined in the job spec
0384         tmp_log.debug(f"core_count_job: {core_count_job}")
0385 
0386         # Calculate the new CPU time
0387         try:
0388             new_cputime_unit = "HS06sPerEvent"
0389             new_cputime = ((max_time_site - basewalltime) * core_power_site * core_count_job * 1.1 / (cpuefficiency / 100.0) / n_events_total) * 1.5
0390             new_cputime_normalized = new_cputime
0391 
0392             if new_cputime and new_cputime < 10:
0393                 new_cputime = new_cputime * 1000
0394                 new_cputime_unit = "mHS06sPerEvent"
0395 
0396             # the entry is stored without decimals in the DB
0397             new_cputime = int(new_cputime)
0398 
0399             tmp_log.debug(f"Old CPU time is {old_cputime} {old_cputime_unit} and possible new CPU time is {new_cputime} {new_cputime_unit}")
0400 
0401             if old_cputime_normalized is not None and new_cputime_normalized is not None and old_cputime_normalized > new_cputime_normalized:
0402                 tmp_log.debug(
0403                     f"Skipping CPU time increase since old CPU time {old_cputime_normalized} HS06sPerEvent "
0404                     f"> new CPU time {new_cputime_normalized} HS06sPerEvent"
0405                 )
0406                 return None, None
0407 
0408             if active:  # only run the update if active mode. Otherwise return what would have been done
0409                 sql_update_cputime = """
0410                 UPDATE ATLAS_PANDA.jedi_tasks SET cputime=:new_cputime, cputimeunit=:new_cputime_unit
0411                 WHERE jeditaskid=:jeditaskid
0412                 """
0413                 var_map = {":new_cputime": new_cputime, ":new_cputime_unit": new_cputime_unit, ":jeditaskid": task_id}
0414                 self.conn.begin()
0415                 self.cur.execute(sql_update_cputime + comment, var_map)
0416                 if not self._commit():
0417                     raise RuntimeError("Commit error")
0418 
0419                 tmp_log.debug(f"Successfully updated the task CPU time from {old_cputime} to {new_cputime} {new_cputime_unit}")
0420             else:
0421                 tmp_log.debug("Not updating the task CPU time since active mode is False.")
0422 
0423             return new_cputime, new_cputime_unit
0424 
0425         except (ZeroDivisionError, TypeError) as e:
0426             tmp_log.debug(f"Exception while updating the task CPU time: {e}")
0427             return None, None
0428 
0429     def requestTaskParameterRecalculation(self, taskID):
0430         """
0431         Requests the recalculation of the CPU time of a task:
0432          1. set the walltimeUnit to NULL and the modificationTime to Now
0433          2. AtlasProdWatchDog > JediDBProxy.setScoutJobDataToTasks will pick up tasks with walltimeUnit == NULL
0434             and modificationTime > Now - 24h. This will trigger a recalculation of the task parameters (outDiskCount,
0435             outDiskUnit, outDiskCount, walltime, walltimeUnit, cpuTime, ioIntensity, ioIntensityUnit, ramCount, ramUnit,
0436             workDiskCount, workDiskUnit, workDiskCount)
0437         """
0438         comment = " /* DBProxy.requestTaskParameterRecalculation */"
0439         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={taskID}")
0440         tmp_log.debug("start")
0441 
0442         timeNow = naive_utcnow()
0443         timeLimit = timeNow - datetime.timedelta(minutes=30)
0444 
0445         # update the task if it was not already updated in the last 30 minutes (avoid continuous recalculation)
0446         sql = """
0447                UPDATE ATLAS_PANDA.jedi_tasks
0448                SET walltimeUnit=NULL, modificationTime=:timeNow
0449                WHERE jediTaskId=:taskID AND modificationTime < :timeLimit
0450                """
0451         varMap = {":taskID": taskID, ":timeNow": timeNow, ":timeLimit": timeLimit}
0452         self.conn.begin()
0453         self.cur.execute(sql, varMap)
0454 
0455         rowcount = self.cur.rowcount
0456 
0457         if not self._commit():
0458             raise RuntimeError("Commit error")
0459 
0460         tmp_log.debug("Forced recalculation of CPUTime")
0461         return rowcount
0462 
0463     # get task parameters
0464     def getTaskParamsPanda(self, jediTaskID):
0465         comment = " /* DBProxy.getTaskParamsPanda */"
0466         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
0467         tmp_log.debug("start")
0468         try:
0469             # sql to get task parameters
0470             sqlRR = f"SELECT jedi_task_parameters FROM {panda_config.schemaDEFT}.T_TASK "
0471             sqlRR += "WHERE taskid=:jediTaskID "
0472             varMap = {}
0473             varMap[":jediTaskID"] = jediTaskID
0474             # start transaction
0475             self.conn.begin()
0476             self.cur.execute(sqlRR + comment, varMap)
0477             # commit
0478             if not self._commit():
0479                 raise RuntimeError("Commit error")
0480             # read clob
0481             taskParams = ""
0482             for (clobJobP,) in self.cur:
0483                 if clobJobP is not None:
0484                     try:
0485                         taskParams = clobJobP.read()
0486                     except AttributeError:
0487                         taskParams = str(clobJobP)
0488                 break
0489             tmp_log.debug("done")
0490             return taskParams
0491         except Exception:
0492             # roll back
0493             self._rollback()
0494             # error
0495             self.dump_error_message(tmp_log)
0496             return ""
0497 
0498     # get task attributes
0499     def getTaskAttributesPanda(self, jediTaskID, attrs):
0500         comment = " /* DBProxy.getTaskAttributesPanda */"
0501         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
0502         tmp_log.debug("start")
0503         try:
0504             # sql to get task attributes
0505             sqlRR = "SELECT "
0506             for attr in attrs:
0507                 sqlRR += f"{attr},"
0508             sqlRR = sqlRR[:-1]
0509             sqlRR += f" FROM {panda_config.schemaJEDI}.JEDI_Tasks "
0510             sqlRR += "WHERE jediTaskID=:jediTaskID "
0511             varMap = {}
0512             varMap[":jediTaskID"] = jediTaskID
0513             # start transaction
0514             self.conn.begin()
0515             self.cur.execute(sqlRR + comment, varMap)
0516             resRR = self.cur.fetchone()
0517             # commit
0518             if not self._commit():
0519                 raise RuntimeError("Commit error")
0520             retVal = {}
0521             if resRR is not None:
0522                 for idx, attr in enumerate(attrs):
0523                     retVal[attr] = resRR[idx]
0524             tmp_log.debug(f"done {str(retVal)}")
0525             return retVal
0526         except Exception:
0527             # roll back
0528             self._rollback()
0529             # error
0530             self.dump_error_message(tmp_log)
0531             return {}
0532 
0533     # get task status
0534     def getTaskStatus(self, jediTaskID):
0535         comment = " /* DBProxy.getTaskStatus */"
0536         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
0537         tmp_log.debug("start")
0538         try:
0539             # sql to update input file status
0540             varMap = {}
0541             varMap[":jediTaskID"] = jediTaskID
0542             sql = f"SELECT status FROM {panda_config.schemaJEDI}.JEDI_Tasks "
0543             sql += "WHERE jediTaskID=:jediTaskID "
0544 
0545             # start transaction
0546             self.conn.begin()
0547             self.cur.arraysize = 1000
0548             self.cur.execute(sql + comment, varMap)
0549             res = self.cur.fetchone()
0550             # commit
0551             if not self._commit():
0552                 raise RuntimeError("Commit error")
0553             if res:
0554                 tmp_log.debug(f"task {jediTaskID} has status: {res[0]} ")
0555             else:
0556                 res = []
0557                 tmp_log.debug(f"task {jediTaskID} not found")
0558             return res
0559         except Exception:
0560             # roll back
0561             self._rollback()
0562             # error
0563             self.dump_error_message(tmp_log)
0564             return []
0565 
0566     # get task status and superstatus
0567     def getTaskStatusSuperstatus(self, jediTaskID):
0568         comment = " /* DBProxy.getTaskStatusSuperstatus */"
0569         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
0570         tmp_log.debug("start")
0571         try:
0572             # sql to update input file status
0573             varMap = {}
0574             varMap[":jediTaskID"] = jediTaskID
0575             sql = f"SELECT status,superStatus FROM {panda_config.schemaJEDI}.JEDI_Tasks "
0576             sql += "WHERE jediTaskID=:jediTaskID "
0577             # start transaction
0578             self.conn.begin()
0579             self.cur.arraysize = 1000
0580             self.cur.execute(sql + comment, varMap)
0581             res = self.cur.fetchone()
0582             # commit
0583             if not self._commit():
0584                 raise RuntimeError("Commit error")
0585             if res:
0586                 tmp_log.debug(f"task {jediTaskID} has status={res[0]} superstatus={res[1]}")
0587             else:
0588                 res = []
0589                 tmp_log.debug(f"task {jediTaskID} not found")
0590             return res
0591         except Exception:
0592             # roll back
0593             self._rollback()
0594             # error
0595             self.dump_error_message(tmp_log)
0596             return []
0597 
0598     # reactivate task
0599     def reactivateTask(self, jediTaskID, keep_attempt_nr=False, trigger_job_generation=False):
0600         comment = " /* DBProxy.reactivateTask */"
0601         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
0602         tmp_log.debug("start")
0603         try:
0604             # sql to update task status
0605             sql = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
0606             sql += "SET status=:status "
0607             sql += "WHERE jediTaskID=:jediTaskID "
0608             # sql to get datasetIDs for master
0609             sqlM = f"SELECT datasetID FROM {panda_config.schemaJEDI}.JEDI_Datasets "
0610             sqlM += "WHERE jediTaskID=:jediTaskID AND type IN (:type1,:type2,:type3) "
0611             # sql to increase attempt numbers and update status
0612             sqlAB = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
0613             if keep_attempt_nr:
0614                 sqlAB += "SET status=:status "
0615             else:
0616                 sqlAB += "SET status=:status,attemptNr=0 "
0617             sqlAB += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
0618             # sql to update datasets
0619             sqlD = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
0620             sqlD += "SET status=:status,nFilesUsed=0,nFilesTobeUsed=nFiles,nFilesFinished=0,nFilesFailed=0 "
0621             sqlD += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
0622             # update task status
0623             varMap = {}
0624             varMap[":jediTaskID"] = jediTaskID
0625             varMap[":status"] = "ready"
0626             self.cur.execute(sql + comment, varMap)
0627             res = self.cur.rowcount
0628             # get datasetIDs for master
0629             varMap = {}
0630             varMap[":jediTaskID"] = jediTaskID
0631             varMap[":type1"] = "input"
0632             varMap[":type2"] = "pseudo_input"
0633             varMap[":type3"] = "random_seed"
0634             self.cur.execute(sqlM + comment, varMap)
0635             resM = self.cur.fetchall()
0636             total_nFiles = 0
0637 
0638             for (datasetID,) in resM:
0639                 # increase attempt numbers
0640                 varMap = {}
0641                 varMap[":jediTaskID"] = jediTaskID
0642                 varMap[":datasetID"] = datasetID
0643                 varMap[":status"] = "ready"
0644 
0645                 # update status and attempt number for datasets
0646                 self.cur.execute(sqlAB + comment, varMap)
0647                 nFiles = self.cur.rowcount
0648 
0649                 # update dataset
0650                 if nFiles > 0:
0651                     varMap = {}
0652                     varMap[":jediTaskID"] = jediTaskID
0653                     varMap[":datasetID"] = datasetID
0654                     varMap[":status"] = "ready"
0655                     tmp_log.debug(sqlD + comment + str(varMap))
0656                     self.cur.execute(sqlD + comment, varMap)
0657                     total_nFiles += nFiles
0658 
0659             tmpMsg = f"updated {total_nFiles} inputs and task {jediTaskID} was reactivated "
0660             tmp_log.debug(tmpMsg)
0661             tmp_log.sendMsg(tmpMsg, "jedi", "pandasrv")
0662             retVal = 0, tmpMsg
0663             # commit
0664             if not self._commit():
0665                 raise RuntimeError("Commit error")
0666             # send message
0667             if trigger_job_generation:
0668                 # message
0669                 msg = srv_msg_utils.make_message("generate_job", taskid=jediTaskID)
0670                 mb_proxy = self.get_mb_proxy("panda_jedi")
0671                 if mb_proxy:
0672                     mb_proxy.send(msg)
0673                     tmp_log.debug(f"sent generate_job message: {msg}")
0674                 else:
0675                     tmp_log.debug("message queue is not configured")
0676             tmp_log.debug("done")
0677             return retVal
0678         except Exception:
0679             # roll back
0680             self._rollback()
0681             # error
0682             self.dump_error_message(tmp_log)
0683             return None, "DB error"
0684 
0685     # get event statistics
0686     def getEventStat(self, jediTaskID, PandaID):
0687         comment = " /* DBProxy.getEventStat */"
0688         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} PandaID={PandaID}")
0689         tmp_log.debug("start")
0690         try:
0691             # sql to get event stats
0692             sql = f"SELECT status,COUNT(*) FROM {panda_config.schemaJEDI}.JEDI_Events "
0693             sql += "WHERE jediTaskID=:jediTaskID AND PandaID=:PandaID "
0694             sql += "GROUP BY status "
0695             # start transaction
0696             self.conn.begin()
0697             self.cur.arraysize = 10000
0698             # get stats
0699             varMap = {}
0700             varMap[":jediTaskID"] = jediTaskID
0701             varMap[":PandaID"] = PandaID
0702             self.cur.execute(sql + comment, varMap)
0703             resM = self.cur.fetchall()
0704             retMap = {}
0705             for eventStatus, cnt in resM:
0706                 retMap[eventStatus] = cnt
0707             # commit
0708             if not self._commit():
0709                 raise RuntimeError("Commit error")
0710             tmp_log.debug(f"done with {str(retMap)}")
0711             return retMap
0712         except Exception:
0713             # roll back
0714             self._rollback()
0715             # error
0716             self.dump_error_message(tmp_log)
0717             return {}
0718 
0719     # update error dialog for a jediTaskID
0720     def updateTaskErrorDialogJEDI(self, jediTaskID, msg):
0721         comment = " /* DBProxy.updateTaskErrorDialogJEDI */"
0722         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
0723         tmp_log.debug(f"start")
0724         try:
0725             # begin transaction
0726             self.conn.begin()
0727             # get existing dialog
0728             sqlGF = f"SELECT errorDialog FROM {panda_config.schemaJEDI}.JEDI_Tasks "
0729             sqlGF += "WHERE jediTaskID=:jediTaskID "
0730             varMap = {}
0731             varMap[":jediTaskID"] = jediTaskID
0732             self.cur.execute(sqlGF + comment, varMap)
0733             resFJ = self.cur.fetchone()
0734             if resFJ is not None:
0735                 # update existing dialog
0736                 (errorDialog,) = resFJ
0737                 errorDialog = msg
0738                 varMap = {}
0739                 varMap[":jediTaskID"] = jediTaskID
0740                 varMap[":errorDialog"] = errorDialog
0741                 sqlUE = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET errorDialog=:errorDialog,modificationTime=CURRENT_DATE "
0742                 sqlUE += "WHERE jediTaskID=:jediTaskID "
0743                 self.cur.execute(sqlUE + comment, varMap)
0744             # commit
0745             if not self._commit():
0746                 raise RuntimeError("Commit error")
0747             tmp_log.debug(f"done")
0748             return True
0749         except Exception:
0750             # roll back
0751             self._rollback()
0752             # error
0753             self.dump_error_message(tmp_log)
0754             return False
0755 
0756     # increase attempt number for unprocessed files
0757     def increaseAttemptNrPanda(self, jediTaskID, increasedNr):
0758         comment = " /* DBProxy.increaseAttemptNrPanda */"
0759         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
0760         tmp_log.debug(f"increasedNr={increasedNr}")
0761         try:
0762             # sql to check task status
0763             sqlT = f"SELECT status,oldStatus FROM {panda_config.schemaJEDI}.JEDI_Tasks "
0764             sqlT += "WHERE jediTaskID=:jediTaskID FOR UPDATE "
0765             # start transaction
0766             self.conn.begin()
0767             # select
0768             self.cur.arraysize = 10
0769             varMap = {}
0770             varMap[":jediTaskID"] = jediTaskID
0771             # get task status
0772             self.cur.execute(sqlT + comment, varMap)
0773             resT = self.cur.fetchone()
0774             if resT is None:
0775                 tmpMsg = f"jediTaskID={jediTaskID} not found"
0776                 tmp_log.debug(tmpMsg)
0777                 retVal = 1, tmpMsg
0778             else:
0779                 taskStatus, oldStatus = resT
0780                 # check task status
0781                 okStatusList = ["running", "scouting", "ready"]
0782                 if taskStatus not in okStatusList and oldStatus not in okStatusList:
0783                     tmpMsg = f"command rejected since status={taskStatus} or oldStatus={oldStatus} not in {str(okStatusList)}"
0784                     tmp_log.debug(tmpMsg)
0785                     retVal = 2, tmpMsg
0786                 else:
0787                     # sql to get datasetIDs for master
0788                     sqlM = f"SELECT datasetID FROM {panda_config.schemaJEDI}.JEDI_Datasets "
0789                     sqlM += "WHERE jediTaskID=:jediTaskID AND type IN (:type1,:type2) "
0790                     # sql to increase attempt numbers
0791                     sqlAB = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
0792                     sqlAB += "SET maxAttempt=CASE WHEN maxAttempt > attemptNr THEN maxAttempt+:increasedNr ELSE attemptNr+:increasedNr END "
0793                     sqlAB += ",proc_status=CASE WHEN maxAttempt > attemptNr AND maxFailure > failedAttempt THEN proc_status ELSE :proc_status END "
0794                     sqlAB += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND status=:status AND keepTrack=:keepTrack "
0795                     # sql to increase attempt numbers and failure counts
0796                     sqlAF = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
0797                     sqlAF += "SET maxAttempt=CASE WHEN maxAttempt > attemptNr THEN maxAttempt+:increasedNr ELSE attemptNr+:increasedNr END "
0798                     sqlAF += ",maxFailure=maxFailure+:increasedNr "
0799                     sqlAF += ",proc_status=CASE WHEN maxAttempt > attemptNr AND maxFailure > failedAttempt THEN proc_status ELSE :proc_status END "
0800                     sqlAF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND status=:status AND keepTrack=:keepTrack "
0801                     # sql to update datasets
0802                     sqlD = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
0803                     sqlD += "SET nFilesUsed=nFilesUsed-:nFilesReset,nFilesFailed=nFilesFailed-:nFilesReset "
0804                     sqlD += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
0805                     # get datasetIDs for master
0806                     varMap = {}
0807                     varMap[":jediTaskID"] = jediTaskID
0808                     varMap[":type1"] = "input"
0809                     varMap[":type2"] = "pseudo_input"
0810                     self.cur.execute(sqlM + comment, varMap)
0811                     resM = self.cur.fetchall()
0812                     total_nFilesIncreased = 0
0813                     total_nFilesReset = 0
0814                     for (datasetID,) in resM:
0815                         # increase attempt numbers
0816                         varMap = {}
0817                         varMap[":jediTaskID"] = jediTaskID
0818                         varMap[":datasetID"] = datasetID
0819                         varMap[":status"] = "ready"
0820                         varMap[":proc_status"] = "ready"
0821                         varMap[":keepTrack"] = 1
0822                         varMap[":increasedNr"] = increasedNr
0823                         nFilesIncreased = 0
0824                         nFilesReset = 0
0825                         # still active and maxFailure is undefined
0826                         sqlA = sqlAB + "AND maxAttempt>attemptNr AND maxFailure IS NULL "
0827                         self.cur.execute(sqlA + comment, varMap)
0828                         nRow = self.cur.rowcount
0829                         nFilesIncreased += nRow
0830                         # still active and maxFailure is defined
0831                         sqlA = sqlAF + "AND maxAttempt>attemptNr AND (maxFailure IS NOT NULL AND maxFailure>failedAttempt) "
0832                         self.cur.execute(sqlA + comment, varMap)
0833                         nRow = self.cur.rowcount
0834                         nFilesIncreased += nRow
0835                         # already done and maxFailure is undefined
0836                         sqlA = sqlAB + "AND maxAttempt<=attemptNr AND maxFailure IS NULL "
0837                         self.cur.execute(sqlA + comment, varMap)
0838                         nRow = self.cur.rowcount
0839                         nFilesReset += nRow
0840                         nFilesIncreased += nRow
0841                         # already done and maxFailure is defined
0842                         sqlA = sqlAF + "AND (maxAttempt<=attemptNr OR (maxFailure IS NOT NULL AND maxFailure=failedAttempt)) "
0843                         self.cur.execute(sqlA + comment, varMap)
0844                         nRow = self.cur.rowcount
0845                         nFilesReset += nRow
0846                         nFilesIncreased += nRow
0847                         # update dataset
0848                         if nFilesReset > 0:
0849                             varMap = {}
0850                             varMap[":jediTaskID"] = jediTaskID
0851                             varMap[":datasetID"] = datasetID
0852                             varMap[":nFilesReset"] = nFilesReset
0853                             tmp_log.debug(sqlD + comment + str(varMap))
0854                             self.cur.execute(sqlD + comment, varMap)
0855                         total_nFilesIncreased += nFilesIncreased
0856                         total_nFilesReset += nFilesReset
0857                     tmpMsg = f"increased attemptNr for {total_nFilesIncreased} inputs ({total_nFilesReset} reactivated)"
0858                     tmp_log.debug(tmpMsg)
0859                     tmp_log.sendMsg(tmpMsg, "jedi", "pandasrv")
0860                     retVal = 0, tmpMsg
0861             # commit
0862             if not self._commit():
0863                 raise RuntimeError("Commit error")
0864             tmp_log.debug("done")
0865             return retVal
0866         except Exception:
0867             # roll back
0868             self._rollback()
0869             # error
0870             self.dump_error_message(tmp_log)
0871             return None, "DB error"
0872 
0873     # insert sandbox file info
0874     def insertSandboxFileInfo(self, userName, hostName, fileName, fileSize, checkSum):
0875         comment = " /* DBProxy.insertSandboxFileInfo */"
0876         tmp_log = self.create_tagged_logger(comment, f"userName={userName}")
0877         sqlC = "SELECT userName,fileSize,checkSum FROM ATLAS_PANDAMETA.userCacheUsage "
0878         sqlC += "WHERE hostName=:hostName AND fileName=:fileName FOR UPDATE"
0879 
0880         sql = "INSERT INTO ATLAS_PANDAMETA.userCacheUsage "
0881         sql += "(userName,hostName,fileName,fileSize,checkSum,creationTime,modificationTime) "
0882         sql += "VALUES (:userName,:hostName,:fileName,:fileSize,:checkSum,CURRENT_DATE,CURRENT_DATE) "
0883 
0884         try:
0885             # begin transaction
0886             self.conn.begin()
0887             # check if it already exists
0888             varMap = {}
0889             varMap[":hostName"] = hostName
0890             varMap[":fileName"] = fileName
0891             self.cur.arraysize = 10
0892             self.cur.execute(sqlC + comment, varMap)
0893             res = self.cur.fetchall()
0894             if len(res) != 0:
0895                 tmp_log.debug(f"skip {hostName} {fileName} since already exists")
0896                 # commit
0897                 if not self._commit():
0898                     raise RuntimeError("Commit error")
0899                 return "WARNING: file exist"
0900             # insert
0901             varMap = {}
0902             varMap[":userName"] = userName
0903             varMap[":hostName"] = hostName
0904             varMap[":fileName"] = fileName
0905             varMap[":fileSize"] = fileSize
0906             varMap[":checkSum"] = checkSum
0907             self.cur.execute(sql + comment, varMap)
0908             # commit
0909             if not self._commit():
0910                 raise RuntimeError("Commit error")
0911             return "OK"
0912         except Exception:
0913             # roll back
0914             self._rollback()
0915             # error
0916             self.dump_error_message(tmp_log)
0917             return "ERROR: DB failure"
0918 
0919     # get and lock sandbox files
0920     def getLockSandboxFiles(self, time_limit, n_files):
0921         comment = " /* DBProxy.getLockSandboxFiles */"
0922         tmp_log = self.create_tagged_logger(comment)
0923         sqlC = (
0924             "SELECT * FROM ("
0925             "SELECT userName,hostName,fileName,creationTime,modificationTime FROM ATLAS_PANDAMETA.userCacheUsage "
0926             "WHERE modificationTime<:timeLimit AND (fileName like 'sources%' OR fileName like 'jobO%') ) "
0927             "WHERE rownum<:nRows "
0928         )
0929         sqlU = "UPDATE ATLAS_PANDAMETA.userCacheUsage SET modificationTime=CURRENT_DATE " "WHERE userName=:userName AND fileName=:fileName "
0930         try:
0931             tmp_log.debug("start")
0932             # begin transaction
0933             self.conn.begin()
0934             # check if it already exists
0935             varMap = {}
0936             varMap[":timeLimit"] = time_limit
0937             varMap[":nRows"] = n_files
0938             self.cur.execute(sqlC + comment, varMap)
0939             res = self.cur.fetchall()
0940             retList = []
0941             for userName, hostName, fileName, creationTime, modificationTime in res:
0942                 retList.append((userName, hostName, fileName, creationTime, modificationTime))
0943                 # lock
0944                 varMap = dict()
0945                 varMap[":userName"] = userName
0946                 varMap[":fileName"] = fileName
0947                 self.cur.execute(sqlU + comment, varMap)
0948             # commit
0949             if not self._commit():
0950                 raise RuntimeError("Commit error")
0951             tmp_log.debug(f"locked {len(retList)} files")
0952             return retList
0953         except Exception:
0954             # roll back
0955             self._rollback()
0956             # error
0957             self.dump_error_message(tmp_log)
0958             return None
0959 
0960     # check duplicated sandbox file
0961     def checkSandboxFile(self, dn, fileSize, checkSum):
0962         comment = " /* DBProxy.checkSandboxFile */"
0963         tmp_log = self.create_tagged_logger(comment)
0964         tmp_log.debug(f"dn={dn} size={fileSize} checksum={checkSum}")
0965         sqlC = "SELECT hostName,fileName FROM ATLAS_PANDAMETA.userCacheUsage "
0966         sqlC += "WHERE userName=:userName AND fileSize=:fileSize AND checkSum=:checkSum "
0967         sqlC += "AND hostName<>:ngHostName AND creationTime>CURRENT_DATE-3 "
0968         sqlC += "AND creationTime>CURRENT_DATE-3 "
0969         try:
0970             retStr = "NOTFOUND"
0971             # get compact DN
0972             compactDN = CoreUtils.clean_user_id(dn)
0973             if compactDN in ["", "NULL", None]:
0974                 compactDN = dn
0975             # begin transaction
0976             self.conn.begin()
0977             # check if it already exists
0978             varMap = {}
0979             varMap[":userName"] = compactDN
0980             varMap[":fileSize"] = fileSize
0981             varMap[":checkSum"] = str(checkSum)
0982             varMap[":ngHostName"] = "localhost.localdomain"
0983             self.cur.arraysize = 10
0984             self.cur.execute(sqlC + comment, varMap)
0985             res = self.cur.fetchall()
0986             # commit
0987             if not self._commit():
0988                 raise RuntimeError("Commit error")
0989             if len(res) != 0:
0990                 hostName, fileName = res[0]
0991                 retStr = f"FOUND:{hostName}:{fileName}"
0992             tmp_log.debug(f"{retStr}")
0993             return retStr
0994         except Exception:
0995             # roll back
0996             self._rollback()
0997             # error
0998             self.dump_error_message(tmp_log)
0999             return "ERROR: DB failure"
1000 
1001     # insert dataset
1002     def insertDataset(self, dataset, tablename="ATLAS_PANDA.Datasets"):
1003         comment = " /* DBProxy.insertDataset */"
1004         tmp_log = self.create_tagged_logger(comment, f"dataset={dataset.name}")
1005         tmp_log.debug("start")
1006         sql0 = f"SELECT COUNT(*) FROM {tablename} WHERE vuid=:vuid "
1007         sql1 = f"INSERT INTO {tablename} "
1008         sql1 += f"({DatasetSpec.columnNames()}) "
1009         sql1 += DatasetSpec.bindValuesExpression()
1010         sql2 = f"SELECT name FROM {tablename} WHERE vuid=:vuid "
1011         # time information
1012         dataset.creationdate = naive_utcnow()
1013         dataset.modificationdate = dataset.creationdate
1014         try:
1015             # subtype
1016             if dataset.subType in ["", "NULL", None]:
1017                 # define using name
1018                 if re.search("_dis\d+$", dataset.name) is not None:
1019                     dataset.subType = "dis"
1020                 elif re.search("_sub\d+$", dataset.name) is not None:
1021                     dataset.subType = "sub"
1022                 else:
1023                     dataset.subType = "top"
1024             # begin transaction
1025             self.conn.begin()
1026             # check if it already exists
1027             varMap = {}
1028             varMap[":vuid"] = dataset.vuid
1029             self.cur.execute(sql0 + comment, varMap)
1030             (nDS,) = self.cur.fetchone()
1031             tmp_log.debug(f"nDS={nDS} with {dataset.vuid}")
1032             if nDS == 0:
1033                 # insert
1034                 tmp_log.debug(sql1 + comment + str(dataset.valuesMap()))
1035                 self.cur.execute(sql1 + comment, dataset.valuesMap())
1036                 # check name in DB
1037                 varMap = {}
1038                 varMap[":vuid"] = dataset.vuid
1039                 self.cur.execute(sql2 + comment, varMap)
1040                 (nameInDB,) = self.cur.fetchone()
1041                 tmp_log.debug(f"inDB -> {nameInDB} {dataset.name == nameInDB}")
1042             # commit
1043             if not self._commit():
1044                 raise RuntimeError("Commit error")
1045             return True
1046         except Exception:
1047             # roll back
1048             self._rollback()
1049             # error
1050             self.dump_error_message(tmp_log)
1051             return False
1052 
1053     # get and lock dataset with a query
1054     def getLockDatasets(self, sqlQuery, varMapGet, modTimeOffset="", getVersion=False):
1055         comment = " /* DBProxy.getLockDatasets */"
1056         tmp_log = self.create_tagged_logger(comment)
1057         tmp_log.debug(f"{sqlQuery},{str(varMapGet)},{modTimeOffset}")
1058         sqlGet = (
1059             "SELECT /*+ INDEX_RS_ASC(tab(STATUS,TYPE,MODIFICATIONDATE)) */ vuid,name,modificationdate,version,transferStatus FROM ATLAS_PANDA.Datasets tab WHERE "
1060             + sqlQuery
1061         )
1062         sqlLock = "UPDATE ATLAS_PANDA.Datasets SET modificationdate=CURRENT_DATE"
1063         if modTimeOffset != "":
1064             sqlLock += f"+{modTimeOffset}"
1065         sqlLock += ",transferStatus=MOD(transferStatus+1,10)"
1066         if getVersion:
1067             sqlLock += ",version=:version"
1068         sqlLock += " WHERE vuid=:vuid AND transferStatus=:transferStatus"
1069         retList = []
1070         try:
1071             # begin transaction
1072             self.conn.begin()
1073             # get datasets
1074             self.cur.arraysize = 1000000
1075             self.cur.execute(sqlGet + comment, varMapGet)
1076             res = self.cur.fetchall()
1077             # commit
1078             if not self._commit():
1079                 raise RuntimeError("Commit error")
1080             # loop over all datasets
1081             if res is not None and len(res) != 0:
1082                 for vuid, name, modificationdate, version, transferStatus in res:
1083                     # lock
1084                     varMapLock = {}
1085                     varMapLock[":vuid"] = vuid
1086                     varMapLock[":transferStatus"] = transferStatus
1087                     if getVersion:
1088                         try:
1089                             varMapLock[":version"] = str(int(version) + 1)
1090                         except Exception:
1091                             varMapLock[":version"] = str(1)
1092                     # begin transaction
1093                     self.conn.begin()
1094                     # update for lock
1095                     self.cur.execute(sqlLock + comment, varMapLock)
1096                     retU = self.cur.rowcount
1097                     # commit
1098                     if not self._commit():
1099                         raise RuntimeError("Commit error")
1100                     if retU > 0:
1101                         # append
1102                         if not getVersion:
1103                             retList.append((vuid, name, modificationdate))
1104                         else:
1105                             retList.append((vuid, name, modificationdate, version))
1106             # commit
1107             if not self._commit():
1108                 raise RuntimeError("Commit error")
1109             # return
1110             return retList
1111         except Exception:
1112             # roll back
1113             self._rollback()
1114             # error
1115             self.dump_error_message(tmp_log)
1116             return []
1117 
1118     # query dataset with map
1119     def queryDatasetWithMap(self, map):
1120         comment = " /* DBProxy.queryDatasetWithMap */"
1121         tmp_log = self.create_tagged_logger(comment)
1122         tmp_log.debug(f"{map}")
1123         if "name" in map:
1124             sql1 = """SELECT /*+ BEGIN_OUTLINE_DATA """
1125             sql1 += """INDEX_RS_ASC(@"SEL$1" "TAB"@"SEL$1" ("DATASETS"."NAME")) """
1126             sql1 += """OUTLINE_LEAF(@"SEL$1") ALL_ROWS """
1127             sql1 += """IGNORE_OPTIM_EMBEDDED_HINTS """
1128             sql1 += """END_OUTLINE_DATA */ """
1129             sql1 += f"{DatasetSpec.columnNames()} FROM ATLAS_PANDA.Datasets tab"
1130         else:
1131             sql1 = f"SELECT {DatasetSpec.columnNames()} FROM ATLAS_PANDA.Datasets"
1132         varMap = {}
1133         for key in map:
1134             if len(varMap) == 0:
1135                 sql1 += f" WHERE {key}=:{key}"
1136             else:
1137                 sql1 += f" AND {key}=:{key}"
1138             varMap[f":{key}"] = map[key]
1139         try:
1140             # start transaction
1141             self.conn.begin()
1142             # select
1143             self.cur.arraysize = 100
1144             tmp_log.debug(sql1 + comment + str(varMap))
1145             self.cur.execute(sql1 + comment, varMap)
1146             res = self.cur.fetchall()
1147             # commit
1148             if not self._commit():
1149                 raise RuntimeError("Commit error")
1150             # instantiate Dataset
1151             if res is not None and len(res) != 0:
1152                 dataset = DatasetSpec()
1153                 dataset.pack(res[0])
1154                 return dataset
1155             tmp_log.error(f"dataset not found")
1156             return None
1157         except Exception:
1158             # roll back
1159             self._rollback()
1160             self.dump_error_message(tmp_log)
1161             return None
1162 
1163     # update dataset
1164     def updateDataset(self, datasets, withLock, withCriteria, criteriaMap):
1165         comment = " /* DBProxy.updateDataset */"
1166         tmp_log = self.create_tagged_logger(comment)
1167         tmp_log.debug("start")
1168         sql1 = f"UPDATE ATLAS_PANDA.Datasets SET {DatasetSpec.bindUpdateExpression()} "
1169         sql1 += "WHERE vuid=:vuid"
1170         if withCriteria != "":
1171             sql1 += f" AND {withCriteria}"
1172         retList = []
1173         try:
1174             # start transaction
1175             self.conn.begin()
1176             for dataset in datasets:
1177                 tmp_log.debug(f"dataset={dataset.name} status={dataset.status})")
1178                 # time information
1179                 dataset.modificationdate = naive_utcnow()
1180                 # update
1181                 varMap = dataset.valuesMap()
1182                 varMap[":vuid"] = dataset.vuid
1183                 for cKey in criteriaMap:
1184                     varMap[cKey] = criteriaMap[cKey]
1185                 tmp_log.debug(sql1 + comment + str(varMap))
1186                 self.cur.execute(sql1 + comment, varMap)
1187                 retU = self.cur.rowcount
1188                 if retU != 0 and retU != 1:
1189                     raise RuntimeError(f"Invalid rerun {retU}")
1190                 retList.append(retU)
1191             # commit
1192             if not self._commit():
1193                 raise RuntimeError("Commit error")
1194             tmp_log.debug(f"ret:{retList}")
1195             return retList
1196         except Exception:
1197             # roll back
1198             self._rollback()
1199             self.dump_error_message(tmp_log)
1200             return []
1201 
1202     # trigger cleanup of internal datasets used by a task
1203     def trigger_cleanup_internal_datasets(self, task_id: int) -> bool:
1204         """
1205         Set deleting flag to dispatch datasets used by a task, which triggers deletion in datasetManager
1206         """
1207         comment = " /* DBProxy.trigger_cleanup_internal_datasets */"
1208         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={task_id}")
1209         tmp_log.debug("start")
1210         sql1 = (
1211             f"UPDATE {panda_config.schemaPANDA}.Datasets SET status=:newStatus,modificationdate=CURRENT_DATE "
1212             "WHERE type=:type AND MoverID=:taskID AND status IN (:status_d,:status_c) "
1213         )
1214         try:
1215             # start transaction
1216             self.conn.begin()
1217             # update
1218             var_map = {
1219                 ":type": "dispatch",
1220                 ":newStatus": "deleting",
1221                 ":taskID": task_id,
1222                 ":status_d": "defined",
1223                 ":status_c": "completed",
1224             }
1225             self.cur.execute(sql1 + comment, var_map)
1226             ret_u = self.cur.rowcount
1227             # commit
1228             if not self._commit():
1229                 raise RuntimeError("Commit error")
1230             tmp_log.debug(f"set flag to {ret_u} dispatch datasets")
1231             return True
1232         except Exception:
1233             # roll back
1234             self._rollback()
1235             self.dump_error_message(tmp_log)
1236             return False
1237 
1238     # get serial number for dataset, insert dummy datasets to increment SN
1239     def getSerialNumber(self, datasetname, definedFreshFlag=None):
1240         comment = " /* DBProxy.getSerialNumber */"
1241         tmp_log = self.create_tagged_logger(comment, f"datasetname={datasetname}")
1242         try:
1243             tmp_log.debug(f"fresh={definedFreshFlag}")
1244             if isinstance(datasetname, str):
1245                 datasetname = datasetname.encode("ascii", "ignore")
1246                 tmp_log.debug(f"converted unicode for {datasetname}")
1247             # start transaction
1248             self.conn.begin()
1249             # check freshness
1250             if definedFreshFlag is None:
1251                 # select
1252                 varMap = {}
1253                 varMap[":name"] = datasetname
1254                 varMap[":type"] = "output"
1255                 sql = "SELECT /*+ INDEX_RS_ASC(TAB (DATASETS.NAME)) */ COUNT(*) FROM ATLAS_PANDA.Datasets tab WHERE type=:type AND name=:name"
1256                 self.cur.arraysize = 100
1257                 self.cur.execute(sql + comment, varMap)
1258                 res = self.cur.fetchone()
1259                 # fresh dataset or not
1260                 if res is not None and len(res) != 0 and res[0] > 0:
1261                     freshFlag = False
1262                 else:
1263                     freshFlag = True
1264             else:
1265                 # use predefined flag
1266                 freshFlag = definedFreshFlag
1267             # get serial number
1268             if self.backend == "oracle":
1269                 sql = "SELECT ATLAS_PANDA.SUBCOUNTER_SUBID_SEQ.nextval FROM dual"
1270                 self.cur.arraysize = 100
1271                 self.cur.execute(sql + comment, {})
1272                 (sn,) = self.cur.fetchone()
1273             elif self.backend == "postgres":
1274                 sql = f"SELECT {panda_config.schemaPANDA}.SUBCOUNTER_SUBID_SEQ.nextval"
1275                 self.cur.arraysize = 100
1276                 self.cur.execute(sql + comment, {})
1277                 (sn,) = self.cur.fetchone()
1278             else:
1279                 # panda_config.backend == 'mysql'
1280                 # fake sequence
1281                 sql = " INSERT INTO ATLAS_PANDA.SUBCOUNTER_SUBID_SEQ (col) VALUES (NULL) "
1282                 self.cur.arraysize = 100
1283                 self.cur.execute(sql + comment, {})
1284                 sql2 = """ SELECT LAST_INSERT_ID() """
1285                 self.cur.execute(sql2 + comment, {})
1286                 (sn,) = self.cur.fetchone()
1287             # commit
1288             if not self._commit():
1289                 raise RuntimeError("Commit error")
1290             tmp_log.debug(f"SN={sn} {freshFlag}")
1291             return (sn, freshFlag)
1292         except Exception:
1293             # roll back
1294             self._rollback()
1295             # error
1296             self.dump_error_message(tmp_log)
1297             return (-1, False)
1298 
1299     # count the number of files with map
1300     def countFilesWithMap(self, map):
1301         comment = " /* DBProxy.countFilesWithMap */"
1302         tmp_log = self.create_tagged_logger(comment)
1303         sql1 = "SELECT /*+ index(tab FILESTABLE4_DESTDBLOCK_IDX) */ COUNT(*) FROM ATLAS_PANDA.filesTable4 tab"
1304         varMap = {}
1305         for key in map:
1306             if len(varMap) == 0:
1307                 sql1 += f" WHERE {key}=:{key}"
1308             else:
1309                 sql1 += f" AND {key}=:{key}"
1310             varMap[f":{key}"] = map[key]
1311         nTry = 3
1312         for iTry in range(nTry):
1313             try:
1314                 # start transaction
1315                 self.conn.begin()
1316                 # select
1317                 tmp_log.debug(f"{sql1} {str(map)}")
1318                 self.cur.arraysize = 10
1319                 retS = self.cur.execute(sql1 + comment, varMap)
1320                 res = self.cur.fetchone()
1321                 tmp_log.debug(f"{retS} {str(res)}")
1322                 # commit
1323                 if not self._commit():
1324                     raise RuntimeError("Commit error")
1325                 nFiles = 0
1326                 if res is not None:
1327                     nFiles = res[0]
1328                 return nFiles
1329             except Exception:
1330                 # roll back
1331                 self._rollback()
1332                 if iTry + 1 < nTry:
1333                     tmp_log.debug(f"retry : {iTry}")
1334                     time.sleep(random.randint(10, 20))
1335                     continue
1336                 self.dump_error_message(tmp_log)
1337                 return -1
1338 
1339     # update input files and return corresponding PandaIDs
1340     def updateInFilesReturnPandaIDs(self, dataset, status, fileLFN=""):
1341         comment = " /* DBProxy.updateInFilesReturnPandaIDs */"
1342         tmp_log = self.create_tagged_logger(comment, f"dataset={dataset}")
1343         tmp_log.debug(f"{fileLFN})")
1344         sql0 = "SELECT /*+ index(tab FILESTABLE4_DISPDBLOCK_IDX) */ row_ID,PandaID FROM ATLAS_PANDA.filesTable4 tab WHERE status<>:status AND dispatchDBlock=:dispatchDBlock"
1345         sql1 = "UPDATE /*+ index(tab FILESTABLE4_DISPDBLOCK_IDX) */ ATLAS_PANDA.filesTable4 tab SET status=:status WHERE status<>:status AND dispatchDBlock=:dispatchDBlock"
1346         varMap = {}
1347         varMap[":status"] = status
1348         varMap[":dispatchDBlock"] = dataset
1349         if fileLFN != "":
1350             sql0 += " AND lfn=:lfn"
1351             sql1 += " AND lfn=:lfn"
1352             varMap[":lfn"] = fileLFN
1353         for iTry in range(self.nTry):
1354             try:
1355                 # start transaction
1356                 self.conn.begin()
1357                 # select
1358                 self.cur.arraysize = 10000
1359                 retS = self.cur.execute(sql0 + comment, varMap)
1360                 resS = self.cur.fetchall()
1361                 # update
1362                 retU = self.cur.execute(sql1 + comment, varMap)
1363                 # commit
1364                 if not self._commit():
1365                     raise RuntimeError("Commit error")
1366                 # collect PandaIDs
1367                 retList = []
1368                 for tmpRowID, tmpPandaID in resS:
1369                     # append
1370                     if tmpPandaID not in retList:
1371                         retList.append(tmpPandaID)
1372                 # return
1373                 tmp_log.debug(f"ret={str(retList)}")
1374                 return retList
1375             except Exception:
1376                 # roll back
1377                 self._rollback()
1378                 # error report
1379                 if iTry + 1 < self.nTry:
1380                     tmp_log.debug(f"retry : {iTry}")
1381                     time.sleep(random.randint(10, 20))
1382                     continue
1383                 self.dump_error_message(tmp_log)
1384         return []
1385 
1386     # update input files for jobs at certain sites and return corresponding PandaIDs
1387     def update_input_files_at_sites_and_get_panda_ids(self, filename: str, sites: list) -> list:
1388         """
1389         Update input files with a LFN for jobs at certain sites and return corresponding PandaIDs
1390 
1391         :param filename: LFN of the input file to be updated
1392         :param sites: List of site names where the jobs are running
1393 
1394         :return: List of PandaIDs of the jobs whose input files were updated
1395         """
1396         comment = " /* DBProxy.update_input_files_at_sites_and_get_panda_ids */"
1397         tmp_log = self.create_tagged_logger(comment, f"lfn={filename}")
1398         tmp_log.debug("start at sites: " + ",".join(sites))
1399         sql_to_get_ids = (
1400             "SELECT tabJ.PandaID FROM ATLAS_PANDA.jobsDefined4 tabJ, ATLAS_PANDA.filesTable4 tabF WHERE tabF.PandaID=tabJ.PandaID AND tabF.lfn=:lfn "
1401             "AND tabF.type='input' AND tabF.status IN ('defined', 'unknown', 'pending') AND tabJ.computingSite IN ("
1402         )
1403         var_map = {":lfn": filename}
1404         for idx, site in enumerate(sites):
1405             tmp_key = f":site_{idx}"
1406             var_map[tmp_key] = site
1407             sql_to_get_ids += f"{tmp_key},"
1408         sql_to_get_ids = sql_to_get_ids[:-1] + ") "
1409         return_list = []
1410         try:
1411             # start transaction
1412             self.conn.begin()
1413             # get IDs
1414             self.cur.arraysize = 10000
1415             self.cur.execute(sql_to_get_ids + comment, var_map)
1416             res = self.cur.fetchall()
1417             # commit
1418             if not self._commit():
1419                 raise RuntimeError("Commit error")
1420             # sql to update files
1421             sql_to_update_files = (
1422                 "UPDATE ATLAS_PANDA.filesTable4 tab SET status='ready' WHERE PandaID=:PandaID AND lfn=:lfn AND type='input' AND status<>'ready' "
1423             )
1424             for (tmp_pandaID,) in res:
1425                 var_map = {":PandaID": tmp_pandaID, ":lfn": filename}
1426                 # start transaction
1427                 self.conn.begin()
1428                 self.cur.execute(sql_to_update_files + comment, var_map)
1429                 if self.cur.rowcount > 0:
1430                     tmp_log.debug(f"updated input file status to 'ready' for PandaID={tmp_pandaID}")
1431                     return_list.append(tmp_pandaID)
1432                 # commit
1433                 if not self._commit():
1434                     raise RuntimeError("Commit error")
1435             # return
1436             tmp_log.debug(f"done")
1437             return return_list
1438         except Exception:
1439             # roll back
1440             self._rollback()
1441             self.dump_error_message(tmp_log)
1442         return []
1443 
1444     # update output files and return corresponding PandaIDs
1445     def updateOutFilesReturnPandaIDs(self, dataset, fileLFN=""):
1446         comment = " /* DBProxy.updateOutFilesReturnPandaIDs */"
1447         tmp_log = self.create_tagged_logger(comment, f"dataset={dataset}")
1448         tmp_log.debug(f"{fileLFN}")
1449         sql0 = "SELECT /*+ index(tab FILESTABLE4_DESTDBLOCK_IDX) */ row_ID,PandaID FROM ATLAS_PANDA.filesTable4 tab WHERE destinationDBlock=:destinationDBlock AND status=:status"
1450         sql1 = "UPDATE /*+ index(tab FILESTABLE4_DESTDBLOCK_IDX) */ ATLAS_PANDA.filesTable4 tab SET status='ready' WHERE destinationDBlock=:destinationDBlock AND status=:status"
1451         varMap = {}
1452         varMap[":status"] = "transferring"
1453         varMap[":destinationDBlock"] = dataset
1454         if fileLFN != "":
1455             sql0 += " AND lfn=:lfn"
1456             sql1 += " AND lfn=:lfn"
1457             varMap[":lfn"] = fileLFN
1458         for iTry in range(self.nTry):
1459             try:
1460                 # start transaction
1461                 self.conn.begin()
1462                 # select
1463                 self.cur.arraysize = 10000
1464                 retS = self.cur.execute(sql0 + comment, varMap)
1465                 resS = self.cur.fetchall()
1466                 # update
1467                 retList = []
1468                 retU = self.cur.execute(sql1 + comment, varMap)
1469                 # commit
1470                 if not self._commit():
1471                     raise RuntimeError("Commit error")
1472                 # collect PandaIDs
1473                 retList = []
1474                 for tmpRowID, tmpPandaID in resS:
1475                     # append
1476                     if tmpPandaID not in retList:
1477                         retList.append(tmpPandaID)
1478                 # return
1479                 tmp_log.debug(f"ret={str(retList)}")
1480                 return retList
1481             except Exception:
1482                 # roll back
1483                 self._rollback()
1484                 # error report
1485                 if iTry + 1 < self.nTry:
1486                     tmp_log.debug(f"retry : {iTry}")
1487                     time.sleep(random.randint(10, 20))
1488                     continue
1489                 self.dump_error_message(tmp_log)
1490         return []
1491 
1492     # get _dis datasets associated to _sub
1493     def getAssociatedDisDatasets(self, subDsName):
1494         comment = " /* DBProxy.getAssociatedDisDatasets */"
1495         tmp_log = self.create_tagged_logger(comment, f"subDsName={subDsName}")
1496         tmp_log.debug(f"start")
1497         sqlF = (
1498             "SELECT /*+ index(tab FILESTABLE4_DESTDBLOCK_IDX) */ distinct PandaID FROM ATLAS_PANDA.filesTable4 tab WHERE destinationDBlock=:destinationDBlock"
1499         )
1500         sqlJ = "SELECT distinct dispatchDBlock FROM ATLAS_PANDA.filesTable4 WHERE PandaID=:PandaID AND type=:type"
1501         try:
1502             # start transaction
1503             self.conn.begin()
1504             # get PandaIDs
1505             varMap = {}
1506             varMap[":destinationDBlock"] = subDsName
1507             self.cur.arraysize = 10000
1508             self.cur.execute(sqlF + comment, varMap)
1509             resS = self.cur.fetchall()
1510             # commit
1511             if not self._commit():
1512                 raise RuntimeError("Commit error")
1513             # loop over all PandaIDs
1514             retList = []
1515             for (pandaID,) in resS:
1516                 # start transaction
1517                 self.conn.begin()
1518                 # get _dis name
1519                 varMap = {}
1520                 varMap[":type"] = "input"
1521                 varMap[":PandaID"] = pandaID
1522                 self.cur.arraysize = 1000
1523                 self.cur.execute(sqlJ + comment, varMap)
1524                 resD = self.cur.fetchall()
1525                 # commit
1526                 if not self._commit():
1527                     raise RuntimeError("Commit error")
1528                 # append
1529                 for (disName,) in resD:
1530                     if disName is not None and disName not in retList:
1531                         retList.append(disName)
1532             # return
1533             tmp_log.debug(f"ret={str(retList)}")
1534             return retList
1535         except Exception:
1536             # roll back
1537             self._rollback()
1538             self.dump_error_message(tmp_log)
1539             return []
1540 
1541     # set GUIDs
1542     def setGUIDs(self, files):
1543         comment = " /* DBProxy.setGUIDs */"
1544         tmp_log = self.create_tagged_logger(comment)
1545         tmp_log.debug(f"{files}")
1546         sql0 = "UPDATE ATLAS_PANDA.filesTable4 SET GUID=:GUID,fsize=:fsize,checksum=:checksum,scope=:scope WHERE lfn=:lfn"
1547         for iTry in range(self.nTry):
1548             try:
1549                 # start transaction
1550                 self.conn.begin()
1551                 self.cur.arraysize = 1000000
1552                 # update
1553                 for file in files:
1554                     varMap = {}
1555                     varMap[":GUID"] = file["guid"]
1556                     varMap[":lfn"] = file["lfn"]
1557                     if file["checksum"] in ["", "NULL"]:
1558                         varMap[":checksum"] = None
1559                     else:
1560                         varMap[":checksum"] = file["checksum"]
1561                     varMap[":fsize"] = file["fsize"]
1562                     if "scope" not in file or file["scope"] in ["", "NULL"]:
1563                         varMap[":scope"] = None
1564                     else:
1565                         varMap[":scope"] = file["scope"]
1566                     self.cur.execute(sql0 + comment, varMap)
1567                     retU = self.cur.rowcount
1568                     tmp_log.debug(f"retU {retU}")
1569                     if retU < 0:
1570                         raise RuntimeError("SQL error")
1571                 # commit
1572                 if not self._commit():
1573                     raise RuntimeError("Commit error")
1574                 return True
1575             except Exception:
1576                 # roll back
1577                 self._rollback()
1578                 # error report
1579                 if iTry + 1 < self.nTry:
1580                     tmp_log.debug(f"retry : {iTry}")
1581                     time.sleep(random.randint(10, 20))
1582                     continue
1583                 self.dump_error_message(tmp_log)
1584         return False
1585 
1586     # get special dispatcher parameters
1587     def get_special_dispatch_params(self):
1588         """
1589         Get the following special parameters for dispatcher.Z
1590           Authorized name lists for proxy, key-pair, and token-key retrieval
1591           Key pairs
1592           Token keys
1593         """
1594         comment = " /* DBProxy.get_special_dispatch_params */"
1595         tmp_log = self.create_tagged_logger(comment)
1596         tmp_log.debug("start")
1597         try:
1598             return_map = {}
1599             # set autocommit on
1600             self.conn.begin()
1601             self.cur.arraysize = 100000
1602             # get token keys
1603             token_keys = {}
1604             sql = f"SELECT dn, credname FROM {panda_config.schemaMETA}.proxykey WHERE expires>:limit ORDER BY expires DESC "
1605             var_map = {":limit": naive_utcnow()}
1606             self.cur.execute(sql + comment, var_map)
1607             res_list = self.cur.fetchall()
1608             for client_name, token_key in res_list:
1609                 token_keys.setdefault(client_name, {"fullList": [], "latest": token_key})
1610                 token_keys[client_name]["fullList"].append(token_key)
1611             return_map["tokenKeys"] = token_keys
1612             tmp_list = [f"""{k}:{len(token_keys[k]["fullList"])}""" for k in token_keys]
1613             tmp_log.debug(f"""got token keys {",".join(tmp_list)}""")
1614             # select to get the list of authorized users
1615             allow_key = []
1616             allow_proxy = []
1617             allow_token = []
1618             sql = "SELECT DISTINCT name, gridpref FROM ATLAS_PANDAMETA.users " "WHERE (status IS NULL OR status<>:ngStatus) AND gridpref IS NOT NULL "
1619             var_map = {":ngStatus": "disabled"}
1620             self.cur.execute(sql + comment, var_map)
1621             res_list = self.cur.fetchall()
1622             # commit
1623             if not self._commit():
1624                 raise RuntimeError("Commit error")
1625             for compactDN, gridpref in res_list:
1626                 # users authorized for proxy retrieval
1627                 if PrioUtil.PERMISSION_PROXY in gridpref:
1628                     if compactDN not in allow_proxy:
1629                         allow_proxy.append(compactDN)
1630                 # users authorized for key-pair retrieval
1631                 if PrioUtil.PERMISSION_KEY in gridpref:
1632                     if compactDN not in allow_key:
1633                         allow_key.append(compactDN)
1634                 # users authorized for token-key retrieval
1635                 if PrioUtil.PERMISSION_TOKEN_KEY in gridpref:
1636                     if compactDN not in allow_token:
1637                         allow_token.append(compactDN)
1638             return_map["allowKeyPair"] = allow_key
1639             return_map["allowProxy"] = allow_proxy
1640             return_map["allowTokenKey"] = allow_token
1641             tmp_log.debug(
1642                 f"got authed users key-pair:{len(return_map['allowKeyPair'])}, proxy:{len(return_map['allowProxy'])}, token-key:{len(return_map['allowTokenKey'])}"
1643             )
1644             # read key pairs
1645             keyPair = {}
1646             try:
1647                 keyFileNames = glob.glob(panda_config.keyDir + "/*")
1648                 for keyName in keyFileNames:
1649                     tmpF = open(keyName)
1650                     keyPair[os.path.basename(keyName)] = tmpF.read()
1651                     tmpF.close()
1652             except Exception as e:
1653                 tmp_log.error(f"failed read key-pairs with {str(e)}")
1654             return_map["keyPair"] = keyPair
1655             tmp_log.debug(f"got {len(return_map['keyPair'])} key-pair files")
1656             tmp_log.debug("done")
1657             return return_map
1658         except Exception:
1659             # roll back
1660             self._rollback()
1661             # error
1662             self.dump_error_message(tmp_log)
1663             return {}
1664 
1665     # get original consumers
1666     def getOriginalConsumers(self, jediTaskID, jobsetID, pandaID):
1667         comment = " /* DBProxy.getOriginalConsumers */"
1668         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} jobsetID={jobsetID} PandaID={pandaID}")
1669         tmp_log.debug("start")
1670         try:
1671             # sql to get sites where consumers are active
1672             sqlA = "SELECT computingSite FROM ATLAS_PANDA.jobsActive4 WHERE jediTaskID=:jediTaskID AND jobsetID=:jobsetID "
1673             sqlA += "UNION "
1674             sqlA += "SELECT computingSite FROM ATLAS_PANDA.jobsDefined4 WHERE jediTaskID=:jediTaskID AND jobsetID=:jobsetID "
1675             # sql to get original IDs
1676             sqlG = f"SELECT oldPandaID FROM {panda_config.schemaJEDI}.JEDI_Job_Retry_History "
1677             sqlG += "WHERE jediTaskID=:jediTaskID AND newPandaID=:jobsetID AND relationType=:relationType "
1678             # sql to check computingSite
1679             sqlC1 = "SELECT computingSite FROM ATLAS_PANDA.jobsArchived4 WHERE PandaID=:PandaID "
1680             sqlC2 = "SELECT computingSite FROM ATLAS_PANDAARCH.jobsArchived WHERE PandaID=:PandaID AND modificationTime>(CURRENT_DATE-30) "
1681             # sql to get job info
1682             sqlJ = f"SELECT {JobSpec.columnNames()} "
1683             sqlJ += "FROM {0} "
1684             sqlJ += "WHERE PandaID=:PandaID AND modificationTime>(CURRENT_DATE-30) "
1685             sqlF = f"SELECT {FileSpec.columnNames()} FROM ATLAS_PANDA.filesTable4 "
1686             sqlF += "WHERE PandaID=:PandaID "
1687             sqlP = "SELECT jobParameters FROM {0} WHERE PandaID=:PandaID "
1688             # get sites
1689             aSites = set()
1690             varMap = dict()
1691             varMap[":jediTaskID"] = jediTaskID
1692             varMap[":jobsetID"] = jobsetID
1693             self.cur.execute(sqlA + comment, varMap)
1694             resA = self.cur.fetchall()
1695             for (computingSite,) in resA:
1696                 aSites.add(computingSite)
1697             # get original IDs
1698             varMap = dict()
1699             varMap[":jediTaskID"] = jediTaskID
1700             varMap[":jobsetID"] = jobsetID
1701             varMap[":relationType"] = EventServiceUtils.relationTypeJS_Map
1702             self.cur.execute(sqlG + comment, varMap)
1703             resG = self.cur.fetchall()
1704             jobList = []
1705             for (pandaID,) in resG:
1706                 # check computingSite
1707                 varMap = dict()
1708                 varMap[":PandaID"] = pandaID
1709                 self.cur.execute(sqlC1 + comment, varMap)
1710                 resC = self.cur.fetchone()
1711                 if resC is None:
1712                     # try archived
1713                     self.cur.execute(sqlC2 + comment, varMap)
1714                     resC = self.cur.fetchone()
1715                     inArchived = True
1716                 else:
1717                     inArchived = False
1718                 # skip since it is not yet archived and thus is still active
1719                 if resC is None:
1720                     continue
1721                 (computingSite,) = resC
1722                 # skip since there is an active consumer at the site
1723                 if computingSite in aSites:
1724                     continue
1725                 # get job
1726                 if inArchived:
1727                     self.cur.execute(sqlJ.format("ATLAS_PANDAARCH.jobsArchived") + comment, varMap)
1728                 else:
1729                     self.cur.execute(sqlJ.format("ATLAS_PANDA.jobsArchived4") + comment, varMap)
1730                 resJ = self.cur.fetchone()
1731                 if resJ is not None:
1732                     jobSpec = JobSpec()
1733                     jobSpec.pack(resJ)
1734                     # get files
1735                     self.cur.execute(sqlF + comment, varMap)
1736                     resFs = self.cur.fetchall()
1737                     if len(resFs) == 0:
1738                         continue
1739                     for resF in resFs:
1740                         fileSpec = FileSpec()
1741                         fileSpec.pack(resF)
1742                         jobSpec.addFile(fileSpec)
1743                     # get job params
1744                     if inArchived:
1745                         self.cur.execute(
1746                             sqlP.format("ATLAS_PANDAARCH.jobParamsTable_ARCH") + comment,
1747                             varMap,
1748                         )
1749                     else:
1750                         self.cur.execute(sqlP.format("ATLAS_PANDA.jobParamsTable") + comment, varMap)
1751                     for (clobJobP,) in self.cur:
1752                         if clobJobP is not None:
1753                             try:
1754                                 jobSpec.jobParameters = clobJobP.read()
1755                             except AttributeError:
1756                                 jobSpec.jobParameters = str(clobJobP)
1757                         break
1758                     # add
1759                     jobList.append(jobSpec)
1760                     aSites.add(computingSite)
1761             tmp_log.debug(f"got {len(jobList)} consumers")
1762             return jobList
1763         except Exception:
1764             # error
1765             self.dump_error_message(tmp_log)
1766             return []
1767 
1768     # update unmerged datasets to trigger merging
1769     def updateUnmergedDatasets(self, job, finalStatusDS, updateCompleted=False):
1770         comment = " /* JediDBProxy.updateUnmergedDatasets */"
1771         tmp_log = self.create_tagged_logger(comment, f"PandaID={job.PandaID}")
1772         # get PandaID which produced unmerged files
1773         umPandaIDs = []
1774         umCheckedIDs = []
1775         # sql to get file counts
1776         sqlGFC = "SELECT status,PandaID,outPandaID FROM ATLAS_PANDA.JEDI_Dataset_Contents "
1777         sqlGFC += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND PandaID IS NOT NULL "
1778         # sql to update nFiles in JEDI datasets
1779         sqlUNF = "UPDATE ATLAS_PANDA.JEDI_Datasets "
1780         sqlUNF += "SET nFilesOnHold=0,nFiles=:nFiles,"
1781         sqlUNF += "nFilesUsed=:nFilesUsed,nFilesTobeUsed=:nFilesTobeUsed "
1782         sqlUNF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
1783         # sql to check nFiles
1784         sqlUCF = "SELECT nFilesTobeUsed,nFilesUsed FROM ATLAS_PANDA.JEDI_Datasets "
1785         sqlUCF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
1786         # sql to update dataset status
1787         sqlUDS = "UPDATE ATLAS_PANDA.JEDI_Datasets "
1788         sqlUDS += "SET status=:status,modificationTime=CURRENT_DATE "
1789         sqlUDS += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
1790         # sql to update dataset status in panda
1791         sqlUDP = "UPDATE ATLAS_PANDA.Datasets "
1792         sqlUDP += "SET status=:status "
1793         sqlUDP += "WHERE vuid=:vuid AND NOT status IN (:statusR,:statusD) "
1794         try:
1795             tmp_log.debug(f"start")
1796             # begin transaction
1797             self.conn.begin()
1798             # update dataset in panda
1799             toSkip = False
1800             for datasetSpec in finalStatusDS:
1801                 varMap = {}
1802                 varMap[":vuid"] = datasetSpec.vuid
1803                 varMap[":status"] = "tobeclosed"
1804                 varMap[":statusR"] = "tobeclosed"
1805                 if not updateCompleted:
1806                     varMap[":statusD"] = "completed"
1807                 else:
1808                     varMap[":statusD"] = "dummy"
1809                 tmp_log.debug(sqlUDP + comment + str(varMap))
1810                 self.cur.execute(sqlUDP + comment, varMap)
1811                 nRow = self.cur.rowcount
1812                 if nRow != 1:
1813                     toSkip = True
1814                     tmp_log.debug(f"failed to lock {datasetSpec.name}")
1815             # look for unmerged files
1816             if not toSkip:
1817                 updatedDS = []
1818                 for tmpFile in job.Files:
1819                     if tmpFile.isUnMergedOutput():
1820                         if tmpFile.datasetID in updatedDS:
1821                             continue
1822                         updatedDS.append(tmpFile.datasetID)
1823                         # get file counts
1824                         varMap = {}
1825                         varMap[":jediTaskID"] = tmpFile.jediTaskID
1826                         varMap[":datasetID"] = tmpFile.datasetID
1827                         self.cur.arraysize = 100000
1828                         tmp_log.debug(sqlGFC + comment + str(varMap))
1829                         self.cur.execute(sqlGFC + comment, varMap)
1830                         resListGFC = self.cur.fetchall()
1831                         varMap = {}
1832                         tmpNumFiles = 0
1833                         tmpNumReady = 0
1834                         for tmpFileStatus, tmpPandaID, tmpOutPandaID in resListGFC:
1835                             if tmpFileStatus in [
1836                                 "finished",
1837                                 "failed",
1838                                 "cancelled",
1839                                 "notmerged",
1840                                 "ready",
1841                                 "lost",
1842                                 "broken",
1843                                 "picked",
1844                                 "nooutput",
1845                             ]:
1846                                 pass
1847                             elif tmpFileStatus == "running" and tmpPandaID != tmpOutPandaID:
1848                                 pass
1849                             else:
1850                                 continue
1851                             tmpNumFiles += 1
1852                             if tmpFileStatus in ["ready"]:
1853                                 tmpNumReady += 1
1854                         # update nFiles
1855                         varMap = {}
1856                         varMap[":jediTaskID"] = tmpFile.jediTaskID
1857                         varMap[":datasetID"] = tmpFile.datasetID
1858                         varMap[":nFiles"] = tmpNumFiles
1859                         varMap[":nFilesTobeUsed"] = tmpNumFiles
1860                         varMap[":nFilesUsed"] = tmpNumFiles - tmpNumReady
1861                         self.cur.arraysize = 10
1862                         tmp_log.debug(sqlUNF + comment + str(varMap))
1863                         self.cur.execute(sqlUNF + comment, varMap)
1864                         nRow = self.cur.rowcount
1865                         if nRow == 1:
1866                             # check nFilesTobeUsed
1867                             varMap = {}
1868                             varMap[":jediTaskID"] = tmpFile.jediTaskID
1869                             varMap[":datasetID"] = tmpFile.datasetID
1870                             self.cur.execute(sqlUCF + comment, varMap)
1871                             resUCF = self.cur.fetchone()
1872                             if resUCF is not None:
1873                                 nFilesTobeUsed, nFilesUsed = resUCF
1874                                 varMap = {}
1875                                 varMap[":jediTaskID"] = tmpFile.jediTaskID
1876                                 varMap[":datasetID"] = tmpFile.datasetID
1877                                 if nFilesTobeUsed - nFilesUsed > 0:
1878                                     varMap[":status"] = "ready"
1879                                 else:
1880                                     varMap[":status"] = "done"
1881                                 # update dataset status
1882                                 tmp_log.debug(sqlUDS + comment + str(varMap))
1883                                 self.cur.execute(sqlUDS + comment, varMap)
1884                         else:
1885                             tmp_log.debug(f"skip jediTaskID={tmpFile.jediTaskID} datasetID={tmpFile.datasetID}")
1886             # commit
1887             if not self._commit():
1888                 raise RuntimeError("Commit error")
1889             tmp_log.debug(f"done")
1890             return True
1891         except Exception:
1892             # roll back
1893             self._rollback()
1894             # error
1895             self.dump_error_message(tmp_log)
1896             return False
1897 
1898     # get throttled users
1899     def getThrottledUsers(self):
1900         comment = " /* DBProxy.getThrottledUsers */"
1901         tmp_log = self.create_tagged_logger(comment)
1902         tmp_log.debug("start")
1903         retVal = set()
1904         try:
1905             # sql to get users
1906             sqlT = "SELECT distinct prodUserName,workingGroup FROM ATLAS_PANDA.jobsActive4 "
1907             sqlT += "WHERE prodSourceLabel=:prodSourceLabel AND jobStatus=:jobStatus AND relocationFlag=:relocationFlag "
1908             # start transaction
1909             self.conn.begin()
1910             # select
1911             self.cur.arraysize = 10
1912             varMap = {}
1913             varMap[":prodSourceLabel"] = "user"
1914             varMap[":relocationFlag"] = 3
1915             varMap[":jobStatus"] = "throttled"
1916             # get datasets
1917             self.cur.execute(sqlT + comment, varMap)
1918             resPs = self.cur.fetchall()
1919             for prodUserName, workingGroup in resPs:
1920                 retVal.add((prodUserName, workingGroup))
1921             # commit
1922             if not self._commit():
1923                 raise RuntimeError("Commit error")
1924             tmp_log.debug(f"done with {str(retVal)}")
1925             return retVal
1926         except Exception:
1927             # roll back
1928             self._rollback()
1929             # error
1930             self.dump_error_message(tmp_log)
1931             return []
1932 
1933     # reset files in JEDI
1934     def resetFileStatusInJEDI(
1935         self, dn: str, is_prod_manager: bool, dataset_name: str, lost_files: List[str], recover_parent: bool, simul: bool
1936     ) -> Tuple[bool, int | None, Dict[str, List[str]] | None, str | None]:
1937         """
1938         Reset file status in JEDI for lost files
1939         1) check ownership
1940         2) set file status from 'finished' to 'lost' in JEDI_Dataset_Contents
1941         3) cancel events in JEDI_Events
1942         4) if recoverParent is True, set parent files from 'finished' to 'ready'
1943         5) if simul is True, do not commit changes
1944         6) return list of input files that produced the lost files
1945 
1946         :param dn: DN of the requester
1947         :param is_prod_manager: whether the requester is a production manager
1948         :param dataset_name: name of the dataset
1949         :param lost_files: list of lost LFNs
1950         :param recover_parent: whether to recover parent files
1951         :param simul: whether to simulate without committing changes
1952         :return: tuple of
1953             success flag,
1954             jediTaskID (None if failed),
1955             dictionary of list of input files that produced the lost files (None if failed),
1956             error message (None if succeeded)
1957         """
1958         comment = " /* DBProxy.resetFileStatusInJEDI */"
1959         tmp_log = self.create_tagged_logger(comment, f"datasetName={dataset_name}")
1960         tmp_log.debug("start")
1961         error_message = None
1962         try:
1963             # list of lost input files
1964             lostInputFiles = {}
1965             # get compact DN
1966             compactDN = CoreUtils.clean_user_id(dn)
1967             if compactDN in ["", "NULL", None]:
1968                 compactDN = dn
1969             tmp_log.debug(f"userName={compactDN}")
1970             toSkip = False
1971             # begin transaction
1972             self.conn.begin()
1973             # get jediTaskID
1974             varMap = {}
1975             varMap[":type1"] = "log"
1976             varMap[":type2"] = "output"
1977             varMap[":name1"] = dataset_name
1978             varMap[":name2"] = dataset_name.split(":")[-1]
1979             sqlGI = f"SELECT jediTaskID,datasetID FROM {panda_config.schemaJEDI}.JEDI_Datasets "
1980             sqlGI += "WHERE type IN (:type1,:type2) AND datasetName IN (:name1,:name2) "
1981             self.cur.execute(sqlGI + comment, varMap)
1982             resGI = self.cur.fetchall()
1983             # use the largest datasetID since broken tasks might have been retried
1984             jediTaskID = None
1985             datasetID = None
1986             for tmpJediTaskID, tmpDatasetID in resGI:
1987                 if jediTaskID is None or jediTaskID < tmpJediTaskID:
1988                     jediTaskID = tmpJediTaskID
1989                     datasetID = tmpDatasetID
1990                 elif datasetID < tmpDatasetID:
1991                     datasetID = tmpDatasetID
1992             if jediTaskID is None:
1993                 error_message = "jediTaskID not found"
1994                 tmp_log.debug(error_message)
1995                 toSkip = True
1996             if not toSkip:
1997                 # get task status and owner
1998                 tmp_log.debug(f"jediTaskID={jediTaskID} datasetID={datasetID}")
1999                 varMap = {}
2000                 varMap[":jediTaskID"] = jediTaskID
2001                 sqlOW = f"SELECT status,userName,useJumbo FROM {panda_config.schemaJEDI}.JEDI_Tasks "
2002                 sqlOW += "WHERE jediTaskID=:jediTaskID "
2003                 self.cur.execute(sqlOW + comment, varMap)
2004                 resOW = self.cur.fetchone()
2005                 taskStatus, ownerName, useJumbo = resOW
2006                 # check ownership
2007                 if not is_prod_manager and ownerName != compactDN:
2008                     error_message = f"Not the owner: {ownerName} != {compactDN}"
2009                     tmp_log.debug(error_message)
2010                     toSkip = True
2011             if not toSkip:
2012                 # get affected PandaIDs
2013                 sqlLP = f"SELECT pandaID FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
2014                 sqlLP += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND lfn=:lfn "
2015                 # get files to update status
2016                 sql_get_files = (
2017                     f"SELECT datasetID,fileID FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
2018                     "WHERE jediTaskID=:jediTaskID AND type=:type AND status=:oldStatus AND PandaID=:PandaID "
2019                 )
2020                 # sql to update file status
2021                 sqlUFO = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
2022                 sqlUFO += "SET status=:newStatus "
2023                 sqlUFO += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID AND status=:oldStatus AND PandaID=:PandaID "
2024                 # sql to cancel events
2025                 sqlCE = "UPDATE /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
2026                 sqlCE += f"{panda_config.schemaJEDI}.JEDI_Events tab "
2027                 sqlCE += "SET status=:status "
2028                 sqlCE += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
2029                 sqlCE += "AND status IN (:esFinished,:esDone,:esMerged) "
2030                 # get affected PandaIDs
2031                 lostPandaIDs = set([])
2032                 nDiff = {}
2033                 for lostFile in lost_files:
2034                     varMap = {}
2035                     varMap[":jediTaskID"] = jediTaskID
2036                     varMap[":datasetID"] = datasetID
2037                     varMap[":lfn"] = lostFile
2038                     self.cur.execute(sqlLP + comment, varMap)
2039                     resLP = self.cur.fetchone()
2040                     if resLP is not None:
2041                         (pandaID,) = resLP
2042                         lostPandaIDs.add(pandaID)
2043                         # get the file and co-produced files to lost
2044                         varMap = {}
2045                         varMap[":jediTaskID"] = jediTaskID
2046                         varMap[":PandaID"] = pandaID
2047                         varMap[":type"] = "output"
2048                         varMap[":oldStatus"] = "finished"
2049                         self.cur.execute(sql_get_files + comment, varMap)
2050                         res_files = self.cur.fetchall()
2051                         if res_files:
2052                             for tmpDatasetID, tmpFileID in res_files:
2053                                 # update file status
2054                                 varMap = {}
2055                                 varMap[":jediTaskID"] = jediTaskID
2056                                 varMap[":datasetID"] = tmpDatasetID
2057                                 varMap[":fileID"] = tmpFileID
2058                                 varMap[":PandaID"] = pandaID
2059                                 varMap[":newStatus"] = "lost"
2060                                 varMap[":oldStatus"] = "finished"
2061                                 tmp_log.debug(sqlUFO + comment + str(varMap))
2062                                 if not simul:
2063                                     self.cur.execute(sqlUFO + comment, varMap)
2064                                     nRow = self.cur.rowcount
2065                                     if nRow > 0:
2066                                         nDiff[tmpDatasetID] = nDiff.get(tmpDatasetID, 0) + 1
2067                                 else:
2068                                     nDiff[tmpDatasetID] = nDiff.get(tmpDatasetID, 0) + 1
2069                 tmp_log.debug(f"PandaIDs produced lost files = {str(lostPandaIDs)}")
2070                 tmp_log.debug("update output datasets")
2071                 # get nEvents
2072                 sqlGNE = "SELECT SUM(c.nEvents) "
2073                 sqlGNE += "FROM {0}.JEDI_Datasets d,{0}.JEDI_Dataset_Contents c ".format(panda_config.schemaJEDI)
2074                 sqlGNE += "WHERE c.jediTaskID=d.jediTaskID AND c.datasetID=d.datasetID "
2075                 sqlGNE += "AND d.jediTaskID=:jediTaskID AND d.datasetID=:datasetID AND c.status=:status "
2076                 # update output dataset statistics
2077                 sqlUDO = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
2078                 sqlUDO += "SET nFilesFinished=nFilesFinished-:nDiff,state=NULL,nEvents=:nEvents "
2079                 sqlUDO += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
2080                 for tmpDatasetID, n_diff_value in nDiff.items():
2081                     # get nEvents
2082                     varMap = {}
2083                     varMap[":jediTaskID"] = jediTaskID
2084                     varMap[":datasetID"] = tmpDatasetID
2085                     varMap[":status"] = "finished"
2086                     self.cur.execute(sqlGNE + comment, varMap)
2087                     (tmp_counts,) = self.cur.fetchone()
2088                     # update nFilesFinished
2089                     varMap = {}
2090                     varMap[":jediTaskID"] = jediTaskID
2091                     varMap[":datasetID"] = tmpDatasetID
2092                     varMap[":nDiff"] = n_diff_value
2093                     varMap[":nEvents"] = tmp_counts
2094                     tmp_log.debug(sqlUDO + comment + str(varMap))
2095                     if not simul:
2096                         self.cur.execute(sqlUDO + comment, varMap)
2097                 # get input datasets
2098                 tmp_log.debug("update input datasets")
2099                 sqlID = f"SELECT datasetID,datasetName,masterID FROM {panda_config.schemaJEDI}.JEDI_Datasets "
2100                 sqlID += "WHERE jediTaskID=:jediTaskID AND type=:type "
2101                 varMap = {}
2102                 varMap[":jediTaskID"] = jediTaskID
2103                 varMap[":type"] = "input"
2104                 self.cur.execute(sqlID + comment, varMap)
2105                 resID = self.cur.fetchall()
2106                 inputDatasets = {}
2107                 masterID = None
2108                 for tmpDatasetID, tmpDatasetName, tmpMasterID in resID:
2109                     inputDatasets[tmpDatasetID] = tmpDatasetName
2110                     if tmpMasterID is None:
2111                         masterID = tmpDatasetID
2112                 # sql to get affected inputs
2113                 if useJumbo is None:
2114                     sqlAI = f"SELECT fileID,datasetID,lfn,outPandaID FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
2115                     sqlAI += "WHERE jediTaskID=:jediTaskID AND type IN (:type1,:type2,:type3) AND PandaID=:PandaID "
2116                 else:
2117                     sqlAI = f"SELECT fileID,datasetID,lfn,NULL FROM {panda_config.schemaPANDA}.filesTable4 "
2118                     sqlAI += "WHERE PandaID=:PandaID AND type IN (:type1,:type2) "
2119                     sqlAI += "UNION "
2120                     sqlAI = f"SELECT fileID,datasetID,lfn,NULL FROM {panda_config.schemaPANDA}.filesTable4 "
2121                     sqlAI += "WHERE PandaID=:PandaID AND type IN (:type1,:type2) AND modificationTime>CURRENT_TIMESTAMP-365 "
2122                 # sql to update input file status
2123                 sqlUFI = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
2124                 sqlUFI += "SET status=:newStatus,attemptNr=attemptNr+1 "
2125                 sqlUFI += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID AND status=:oldStatus "
2126                 # get affected inputs
2127                 datasetCountMap = {}
2128                 for lostPandaID in lostPandaIDs:
2129                     varMap = {}
2130                     if useJumbo is None:
2131                         varMap[":jediTaskID"] = jediTaskID
2132                         varMap[":PandaID"] = lostPandaID
2133                         varMap[":type1"] = "input"
2134                         varMap[":type2"] = "pseudo_input"
2135                         varMap[":type3"] = "output"
2136                     else:
2137                         varMap[":PandaID"] = lostPandaID
2138                         varMap[":type1"] = "input"
2139                         varMap[":type2"] = "pseudo_input"
2140                     self.cur.execute(sqlAI + comment, varMap)
2141                     resAI = self.cur.fetchall()
2142                     newResAI = []
2143                     for tmpItem in resAI:
2144                         tmpFileID, tmpDatasetID, tmpLFN, tmpOutPandaID = tmpItem
2145                         # skip output file
2146                         if lostPandaID == tmpOutPandaID:
2147                             continue
2148                         # input for merged files
2149                         if tmpOutPandaID is not None:
2150                             varMap = {}
2151                             varMap[":jediTaskID"] = jediTaskID
2152                             varMap[":PandaID"] = tmpOutPandaID
2153                             varMap[":type1"] = "input"
2154                             varMap[":type2"] = "pseudo_input"
2155                             varMap[":type3"] = "dummy"
2156                             self.cur.execute(sqlAI + comment, varMap)
2157                             resAI2 = self.cur.fetchall()
2158                             for tmpItem in resAI2:
2159                                 newResAI.append(tmpItem)
2160                         else:
2161                             newResAI.append(tmpItem)
2162                     for tmpFileID, tmpDatasetID, tmpLFN, tmpOutPandaID in newResAI:
2163                         # collect if dataset was already deleted
2164                         is_lost = False
2165                         if recover_parent and tmpDatasetID == masterID:
2166                             lostInputFiles.setdefault(inputDatasets[tmpDatasetID], [])
2167                             lostInputFiles[inputDatasets[tmpDatasetID]].append(tmpLFN)
2168                             is_lost = True
2169                         # reset file status
2170                         if tmpDatasetID not in datasetCountMap:
2171                             datasetCountMap[tmpDatasetID] = 0
2172                         varMap = {}
2173                         varMap[":jediTaskID"] = jediTaskID
2174                         varMap[":datasetID"] = tmpDatasetID
2175                         varMap[":fileID"] = tmpFileID
2176                         if is_lost:
2177                             varMap[":newStatus"] = "lost"
2178                         else:
2179                             varMap[":newStatus"] = "ready"
2180                         varMap[":oldStatus"] = "finished"
2181                         if not simul:
2182                             self.cur.execute(sqlUFI + comment, varMap)
2183                             nRow = self.cur.rowcount
2184                         else:
2185                             tmp_log.debug(sqlUFI + comment + str(varMap))
2186                             nRow = 1
2187                         if nRow > 0:
2188                             datasetCountMap[tmpDatasetID] += 1
2189                             if useJumbo is not None:
2190                                 # cancel events
2191                                 varMap = {}
2192                                 varMap[":jediTaskID"] = jediTaskID
2193                                 varMap[":datasetID"] = tmpDatasetID
2194                                 varMap[":fileID"] = tmpFileID
2195                                 varMap[":status"] = EventServiceUtils.ST_cancelled
2196                                 varMap[":esFinished"] = EventServiceUtils.ST_finished
2197                                 varMap[":esDone"] = EventServiceUtils.ST_done
2198                                 varMap[":esMerged"] = EventServiceUtils.ST_merged
2199                                 if not simul:
2200                                     self.cur.execute(sqlCE + comment, varMap)
2201                                 else:
2202                                     tmp_log.debug(sqlCE + comment + str(varMap))
2203                 # update dataset statistics
2204                 sqlUDI = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
2205                 sqlUDI += "SET nFilesUsed=nFilesUsed-:nDiff,nFilesFinished=nFilesFinished-:nDiff,"
2206                 sqlUDI += "nEventsUsed=(SELECT SUM(CASE WHEN startEvent IS NULL THEN nEvents ELSE endEvent-startEvent+1 END) "
2207                 sqlUDI += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
2208                 sqlUDI += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND status=:status) "
2209                 sqlUDI += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
2210                 for tmpDatasetID in datasetCountMap:
2211                     nDiff = datasetCountMap[tmpDatasetID]
2212                     varMap = {}
2213                     varMap[":jediTaskID"] = jediTaskID
2214                     varMap[":datasetID"] = tmpDatasetID
2215                     varMap[":nDiff"] = nDiff
2216                     varMap[":status"] = "finished"
2217                     tmp_log.debug(sqlUDI + comment + str(varMap))
2218                     if not simul:
2219                         self.cur.execute(sqlUDI + comment, varMap)
2220                 # update task status
2221                 if taskStatus == "done":
2222                     sqlUT = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET status=:newStatus WHERE jediTaskID=:jediTaskID "
2223                     varMap = {}
2224                     varMap[":jediTaskID"] = jediTaskID
2225                     varMap[":newStatus"] = "finished"
2226                     if not simul:
2227                         self.cur.execute(sqlUT + comment, varMap)
2228             # commit
2229             if not self._commit():
2230                 raise RuntimeError("Commit error")
2231             tmp_log.debug("done")
2232             return True, jediTaskID, lostInputFiles, error_message
2233         except Exception:
2234             # roll back
2235             self._rollback()
2236             # error
2237             self.dump_error_message(tmp_log)
2238             return False, None, None, "database error"
2239 
2240     # copy file records
2241     def copy_file_records(self, new_lfns, file_spec):
2242         comment = " /* DBProxy.copy_file_records */"
2243         tmp_log = self.create_tagged_logger(comment, f"PandaID={file_spec.PandaID} oldLFN={file_spec.lfn}")
2244         tmp_log.debug(f"start with {len(new_lfns)} files")
2245         try:
2246             # begin transaction
2247             self.conn.begin()
2248             for idx_lfn, new_lfn in enumerate(new_lfns):
2249                 # reset rowID
2250                 tmpFileSpec = copy.copy(file_spec)
2251                 tmpFileSpec.lfn = new_lfn
2252                 if idx_lfn > 0:
2253                     tmpFileSpec.row_ID = None
2254                 # insert file in JEDI
2255                 if idx_lfn > 0 and tmpFileSpec.jediTaskID not in [None, "NULL"] and tmpFileSpec.fileID not in ["", "NULL", None]:
2256                     # get fileID
2257                     sqlFileID = "SELECT ATLAS_PANDA.JEDI_DATASET_CONT_FILEID_SEQ.nextval FROM dual "
2258                     self.cur.execute(sqlFileID + comment)
2259                     (newFileID,) = self.cur.fetchone()
2260                     # read file in JEDI
2261                     varMap = {}
2262                     varMap[":jediTaskID"] = tmpFileSpec.jediTaskID
2263                     varMap[":datasetID"] = tmpFileSpec.datasetID
2264                     varMap[":fileID"] = tmpFileSpec.fileID
2265                     sqlGI = f"SELECT * FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
2266                     sqlGI += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
2267                     self.cur.execute(sqlGI + comment, varMap)
2268                     resGI = self.cur.fetchone()
2269                     tmpFileSpec.fileID = newFileID
2270                     if resGI is not None:
2271                         # make sql and map
2272                         sqlJI = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
2273                         sqlJI += "VALUES ("
2274                         varMap = {}
2275                         for columDesc, columVal in zip(self.cur.description, resGI):
2276                             columName = columDesc[0]
2277                             # overwrite fileID
2278                             if columName.upper() == "FILEID":
2279                                 columVal = tmpFileSpec.fileID
2280                             keyName = f":{columName}"
2281                             varMap[keyName] = columVal
2282                             sqlJI += f"{keyName},"
2283                         sqlJI = sqlJI[:-1]
2284                         sqlJI += ") "
2285                         # insert file in JEDI
2286                         self.cur.execute(sqlJI + comment, varMap)
2287                 if idx_lfn > 0:
2288                     # insert file in Panda
2289                     sqlFile = f"INSERT INTO ATLAS_PANDA.filesTable4 ({FileSpec.columnNames()}) "
2290                     sqlFile += FileSpec.bindValuesExpression(useSeq=True)
2291                     varMap = tmpFileSpec.valuesMap(useSeq=True)
2292                     self.cur.execute(sqlFile + comment, varMap)
2293                 else:
2294                     # update LFN
2295                     sqlFSF = "UPDATE ATLAS_PANDA.filesTable4 SET lfn=:lfn "
2296                     sqlFSF += "WHERE row_ID=:row_ID "
2297                     varMap = {}
2298                     varMap[":lfn"] = tmpFileSpec.lfn
2299                     varMap[":row_ID"] = tmpFileSpec.row_ID
2300                     self.cur.execute(sqlFSF + comment, varMap)
2301                 # update LFN in JEDI
2302                 if tmpFileSpec.fileID not in ["", "NULL", None]:
2303                     sqlJF = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
2304                     sqlJF += "SET lfn=:lfn "
2305                     sqlJF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
2306                     varMap = {}
2307                     varMap[":lfn"] = tmpFileSpec.lfn
2308                     varMap[":jediTaskID"] = tmpFileSpec.jediTaskID
2309                     varMap[":datasetID"] = tmpFileSpec.datasetID
2310                     varMap[":fileID"] = tmpFileSpec.fileID
2311                     self.cur.execute(sqlJF + comment, varMap)
2312             # commit
2313             if not self._commit():
2314                 raise RuntimeError("Commit error")
2315             tmp_log.debug("done")
2316             return True
2317         except Exception:
2318             # roll back
2319             self._rollback()
2320             # error
2321             self.dump_error_message(tmp_log)
2322             return False
2323 
2324     # get error definitions from DB (values cached for 1 hour)
2325     @memoize
2326     def getRetrialRules(self):
2327         # Logging
2328         comment = " /* DBProxy.getRetrialRules */"
2329         tmp_log = self.create_tagged_logger(comment)
2330         tmp_log.debug("start")
2331 
2332         # SQL to extract the error definitions
2333         sql = """
2334         SELECT re.retryerror_id, re.errorsource, re.errorcode, re.errorDiag, re.parameters, re.architecture, re.release, re.workqueue_id, ra.retry_action, re.active, ra.active
2335         FROM ATLAS_PANDA.RETRYERRORS re, ATLAS_PANDA.RETRYACTIONS ra
2336         WHERE re.retryaction=ra.retryaction_id
2337         AND (CURRENT_TIMESTAMP < re.expiration_date or re.expiration_date IS NULL)
2338         """
2339         self.cur.execute(sql + comment, {})
2340         definitions = self.cur.fetchall()  # example of output: [('pilotErrorCode', 1, None, None, None, None, 'no_retry', 'Y', 'Y'),...]
2341 
2342         # commit
2343         if not self._commit():
2344             raise RuntimeError("Commit error")
2345 
2346         # tmp_log.debug("definitions %s"%(definitions))
2347 
2348         retrial_rules = {}
2349         for definition in definitions:
2350             (
2351                 retryerror_id,
2352                 error_source,
2353                 error_code,
2354                 error_diag,
2355                 parameters,
2356                 architecture,
2357                 release,
2358                 wqid,
2359                 action,
2360                 e_active,
2361                 a_active,
2362             ) = definition
2363 
2364             # Convert the parameter string into a dictionary
2365             try:
2366                 # 1. Convert a string like "key1=value1&key2=value2" into [[key1, value1],[key2,value2]]
2367                 params_list = map(
2368                     lambda key_value_pair: key_value_pair.split("="),
2369                     parameters.split("&"),
2370                 )
2371                 # 2. Convert a list [[key1, value1],[key2,value2]] into {key1: value1, key2: value2}
2372                 params_dict = dict((key, value) for (key, value) in params_list)
2373             except AttributeError:
2374                 params_dict = {}
2375             except ValueError:
2376                 params_dict = {}
2377 
2378             # Calculate if action and error combination should be active
2379             if e_active == "Y" and a_active == "Y":
2380                 active = True  # Apply the action for this error
2381             else:
2382                 active = False  # Do not apply the action for this error, only log
2383 
2384             retrial_rules.setdefault(error_source, {})
2385             retrial_rules[error_source].setdefault(error_code, [])
2386             retrial_rules[error_source][error_code].append(
2387                 {
2388                     "error_id": retryerror_id,
2389                     "error_diag": error_diag,
2390                     "action": action,
2391                     "params": params_dict,
2392                     "architecture": architecture,
2393                     "release": release,
2394                     "wqid": wqid,
2395                     "active": active,
2396                 }
2397             )
2398         # tmp_log.debug("Loaded retrial rules from DB: %s" %retrial_rules)
2399         return retrial_rules
2400 
2401     def setMaxAttempt(self, jobID, taskID, files, maxAttempt):
2402         # Logging
2403         comment = " /* DBProxy.setMaxAttempt */"
2404         tmp_log = self.create_tagged_logger(comment, f"jobID={jobID} taskID={taskID}")
2405         tmp_log.debug("start")
2406 
2407         # Update the file entries to avoid JEDI generating new jobs
2408         input_types = ("input", "pseudo_input", "pp_input", "trn_log", "trn_output")
2409         input_files = list(
2410             filter(
2411                 lambda pandafile: pandafile.type in input_types and re.search("DBRelease", pandafile.lfn) is None,
2412                 files,
2413             )
2414         )
2415         input_fileIDs = [input_file.fileID for input_file in input_files]
2416         input_datasetIDs = [input_file.datasetID for input_file in input_files]
2417 
2418         if input_fileIDs:
2419             try:
2420                 # Start transaction
2421                 self.conn.begin()
2422 
2423                 varMap = {}
2424                 varMap[":taskID"] = taskID
2425                 varMap[":pandaID"] = jobID
2426 
2427                 # Bind the files
2428                 file_var_names_str, file_var_map = get_sql_IN_bind_variables(input_fileIDs, prefix=":file")
2429                 varMap.update(file_var_map)
2430 
2431                 # Bind the datasets
2432                 ds_var_names_str, ds_var_map = get_sql_IN_bind_variables(input_datasetIDs, prefix=":dataset")
2433                 varMap.update(ds_var_map)
2434 
2435                 # Get the minimum maxAttempt value of the files
2436                 sql_select = f"""
2437                 select min(maxattempt) from ATLAS_PANDA.JEDI_Dataset_Contents
2438                 WHERE JEDITaskID = :taskID
2439                 AND datasetID IN ({ds_var_names_str})
2440                 AND fileID IN ({file_var_names_str})
2441                 AND pandaID = :pandaID
2442                 """
2443                 self.cur.execute(sql_select + comment, varMap)
2444                 try:
2445                     maxAttempt_select = self.cur.fetchone()[0]
2446                 except (TypeError, IndexError):
2447                     maxAttempt_select = None
2448 
2449                 # Don't update the maxAttempt if the new value is higher than the old value
2450                 if maxAttempt_select and maxAttempt_select > maxAttempt:
2451                     varMap[":maxAttempt"] = min(maxAttempt, maxAttempt_select)
2452 
2453                     sql_update = f"""
2454                     UPDATE ATLAS_PANDA.JEDI_Dataset_Contents
2455                     SET maxAttempt=:maxAttempt
2456                     WHERE JEDITaskID = :taskID
2457                     AND datasetID IN ({ds_var_names_str})
2458                     AND fileID IN ({file_var_names_str})
2459                     AND pandaID = :pandaID
2460                     """
2461 
2462                     self.cur.execute(sql_update + comment, varMap)
2463 
2464                 # Commit updates
2465                 if not self._commit():
2466                     raise RuntimeError("Commit error")
2467             except Exception:
2468                 # roll back
2469                 self._rollback()
2470                 # error
2471                 self.dump_error_message(tmp_log)
2472                 return False
2473 
2474         tmp_log.debug("done")
2475         return True
2476 
2477     def increase_max_failure(self, job_id, task_id, files):
2478         """Increase the max failure number by one for specific files."""
2479         comment = " /* DBProxy.increase_max_failure */"
2480         tmp_log = self.create_tagged_logger(comment, f"PandaID={job_id} jediTaskID={task_id}")
2481         tmp_log.debug("start")
2482 
2483         # Update the file entries to increase the max attempt number by one
2484         input_types = ("input", "pseudo_input", "pp_input", "trn_log", "trn_output")
2485         input_files = [pandafile for pandafile in files if pandafile.type in input_types and re.search("DBRelease", pandafile.lfn) is None]
2486         input_file_ids = [input_file.fileID for input_file in input_files]
2487         input_dataset_ids = [input_file.datasetID for input_file in input_files]
2488 
2489         if input_file_ids:
2490             try:
2491                 # Start transaction
2492                 self.conn.begin()
2493 
2494                 var_map = {
2495                     ":taskID": task_id,
2496                     ":pandaID": job_id,
2497                 }
2498 
2499                 # Bind the files
2500                 file_var_names_str, file_var_map = get_sql_IN_bind_variables(input_file_ids, prefix=":file")
2501                 var_map.update(file_var_map)
2502 
2503                 # Bind the datasets
2504                 ds_var_names_str, ds_var_map = get_sql_IN_bind_variables(input_dataset_ids, prefix=":dataset")
2505                 var_map.update(ds_var_map)
2506 
2507                 sql_update = f"""
2508                 UPDATE ATLAS_PANDA.JEDI_Dataset_Contents
2509                 SET maxFailure = maxFailure + 1
2510                 WHERE JEDITaskID = :taskID
2511                 AND datasetID IN ({ds_var_names_str})
2512                 AND fileID IN ({file_var_names_str})
2513                 AND pandaID = :pandaID
2514                 """
2515 
2516                 self.cur.execute(sql_update + comment, var_map)
2517 
2518                 # Commit updates
2519                 if not self._commit():
2520                     raise RuntimeError("Commit error")
2521 
2522             except Exception:
2523                 # Roll back
2524                 self._rollback()
2525                 # Log error
2526                 self.dump_error_message(tmp_log)
2527                 return False
2528 
2529         tmp_log.debug("done")
2530         return True
2531 
2532     def setNoRetry(self, jobID, taskID, files):
2533         # Logging
2534         comment = " /* DBProxy.setNoRetry */"
2535         tmp_log = self.create_tagged_logger(comment, f"PandaID={jobID} jediTaskID={taskID}")
2536         tmp_log.debug("start")
2537 
2538         # Update the file entries to avoid JEDI generating new jobs
2539         input_types = ("input", "pseudo_input", "pp_input", "trn_log", "trn_output")
2540         input_files = list(
2541             filter(
2542                 lambda pandafile: pandafile.type in input_types and re.search("DBRelease", pandafile.lfn) is None,
2543                 files,
2544             )
2545         )
2546         input_fileIDs = [input_file.fileID for input_file in input_files]
2547         input_datasetIDs = [input_file.datasetID for input_file in input_files]
2548 
2549         if input_fileIDs:
2550             try:
2551                 # Start transaction
2552                 self.conn.begin()
2553 
2554                 # loop over all datasets
2555                 for datasetID in input_datasetIDs:
2556                     varMap = {}
2557                     varMap[":taskID"] = taskID
2558                     varMap[":datasetID"] = datasetID
2559                     varMap[":keepTrack"] = 1
2560 
2561                     # Bind the files
2562                     file_var_names_str, file_var_map = get_sql_IN_bind_variables(input_fileIDs, prefix=":file")
2563                     varMap.update(file_var_map)
2564 
2565                     sql_update = f"""
2566                     UPDATE ATLAS_PANDA.JEDI_Dataset_Contents
2567                     SET maxAttempt=attemptNr
2568                     WHERE JEDITaskID = :taskID
2569                     AND datasetID=:datasetID
2570                     AND fileID IN ({file_var_names_str})
2571                     AND maxAttempt IS NOT NULL AND attemptNr IS NOT NULL
2572                     AND maxAttempt > attemptNr
2573                     AND (maxFailure IS NULL OR failedAttempt IS NULL OR maxFailure > failedAttempt)
2574                     AND keepTrack=:keepTrack
2575                     AND status=:status
2576                     """
2577 
2578                     # update files in 'running' status. These files do NOT need to be counted for the nFiles*
2579                     varMap[":status"] = "running"
2580                     self.cur.execute(sql_update + comment, varMap)
2581 
2582                     # update files in 'ready' status. These files need to be counted for the nFiles*
2583                     varMap[":status"] = "ready"
2584                     self.cur.execute(sql_update + comment, varMap)
2585                     rowcount = self.cur.rowcount
2586 
2587                     # update datasets
2588                     if rowcount > 0:
2589                         sql_dataset = "UPDATE ATLAS_PANDA.JEDI_Datasets "
2590                         sql_dataset += "SET nFilesUsed=nFilesUsed+:nDiff,nFilesFailed=nFilesFailed+:nDiff "
2591                         sql_dataset += "WHERE jediTaskID=:taskID AND datasetID=:datasetID "
2592                         varMap = dict()
2593                         varMap[":taskID"] = taskID
2594                         varMap[":datasetID"] = datasetID
2595                         varMap[":nDiff"] = rowcount
2596                         self.cur.execute(sql_dataset + comment, varMap)
2597 
2598                 # Commit updates
2599                 if not self._commit():
2600                     raise RuntimeError("Commit error")
2601             except Exception:
2602                 # roll back
2603                 self._rollback()
2604                 # error
2605                 self.dump_error_message(tmp_log)
2606                 return False
2607 
2608         tmp_log.debug("done")
2609         return True
2610 
2611     # add associate sub datasets for single consumer job
2612     def getDestDBlocksWithSingleConsumer(self, jediTaskID, PandaID, ngDatasets):
2613         comment = " /* DBProxy.getDestDBlocksWithSingleConsumer */"
2614         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} PandaID={PandaID}")
2615         tmp_log.debug("start")
2616         try:
2617             retMap = {}
2618             checkedDS = set()
2619             # sql to get files
2620             sqlF = "SELECT datasetID,fileID FROM ATLAS_PANDA.JEDI_Events "
2621             sqlF += "WHERE jediTaskID=:jediTaskID AND PandaID=:PandaID "
2622             # sql to get PandaIDs
2623             sqlP = "SELECT distinct PandaID FROM ATLAS_PANDA.filesTable4 "
2624             sqlP += "WHERE jediTaskID=:jediTaskID ANd datasetID=:datasetID AND fileID=:fileID "
2625             # sql to get sub datasets
2626             sqlD = "SELECT destinationDBlock,datasetID FROM ATLAS_PANDA.filesTable4 "
2627             sqlD += "WHERE PandaID=:PandaID AND type IN (:type1,:type2) "
2628             # sql to get PandaIDs in merging
2629             sqlM = "SELECT distinct PandaID FROM ATLAS_PANDA.filesTable4 "
2630             sqlM += "WHERE jediTaskID=:jediTaskID ANd datasetID=:datasetID AND status=:status "
2631             varMap = {}
2632             varMap[":jediTaskID"] = jediTaskID
2633             varMap[":PandaID"] = PandaID
2634             # begin transaction
2635             self.conn.begin()
2636             # get files
2637             self.cur.execute(sqlF + comment, varMap)
2638             resF = self.cur.fetchall()
2639             for datasetID, fileID in resF:
2640                 # get parallel jobs
2641                 varMap = {}
2642                 varMap[":jediTaskID"] = jediTaskID
2643                 varMap[":datasetID"] = datasetID
2644                 varMap[":fileID"] = fileID
2645                 self.cur.execute(sqlP + comment, varMap)
2646                 resP = self.cur.fetchall()
2647                 for (sPandaID,) in resP:
2648                     if sPandaID == PandaID:
2649                         continue
2650                     # get sub datasets of parallel jobs
2651                     varMap = {}
2652                     varMap[":PandaID"] = sPandaID
2653                     varMap[":type1"] = "output"
2654                     varMap[":type2"] = "log"
2655                     self.cur.execute(sqlD + comment, varMap)
2656                     resD = self.cur.fetchall()
2657                     subDatasets = []
2658                     subDatasetID = None
2659                     for destinationDBlock, datasetID in resD:
2660                         if destinationDBlock in ngDatasets:
2661                             continue
2662                         if destinationDBlock in checkedDS:
2663                             continue
2664                         checkedDS.add(destinationDBlock)
2665                         subDatasets.append(destinationDBlock)
2666                         subDatasetID = datasetID
2667                     if subDatasets == []:
2668                         continue
2669                     # get merging PandaID which uses sub dataset
2670                     varMap = {}
2671                     varMap[":jediTaskID"] = jediTaskID
2672                     varMap[":datasetID"] = datasetID
2673                     varMap[":status"] = "merging"
2674                     self.cur.execute(sqlM + comment, varMap)
2675                     resM = self.cur.fetchone()
2676                     if resM is not None:
2677                         (mPandaID,) = resM
2678                         retMap[mPandaID] = subDatasets
2679             # commit
2680             if not self._commit():
2681                 raise RuntimeError("Commit error")
2682             tmp_log.debug(f"got {len(retMap)} jobs")
2683             return retMap
2684         except Exception:
2685             # roll back
2686             self._rollback()
2687             # error
2688             self.dump_error_message(tmp_log)
2689             return {}
2690 
2691     # get dispatch datasets per user
2692     def getDispatchDatasetsPerUser(self, vo, prodSourceLabel, onlyActive, withSize):
2693         comment = " /* DBProxy.getDispatchDatasetsPerUser */"
2694         tmp_log = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel}")
2695         tmp_log.debug("start")
2696         # mapping for table and job status
2697         tableStatMap = {"jobsDefined4": ["defined", "assigned"]}
2698         if not onlyActive:
2699             tableStatMap["jobsActive4"] = None
2700             tableStatMap["jobsArchived4"] = None
2701         try:
2702             userDispMap = {}
2703             for tableName in tableStatMap:
2704                 statusList = tableStatMap[tableName]
2705                 # make sql to get dispatch datasets
2706                 varMap = {}
2707                 varMap[":vo"] = vo
2708                 varMap[":label"] = prodSourceLabel
2709                 varMap[":dType"] = "dispatch"
2710                 sqlJ = "SELECT distinct prodUserName,dispatchDBlock,jediTaskID,currentFiles "
2711                 sqlJ += "FROM {0}.{1} j, {0}.Datasets d ".format(panda_config.schemaPANDA, tableName)
2712                 sqlJ += "WHERE vo=:vo AND prodSourceLabel=:label "
2713                 if statusList is not None:
2714                     jobstat_var_names_str, jobstat_var_map = get_sql_IN_bind_variables(statusList, prefix=":jobStat_", value_as_suffix=True)
2715                     sqlJ += f"AND jobStatus IN ({jobstat_var_names_str}) "
2716                     varMap.update(jobstat_var_map)
2717                 sqlJ += "AND dispatchDBlock IS NOT NULL "
2718                 sqlJ += "AND d.name=j.dispatchDBlock AND d.modificationDate>CURRENT_DATE-14 "
2719                 sqlJ += "AND d.type=:dType "
2720                 # begin transaction
2721                 self.conn.begin()
2722                 # get dispatch datasets
2723                 self.cur.execute(sqlJ + comment, varMap)
2724                 resJ = self.cur.fetchall()
2725                 if not self._commit():
2726                     raise RuntimeError("Commit error")
2727                 # make map
2728                 for prodUserName, dispatchDBlock, jediTaskID, dsSize in resJ:
2729                     transferType = "transfer"
2730                     try:
2731                         if dispatchDBlock.split(".")[4] == "prestaging":
2732                             transferType = "prestaging"
2733                     except Exception:
2734                         pass
2735                     userDispMap.setdefault(prodUserName, {})
2736                     userDispMap[prodUserName].setdefault(transferType, {"datasets": set(), "size": 0, "tasks": set()})
2737                     if dispatchDBlock not in userDispMap[prodUserName][transferType]["datasets"]:
2738                         userDispMap[prodUserName][transferType]["datasets"].add(dispatchDBlock)
2739                         userDispMap[prodUserName][transferType]["tasks"].add(jediTaskID)
2740                         userDispMap[prodUserName][transferType]["size"] += dsSize
2741             tmp_log.debug("done")
2742             return userDispMap
2743         except Exception:
2744             # roll back
2745             self._rollback()
2746             # error
2747             self.dump_error_message(tmp_log)
2748             return {}
2749 
2750     # bulk fetch PandaIDs
2751     def bulk_fetch_panda_ids(self, num_ids):
2752         comment = " /* JediDBProxy.bulk_fetch_panda_ids */"
2753         tmp_log = self.create_tagged_logger(comment, f"num_ids={num_ids}")
2754         tmp_log.debug("start")
2755         try:
2756             new_ids = []
2757             var_map = {}
2758             var_map[":nIDs"] = num_ids
2759             # sql to get fileID
2760             sqlFID = "SELECT ATLAS_PANDA.JOBSDEFINED4_PANDAID_SEQ.nextval FROM "
2761             sqlFID += "(SELECT level FROM dual CONNECT BY level<=:nIDs) "
2762             # start transaction
2763             self.conn.begin()
2764             self.cur.arraysize = 10000
2765             self.cur.execute(sqlFID + comment, var_map)
2766             resFID = self.cur.fetchall()
2767             for (id,) in resFID:
2768                 new_ids.append(id)
2769             # commit
2770             if not self._commit():
2771                 raise RuntimeError("Commit error")
2772             tmp_log.debug(f"got {len(new_ids)} IDs")
2773             return sorted(new_ids)
2774         except Exception:
2775             # roll back
2776             self._rollback()
2777             # error
2778             self.dump_error_message(tmp_log)
2779             return []
2780 
2781     # bulk fetch fileIDs
2782     def bulkFetchFileIDsPanda(self, nIDs):
2783         comment = " /* JediDBProxy.bulkFetchFileIDsPanda */"
2784         tmp_log = self.create_tagged_logger(comment, f"nIDs={nIDs}")
2785         tmp_log.debug("start")
2786         try:
2787             newFileIDs = []
2788             varMap = {}
2789             varMap[":nIDs"] = nIDs
2790             # sql to get fileID
2791             sqlFID = "SELECT ATLAS_PANDA.FILESTABLE4_ROW_ID_SEQ.nextval FROM "
2792             sqlFID += "(SELECT level FROM dual CONNECT BY level<=:nIDs) "
2793             # start transaction
2794             self.conn.begin()
2795             self.cur.arraysize = 10000
2796             self.cur.execute(sqlFID + comment, varMap)
2797             resFID = self.cur.fetchall()
2798             for (fileID,) in resFID:
2799                 newFileIDs.append(fileID)
2800             # commit
2801             if not self._commit():
2802                 raise RuntimeError("Commit error")
2803             tmp_log.debug(f"got {len(newFileIDs)} IDs")
2804             return newFileIDs
2805         except Exception:
2806             # roll back
2807             self._rollback()
2808             # error
2809             self.dump_error_message(tmp_log)
2810             return []
2811 
2812     # get LNFs for jumbo job
2813     def getLFNsForJumbo(self, jediTaskID):
2814         comment = " /* DBProxy.getLFNsForJumbo */"
2815         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
2816         tmp_log.debug("start")
2817         try:
2818             sqlS = "SELECT lfn,scope FROM {0}.JEDI_Datasets d, {0}.JEDI_Dataset_Contents c ".format(panda_config.schemaJEDI)
2819             sqlS += "WHERE d.jediTaskID=c.jediTaskID AND d.datasetID=c.datasetID AND d.jediTaskID=:jediTaskID "
2820             sqlS += "AND d.type IN (:type1,:type2) AND d.masterID IS NULL "
2821             retSet = set()
2822             # start transaction
2823             self.conn.begin()
2824             varMap = {}
2825             varMap[":jediTaskID"] = jediTaskID
2826             varMap[":type1"] = "input"
2827             varMap[":type2"] = "pseudo_input"
2828             self.cur.execute(sqlS + comment, varMap)
2829             res = self.cur.fetchall()
2830             for tmpLFN, tmpScope in res:
2831                 name = f"{tmpScope}:{tmpLFN}"
2832                 retSet.add(name)
2833             # commit
2834             if not self._commit():
2835                 raise RuntimeError("Commit error")
2836             tmp_log.debug(f"has {len(retSet)} LFNs")
2837             return retSet
2838         except Exception:
2839             # roll back
2840             self._rollback()
2841             # error
2842             self.dump_error_message(tmp_log)
2843             return []
2844 
2845     # get number of started events
2846     def getNumStartedEvents(self, jobSpec):
2847         comment = " /* DBProxy.getNumStartedEvents */"
2848         tmp_log = self.create_tagged_logger(comment, f"PandaID={jobSpec.PandaID}")
2849         tmp_log.debug("start")
2850         try:
2851             # count the number of started ranges
2852             sqlCDO = "SELECT /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
2853             sqlCDO += f"COUNT(*) FROM {panda_config.schemaJEDI}.JEDI_Events tab "
2854             sqlCDO += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
2855             sqlCDO += "AND status IN (:esSent,:esRunning,:esFinished,:esDone) "
2856             # start transaction
2857             self.conn.begin()
2858             nEvt = 0
2859             for fileSpec in jobSpec.Files:
2860                 if fileSpec.type != "input":
2861                     continue
2862                 varMap = {}
2863                 varMap[":jediTaskID"] = fileSpec.jediTaskID
2864                 varMap[":datasetID"] = fileSpec.datasetID
2865                 varMap[":fileID"] = fileSpec.fileID
2866                 varMap[":esSent"] = EventServiceUtils.ST_sent
2867                 varMap[":esRunning"] = EventServiceUtils.ST_running
2868                 varMap[":esFinished"] = EventServiceUtils.ST_finished
2869                 varMap[":esDone"] = EventServiceUtils.ST_done
2870                 self.cur.execute(sqlCDO + comment, varMap)
2871                 res = self.cur.fetchone()
2872                 if res is not None:
2873                     nEvt += res[0]
2874             # commit
2875             if not self._commit():
2876                 raise RuntimeError("Commit error")
2877             tmp_log.debug(f"{nEvt} events started")
2878             return nEvt
2879         except Exception:
2880             # roll back
2881             self._rollback()
2882             # error
2883             self.dump_error_message(tmp_log)
2884             return None
2885 
2886     # get JEDI file attributes
2887     def getJediFileAttributes(self, PandaID, jediTaskID, datasetID, fileID, attrs):
2888         comment = " /* DBProxy.getJediFileAttributes */"
2889         tmp_log = self.create_tagged_logger(comment, f"PandaID={PandaID}")
2890         tmp_log.debug(f"start for jediTaskID={jediTaskID} datasetId={datasetID} fileID={fileID}")
2891         try:
2892             # sql to get task attributes
2893             sqlRR = "SELECT "
2894             for attr in attrs:
2895                 sqlRR += f"{attr},"
2896             sqlRR = sqlRR[:-1]
2897             sqlRR += f" FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
2898             sqlRR += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
2899             varMap = {}
2900             varMap[":jediTaskID"] = jediTaskID
2901             varMap[":datasetID"] = datasetID
2902             varMap[":fileID"] = fileID
2903             # start transaction
2904             self.conn.begin()
2905             self.cur.execute(sqlRR + comment, varMap)
2906             resRR = self.cur.fetchone()
2907             # commit
2908             if not self._commit():
2909                 raise RuntimeError("Commit error")
2910             retVal = {}
2911             if resRR is not None:
2912                 for idx, attr in enumerate(attrs):
2913                     retVal[attr] = resRR[idx]
2914             tmp_log.debug(f"done {str(retVal)}")
2915             return retVal
2916         except Exception:
2917             # roll back
2918             self._rollback()
2919             # error
2920             self.dump_error_message(tmp_log)
2921             return {}
2922 
2923     # get jumbo job datasets
2924     def getJumboJobDatasets(self, n_days, grace_period):
2925         comment = " /* DBProxy.getJumboJobDatasets */"
2926         tmp_log = self.create_tagged_logger(comment, f"nDays={n_days}")
2927         tmp_log.debug("start")
2928         try:
2929             # sql to get workers
2930             sqlC = "SELECT t.jediTaskID,d.datasetName,t.status FROM ATLAS_PANDA.JEDI_Tasks t,ATLAS_PANDA.JEDI_Datasets d "
2931             sqlC += "WHERE t.prodSourceLabel='managed' AND t.useJumbo IS NOT NULL "
2932             sqlC += "AND t.modificationTime>CURRENT_DATE-:days AND t.modificationTime<CURRENT_DATE-:grace_period "
2933             sqlC += "AND t.status IN ('finished','done') "
2934             sqlC += "AND d.jediTaskID=t.jediTaskID AND d.type='output' "
2935             varMap = {}
2936             varMap[":days"] = n_days
2937             varMap[":grace_period"] = grace_period
2938             # start transaction
2939             self.conn.begin()
2940             self.cur.execute(sqlC + comment, varMap)
2941             resCs = self.cur.fetchall()
2942             retMap = dict()
2943             nDS = 0
2944             for jediTaskID, datasetName, status in resCs:
2945                 retMap.setdefault(jediTaskID, {"status": status, "datasets": []})
2946                 retMap[jediTaskID]["datasets"].append(datasetName)
2947                 nDS += 1
2948             # commit
2949             if not self._commit():
2950                 raise RuntimeError("Commit error")
2951             tmp_log.debug(f"got {nDS} datasets")
2952             return retMap
2953         except Exception:
2954             # roll back
2955             self._rollback()
2956             # error
2957             self.dump_error_message(tmp_log)
2958             return {}
2959 
2960     # get output datasets
2961     def getOutputDatasetsJEDI(self, panda_id):
2962         comment = " /* DBProxy.getOutputDatasetsJEDI */"
2963         tmp_log = self.create_tagged_logger(comment, f"PandaID={panda_id}")
2964         tmp_log.debug("start")
2965         try:
2966             # sql to get workers
2967             sqlC = "SELECT d.datasetID,d.datasetName FROM ATLAS_PANDA.filesTable4 f,ATLAS_PANDA.JEDI_Datasets d "
2968             sqlC += "WHERE f.PandaID=:PandaID AND f.type IN (:type1,:type2) AND d.jediTaskID=f.jediTaskID AND d.datasetID=f.datasetID "
2969             varMap = {}
2970             varMap[":PandaID"] = panda_id
2971             varMap[":type1"] = "output"
2972             varMap[":type2"] = "log"
2973             # start transaction
2974             self.conn.begin()
2975             self.cur.execute(sqlC + comment, varMap)
2976             retMap = dict()
2977             resCs = self.cur.fetchall()
2978             for datasetID, datasetName in resCs:
2979                 retMap[datasetID] = datasetName
2980             # commit
2981             if not self._commit():
2982                 raise RuntimeError("Commit error")
2983             tmp_log.debug(f"got {len(retMap)}")
2984             return retMap
2985         except Exception:
2986             # roll back
2987             self._rollback()
2988             # error
2989             self.dump_error_message(tmp_log)
2990             return {}
2991 
2992     # lock process
2993     def lockProcess_PANDA(self, component, pid, time_limit, force=False):
2994         comment = " /* DBProxy.lockProcess_PANDA */"
2995         tmp_log = self.create_tagged_logger(comment, f"component={component} pid={pid}")
2996         # defaults
2997         vo = "default"
2998         prodSourceLabel = "default"
2999         cloud = "default"
3000         workqueue_id = 0
3001         resource_name = "default"
3002         tmp_log.debug("start")
3003         try:
3004             retVal = False
3005             # sql to check
3006             sqlCT = (
3007                 "SELECT lockedBy "
3008                 "FROM {0}.JEDI_Process_Lock "
3009                 "WHERE vo=:vo AND prodSourceLabel=:prodSourceLabel "
3010                 "AND cloud=:cloud AND workqueue_id=:workqueue_id "
3011                 "AND resource_type=:resource_name AND component=:component "
3012                 "AND lockedTime>:lockedTime "
3013                 "FOR UPDATE"
3014             ).format(panda_config.schemaJEDI)
3015             # sql to delete
3016             sqlCD = (
3017                 "DELETE FROM {0}.JEDI_Process_Lock "
3018                 "WHERE vo=:vo AND prodSourceLabel=:prodSourceLabel "
3019                 "AND cloud=:cloud AND workqueue_id=:workqueue_id "
3020                 "AND resource_type=:resource_name AND component=:component "
3021             ).format(panda_config.schemaJEDI)
3022             # sql to insert
3023             sqlFR = (
3024                 "INSERT INTO {0}.JEDI_Process_Lock "
3025                 "(vo, prodSourceLabel, cloud, workqueue_id, resource_type, component, lockedBy, lockedTime) "
3026                 "VALUES(:vo, :prodSourceLabel, :cloud, :workqueue_id, :resource_name, :component, :lockedBy, CURRENT_DATE) "
3027             ).format(panda_config.schemaJEDI)
3028             # start transaction
3029             self.conn.begin()
3030             # check
3031             if not force:
3032                 varMap = {}
3033                 varMap[":vo"] = vo
3034                 varMap[":prodSourceLabel"] = prodSourceLabel
3035                 varMap[":cloud"] = cloud
3036                 varMap[":workqueue_id"] = workqueue_id
3037                 varMap[":resource_name"] = resource_name
3038                 varMap[":component"] = component
3039                 varMap[":lockedTime"] = naive_utcnow() - datetime.timedelta(minutes=time_limit)
3040                 self.cur.execute(sqlCT + comment, varMap)
3041                 resCT = self.cur.fetchone()
3042             else:
3043                 resCT = None
3044             if resCT is not None:
3045                 tmp_log.debug(f"skipped, locked by {resCT[0]}")
3046             else:
3047                 # delete
3048                 varMap = {}
3049                 varMap[":vo"] = vo
3050                 varMap[":prodSourceLabel"] = prodSourceLabel
3051                 varMap[":cloud"] = cloud
3052                 varMap[":workqueue_id"] = workqueue_id
3053                 varMap[":resource_name"] = resource_name
3054                 varMap[":component"] = component
3055                 self.cur.execute(sqlCD + comment, varMap)
3056                 # insert
3057                 varMap = {}
3058                 varMap[":vo"] = vo
3059                 varMap[":prodSourceLabel"] = prodSourceLabel
3060                 varMap[":cloud"] = cloud
3061                 varMap[":workqueue_id"] = workqueue_id
3062                 varMap[":resource_name"] = resource_name
3063                 varMap[":component"] = component
3064                 varMap[":lockedBy"] = pid
3065                 self.cur.execute(sqlFR + comment, varMap)
3066                 tmp_log.debug("successfully locked")
3067                 retVal = True
3068             # commit
3069             if not self._commit():
3070                 raise RuntimeError("Commit error")
3071             return retVal
3072         except Exception:
3073             # roll back
3074             self._rollback()
3075             # error
3076             self.dump_error_message(tmp_log)
3077             return retVal
3078 
3079     # unlock process
3080     def unlockProcess_PANDA(self, component, pid):
3081         comment = " /* DBProxy.unlockProcess_PANDA */"
3082         tmp_log = self.create_tagged_logger(comment, f"component={component} pid={pid}")
3083         # defaults
3084         vo = "default"
3085         prodSourceLabel = "default"
3086         cloud = "default"
3087         workqueue_id = 0
3088         resource_name = "default"
3089         tmp_log.debug("start")
3090         try:
3091             retVal = False
3092             # sql to delete
3093             sqlCD = (
3094                 "DELETE FROM {0}.JEDI_Process_Lock "
3095                 "WHERE vo=:vo AND prodSourceLabel=:prodSourceLabel AND cloud=:cloud "
3096                 "AND workqueue_id=:workqueue_id AND lockedBy=:lockedBy "
3097                 "AND resource_type=:resource_name AND component=:component "
3098             ).format(panda_config.schemaJEDI)
3099             # start transaction
3100             self.conn.begin()
3101             # check
3102             varMap = {}
3103             varMap[":vo"] = vo
3104             varMap[":prodSourceLabel"] = prodSourceLabel
3105             varMap[":cloud"] = cloud
3106             varMap[":workqueue_id"] = workqueue_id
3107             varMap[":resource_name"] = resource_name
3108             varMap[":component"] = component
3109             varMap[":lockedBy"] = pid
3110             self.cur.execute(sqlCD + comment, varMap)
3111             # commit
3112             if not self._commit():
3113                 raise RuntimeError("Commit error")
3114             tmp_log.debug("done")
3115             retVal = True
3116             return retVal
3117         except Exception:
3118             # roll back
3119             self._rollback()
3120             # error
3121             self.dump_error_message(tmp_log)
3122             return retVal
3123 
3124     # check process lock
3125     def checkProcessLock_PANDA(self, component, pid, time_limit, check_base=False):
3126         comment = " /* DBProxy.checkProcessLock_PANDA */"
3127         tmp_log = self.create_tagged_logger(comment, f"component={component} pid={pid}")
3128         # defaults
3129         vo = "default"
3130         prodSourceLabel = "default"
3131         cloud = "default"
3132         workqueue_id = 0
3133         resource_name = "default"
3134         tmp_log.debug("start")
3135         try:
3136             retVal = False, None
3137             # sql to check
3138             sqlCT = (
3139                 "SELECT lockedBy, lockedTime "
3140                 "FROM {0}.JEDI_Process_Lock "
3141                 "WHERE vo=:vo AND prodSourceLabel=:prodSourceLabel "
3142                 "AND cloud=:cloud AND workqueue_id=:workqueue_id "
3143                 "AND resource_type=:resource_name AND component=:component "
3144                 "AND lockedTime>:lockedTime "
3145             ).format(panda_config.schemaJEDI)
3146             # start transaction
3147             self.conn.begin()
3148             # check
3149             varMap = {}
3150             varMap[":vo"] = vo
3151             varMap[":prodSourceLabel"] = prodSourceLabel
3152             varMap[":cloud"] = cloud
3153             varMap[":workqueue_id"] = workqueue_id
3154             varMap[":resource_name"] = resource_name
3155             varMap[":component"] = component
3156             varMap[":lockedTime"] = naive_utcnow() - datetime.timedelta(minutes=time_limit)
3157             self.cur.execute(sqlCT + comment, varMap)
3158             resCT = self.cur.fetchone()
3159             if resCT is not None:
3160                 lockedBy, lockedTime = resCT
3161                 if check_base:
3162                     # check only base part
3163                     if not lockedBy.startswith(pid):
3164                         retVal = True, lockedTime
3165                 else:
3166                     # check whole string
3167                     if lockedBy != pid:
3168                         retVal = True, lockedTime
3169                 if retVal[0]:
3170                     tmp_log.debug(f"found locked by {lockedBy} at {lockedTime.strftime('%Y-%m-%d_%H:%M:%S')}")
3171                 else:
3172                     tmp_log.debug("found unlocked")
3173             # commit
3174             if not self._commit():
3175                 raise RuntimeError("Commit error")
3176             tmp_log.debug("done")
3177             return retVal
3178         except Exception:
3179             # roll back
3180             self._rollback()
3181             # error
3182             self.dump_error_message(tmp_log)
3183             return retVal
3184 
3185     # update problematic resource info for user
3186     def update_problematic_resource_info(self, user_name, jedi_task_id, resource, problem_type):
3187         comment = " /* DBProxy.update_problematic_resource_info */"
3188         tmp_log = self.create_tagged_logger(comment, f"user={user_name} jediTaskID={jedi_task_id}")
3189         tmp_log.debug("start")
3190         retVal = False
3191         try:
3192             if problem_type not in ["dest", None]:
3193                 tmp_log.debug(f"unknown problem type: {problem_type}")
3194                 return None
3195             sqlR = "SELECT pagecache FROM ATLAS_PANDAMETA.users " "WHERE name=:name "
3196             sqlW = "UPDATE ATLAS_PANDAMETA.users SET pagecache=:data " "WHERE name=:name "
3197             # string to use a dict key
3198             jedi_task_id = str(jedi_task_id)
3199             # start transaction
3200             self.conn.begin()
3201             # read
3202             varMap = {}
3203             varMap[":name"] = user_name
3204             self.cur.execute(sqlR + comment, varMap)
3205             data = self.cur.fetchone()
3206             if data is None:
3207                 tmp_log.debug("user not found")
3208             else:
3209                 try:
3210                     data = json.loads(data[0])
3211                 except Exception:
3212                     data = {}
3213                 if problem_type is not None:
3214                     data.setdefault(problem_type, {})
3215                     data[problem_type].setdefault(jedi_task_id, {})
3216                     data[problem_type][jedi_task_id].setdefault(resource, None)
3217                     old = data[problem_type][jedi_task_id][resource]
3218                     if old is None or datetime.datetime.now(datetime.timezone.utc) - datetime.datetime.fromtimestamp(
3219                         old, datetime.timezone.utc
3220                     ) > datetime.timedelta(days=1):
3221                         retVal = True
3222                         data[problem_type][jedi_task_id][resource] = time.time()
3223                 # delete old data
3224                 for p in list(data):
3225                     for t in list(data[p]):
3226                         for r in list(data[p][t]):
3227                             ts = data[p][t][r]
3228                             if datetime.datetime.now(datetime.timezone.utc) - datetime.datetime.fromtimestamp(ts, datetime.timezone.utc) > datetime.timedelta(
3229                                 days=7
3230                             ):
3231                                 del data[p][t][r]
3232                         if not data[p][t]:
3233                             del data[p][t]
3234                     if not data[p]:
3235                         del data[p]
3236                 # update
3237                 varMap = {}
3238                 varMap[":name"] = user_name
3239                 varMap[":data"] = json.dumps(data)
3240                 self.cur.execute(sqlW + comment, varMap)
3241             # commit
3242             if not self._commit():
3243                 raise RuntimeError("Commit error")
3244             tmp_log.debug(f"done with {retVal} : {str(data)}")
3245             return retVal
3246         except Exception:
3247             # roll back
3248             self._rollback()
3249             # error
3250             self.dump_error_message(tmp_log)
3251             return None
3252 
3253     # get LFNs in datasets
3254     def get_files_in_datasets(self, task_id, dataset_types):
3255         comment = " /* DBProxy.get_lfns_in_datasets */"
3256         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={task_id}")
3257         tmp_log.debug("start")
3258         try:
3259             varMap = {}
3260             varMap[":jediTaskID"] = task_id
3261             sqlD = f"SELECT datasetName,datasetID FROM {panda_config.schemaJEDI}.JEDI_Datasets WHERE jediTaskID=:jediTaskID "
3262             # Old API expects comma separated types, while new API is taking directly a tuple of dataset types
3263             if type(dataset_types) == str:
3264                 dataset_types = dataset_types.split(",")
3265             dstype_var_names_str, dstype_var_map = get_sql_IN_bind_variables(dataset_types, prefix=":", value_as_suffix=True)
3266             sqlD += f"AND type IN ({dstype_var_names_str}) "
3267             varMap.update(dstype_var_map)
3268             sqlS = f"SELECT lfn,scope,fileID,status FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
3269             sqlS += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID ORDER BY fileID "
3270             retVal = []
3271             # start transaction
3272             self.conn.begin()
3273             self.cur.execute(sqlD + comment, varMap)
3274             res = self.cur.fetchall()
3275             for datasetName, datasetID in res:
3276                 datasetDict = {}
3277                 datasetDict["name"] = datasetName
3278                 datasetDict["id"] = datasetID
3279                 # read files
3280                 varMap = {}
3281                 varMap[":jediTaskID"] = task_id
3282                 varMap[":datasetID"] = datasetID
3283                 self.cur.execute(sqlS + comment, varMap)
3284                 resF = self.cur.fetchall()
3285                 fileList = []
3286                 for lfn, fileScope, fileID, status in resF:
3287                     fileDict = {
3288                         "lfn": lfn,
3289                         "scope": fileScope,
3290                         "id": fileID,
3291                         "status": status,
3292                     }
3293                     fileList.append(fileDict)
3294                 retVal.append({"dataset": datasetDict, "files": fileList})
3295             # commit
3296             if not self._commit():
3297                 raise RuntimeError("Commit error")
3298             tmp_log.debug("done")
3299             return retVal
3300         except Exception:
3301             # roll back
3302             self._rollback()
3303             # error
3304             self.dump_error_message(tmp_log)
3305             return None
3306 
3307     # update datasets asynchronously outside propagateResultToJEDI to avoid row contentions
3308     def async_update_datasets(self, panda_id):
3309         comment = " /* DBProxy.async_update_datasets */"
3310         g_tmp_log = self.create_tagged_logger(comment)
3311         g_tmp_log.debug("start")
3312         try:
3313             if panda_id is not None:
3314                 panda_id_list = [panda_id]
3315             else:
3316                 # get PandaIDs
3317                 sql = f"SELECT DISTINCT PandaID FROM {panda_config.schemaPANDA}.SQL_QUEUE WHERE topic=:topic AND creationTime<:timeLimit"
3318                 varMap = {
3319                     ":topic": SQL_QUEUE_TOPIC_async_dataset_update,
3320                     ":timeLimit": naive_utcnow() - datetime.timedelta(minutes=1),
3321                 }
3322                 # start transaction
3323                 self.conn.begin()
3324                 self.cur.arraysize = 10000
3325                 self.cur.execute(sql + comment, varMap)
3326                 panda_id_list = [i[0] for i in self.cur.fetchall()]
3327                 # commit
3328                 if not self._commit():
3329                     raise RuntimeError("Commit error")
3330             if not panda_id_list:
3331                 g_tmp_log.debug("done since no IDs are available")
3332                 return None
3333             # loop over all IDs
3334             for tmp_id in panda_id_list:
3335                 tmp_log = self.create_tagged_logger(comment, f"PandaID={tmp_id}")
3336                 sqlL = "SELECT data FROM {0}.SQL_QUEUE WHERE topic=:topic AND PandaID=:PandaID ORDER BY " "execution_order FOR UPDATE NOWAIT ".format(
3337                     panda_config.schemaPANDA
3338                 )
3339                 sqlD = f"DELETE FROM {panda_config.schemaPANDA}.SQL_QUEUE WHERE PandaID=:PandaID "
3340                 n_try = 5
3341                 all_ok = True
3342                 for i_try in range(n_try):
3343                     all_ok = True
3344                     query_list = []
3345                     tmp_log.debug(f"Trying PandaID={tmp_id} {i_try+1}/{n_try}")
3346                     tmp_data_list = None
3347                     # start transaction
3348                     self.conn.begin()
3349                     # lock queries
3350                     try:
3351                         var_map = {
3352                             ":topic": SQL_QUEUE_TOPIC_async_dataset_update,
3353                             ":PandaID": tmp_id,
3354                         }
3355                         self.cur.execute(sqlL + comment, var_map)
3356                         tmp_data_list = self.cur.fetchall()
3357                     except Exception:
3358                         tmp_log.debug("cannot lock queries")
3359                         all_ok = False
3360                     if tmp_data_list:
3361                         # execute queries
3362                         if all_ok:
3363                             for (tmp_data,) in tmp_data_list:
3364                                 sql, var_map = json.loads(tmp_data)
3365                                 query_list.append((sql, var_map))
3366                                 try:
3367                                     self.cur.execute(sql + comment, var_map)
3368                                 except Exception:
3369                                     tmp_log.error(f'failed to execute "{sql}" var={str(var_map)}')
3370                                     self.dump_error_message(tmp_log)
3371                                     all_ok = False
3372                                     break
3373                         # delete queries
3374                         if all_ok:
3375                             var_map = {":PandaID": tmp_id}
3376                             self.cur.execute(sqlD + comment, var_map)
3377                     # commit
3378                     if all_ok:
3379                         if not self._commit():
3380                             raise RuntimeError("Commit error")
3381                         for sql, var_map in query_list:
3382                             tmp_log.debug(sql + str(var_map))
3383                         tmp_log.debug("done")
3384                         break
3385                     else:
3386                         self._rollback()
3387                         if i_try + 1 < n_try:
3388                             time.sleep(1)
3389                 if not all_ok:
3390                     tmp_log.error("all attempts failed")
3391             g_tmp_log.debug(f"processed {len(panda_id_list)} IDs")
3392             return True
3393         except Exception:
3394             # roll back
3395             self._rollback()
3396             # error
3397             self.dump_error_message(g_tmp_log)
3398             return False
3399 
3400     # get datasets of input and lib, to update data locality records
3401     def get_tasks_inputdatasets_JEDI(self, vo):
3402         comment = " /* JediDBProxy.get_tasks_inputdatasets_JEDI */"
3403         # last update time
3404         tmpLog = self.create_tagged_logger(comment, f"vo={vo}")
3405         tmpLog.debug("start")
3406         now_ts = naive_utcnow()
3407         try:
3408             retVal = None
3409             # sql to get all jediTaskID and datasetID of input
3410             sql = (
3411                 "SELECT tabT.jediTaskID,datasetID, tabD.datasetName "
3412                 "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_Datasets tabD,{0}.JEDI_AUX_Status_MinTaskID tabA "
3413                 "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID AND tabT.jediTaskID=tabD.jediTaskID "
3414                 "AND tabT.vo=:vo AND tabT.status IN ('running', 'ready', 'scouting', 'pending') "
3415                 "AND tabD.type IN ('input') AND tabD.masterID IS NULL "
3416             ).format(panda_config.schemaJEDI)
3417             # start transaction
3418             self.conn.begin()
3419             # get
3420             varMap = {}
3421             varMap[":vo"] = vo
3422             self.cur.execute(sql + comment, varMap)
3423             res = self.cur.fetchall()
3424             nRows = self.cur.rowcount
3425             # commit
3426             if not self._commit():
3427                 raise RuntimeError("Commit error")
3428             # return
3429             retVal = res
3430             tmpLog.debug(f"done with {nRows} rows")
3431             return retVal
3432         except Exception:
3433             # roll back
3434             self._rollback()
3435             # error
3436             self.dump_error_message(tmpLog)
3437             return retVal
3438 
3439     # update dataset locality
3440     def updateDatasetLocality_JEDI(self, jedi_taskid, datasetid, rse):
3441         comment = " /* JediDBProxy.updateDatasetLocality_JEDI */"
3442         # last update time
3443         timestamp = naive_utcnow()
3444         timestamp_str = timestamp.strftime("%Y-%m-%d_%H:%M:%S")
3445         tmpLog = self.create_tagged_logger(comment, f"taskID={jedi_taskid} datasetID={datasetid} rse={rse} timestamp={timestamp_str}")
3446         # tmpLog.debug('start')
3447         try:
3448             retVal = False
3449             # sql to check
3450             sqlC = f"SELECT timestamp FROM {panda_config.schemaJEDI}.JEDI_Dataset_Locality WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND rse=:rse "
3451             # sql to insert
3452             sqlI = (
3453                 "INSERT INTO {0}.JEDI_Dataset_Locality " "(jediTaskID, datasetID, rse, timestamp) " "VALUES (:jediTaskID, :datasetID, :rse, :timestamp)"
3454             ).format(panda_config.schemaJEDI)
3455             # sql to update
3456             sqlU = (
3457                 "UPDATE {0}.JEDI_Dataset_Locality " "SET timestamp=:timestamp " "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND rse=:rse "
3458             ).format(panda_config.schemaJEDI)
3459             # start transaction
3460             self.conn.begin()
3461             # check
3462             varMap = {}
3463             varMap[":jediTaskID"] = jedi_taskid
3464             varMap[":datasetID"] = datasetid
3465             varMap[":rse"] = rse
3466             self.cur.execute(sqlC + comment, varMap)
3467             resC = self.cur.fetchone()
3468             varMap[":timestamp"] = timestamp
3469             if resC is None:
3470                 # insert if missing
3471                 tmpLog.debug("insert")
3472                 self.cur.execute(sqlI + comment, varMap)
3473             else:
3474                 # update
3475                 tmpLog.debug("update")
3476                 self.cur.execute(sqlU + comment, varMap)
3477             # commit
3478             if not self._commit():
3479                 raise RuntimeError("Commit error")
3480             # return
3481             retVal = True
3482             # tmpLog.debug('done')
3483             return retVal
3484         except Exception:
3485             # roll back
3486             self._rollback()
3487             # error
3488             self.dump_error_message(tmpLog)
3489             return retVal
3490 
3491     # delete outdated dataset locality records
3492     def deleteOutdatedDatasetLocality_JEDI(self, before_timestamp):
3493         comment = " /* JediDBProxy.deleteOutdatedDatasetLocality_JEDI */"
3494         # last update time
3495         before_timestamp_str = before_timestamp.strftime("%Y-%m-%d_%H:%M:%S")
3496         tmpLog = self.create_tagged_logger(comment, f"before_timestamp={before_timestamp_str}")
3497         tmpLog.debug("start")
3498         try:
3499             retVal = 0
3500             # sql to delete
3501             sqlD = f"DELETE FROM {panda_config.schemaJEDI}.Jedi_Dataset_Locality WHERE timestamp<=:timestamp "
3502             # start transaction
3503             self.conn.begin()
3504             # check
3505             varMap = {}
3506             varMap[":timestamp"] = before_timestamp
3507             # delete
3508             self.cur.execute(sqlD + comment, varMap)
3509             retVal = self.cur.rowcount
3510             # commit
3511             if not self._commit():
3512                 raise RuntimeError("Commit error")
3513             # return
3514             tmpLog.debug(f"done, deleted {retVal} records")
3515             return retVal
3516         except Exception:
3517             # roll back
3518             self._rollback()
3519             # error
3520             self.dump_error_message(tmpLog)
3521             return retVal
3522 
3523     # append input datasets for incremental execution
3524     def appendDatasets_JEDI(self, jediTaskID, inMasterDatasetSpecList, inSecDatasetSpecList):
3525         comment = " /* JediDBProxy.appendDatasets_JEDI */"
3526         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
3527         tmpLog.debug("start")
3528         goDefined = False
3529         refreshContents = False
3530         commandStr = "incexec"
3531         try:
3532             # start transaction
3533             self.conn.begin()
3534             self.cur.arraysize = 100000
3535             # check task status
3536             varMap = {}
3537             varMap[":jediTaskID"] = jediTaskID
3538             sqlTK = f"SELECT status FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID FOR UPDATE "
3539             self.cur.execute(sqlTK + comment, varMap)
3540             resTK = self.cur.fetchone()
3541             if resTK is None:
3542                 # task not found
3543                 msgStr = "task not found"
3544                 tmpLog.debug(msgStr)
3545             else:
3546                 (taskStatus,) = resTK
3547                 # invalid status
3548                 if taskStatus != JediTaskSpec.commandStatusMap()[commandStr]["done"]:
3549                     msgStr = f"invalid status={taskStatus} for dataset appending"
3550                     tmpLog.debug(msgStr)
3551                 else:
3552                     timeNow = naive_utcnow()
3553                     # list of master dataset names
3554                     master_dataset_names = [datasetSpec.datasetName for datasetSpec in inMasterDatasetSpecList]
3555                     # get existing input datasets
3556                     varMap = {}
3557                     varMap[":jediTaskID"] = jediTaskID
3558                     sqlDS = "SELECT datasetName,datasetID,status,nFilesTobeUsed,nFilesUsed,masterID "
3559                     sqlDS += f"FROM {panda_config.schemaJEDI}.JEDI_Datasets "
3560                     sqlDS += f"WHERE jediTaskID=:jediTaskID AND type IN ({INPUT_TYPES_var_str}) "
3561                     varMap.update(INPUT_TYPES_var_map)
3562                     self.cur.execute(sqlDS + comment, varMap)
3563                     resDS = self.cur.fetchall()
3564                     # check if existing datasets are available, and update status if necessary
3565                     sql_ex = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets SET status=:status WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
3566                     existingDatasets = {}
3567                     for datasetName, dataset_id, datasetStatus, nFilesTobeUsed, nFilesUsed, masterID in resDS:
3568                         # only master datasets with remaining files
3569                         try:
3570                             if masterID is None and (nFilesTobeUsed - nFilesUsed > 0 or datasetStatus in JediDatasetSpec.statusToUpdateContents()):
3571                                 # the dataset was removed before and then added to the container again
3572                                 to_update_status = False
3573                                 if datasetStatus == "removed" and datasetName in master_dataset_names:
3574                                     to_update_status = True
3575                                     var_map = {":status": "defined", ":jediTaskID": jediTaskID, ":datasetID": dataset_id}
3576                                 # the dataset was removed
3577                                 elif datasetStatus != "removed" and datasetName not in master_dataset_names:
3578                                     to_update_status = True
3579                                     var_map = {":status": "removed", ":jediTaskID": jediTaskID, ":datasetID": dataset_id}
3580                                 # update status for removed/recovered dataset
3581                                 if to_update_status:
3582                                     tmpLog.debug(f"""set status={var_map[":status"]} from {datasetStatus} for {datasetName}""")
3583                                     self.cur.execute(sql_ex + comment, var_map)
3584                                     datasetStatus = var_map[":status"]
3585                                 if datasetStatus != "removed":
3586                                     goDefined = True
3587                                     if datasetStatus in JediDatasetSpec.statusToUpdateContents():
3588                                         refreshContents = True
3589                         except Exception:
3590                             pass
3591                         existingDatasets[datasetName] = datasetStatus
3592                     # insert datasets
3593                     sqlID = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Datasets ({JediDatasetSpec.columnNames()}) "
3594                     sqlID += JediDatasetSpec.bindValuesExpression()
3595                     sqlID += " RETURNING datasetID INTO :newDatasetID"
3596                     for datasetSpec in inMasterDatasetSpecList:
3597                         # skip existing datasets
3598                         if datasetSpec.datasetName in existingDatasets:
3599                             # check dataset status and remaining files
3600                             if existingDatasets[datasetSpec.datasetName] in JediDatasetSpec.statusToUpdateContents():
3601                                 goDefined = True
3602                             continue
3603                         datasetSpec.creationTime = timeNow
3604                         datasetSpec.modificationTime = timeNow
3605                         varMap = datasetSpec.valuesMap(useSeq=True)
3606                         varMap[":newDatasetID"] = self.cur.var(varNUMBER)
3607                         # insert dataset
3608                         tmpLog.debug(f"append {datasetSpec.datasetName}")
3609                         self.cur.execute(sqlID + comment, varMap)
3610                         val = self.getvalue_corrector(self.cur.getvalue(varMap[":newDatasetID"]))
3611                         datasetID = int(val)
3612                         masterID = datasetID
3613                         datasetSpec.datasetID = datasetID
3614                         # insert secondary datasets
3615                         for datasetSpec in inSecDatasetSpecList:
3616                             datasetSpec.creationTime = timeNow
3617                             datasetSpec.modificationTime = timeNow
3618                             datasetSpec.masterID = masterID
3619                             varMap = datasetSpec.valuesMap(useSeq=True)
3620                             varMap[":newDatasetID"] = self.cur.var(varNUMBER)
3621                             # insert dataset
3622                             self.cur.execute(sqlID + comment, varMap)
3623                             val = self.getvalue_corrector(self.cur.getvalue(varMap[":newDatasetID"]))
3624                             datasetID = int(val)
3625                             datasetSpec.datasetID = datasetID
3626                         goDefined = True
3627                     # update task
3628                     deft_staus = None
3629                     sqlUT = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
3630                     sqlUT += "SET status=:status,lockedBy=NULL,lockedTime=NULL,modificationtime=:updateTime,stateChangeTime=CURRENT_DATE "
3631                     sqlUT += "WHERE jediTaskID=:jediTaskID "
3632                     varMap = {}
3633                     varMap[":jediTaskID"] = jediTaskID
3634                     if goDefined:
3635                         # no new datasets
3636                         if inMasterDatasetSpecList == [] and not refreshContents:
3637                             # pass to JobGenerator
3638                             varMap[":status"] = "ready"
3639                             deft_staus = "ready"
3640                         else:
3641                             # pass to ContentsFeeder
3642                             varMap[":status"] = "defined"
3643                             deft_staus = "registered"
3644                     else:
3645                         # go to finalization since no datasets are appended
3646                         varMap[":status"] = "prepared"
3647                     # set old update time to trigger subsequent process
3648                     varMap[":updateTime"] = naive_utcnow() - datetime.timedelta(hours=6)
3649                     tmpLog.debug(f"set taskStatus={varMap[':status']}")
3650                     self.cur.execute(sqlUT + comment, varMap)
3651                     # update DEFT status
3652                     if deft_staus is not None:
3653                         self.setDeftStatus_JEDI(jediTaskID, deft_staus)
3654                         self.setSuperStatus_JEDI(jediTaskID, deft_staus)
3655                     # add missing record_task_status_change and push_task_status_message updates
3656                     self.record_task_status_change(jediTaskID)
3657                     self.push_task_status_message(None, jediTaskID, varMap[":status"])
3658 
3659             # commit
3660             if not self._commit():
3661                 raise RuntimeError("Commit error")
3662             # return
3663             tmpLog.debug("done")
3664             return True
3665         except Exception:
3666             # roll back
3667             self._rollback()
3668             # error
3669             self.dump_error_message(tmpLog)
3670             return False
3671 
3672     # insert dataset to the JEDI datasets table
3673     def insertDataset_JEDI(self, datasetSpec):
3674         comment = " /* JediDBProxy.insertDataset_JEDI */"
3675         tmpLog = self.create_tagged_logger(comment)
3676         tmpLog.debug("start")
3677         try:
3678             # set attributes
3679             timeNow = naive_utcnow()
3680             datasetSpec.creationTime = timeNow
3681             datasetSpec.modificationTime = timeNow
3682             # sql
3683             sql = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Datasets ({JediDatasetSpec.columnNames()}) "
3684             sql += JediDatasetSpec.bindValuesExpression()
3685             sql += " RETURNING datasetID INTO :newDatasetID"
3686             varMap = datasetSpec.valuesMap(useSeq=True)
3687             varMap[":newDatasetID"] = self.cur.var(varNUMBER)
3688             # begin transaction
3689             self.conn.begin()
3690             # insert dataset
3691             self.cur.execute(sql + comment, varMap)
3692             # commit
3693             if not self._commit():
3694                 raise RuntimeError("Commit error")
3695             tmpLog.debug("done")
3696             val = self.getvalue_corrector(self.cur.getvalue(varMap[":newDatasetID"]))
3697             return True, int(val)
3698         except Exception:
3699             # roll back
3700             self._rollback()
3701             # error
3702             self.dump_error_message(tmpLog)
3703             return False, None
3704 
3705     # update JEDI dataset
3706     def updateDataset_JEDI(self, datasetSpec, criteria, lockTask):
3707         comment = " /* JediDBProxy.updateDataset_JEDI */"
3708         tmpLog = self.create_tagged_logger(comment, f"datasetID={datasetSpec.datasetID}")
3709         tmpLog.debug("start")
3710         # return value for failure
3711         failedRet = False, 0
3712         # no criteria
3713         if criteria == {}:
3714             tmpLog.error("no selection criteria")
3715             return failedRet
3716         # check criteria
3717         for tmpKey in criteria.keys():
3718             if not hasattr(datasetSpec, tmpKey):
3719                 tmpLog.error(f"unknown attribute {tmpKey} is used in criteria")
3720                 return failedRet
3721         try:
3722             # set attributes
3723             timeNow = naive_utcnow()
3724             datasetSpec.modificationTime = timeNow
3725             # values for UPDATE
3726             varMap = datasetSpec.valuesMap(useSeq=False, onlyChanged=True)
3727             # sql for update
3728             sql = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets SET {datasetSpec.bindUpdateChangesExpression()} WHERE "
3729             useAND = False
3730             for tmpKey, tmpVal in criteria.items():
3731                 crKey = f":cr_{tmpKey}"
3732                 if useAND:
3733                     sql += " AND"
3734                 else:
3735                     useAND = True
3736                 sql += f" {tmpKey}={crKey}"
3737                 varMap[crKey] = tmpVal
3738 
3739             # sql for loc
3740             varMapLock = {}
3741             varMapLock[":jediTaskID"] = datasetSpec.jediTaskID
3742             sqlLock = f"SELECT 1 FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID FOR UPDATE"
3743             # begin transaction
3744             self.conn.begin()
3745             # lock task
3746             if lockTask:
3747                 self.cur.execute(sqlLock + comment, varMapLock)
3748             # update dataset
3749             tmpLog.debug(sql + comment + str(varMap))
3750             self.cur.execute(sql + comment, varMap)
3751             # the number of updated rows
3752             nRows = self.cur.rowcount
3753             # commit
3754             if not self._commit():
3755                 raise RuntimeError("Commit error")
3756             tmpLog.debug(f"updated {nRows} rows")
3757             return True, nRows
3758         except Exception:
3759             # roll back
3760             self._rollback()
3761             # error
3762             self.dump_error_message(tmpLog)
3763             return failedRet
3764 
3765     # update JEDI dataset attributes
3766     def updateDatasetAttributes_JEDI(self, jediTaskID, datasetID, attributes):
3767         comment = " /* JediDBProxy.updateDatasetAttributes_JEDI */"
3768         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} datasetID={datasetID}")
3769         tmpLog.debug("start")
3770         # return value for failure
3771         failedRet = False
3772         try:
3773             # sql for update
3774             sql = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets SET "
3775             # values for UPDATE
3776             varMap = {}
3777             varMap[":jediTaskID"] = jediTaskID
3778             varMap[":datasetID"] = datasetID
3779             for tmpKey, tmpVal in attributes.items():
3780                 crKey = f":{tmpKey}"
3781                 sql += f"{tmpKey}={crKey},"
3782                 varMap[crKey] = tmpVal
3783             sql = sql[:-1]
3784             sql += " "
3785             sql += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
3786             # begin transaction
3787             self.conn.begin()
3788             # update dataset
3789             tmpLog.debug(sql + comment + str(varMap))
3790             self.cur.execute(sql + comment, varMap)
3791             # the number of updated rows
3792             nRows = self.cur.rowcount
3793             # commit
3794             if not self._commit():
3795                 raise RuntimeError("Commit error")
3796             tmpLog.debug(f"updated {nRows} rows")
3797             return True, nRows
3798         except Exception:
3799             # roll back
3800             self._rollback()
3801             # error
3802             self.dump_error_message(tmpLog)
3803             return failedRet
3804 
3805     # get JEDI dataset attributes
3806     def getDatasetAttributes_JEDI(self, jediTaskID, datasetID, attributes):
3807         comment = " /* JediDBProxy.getDatasetAttributes_JEDI */"
3808         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} datasetID={datasetID}")
3809         tmpLog.debug("start")
3810         # return value for failure
3811         failedRet = {}
3812         try:
3813             # sql for get attributes
3814             sql = "SELECT "
3815             for tmpKey in attributes:
3816                 sql += f"{tmpKey},"
3817             sql = sql[:-1] + " "
3818             sql += f"FROM {panda_config.schemaJEDI}.JEDI_Datasets "
3819             sql += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
3820             # values for UPDATE
3821             varMap = {}
3822             varMap[":jediTaskID"] = jediTaskID
3823             varMap[":datasetID"] = datasetID
3824             # begin transaction
3825             self.conn.begin()
3826             # select
3827             self.cur.execute(sql + comment, varMap)
3828             res = self.cur.fetchone()
3829             # commit
3830             if not self._commit():
3831                 raise RuntimeError("Commit error")
3832             # make return
3833             retMap = {}
3834             if res is not None:
3835                 for tmpIdx, tmpKey in enumerate(attributes):
3836                     retMap[tmpKey] = res[tmpIdx]
3837             tmpLog.debug(f"got {str(retMap)}")
3838             return retMap
3839         except Exception:
3840             # roll back
3841             self._rollback()
3842             # error
3843             self.dump_error_message(tmpLog)
3844             return failedRet
3845 
3846     # get JEDI dataset attributes with map
3847     def getDatasetAttributesWithMap_JEDI(self, jediTaskID, criteria, attributes):
3848         comment = " /* JediDBProxy.getDatasetAttributesWithMap_JEDI */"
3849         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} criteria={str(criteria)}")
3850         tmpLog.debug("start")
3851         # return value for failure
3852         failedRet = {}
3853         try:
3854             varMap = {}
3855             varMap[":jediTaskID"] = jediTaskID
3856             # sql for get attributes
3857             sql = "SELECT "
3858             for tmpKey in attributes:
3859                 sql += f"{tmpKey},"
3860             sql = sql[:-1] + " "
3861             sql += f"FROM {panda_config.schemaJEDI}.JEDI_Datasets "
3862             sql += "WHERE jediTaskID=:jediTaskID "
3863             for crKey, crVal in criteria.items():
3864                 sql += "AND {0}=:{0} ".format(crKey)
3865                 varMap[f":{crKey}"] = crVal
3866             # begin transaction
3867             self.conn.begin()
3868             # select
3869             self.cur.execute(sql + comment, varMap)
3870             res = self.cur.fetchone()
3871             # commit
3872             if not self._commit():
3873                 raise RuntimeError("Commit error")
3874             # make return
3875             retMap = {}
3876             if res is not None:
3877                 for tmpIdx, tmpKey in enumerate(attributes):
3878                     retMap[tmpKey] = res[tmpIdx]
3879             tmpLog.debug(f"got {str(retMap)}")
3880             return retMap
3881         except Exception:
3882             # roll back
3883             self._rollback()
3884             # error
3885             self.dump_error_message(tmpLog)
3886             return failedRet
3887 
3888     # get JEDI dataset with datasetID
3889     def getDatasetWithID_JEDI(self, jediTaskID, datasetID):
3890         comment = " /* JediDBProxy.getDatasetWithID_JEDI */"
3891         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} datasetID={datasetID}")
3892         tmpLog.debug("start")
3893         # return value for failure
3894         failedRet = False, None
3895         try:
3896             # sql
3897             sql = f"SELECT {JediDatasetSpec.columnNames()} "
3898             sql += f"FROM {panda_config.schemaJEDI}.JEDI_Datasets WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
3899             varMap = {}
3900             varMap[":jediTaskID"] = jediTaskID
3901             varMap[":datasetID"] = datasetID
3902             # begin transaction
3903             self.conn.begin()
3904             # select
3905             self.cur.execute(sql + comment, varMap)
3906             res = self.cur.fetchone()
3907             # commit
3908             if not self._commit():
3909                 raise RuntimeError("Commit error")
3910             if res is not None:
3911                 datasetSpec = JediDatasetSpec()
3912                 datasetSpec.pack(res)
3913             else:
3914                 datasetSpec = None
3915             tmpLog.debug("done")
3916             return True, datasetSpec
3917         except Exception:
3918             # roll back
3919             self._rollback()
3920             # error
3921             self.dump_error_message(tmpLog)
3922             return failedRet
3923 
3924     # get JEDI datasets with jediTaskID
3925     def getDatasetsWithJediTaskID_JEDI(self, jediTaskID, datasetTypes=None):
3926         comment = " /* JediDBProxy.getDatasetsWithJediTaskID_JEDI */"
3927         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} datasetTypes={datasetTypes}")
3928         tmpLog.debug("start")
3929         # return value for failure
3930         failedRet = False, None
3931         try:
3932             # sql
3933             varMap = {}
3934             varMap[":jediTaskID"] = jediTaskID
3935             sql = f"SELECT {JediDatasetSpec.columnNames()} "
3936             sql += f"FROM {panda_config.schemaJEDI}.JEDI_Datasets WHERE jediTaskID=:jediTaskID "
3937             if datasetTypes is not None:
3938                 dstype_var_names_str, dstype_var_map = get_sql_IN_bind_variables(datasetTypes, prefix=":type_", value_as_suffix=True)
3939                 sql += f"AND type IN ({dstype_var_names_str}) "
3940                 varMap.update(dstype_var_map)
3941             # begin transaction
3942             self.conn.begin()
3943             self.cur.arraysize = 10000
3944             # select
3945             self.cur.execute(sql + comment, varMap)
3946             tmpResList = self.cur.fetchall()
3947             # commit
3948             if not self._commit():
3949                 raise RuntimeError("Commit error")
3950             # make file specs
3951             datasetSpecList = []
3952             for tmpRes in tmpResList:
3953                 datasetSpec = JediDatasetSpec()
3954                 datasetSpec.pack(tmpRes)
3955                 datasetSpecList.append(datasetSpec)
3956             tmpLog.debug(f"done with {len(datasetSpecList)} datasets")
3957             return True, datasetSpecList
3958         except Exception:
3959             # roll back
3960             self._rollback()
3961             # error
3962             self.dump_error_message(tmpLog)
3963             return failedRet
3964 
3965     # get jediTaskIDs with dataset attributes
3966     def get_task_ids_with_dataset_attributes(self, dataset_attributes: dict, only_active_tasks: bool = True) -> tuple[bool, list[int] | None]:
3967         """Get jediTaskIDs with dataset attributes.
3968         Args:
3969             dataset_attributes (dict): A dictionary of dataset attributes to filter on.
3970             only_active_tasks (bool): If True, only consider active tasks.
3971         Returns:
3972             tuple: A tuple containing a boolean indicating success, and a list of jediTaskIDs or None.
3973         """
3974         comment = " /* JediDBProxy.get_task_ids_with_dataset_attributes */"
3975         tmp_log = self.create_tagged_logger(comment, f"dataset_attributes={dataset_attributes} only_active_tasks={only_active_tasks}")
3976         tmp_log.debug("start")
3977         try:
3978             # sql
3979             sql = (
3980                 f"SELECT DISTINCT  tabT.jediTaskID FROM {panda_config.schemaJEDI}.JEDI_Tasks tabT,{panda_config.schemaJEDI}.JEDI_AUX_Status_MinTaskID tabA,{panda_config.schemaJEDI}.JEDI_Datasets tabD "
3981                 "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID AND tabT.jediTaskID=tabD.jediTaskID "
3982             )
3983             for k, v in dataset_attributes.items():
3984                 sql += f"AND tabD.{k}=:{k} "
3985             var_map = copy.copy(dataset_attributes)
3986             if only_active_tasks:
3987                 task_var_names_str, task_var_map = get_sql_IN_bind_variables(
3988                     JediTaskSpec.statusToRejectExtChange(), prefix=":task_status_", value_as_suffix=True
3989                 )
3990                 sql += f"AND tabT.status NOT IN ({task_var_names_str}) "
3991                 var_map.update(task_var_map)
3992             # begin transaction
3993             self.conn.begin()
3994             self.cur.arraysize = 10000
3995             # select
3996             print(sql, str(var_map))
3997             self.cur.execute(sql + comment, var_map)
3998             tmp_res = self.cur.fetchall()
3999             # commit
4000             if not self._commit():
4001                 raise RuntimeError("Commit error")
4002             task_id_list = [row[0] for row in tmp_res]
4003             tmp_log.debug(f"done with {len(task_id_list)} tasks")
4004             return True, task_id_list
4005         except Exception:
4006             # roll back
4007             self._rollback()
4008             # error
4009             self.dump_error_message(tmp_log)
4010             return False, None
4011 
4012     # extend lifetime of sandbox file
4013     def extendSandboxLifetime_JEDI(self, jedi_taskid, file_name):
4014         comment = " /* JediDBProxy.extendSandboxLifetime_JEDI */"
4015         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jedi_taskid}")
4016         try:
4017             self.conn.begin()
4018             retVal = False
4019             # sql to update
4020             sqlC = f"UPDATE {panda_config.schemaMETA}.userCacheUsage SET creationTime=CURRENT_DATE WHERE fileName=:fileName "
4021             varMap = {}
4022             varMap[":fileName"] = file_name
4023             self.cur.execute(sqlC + comment, varMap)
4024             nRows = self.cur.rowcount
4025             if not self._commit():
4026                 raise RuntimeError("Commit error")
4027             tmpLog.debug(f"done {file_name} with {nRows}")
4028             # return
4029             return nRows
4030         except Exception:
4031             # roll back
4032             self._rollback()
4033             # error
4034             self.dump_error_message(tmpLog)
4035             return None
4036 
4037     # lock process
4038     def lockProcess_JEDI(self, vo, prodSourceLabel, cloud, workqueue_id, resource_name, component, pid, forceOption, timeLimit):
4039         comment = " /* JediDBProxy.lockProcess_JEDI */"
4040         # defaults
4041         if cloud is None:
4042             cloud = "default"
4043         if workqueue_id is None:
4044             workqueue_id = 0
4045         if resource_name is None:
4046             resource_name = "default"
4047         if component is None:
4048             component = "default"
4049         tmpLog = self.create_tagged_logger(
4050             comment, f"vo={vo} label={prodSourceLabel} cloud={cloud} queue={workqueue_id} resource_type={resource_name} component={component} pid={pid}"
4051         )
4052         tmpLog.debug("start")
4053         try:
4054             retVal = False
4055             # sql to check
4056             sqlCT = "SELECT lockedBy "
4057             sqlCT += f"FROM {panda_config.schemaJEDI}.JEDI_Process_Lock "
4058             sqlCT += "WHERE vo=:vo AND prodSourceLabel=:prodSourceLabel AND cloud=:cloud AND workqueue_id=:workqueue_id "
4059             sqlCT += "AND resource_type=:resource_name AND component=:component "
4060             sqlCT += "AND lockedTime>:timeLimit "
4061             sqlCT += "FOR UPDATE"
4062             # sql to delete
4063             sqlCD = f"DELETE FROM {panda_config.schemaJEDI}.JEDI_Process_Lock "
4064             sqlCD += "WHERE vo=:vo AND prodSourceLabel=:prodSourceLabel AND cloud=:cloud AND workqueue_id=:workqueue_id "
4065             sqlCD += "AND resource_type=:resource_name AND component=:component "
4066             # sql to insert
4067             sqlFR = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Process_Lock "
4068             sqlFR += "(vo, prodSourceLabel, cloud, workqueue_id, resource_type, component, lockedBy, lockedTime) "
4069             sqlFR += "VALUES(:vo, :prodSourceLabel, :cloud, :workqueue_id, :resource_name, :component, :lockedBy, CURRENT_DATE) "
4070             # start transaction
4071             self.conn.begin()
4072             # check
4073             if not forceOption:
4074                 varMap = {}
4075                 varMap[":vo"] = vo
4076                 varMap[":prodSourceLabel"] = prodSourceLabel
4077                 varMap[":cloud"] = cloud
4078                 varMap[":workqueue_id"] = workqueue_id
4079                 varMap[":resource_name"] = resource_name
4080                 varMap[":component"] = component
4081                 varMap[":timeLimit"] = naive_utcnow() - datetime.timedelta(minutes=timeLimit)
4082                 self.cur.execute(sqlCT + comment, varMap)
4083                 resCT = self.cur.fetchone()
4084             else:
4085                 resCT = None
4086             if resCT is not None:
4087                 tmpLog.debug(f"skipped, locked by {resCT[0]}")
4088             else:
4089                 # delete
4090                 varMap = {}
4091                 varMap[":vo"] = vo
4092                 varMap[":prodSourceLabel"] = prodSourceLabel
4093                 varMap[":cloud"] = cloud
4094                 varMap[":workqueue_id"] = workqueue_id
4095                 varMap[":resource_name"] = resource_name
4096                 varMap[":component"] = component
4097                 self.cur.execute(sqlCD + comment, varMap)
4098                 # insert
4099                 varMap = {}
4100                 varMap[":vo"] = vo
4101                 varMap[":prodSourceLabel"] = prodSourceLabel
4102                 varMap[":cloud"] = cloud
4103                 varMap[":workqueue_id"] = workqueue_id
4104                 varMap[":resource_name"] = resource_name
4105                 varMap[":component"] = component
4106                 varMap[":lockedBy"] = pid
4107                 self.cur.execute(sqlFR + comment, varMap)
4108                 tmpLog.debug("successfully locked")
4109                 retVal = True
4110             # commit
4111             if not self._commit():
4112                 raise RuntimeError("Commit error")
4113             return retVal
4114         except Exception:
4115             # roll back
4116             self._rollback()
4117             # error
4118             self.dump_error_message(tmpLog)
4119             return retVal
4120 
4121     # unlock process
4122     def unlockProcess_JEDI(self, vo, prodSourceLabel, cloud, workqueue_id, resource_name, component, pid):
4123         comment = " /* JediDBProxy.unlockProcess_JEDI */"
4124         # defaults
4125         if cloud is None:
4126             cloud = "default"
4127         if workqueue_id is None:
4128             workqueue_id = 0
4129         if resource_name is None:
4130             resource_name = "default"
4131         if component is None:
4132             component = "default"
4133         tmpLog = self.create_tagged_logger(
4134             comment, f"vo={vo} label={prodSourceLabel} cloud={cloud} queue={workqueue_id} resource_type={resource_name} component={component} pid={pid}"
4135         )
4136         tmpLog.debug("start")
4137         try:
4138             retVal = False
4139             # sql to delete
4140             sqlCD = f"DELETE FROM {panda_config.schemaJEDI}.JEDI_Process_Lock "
4141             sqlCD += "WHERE vo=:vo AND prodSourceLabel=:prodSourceLabel AND cloud=:cloud "
4142             sqlCD += "AND workqueue_id=:workqueue_id AND lockedBy=:lockedBy "
4143             sqlCD += "AND resource_type=:resource_name AND component=:component "
4144             # start transaction
4145             self.conn.begin()
4146             # check
4147             varMap = {}
4148             varMap[":vo"] = vo
4149             varMap[":prodSourceLabel"] = prodSourceLabel
4150             varMap[":cloud"] = cloud
4151             varMap[":workqueue_id"] = workqueue_id
4152             varMap[":resource_name"] = resource_name
4153             varMap[":component"] = component
4154             varMap[":lockedBy"] = pid
4155             self.cur.execute(sqlCD + comment, varMap)
4156             # commit
4157             if not self._commit():
4158                 raise RuntimeError("Commit error")
4159             tmpLog.debug("done")
4160             retVal = True
4161             return retVal
4162         except Exception:
4163             # roll back
4164             self._rollback()
4165             # error
4166             self.dump_error_message(tmpLog)
4167             return retVal
4168 
4169     # unlock process with PID
4170     def unlockProcessWithPID_JEDI(self, vo, prodSourceLabel, workqueue_id, resource_name, pid, useBase):
4171         comment = " /* JediDBProxy.unlockProcessWithPID_JEDI */"
4172         tmpLog = self.create_tagged_logger(
4173             comment, f"vo={vo} label={prodSourceLabel} queue={workqueue_id} resource_type={resource_name} pid={pid} useBase={useBase}"
4174         )
4175         tmpLog.debug("start")
4176         try:
4177             retVal = False
4178             # sql to delete
4179             sqlCD = f"DELETE FROM {panda_config.schemaJEDI}.JEDI_Process_Lock "
4180             sqlCD += "WHERE vo=:vo AND prodSourceLabel=:prodSourceLabel "
4181             sqlCD += "AND workqueue_id=:workqueue_id "
4182             sqlCD += "AND resource_name=:resource_name "
4183             if useBase:
4184                 sqlCD += "AND lockedBy LIKE :lockedBy "
4185             else:
4186                 sqlCD += "AND lockedBy=:lockedBy "
4187             # start transaction
4188             self.conn.begin()
4189             # delete
4190             varMap = {}
4191             varMap[":vo"] = vo
4192             varMap[":prodSourceLabel"] = prodSourceLabel
4193             varMap[":workqueue_id"] = workqueue_id
4194             varMap[":resource_name"] = resource_name
4195             if useBase:
4196                 varMap[":lockedBy"] = pid + "%"
4197             else:
4198                 varMap[":lockedBy"] = pid
4199             self.cur.execute(sqlCD + comment, varMap)
4200             # commit
4201             if not self._commit():
4202                 raise RuntimeError("Commit error")
4203             tmpLog.debug("done")
4204             retVal = True
4205             return retVal
4206         except Exception:
4207             # roll back
4208             self._rollback()
4209             # error
4210             self.dump_error_message(tmpLog)
4211             return retVal
4212 
4213     # check process lock
4214     def checkProcessLock_JEDI(self, vo, prodSourceLabel, cloud, workqueue_id, resource_name, component, pid, checkBase):
4215         comment = " /* JediDBProxy.checkProcessLock_JEDI */"
4216         # defaults
4217         if cloud is None:
4218             cloud = "default"
4219         if workqueue_id is None:
4220             workqueue_id = 0
4221         if resource_name is None:
4222             resource_name = "default"
4223         if component is None:
4224             component = "default"
4225         tmpLog = self.create_tagged_logger(
4226             comment, f"vo={vo} label={prodSourceLabel} cloud={cloud} queue={workqueue_id} resource_type={resource_name} component={component} pid={pid}"
4227         )
4228         tmpLog.debug("start")
4229         try:
4230             retVal = False
4231             # sql to check
4232             sqlCT = "SELECT lockedBy "
4233             sqlCT += f"FROM {panda_config.schemaJEDI}.JEDI_Process_Lock "
4234             sqlCT += "WHERE vo=:vo AND prodSourceLabel=:prodSourceLabel AND cloud=:cloud AND workqueue_id=:workqueue_id "
4235             sqlCT += "AND resource_type=:resource_name AND component=:component "
4236             sqlCT += "AND lockedTime>:timeLimit "
4237             # start transaction
4238             self.conn.begin()
4239             # check
4240             varMap = {}
4241             varMap[":vo"] = vo
4242             varMap[":prodSourceLabel"] = prodSourceLabel
4243             varMap[":cloud"] = cloud
4244             varMap[":workqueue_id"] = workqueue_id
4245             varMap[":resource_name"] = resource_name
4246             varMap[":component"] = component
4247             varMap[":timeLimit"] = naive_utcnow() - datetime.timedelta(minutes=5)
4248             self.cur.execute(sqlCT + comment, varMap)
4249             resCT = self.cur.fetchone()
4250             if resCT is not None:
4251                 (lockedBy,) = resCT
4252                 if checkBase:
4253                     # check only base part
4254                     if not lockedBy.startswith(pid):
4255                         retVal = True
4256                 else:
4257                     # check whole string
4258                     if lockedBy != pid:
4259                         retVal = True
4260                 if retVal is True:
4261                     tmpLog.debug(f"skipped locked by {lockedBy}")
4262             # commit
4263             if not self._commit():
4264                 raise RuntimeError("Commit error")
4265             tmpLog.debug(f"done with {retVal}")
4266             return retVal
4267         except Exception:
4268             # roll back
4269             self._rollback()
4270             # error
4271             self.dump_error_message(tmpLog)
4272             return retVal