File indexing completed on 2026-04-10 08:39:04
0001 import copy
0002 import datetime
0003 import glob
0004 import json
0005 import os
0006 import random
0007 import re
0008 import time
0009 from typing import Dict, List, Tuple
0010
0011 from pandacommon.pandalogger.LogWrapper import LogWrapper
0012 from pandacommon.pandautils.PandaUtils import get_sql_IN_bind_variables, naive_utcnow
0013
0014 from pandaserver.config import panda_config
0015 from pandaserver.srvcore import CoreUtils, srv_msg_utils
0016 from pandaserver.taskbuffer import EventServiceUtils, PrioUtil
0017 from pandaserver.taskbuffer.DatasetSpec import DatasetSpec
0018 from pandaserver.taskbuffer.db_proxy_mods.base_module import (
0019 BaseModule,
0020 SQL_QUEUE_TOPIC_async_dataset_update,
0021 memoize,
0022 varNUMBER,
0023 )
0024 from pandaserver.taskbuffer.FileSpec import FileSpec
0025 from pandaserver.taskbuffer.JediDatasetSpec import (
0026 INPUT_TYPES_var_map,
0027 INPUT_TYPES_var_str,
0028 JediDatasetSpec,
0029 )
0030 from pandaserver.taskbuffer.JediTaskSpec import JediTaskSpec
0031 from pandaserver.taskbuffer.JobSpec import JobSpec
0032
0033
0034
0035 class MiscStandaloneModule(BaseModule):
0036
0037 def __init__(self, log_stream: LogWrapper):
0038 super().__init__(log_stream)
0039
0040
0041 def getPandaIDsWithTaskID(self, jediTaskID: int, scout_only: bool = False, unsuccessful_only: bool = False) -> List[int]:
0042 """Get PanDA job IDs for a task.
0043
0044 Args:
0045 jediTaskID: JEDI task ID.
0046 scout_only: When True, return only jobs with the ``sj`` token in the
0047 comma-separated ``specialHandling`` field.
0048 unsuccessful_only: When True, return only jobs with status in
0049 ``failed``, ``cancelled``, or ``closed``.
0050
0051 Returns:
0052 List[int]: PanDA job IDs found in defined, active, and archived tables.
0053 """
0054 comment = " /* DBProxy.getPandaIDsWithTaskID */"
0055 tmp_log = self.create_tagged_logger(comment, f"<jediTaskID={jediTaskID} scout_only={scout_only} unsuccessful_only={unsuccessful_only}>")
0056 tmp_log.debug("start")
0057
0058 scout_filter = "AND INSTR(','||NVL(specialHandling, '')||',',',sj,')>0 "
0059 unsuccessful_filter = "AND jobStatus IN ('failed','cancelled','closed') "
0060 sql = "SELECT PandaID FROM ATLAS_PANDA.jobsDefined4 "
0061 sql += "WHERE jediTaskID=:jediTaskID "
0062 if scout_only:
0063 sql += scout_filter
0064 if unsuccessful_only:
0065 sql += unsuccessful_filter
0066 sql += "UNION "
0067 sql += "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 "
0068 sql += "WHERE jediTaskID=:jediTaskID "
0069 if scout_only:
0070 sql += scout_filter
0071 if unsuccessful_only:
0072 sql += unsuccessful_filter
0073 sql += "UNION "
0074 sql += "SELECT PandaID FROM ATLAS_PANDA.jobsArchived4 "
0075 sql += "WHERE jediTaskID=:jediTaskID "
0076 if scout_only:
0077 sql += scout_filter
0078 if unsuccessful_only:
0079 sql += unsuccessful_filter
0080 varMap = {}
0081 varMap[":jediTaskID"] = jediTaskID
0082 try:
0083
0084 self.conn.begin()
0085
0086 self.cur.arraysize = 1000000
0087 self.cur.execute(sql + comment, varMap)
0088 res = self.cur.fetchall()
0089
0090 if not self._commit():
0091 raise RuntimeError("Commit error")
0092 retList = []
0093 for (pandaID,) in res:
0094 retList.append(pandaID)
0095
0096 if scout_only:
0097 tmp_log.debug(f"found {len(retList)} scout IDs")
0098 elif unsuccessful_only:
0099 tmp_log.debug(f"found {len(retList)} unsuccessful IDs")
0100 else:
0101 tmp_log.debug(f"found {len(retList)} IDs")
0102 return retList
0103 except Exception:
0104
0105 self._rollback()
0106
0107 self.dump_error_message(tmp_log)
0108 return []
0109
0110
0111 def changeTaskPriorityPanda(self, jediTaskID, newPriority):
0112 comment = " /* DBProxy.changeTaskPriorityPanda */"
0113 tmp_log = self.create_tagged_logger(comment, f"<jediTaskID={jediTaskID}>")
0114 tmp_log.debug(f"newPrio={newPriority}")
0115 try:
0116
0117 sqlT = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET currentPriority=:newPriority WHERE jediTaskID=:jediTaskID "
0118
0119 schemaDEFT = panda_config.schemaDEFT
0120 sqlD = f"UPDATE {schemaDEFT}.T_TASK SET current_priority=:newPriority,timestamp=CURRENT_DATE WHERE taskid=:jediTaskID "
0121
0122 sqlJ = "UPDATE ATLAS_PANDA.{0} SET currentPriority=:newPriority WHERE jediTaskID=:jediTaskID "
0123
0124 self.conn.begin()
0125
0126 self.cur.arraysize = 10
0127 varMap = {}
0128 varMap[":jediTaskID"] = jediTaskID
0129 varMap[":newPriority"] = newPriority
0130
0131 self.cur.execute(sqlT + comment, varMap)
0132 nRow = self.cur.rowcount
0133 if nRow == 1:
0134
0135 for tableName in ["jobsActive4", "jobsDefined4"]:
0136 self.cur.execute(sqlJ.format(tableName) + comment, varMap)
0137
0138 self.cur.execute(sqlD + comment, varMap)
0139
0140 if not self._commit():
0141 raise RuntimeError("Commit error")
0142 tmp_log.debug(f"done with {nRow}")
0143 return nRow
0144 except Exception:
0145
0146 self._rollback()
0147
0148 self.dump_error_message(tmp_log)
0149 return None
0150
0151
0152 def getTaskIDwithTaskNameJEDI(self, userName, taskName):
0153 comment = " /* DBProxy.getTaskIDwithTaskNameJEDI */"
0154 tmp_log = self.create_tagged_logger(comment, f"<userName={userName} taskName={taskName}")
0155 tmp_log.debug(f"start")
0156 try:
0157
0158 self.conn.begin()
0159
0160 sqlGF = f"SELECT MAX(jediTaskID) FROM {panda_config.schemaJEDI}.JEDI_Tasks "
0161 sqlGF += "WHERE userName=:userName AND taskName=:taskName "
0162 varMap = {}
0163 varMap[":userName"] = userName
0164 varMap[":taskName"] = taskName
0165 self.cur.execute(sqlGF + comment, varMap)
0166 resFJ = self.cur.fetchone()
0167 if resFJ is not None:
0168 (jediTaskID,) = resFJ
0169 else:
0170 jediTaskID = None
0171
0172 if not self._commit():
0173 raise RuntimeError("Commit error")
0174 tmp_log.debug(f"jediTaskID={jediTaskID}")
0175 return jediTaskID
0176 except Exception:
0177
0178 self._rollback()
0179
0180 self.dump_error_message(tmp_log)
0181 return None
0182
0183
0184 def updateTaskModTimeJEDI(self, jediTaskID, newStatus):
0185 comment = " /* DBProxy.updateTaskErrorDialogJEDI */"
0186 tmp_log = self.create_tagged_logger(comment, f"<jediTaskID={jediTaskID}>")
0187 tmp_log.debug(f"start")
0188 try:
0189
0190 self.conn.begin()
0191
0192 varMap = {}
0193 varMap[":jediTaskID"] = jediTaskID
0194 if newStatus is not None:
0195 varMap[":newStatus"] = newStatus
0196 sqlUE = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET "
0197 sqlUE += "modificationTime=CURRENT_DATE-1,"
0198 if newStatus is not None:
0199 sqlUE += "status=:newStatus,oldStatus=NULL,"
0200 sqlUE = sqlUE[:-1]
0201 sqlUE += " WHERE jediTaskID=:jediTaskID "
0202 self.cur.execute(sqlUE + comment, varMap)
0203
0204 if not self._commit():
0205 raise RuntimeError("Commit error")
0206 tmp_log.debug(f"done")
0207 return True
0208 except Exception:
0209
0210 self._rollback()
0211
0212 self.dump_error_message(tmp_log)
0213 return False
0214
0215 def initialize_cpu_time_task(self, job_id, task_id, site_id, files, active):
0216 """
0217 Increases the CPU time of a task
0218 walltime = basewalltime + cpuefficiency*CPUTime*nEvents/Corepower/Corecount
0219
0220 CPU time: execution time per event
0221 Walltime: time for a job
0222 Corepower: HS06 score
0223 Basewalltime: Setup time, time to download, etc. taken by the pilot
0224 """
0225 comment = " /* DBProxy.initialize_cpu_time_task */"
0226 tmp_log = self.create_tagged_logger(comment, f"PandaID={job_id}; jediTaskID={task_id}; siteID={site_id}")
0227 tmp_log.debug("start")
0228
0229
0230 sql = (
0231 f"SELECT 1 FROM "
0232 f"(SELECT 1 FROM atlas_panda.jobsarchived4 "
0233 f"WHERE jeditaskid = :jedi_task_id AND jobstatus = 'finished' AND transformation NOT LIKE '%build%' AND ROWNUM = 1 "
0234 f"UNION ALL "
0235 f"SELECT 1 FROM atlas_pandaarch.jobsarchived "
0236 f"WHERE jeditaskid = :jedi_task_id AND jobstatus = 'finished' AND transformation NOT LIKE '%build%' AND ROWNUM = 1) "
0237 f"WHERE ROWNUM = 1"
0238 )
0239 var_map = {":jedi_task_id": task_id}
0240 self.cur.execute(sql + comment, var_map)
0241 exists = False
0242 if self.cur.fetchone():
0243 exists = True
0244
0245
0246 if exists:
0247 tmp_log.debug(f"Task {task_id} already has successful jobs, skipping CPU time increase and leaving it up to the scouting mechanism")
0248 return None, None
0249
0250
0251 sql = (
0252 "SELECT /* use_json_type */ sc.data.maxtime, sc.data.corepower, NVL(TO_NUMBER(sc.data.corecount), 1) AS corecount "
0253 "FROM ATLAS_PANDA.schedconfig_json sc "
0254 "WHERE sc.panda_queue= :site_id "
0255 )
0256
0257 var_map = {":site_id": site_id}
0258 self.cur.execute(sql + comment, var_map)
0259 site_parameters = self.cur.fetchone()
0260
0261 if not site_parameters:
0262 tmp_log.debug(f"No site parameters retrieved for {site_id}")
0263
0264 (max_time_site, core_power_site, core_count_site) = site_parameters
0265 tmp_log.debug(f"site_id {site_id} has parameters: max_time_site {max_time_site}, core_power_site {core_power_site}, core_count_site {core_count_site}")
0266 if (not max_time_site) or (not core_power_site) or (not core_count_site):
0267 tmp_log.debug(f"One or more site parameters are not defined for {site_id}... nothing to do")
0268 return None, None
0269 else:
0270 (max_time_site, core_power_site, core_count_site) = (
0271 int(max_time_site),
0272 float(core_power_site),
0273 int(core_count_site),
0274 )
0275
0276
0277 sql = """
0278 SELECT jt.cputime, jt.walltime, jt.basewalltime, jt.cpuefficiency, jt.cputimeunit
0279 FROM ATLAS_PANDA.jedi_tasks jt
0280 WHERE jt.jeditaskid=:jeditaskid
0281 """
0282 var_map = {":jeditaskid": task_id}
0283 self.cur.execute(sql + comment, var_map)
0284 task_parameters = self.cur.fetchone()
0285
0286 if not task_parameters:
0287 tmp_log.debug(f"No task parameters retrieved for jeditaskid {task_id}... nothing to do")
0288 return None, None
0289
0290 (old_cputime, walltime, basewalltime, cpuefficiency, old_cputime_unit) = task_parameters
0291 if not cpuefficiency or not basewalltime:
0292 tmp_log.debug(f"CPU efficiency and/or basewalltime are not defined for task {task_id}... nothing to do")
0293 return None, None
0294
0295 tmp_log.debug(
0296 f"task {task_id} has parameters: cputime {old_cputime} {old_cputime_unit}, walltime {walltime}, "
0297 f"basewalltime {basewalltime}, cpuefficiency {cpuefficiency}"
0298 )
0299
0300 old_cputime_normalized = None
0301 if old_cputime is not None:
0302 if old_cputime_unit == "HS06sPerEvent":
0303 old_cputime_normalized = old_cputime
0304 elif old_cputime_unit == "mHS06sPerEvent":
0305 old_cputime_normalized = old_cputime / 1000
0306
0307
0308 input_types = ("input", "pseudo_input", "pp_input", "trn_log", "trn_output")
0309 input_files = list(
0310 filter(
0311 lambda pandafile: pandafile.type in input_types and re.search("DBRelease", pandafile.lfn) is None,
0312 files,
0313 )
0314 )
0315 input_fileIDs = [input_file.fileID for input_file in input_files]
0316 input_datasetIDs = [input_file.datasetID for input_file in input_files]
0317
0318 if input_fileIDs:
0319 var_map = {":taskID": task_id}
0320
0321
0322 file_var_names_str, file_var_map = get_sql_IN_bind_variables(input_fileIDs, prefix=":file")
0323 var_map.update(file_var_map)
0324
0325
0326 ds_var_names_str, ds_var_map = get_sql_IN_bind_variables(input_datasetIDs, prefix=":dataset")
0327 var_map.update(ds_var_map)
0328
0329 sql_select = f"""
0330 SELECT jdc.fileid, jdc.nevents, jdc.startevent, jdc.endevent
0331 FROM ATLAS_PANDA.JEDI_Dataset_Contents jdc, ATLAS_PANDA.JEDI_Datasets jd
0332 WHERE jdc.JEDITaskID = :taskID
0333 AND jdc.datasetID IN ({ds_var_names_str})
0334 AND jdc.fileID IN ({file_var_names_str})
0335 AND jd.datasetID = jdc.datasetID
0336 AND jd.masterID IS NULL
0337 """
0338 self.cur.execute(sql_select + comment, var_map)
0339
0340 result_list = self.cur.fetchall()
0341 n_events_total = 0
0342 for fileid, n_events, start_event, end_event in result_list:
0343 tmp_log.debug(f"event information: fileid {fileid}, n_events {n_events}, start_event {start_event}, end_event {end_event}")
0344
0345 start_event = start_event if start_event is not None else 0
0346 end_event = end_event if end_event is not None else 0
0347 if end_event and start_event:
0348 n_events_total += end_event - start_event
0349 elif n_events:
0350 n_events_total += n_events
0351
0352 if not n_events_total:
0353 tmp_log.debug(f"n_events could not be calculated for job {job_id}... nothing to do")
0354 return None, None
0355 else:
0356 tmp_log.debug(f"No input files for job {job_id}, so could not update CPU time for task {task_id}")
0357 return None, None
0358
0359
0360 var_map = {":task_id": task_id, ":job_id": job_id}
0361 sql_select = f"""
0362 SELECT jact4.corecount
0363 FROM ATLAS_PANDA.jobsactive4 jact4
0364 WHERE jeditaskid = :task_id AND pandaid = :job_id
0365 UNION
0366 SELECT jarc4.corecount
0367 FROM ATLAS_PANDA.jobsarchived4 jarc4
0368 WHERE jeditaskid = :task_id AND pandaid = :job_id
0369 UNION
0370 SELECT jarch.corecount
0371 FROM ATLAS_PANDAARCH.jobsarchived jarch
0372 WHERE jeditaskid = :task_id AND pandaid = :job_id
0373 """
0374 self.cur.execute(sql_select + comment, var_map)
0375
0376 results = self.cur.fetchone()
0377 try:
0378 core_count_job = results[0]
0379 except (IndexError, TypeError):
0380 core_count_job = None
0381
0382 if not core_count_job:
0383 core_count_job = 1
0384 tmp_log.debug(f"core_count_job: {core_count_job}")
0385
0386
0387 try:
0388 new_cputime_unit = "HS06sPerEvent"
0389 new_cputime = ((max_time_site - basewalltime) * core_power_site * core_count_job * 1.1 / (cpuefficiency / 100.0) / n_events_total) * 1.5
0390 new_cputime_normalized = new_cputime
0391
0392 if new_cputime and new_cputime < 10:
0393 new_cputime = new_cputime * 1000
0394 new_cputime_unit = "mHS06sPerEvent"
0395
0396
0397 new_cputime = int(new_cputime)
0398
0399 tmp_log.debug(f"Old CPU time is {old_cputime} {old_cputime_unit} and possible new CPU time is {new_cputime} {new_cputime_unit}")
0400
0401 if old_cputime_normalized is not None and new_cputime_normalized is not None and old_cputime_normalized > new_cputime_normalized:
0402 tmp_log.debug(
0403 f"Skipping CPU time increase since old CPU time {old_cputime_normalized} HS06sPerEvent "
0404 f"> new CPU time {new_cputime_normalized} HS06sPerEvent"
0405 )
0406 return None, None
0407
0408 if active:
0409 sql_update_cputime = """
0410 UPDATE ATLAS_PANDA.jedi_tasks SET cputime=:new_cputime, cputimeunit=:new_cputime_unit
0411 WHERE jeditaskid=:jeditaskid
0412 """
0413 var_map = {":new_cputime": new_cputime, ":new_cputime_unit": new_cputime_unit, ":jeditaskid": task_id}
0414 self.conn.begin()
0415 self.cur.execute(sql_update_cputime + comment, var_map)
0416 if not self._commit():
0417 raise RuntimeError("Commit error")
0418
0419 tmp_log.debug(f"Successfully updated the task CPU time from {old_cputime} to {new_cputime} {new_cputime_unit}")
0420 else:
0421 tmp_log.debug("Not updating the task CPU time since active mode is False.")
0422
0423 return new_cputime, new_cputime_unit
0424
0425 except (ZeroDivisionError, TypeError) as e:
0426 tmp_log.debug(f"Exception while updating the task CPU time: {e}")
0427 return None, None
0428
0429 def requestTaskParameterRecalculation(self, taskID):
0430 """
0431 Requests the recalculation of the CPU time of a task:
0432 1. set the walltimeUnit to NULL and the modificationTime to Now
0433 2. AtlasProdWatchDog > JediDBProxy.setScoutJobDataToTasks will pick up tasks with walltimeUnit == NULL
0434 and modificationTime > Now - 24h. This will trigger a recalculation of the task parameters (outDiskCount,
0435 outDiskUnit, outDiskCount, walltime, walltimeUnit, cpuTime, ioIntensity, ioIntensityUnit, ramCount, ramUnit,
0436 workDiskCount, workDiskUnit, workDiskCount)
0437 """
0438 comment = " /* DBProxy.requestTaskParameterRecalculation */"
0439 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={taskID}")
0440 tmp_log.debug("start")
0441
0442 timeNow = naive_utcnow()
0443 timeLimit = timeNow - datetime.timedelta(minutes=30)
0444
0445
0446 sql = """
0447 UPDATE ATLAS_PANDA.jedi_tasks
0448 SET walltimeUnit=NULL, modificationTime=:timeNow
0449 WHERE jediTaskId=:taskID AND modificationTime < :timeLimit
0450 """
0451 varMap = {":taskID": taskID, ":timeNow": timeNow, ":timeLimit": timeLimit}
0452 self.conn.begin()
0453 self.cur.execute(sql, varMap)
0454
0455 rowcount = self.cur.rowcount
0456
0457 if not self._commit():
0458 raise RuntimeError("Commit error")
0459
0460 tmp_log.debug("Forced recalculation of CPUTime")
0461 return rowcount
0462
0463
0464 def getTaskParamsPanda(self, jediTaskID):
0465 comment = " /* DBProxy.getTaskParamsPanda */"
0466 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
0467 tmp_log.debug("start")
0468 try:
0469
0470 sqlRR = f"SELECT jedi_task_parameters FROM {panda_config.schemaDEFT}.T_TASK "
0471 sqlRR += "WHERE taskid=:jediTaskID "
0472 varMap = {}
0473 varMap[":jediTaskID"] = jediTaskID
0474
0475 self.conn.begin()
0476 self.cur.execute(sqlRR + comment, varMap)
0477
0478 if not self._commit():
0479 raise RuntimeError("Commit error")
0480
0481 taskParams = ""
0482 for (clobJobP,) in self.cur:
0483 if clobJobP is not None:
0484 try:
0485 taskParams = clobJobP.read()
0486 except AttributeError:
0487 taskParams = str(clobJobP)
0488 break
0489 tmp_log.debug("done")
0490 return taskParams
0491 except Exception:
0492
0493 self._rollback()
0494
0495 self.dump_error_message(tmp_log)
0496 return ""
0497
0498
0499 def getTaskAttributesPanda(self, jediTaskID, attrs):
0500 comment = " /* DBProxy.getTaskAttributesPanda */"
0501 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
0502 tmp_log.debug("start")
0503 try:
0504
0505 sqlRR = "SELECT "
0506 for attr in attrs:
0507 sqlRR += f"{attr},"
0508 sqlRR = sqlRR[:-1]
0509 sqlRR += f" FROM {panda_config.schemaJEDI}.JEDI_Tasks "
0510 sqlRR += "WHERE jediTaskID=:jediTaskID "
0511 varMap = {}
0512 varMap[":jediTaskID"] = jediTaskID
0513
0514 self.conn.begin()
0515 self.cur.execute(sqlRR + comment, varMap)
0516 resRR = self.cur.fetchone()
0517
0518 if not self._commit():
0519 raise RuntimeError("Commit error")
0520 retVal = {}
0521 if resRR is not None:
0522 for idx, attr in enumerate(attrs):
0523 retVal[attr] = resRR[idx]
0524 tmp_log.debug(f"done {str(retVal)}")
0525 return retVal
0526 except Exception:
0527
0528 self._rollback()
0529
0530 self.dump_error_message(tmp_log)
0531 return {}
0532
0533
0534 def getTaskStatus(self, jediTaskID):
0535 comment = " /* DBProxy.getTaskStatus */"
0536 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
0537 tmp_log.debug("start")
0538 try:
0539
0540 varMap = {}
0541 varMap[":jediTaskID"] = jediTaskID
0542 sql = f"SELECT status FROM {panda_config.schemaJEDI}.JEDI_Tasks "
0543 sql += "WHERE jediTaskID=:jediTaskID "
0544
0545
0546 self.conn.begin()
0547 self.cur.arraysize = 1000
0548 self.cur.execute(sql + comment, varMap)
0549 res = self.cur.fetchone()
0550
0551 if not self._commit():
0552 raise RuntimeError("Commit error")
0553 if res:
0554 tmp_log.debug(f"task {jediTaskID} has status: {res[0]} ")
0555 else:
0556 res = []
0557 tmp_log.debug(f"task {jediTaskID} not found")
0558 return res
0559 except Exception:
0560
0561 self._rollback()
0562
0563 self.dump_error_message(tmp_log)
0564 return []
0565
0566
0567 def getTaskStatusSuperstatus(self, jediTaskID):
0568 comment = " /* DBProxy.getTaskStatusSuperstatus */"
0569 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
0570 tmp_log.debug("start")
0571 try:
0572
0573 varMap = {}
0574 varMap[":jediTaskID"] = jediTaskID
0575 sql = f"SELECT status,superStatus FROM {panda_config.schemaJEDI}.JEDI_Tasks "
0576 sql += "WHERE jediTaskID=:jediTaskID "
0577
0578 self.conn.begin()
0579 self.cur.arraysize = 1000
0580 self.cur.execute(sql + comment, varMap)
0581 res = self.cur.fetchone()
0582
0583 if not self._commit():
0584 raise RuntimeError("Commit error")
0585 if res:
0586 tmp_log.debug(f"task {jediTaskID} has status={res[0]} superstatus={res[1]}")
0587 else:
0588 res = []
0589 tmp_log.debug(f"task {jediTaskID} not found")
0590 return res
0591 except Exception:
0592
0593 self._rollback()
0594
0595 self.dump_error_message(tmp_log)
0596 return []
0597
0598
0599 def reactivateTask(self, jediTaskID, keep_attempt_nr=False, trigger_job_generation=False):
0600 comment = " /* DBProxy.reactivateTask */"
0601 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
0602 tmp_log.debug("start")
0603 try:
0604
0605 sql = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
0606 sql += "SET status=:status "
0607 sql += "WHERE jediTaskID=:jediTaskID "
0608
0609 sqlM = f"SELECT datasetID FROM {panda_config.schemaJEDI}.JEDI_Datasets "
0610 sqlM += "WHERE jediTaskID=:jediTaskID AND type IN (:type1,:type2,:type3) "
0611
0612 sqlAB = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
0613 if keep_attempt_nr:
0614 sqlAB += "SET status=:status "
0615 else:
0616 sqlAB += "SET status=:status,attemptNr=0 "
0617 sqlAB += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
0618
0619 sqlD = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
0620 sqlD += "SET status=:status,nFilesUsed=0,nFilesTobeUsed=nFiles,nFilesFinished=0,nFilesFailed=0 "
0621 sqlD += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
0622
0623 varMap = {}
0624 varMap[":jediTaskID"] = jediTaskID
0625 varMap[":status"] = "ready"
0626 self.cur.execute(sql + comment, varMap)
0627 res = self.cur.rowcount
0628
0629 varMap = {}
0630 varMap[":jediTaskID"] = jediTaskID
0631 varMap[":type1"] = "input"
0632 varMap[":type2"] = "pseudo_input"
0633 varMap[":type3"] = "random_seed"
0634 self.cur.execute(sqlM + comment, varMap)
0635 resM = self.cur.fetchall()
0636 total_nFiles = 0
0637
0638 for (datasetID,) in resM:
0639
0640 varMap = {}
0641 varMap[":jediTaskID"] = jediTaskID
0642 varMap[":datasetID"] = datasetID
0643 varMap[":status"] = "ready"
0644
0645
0646 self.cur.execute(sqlAB + comment, varMap)
0647 nFiles = self.cur.rowcount
0648
0649
0650 if nFiles > 0:
0651 varMap = {}
0652 varMap[":jediTaskID"] = jediTaskID
0653 varMap[":datasetID"] = datasetID
0654 varMap[":status"] = "ready"
0655 tmp_log.debug(sqlD + comment + str(varMap))
0656 self.cur.execute(sqlD + comment, varMap)
0657 total_nFiles += nFiles
0658
0659 tmpMsg = f"updated {total_nFiles} inputs and task {jediTaskID} was reactivated "
0660 tmp_log.debug(tmpMsg)
0661 tmp_log.sendMsg(tmpMsg, "jedi", "pandasrv")
0662 retVal = 0, tmpMsg
0663
0664 if not self._commit():
0665 raise RuntimeError("Commit error")
0666
0667 if trigger_job_generation:
0668
0669 msg = srv_msg_utils.make_message("generate_job", taskid=jediTaskID)
0670 mb_proxy = self.get_mb_proxy("panda_jedi")
0671 if mb_proxy:
0672 mb_proxy.send(msg)
0673 tmp_log.debug(f"sent generate_job message: {msg}")
0674 else:
0675 tmp_log.debug("message queue is not configured")
0676 tmp_log.debug("done")
0677 return retVal
0678 except Exception:
0679
0680 self._rollback()
0681
0682 self.dump_error_message(tmp_log)
0683 return None, "DB error"
0684
0685
0686 def getEventStat(self, jediTaskID, PandaID):
0687 comment = " /* DBProxy.getEventStat */"
0688 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} PandaID={PandaID}")
0689 tmp_log.debug("start")
0690 try:
0691
0692 sql = f"SELECT status,COUNT(*) FROM {panda_config.schemaJEDI}.JEDI_Events "
0693 sql += "WHERE jediTaskID=:jediTaskID AND PandaID=:PandaID "
0694 sql += "GROUP BY status "
0695
0696 self.conn.begin()
0697 self.cur.arraysize = 10000
0698
0699 varMap = {}
0700 varMap[":jediTaskID"] = jediTaskID
0701 varMap[":PandaID"] = PandaID
0702 self.cur.execute(sql + comment, varMap)
0703 resM = self.cur.fetchall()
0704 retMap = {}
0705 for eventStatus, cnt in resM:
0706 retMap[eventStatus] = cnt
0707
0708 if not self._commit():
0709 raise RuntimeError("Commit error")
0710 tmp_log.debug(f"done with {str(retMap)}")
0711 return retMap
0712 except Exception:
0713
0714 self._rollback()
0715
0716 self.dump_error_message(tmp_log)
0717 return {}
0718
0719
0720 def updateTaskErrorDialogJEDI(self, jediTaskID, msg):
0721 comment = " /* DBProxy.updateTaskErrorDialogJEDI */"
0722 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
0723 tmp_log.debug(f"start")
0724 try:
0725
0726 self.conn.begin()
0727
0728 sqlGF = f"SELECT errorDialog FROM {panda_config.schemaJEDI}.JEDI_Tasks "
0729 sqlGF += "WHERE jediTaskID=:jediTaskID "
0730 varMap = {}
0731 varMap[":jediTaskID"] = jediTaskID
0732 self.cur.execute(sqlGF + comment, varMap)
0733 resFJ = self.cur.fetchone()
0734 if resFJ is not None:
0735
0736 (errorDialog,) = resFJ
0737 errorDialog = msg
0738 varMap = {}
0739 varMap[":jediTaskID"] = jediTaskID
0740 varMap[":errorDialog"] = errorDialog
0741 sqlUE = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET errorDialog=:errorDialog,modificationTime=CURRENT_DATE "
0742 sqlUE += "WHERE jediTaskID=:jediTaskID "
0743 self.cur.execute(sqlUE + comment, varMap)
0744
0745 if not self._commit():
0746 raise RuntimeError("Commit error")
0747 tmp_log.debug(f"done")
0748 return True
0749 except Exception:
0750
0751 self._rollback()
0752
0753 self.dump_error_message(tmp_log)
0754 return False
0755
0756
0757 def increaseAttemptNrPanda(self, jediTaskID, increasedNr):
0758 comment = " /* DBProxy.increaseAttemptNrPanda */"
0759 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
0760 tmp_log.debug(f"increasedNr={increasedNr}")
0761 try:
0762
0763 sqlT = f"SELECT status,oldStatus FROM {panda_config.schemaJEDI}.JEDI_Tasks "
0764 sqlT += "WHERE jediTaskID=:jediTaskID FOR UPDATE "
0765
0766 self.conn.begin()
0767
0768 self.cur.arraysize = 10
0769 varMap = {}
0770 varMap[":jediTaskID"] = jediTaskID
0771
0772 self.cur.execute(sqlT + comment, varMap)
0773 resT = self.cur.fetchone()
0774 if resT is None:
0775 tmpMsg = f"jediTaskID={jediTaskID} not found"
0776 tmp_log.debug(tmpMsg)
0777 retVal = 1, tmpMsg
0778 else:
0779 taskStatus, oldStatus = resT
0780
0781 okStatusList = ["running", "scouting", "ready"]
0782 if taskStatus not in okStatusList and oldStatus not in okStatusList:
0783 tmpMsg = f"command rejected since status={taskStatus} or oldStatus={oldStatus} not in {str(okStatusList)}"
0784 tmp_log.debug(tmpMsg)
0785 retVal = 2, tmpMsg
0786 else:
0787
0788 sqlM = f"SELECT datasetID FROM {panda_config.schemaJEDI}.JEDI_Datasets "
0789 sqlM += "WHERE jediTaskID=:jediTaskID AND type IN (:type1,:type2) "
0790
0791 sqlAB = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
0792 sqlAB += "SET maxAttempt=CASE WHEN maxAttempt > attemptNr THEN maxAttempt+:increasedNr ELSE attemptNr+:increasedNr END "
0793 sqlAB += ",proc_status=CASE WHEN maxAttempt > attemptNr AND maxFailure > failedAttempt THEN proc_status ELSE :proc_status END "
0794 sqlAB += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND status=:status AND keepTrack=:keepTrack "
0795
0796 sqlAF = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
0797 sqlAF += "SET maxAttempt=CASE WHEN maxAttempt > attemptNr THEN maxAttempt+:increasedNr ELSE attemptNr+:increasedNr END "
0798 sqlAF += ",maxFailure=maxFailure+:increasedNr "
0799 sqlAF += ",proc_status=CASE WHEN maxAttempt > attemptNr AND maxFailure > failedAttempt THEN proc_status ELSE :proc_status END "
0800 sqlAF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND status=:status AND keepTrack=:keepTrack "
0801
0802 sqlD = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
0803 sqlD += "SET nFilesUsed=nFilesUsed-:nFilesReset,nFilesFailed=nFilesFailed-:nFilesReset "
0804 sqlD += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
0805
0806 varMap = {}
0807 varMap[":jediTaskID"] = jediTaskID
0808 varMap[":type1"] = "input"
0809 varMap[":type2"] = "pseudo_input"
0810 self.cur.execute(sqlM + comment, varMap)
0811 resM = self.cur.fetchall()
0812 total_nFilesIncreased = 0
0813 total_nFilesReset = 0
0814 for (datasetID,) in resM:
0815
0816 varMap = {}
0817 varMap[":jediTaskID"] = jediTaskID
0818 varMap[":datasetID"] = datasetID
0819 varMap[":status"] = "ready"
0820 varMap[":proc_status"] = "ready"
0821 varMap[":keepTrack"] = 1
0822 varMap[":increasedNr"] = increasedNr
0823 nFilesIncreased = 0
0824 nFilesReset = 0
0825
0826 sqlA = sqlAB + "AND maxAttempt>attemptNr AND maxFailure IS NULL "
0827 self.cur.execute(sqlA + comment, varMap)
0828 nRow = self.cur.rowcount
0829 nFilesIncreased += nRow
0830
0831 sqlA = sqlAF + "AND maxAttempt>attemptNr AND (maxFailure IS NOT NULL AND maxFailure>failedAttempt) "
0832 self.cur.execute(sqlA + comment, varMap)
0833 nRow = self.cur.rowcount
0834 nFilesIncreased += nRow
0835
0836 sqlA = sqlAB + "AND maxAttempt<=attemptNr AND maxFailure IS NULL "
0837 self.cur.execute(sqlA + comment, varMap)
0838 nRow = self.cur.rowcount
0839 nFilesReset += nRow
0840 nFilesIncreased += nRow
0841
0842 sqlA = sqlAF + "AND (maxAttempt<=attemptNr OR (maxFailure IS NOT NULL AND maxFailure=failedAttempt)) "
0843 self.cur.execute(sqlA + comment, varMap)
0844 nRow = self.cur.rowcount
0845 nFilesReset += nRow
0846 nFilesIncreased += nRow
0847
0848 if nFilesReset > 0:
0849 varMap = {}
0850 varMap[":jediTaskID"] = jediTaskID
0851 varMap[":datasetID"] = datasetID
0852 varMap[":nFilesReset"] = nFilesReset
0853 tmp_log.debug(sqlD + comment + str(varMap))
0854 self.cur.execute(sqlD + comment, varMap)
0855 total_nFilesIncreased += nFilesIncreased
0856 total_nFilesReset += nFilesReset
0857 tmpMsg = f"increased attemptNr for {total_nFilesIncreased} inputs ({total_nFilesReset} reactivated)"
0858 tmp_log.debug(tmpMsg)
0859 tmp_log.sendMsg(tmpMsg, "jedi", "pandasrv")
0860 retVal = 0, tmpMsg
0861
0862 if not self._commit():
0863 raise RuntimeError("Commit error")
0864 tmp_log.debug("done")
0865 return retVal
0866 except Exception:
0867
0868 self._rollback()
0869
0870 self.dump_error_message(tmp_log)
0871 return None, "DB error"
0872
0873
0874 def insertSandboxFileInfo(self, userName, hostName, fileName, fileSize, checkSum):
0875 comment = " /* DBProxy.insertSandboxFileInfo */"
0876 tmp_log = self.create_tagged_logger(comment, f"userName={userName}")
0877 sqlC = "SELECT userName,fileSize,checkSum FROM ATLAS_PANDAMETA.userCacheUsage "
0878 sqlC += "WHERE hostName=:hostName AND fileName=:fileName FOR UPDATE"
0879
0880 sql = "INSERT INTO ATLAS_PANDAMETA.userCacheUsage "
0881 sql += "(userName,hostName,fileName,fileSize,checkSum,creationTime,modificationTime) "
0882 sql += "VALUES (:userName,:hostName,:fileName,:fileSize,:checkSum,CURRENT_DATE,CURRENT_DATE) "
0883
0884 try:
0885
0886 self.conn.begin()
0887
0888 varMap = {}
0889 varMap[":hostName"] = hostName
0890 varMap[":fileName"] = fileName
0891 self.cur.arraysize = 10
0892 self.cur.execute(sqlC + comment, varMap)
0893 res = self.cur.fetchall()
0894 if len(res) != 0:
0895 tmp_log.debug(f"skip {hostName} {fileName} since already exists")
0896
0897 if not self._commit():
0898 raise RuntimeError("Commit error")
0899 return "WARNING: file exist"
0900
0901 varMap = {}
0902 varMap[":userName"] = userName
0903 varMap[":hostName"] = hostName
0904 varMap[":fileName"] = fileName
0905 varMap[":fileSize"] = fileSize
0906 varMap[":checkSum"] = checkSum
0907 self.cur.execute(sql + comment, varMap)
0908
0909 if not self._commit():
0910 raise RuntimeError("Commit error")
0911 return "OK"
0912 except Exception:
0913
0914 self._rollback()
0915
0916 self.dump_error_message(tmp_log)
0917 return "ERROR: DB failure"
0918
0919
0920 def getLockSandboxFiles(self, time_limit, n_files):
0921 comment = " /* DBProxy.getLockSandboxFiles */"
0922 tmp_log = self.create_tagged_logger(comment)
0923 sqlC = (
0924 "SELECT * FROM ("
0925 "SELECT userName,hostName,fileName,creationTime,modificationTime FROM ATLAS_PANDAMETA.userCacheUsage "
0926 "WHERE modificationTime<:timeLimit AND (fileName like 'sources%' OR fileName like 'jobO%') ) "
0927 "WHERE rownum<:nRows "
0928 )
0929 sqlU = "UPDATE ATLAS_PANDAMETA.userCacheUsage SET modificationTime=CURRENT_DATE " "WHERE userName=:userName AND fileName=:fileName "
0930 try:
0931 tmp_log.debug("start")
0932
0933 self.conn.begin()
0934
0935 varMap = {}
0936 varMap[":timeLimit"] = time_limit
0937 varMap[":nRows"] = n_files
0938 self.cur.execute(sqlC + comment, varMap)
0939 res = self.cur.fetchall()
0940 retList = []
0941 for userName, hostName, fileName, creationTime, modificationTime in res:
0942 retList.append((userName, hostName, fileName, creationTime, modificationTime))
0943
0944 varMap = dict()
0945 varMap[":userName"] = userName
0946 varMap[":fileName"] = fileName
0947 self.cur.execute(sqlU + comment, varMap)
0948
0949 if not self._commit():
0950 raise RuntimeError("Commit error")
0951 tmp_log.debug(f"locked {len(retList)} files")
0952 return retList
0953 except Exception:
0954
0955 self._rollback()
0956
0957 self.dump_error_message(tmp_log)
0958 return None
0959
0960
0961 def checkSandboxFile(self, dn, fileSize, checkSum):
0962 comment = " /* DBProxy.checkSandboxFile */"
0963 tmp_log = self.create_tagged_logger(comment)
0964 tmp_log.debug(f"dn={dn} size={fileSize} checksum={checkSum}")
0965 sqlC = "SELECT hostName,fileName FROM ATLAS_PANDAMETA.userCacheUsage "
0966 sqlC += "WHERE userName=:userName AND fileSize=:fileSize AND checkSum=:checkSum "
0967 sqlC += "AND hostName<>:ngHostName AND creationTime>CURRENT_DATE-3 "
0968 sqlC += "AND creationTime>CURRENT_DATE-3 "
0969 try:
0970 retStr = "NOTFOUND"
0971
0972 compactDN = CoreUtils.clean_user_id(dn)
0973 if compactDN in ["", "NULL", None]:
0974 compactDN = dn
0975
0976 self.conn.begin()
0977
0978 varMap = {}
0979 varMap[":userName"] = compactDN
0980 varMap[":fileSize"] = fileSize
0981 varMap[":checkSum"] = str(checkSum)
0982 varMap[":ngHostName"] = "localhost.localdomain"
0983 self.cur.arraysize = 10
0984 self.cur.execute(sqlC + comment, varMap)
0985 res = self.cur.fetchall()
0986
0987 if not self._commit():
0988 raise RuntimeError("Commit error")
0989 if len(res) != 0:
0990 hostName, fileName = res[0]
0991 retStr = f"FOUND:{hostName}:{fileName}"
0992 tmp_log.debug(f"{retStr}")
0993 return retStr
0994 except Exception:
0995
0996 self._rollback()
0997
0998 self.dump_error_message(tmp_log)
0999 return "ERROR: DB failure"
1000
1001
1002 def insertDataset(self, dataset, tablename="ATLAS_PANDA.Datasets"):
1003 comment = " /* DBProxy.insertDataset */"
1004 tmp_log = self.create_tagged_logger(comment, f"dataset={dataset.name}")
1005 tmp_log.debug("start")
1006 sql0 = f"SELECT COUNT(*) FROM {tablename} WHERE vuid=:vuid "
1007 sql1 = f"INSERT INTO {tablename} "
1008 sql1 += f"({DatasetSpec.columnNames()}) "
1009 sql1 += DatasetSpec.bindValuesExpression()
1010 sql2 = f"SELECT name FROM {tablename} WHERE vuid=:vuid "
1011
1012 dataset.creationdate = naive_utcnow()
1013 dataset.modificationdate = dataset.creationdate
1014 try:
1015
1016 if dataset.subType in ["", "NULL", None]:
1017
1018 if re.search("_dis\d+$", dataset.name) is not None:
1019 dataset.subType = "dis"
1020 elif re.search("_sub\d+$", dataset.name) is not None:
1021 dataset.subType = "sub"
1022 else:
1023 dataset.subType = "top"
1024
1025 self.conn.begin()
1026
1027 varMap = {}
1028 varMap[":vuid"] = dataset.vuid
1029 self.cur.execute(sql0 + comment, varMap)
1030 (nDS,) = self.cur.fetchone()
1031 tmp_log.debug(f"nDS={nDS} with {dataset.vuid}")
1032 if nDS == 0:
1033
1034 tmp_log.debug(sql1 + comment + str(dataset.valuesMap()))
1035 self.cur.execute(sql1 + comment, dataset.valuesMap())
1036
1037 varMap = {}
1038 varMap[":vuid"] = dataset.vuid
1039 self.cur.execute(sql2 + comment, varMap)
1040 (nameInDB,) = self.cur.fetchone()
1041 tmp_log.debug(f"inDB -> {nameInDB} {dataset.name == nameInDB}")
1042
1043 if not self._commit():
1044 raise RuntimeError("Commit error")
1045 return True
1046 except Exception:
1047
1048 self._rollback()
1049
1050 self.dump_error_message(tmp_log)
1051 return False
1052
1053
1054 def getLockDatasets(self, sqlQuery, varMapGet, modTimeOffset="", getVersion=False):
1055 comment = " /* DBProxy.getLockDatasets */"
1056 tmp_log = self.create_tagged_logger(comment)
1057 tmp_log.debug(f"{sqlQuery},{str(varMapGet)},{modTimeOffset}")
1058 sqlGet = (
1059 "SELECT /*+ INDEX_RS_ASC(tab(STATUS,TYPE,MODIFICATIONDATE)) */ vuid,name,modificationdate,version,transferStatus FROM ATLAS_PANDA.Datasets tab WHERE "
1060 + sqlQuery
1061 )
1062 sqlLock = "UPDATE ATLAS_PANDA.Datasets SET modificationdate=CURRENT_DATE"
1063 if modTimeOffset != "":
1064 sqlLock += f"+{modTimeOffset}"
1065 sqlLock += ",transferStatus=MOD(transferStatus+1,10)"
1066 if getVersion:
1067 sqlLock += ",version=:version"
1068 sqlLock += " WHERE vuid=:vuid AND transferStatus=:transferStatus"
1069 retList = []
1070 try:
1071
1072 self.conn.begin()
1073
1074 self.cur.arraysize = 1000000
1075 self.cur.execute(sqlGet + comment, varMapGet)
1076 res = self.cur.fetchall()
1077
1078 if not self._commit():
1079 raise RuntimeError("Commit error")
1080
1081 if res is not None and len(res) != 0:
1082 for vuid, name, modificationdate, version, transferStatus in res:
1083
1084 varMapLock = {}
1085 varMapLock[":vuid"] = vuid
1086 varMapLock[":transferStatus"] = transferStatus
1087 if getVersion:
1088 try:
1089 varMapLock[":version"] = str(int(version) + 1)
1090 except Exception:
1091 varMapLock[":version"] = str(1)
1092
1093 self.conn.begin()
1094
1095 self.cur.execute(sqlLock + comment, varMapLock)
1096 retU = self.cur.rowcount
1097
1098 if not self._commit():
1099 raise RuntimeError("Commit error")
1100 if retU > 0:
1101
1102 if not getVersion:
1103 retList.append((vuid, name, modificationdate))
1104 else:
1105 retList.append((vuid, name, modificationdate, version))
1106
1107 if not self._commit():
1108 raise RuntimeError("Commit error")
1109
1110 return retList
1111 except Exception:
1112
1113 self._rollback()
1114
1115 self.dump_error_message(tmp_log)
1116 return []
1117
1118
1119 def queryDatasetWithMap(self, map):
1120 comment = " /* DBProxy.queryDatasetWithMap */"
1121 tmp_log = self.create_tagged_logger(comment)
1122 tmp_log.debug(f"{map}")
1123 if "name" in map:
1124 sql1 = """SELECT /*+ BEGIN_OUTLINE_DATA """
1125 sql1 += """INDEX_RS_ASC(@"SEL$1" "TAB"@"SEL$1" ("DATASETS"."NAME")) """
1126 sql1 += """OUTLINE_LEAF(@"SEL$1") ALL_ROWS """
1127 sql1 += """IGNORE_OPTIM_EMBEDDED_HINTS """
1128 sql1 += """END_OUTLINE_DATA */ """
1129 sql1 += f"{DatasetSpec.columnNames()} FROM ATLAS_PANDA.Datasets tab"
1130 else:
1131 sql1 = f"SELECT {DatasetSpec.columnNames()} FROM ATLAS_PANDA.Datasets"
1132 varMap = {}
1133 for key in map:
1134 if len(varMap) == 0:
1135 sql1 += f" WHERE {key}=:{key}"
1136 else:
1137 sql1 += f" AND {key}=:{key}"
1138 varMap[f":{key}"] = map[key]
1139 try:
1140
1141 self.conn.begin()
1142
1143 self.cur.arraysize = 100
1144 tmp_log.debug(sql1 + comment + str(varMap))
1145 self.cur.execute(sql1 + comment, varMap)
1146 res = self.cur.fetchall()
1147
1148 if not self._commit():
1149 raise RuntimeError("Commit error")
1150
1151 if res is not None and len(res) != 0:
1152 dataset = DatasetSpec()
1153 dataset.pack(res[0])
1154 return dataset
1155 tmp_log.error(f"dataset not found")
1156 return None
1157 except Exception:
1158
1159 self._rollback()
1160 self.dump_error_message(tmp_log)
1161 return None
1162
1163
1164 def updateDataset(self, datasets, withLock, withCriteria, criteriaMap):
1165 comment = " /* DBProxy.updateDataset */"
1166 tmp_log = self.create_tagged_logger(comment)
1167 tmp_log.debug("start")
1168 sql1 = f"UPDATE ATLAS_PANDA.Datasets SET {DatasetSpec.bindUpdateExpression()} "
1169 sql1 += "WHERE vuid=:vuid"
1170 if withCriteria != "":
1171 sql1 += f" AND {withCriteria}"
1172 retList = []
1173 try:
1174
1175 self.conn.begin()
1176 for dataset in datasets:
1177 tmp_log.debug(f"dataset={dataset.name} status={dataset.status})")
1178
1179 dataset.modificationdate = naive_utcnow()
1180
1181 varMap = dataset.valuesMap()
1182 varMap[":vuid"] = dataset.vuid
1183 for cKey in criteriaMap:
1184 varMap[cKey] = criteriaMap[cKey]
1185 tmp_log.debug(sql1 + comment + str(varMap))
1186 self.cur.execute(sql1 + comment, varMap)
1187 retU = self.cur.rowcount
1188 if retU != 0 and retU != 1:
1189 raise RuntimeError(f"Invalid rerun {retU}")
1190 retList.append(retU)
1191
1192 if not self._commit():
1193 raise RuntimeError("Commit error")
1194 tmp_log.debug(f"ret:{retList}")
1195 return retList
1196 except Exception:
1197
1198 self._rollback()
1199 self.dump_error_message(tmp_log)
1200 return []
1201
1202
1203 def trigger_cleanup_internal_datasets(self, task_id: int) -> bool:
1204 """
1205 Set deleting flag to dispatch datasets used by a task, which triggers deletion in datasetManager
1206 """
1207 comment = " /* DBProxy.trigger_cleanup_internal_datasets */"
1208 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={task_id}")
1209 tmp_log.debug("start")
1210 sql1 = (
1211 f"UPDATE {panda_config.schemaPANDA}.Datasets SET status=:newStatus,modificationdate=CURRENT_DATE "
1212 "WHERE type=:type AND MoverID=:taskID AND status IN (:status_d,:status_c) "
1213 )
1214 try:
1215
1216 self.conn.begin()
1217
1218 var_map = {
1219 ":type": "dispatch",
1220 ":newStatus": "deleting",
1221 ":taskID": task_id,
1222 ":status_d": "defined",
1223 ":status_c": "completed",
1224 }
1225 self.cur.execute(sql1 + comment, var_map)
1226 ret_u = self.cur.rowcount
1227
1228 if not self._commit():
1229 raise RuntimeError("Commit error")
1230 tmp_log.debug(f"set flag to {ret_u} dispatch datasets")
1231 return True
1232 except Exception:
1233
1234 self._rollback()
1235 self.dump_error_message(tmp_log)
1236 return False
1237
1238
1239 def getSerialNumber(self, datasetname, definedFreshFlag=None):
1240 comment = " /* DBProxy.getSerialNumber */"
1241 tmp_log = self.create_tagged_logger(comment, f"datasetname={datasetname}")
1242 try:
1243 tmp_log.debug(f"fresh={definedFreshFlag}")
1244 if isinstance(datasetname, str):
1245 datasetname = datasetname.encode("ascii", "ignore")
1246 tmp_log.debug(f"converted unicode for {datasetname}")
1247
1248 self.conn.begin()
1249
1250 if definedFreshFlag is None:
1251
1252 varMap = {}
1253 varMap[":name"] = datasetname
1254 varMap[":type"] = "output"
1255 sql = "SELECT /*+ INDEX_RS_ASC(TAB (DATASETS.NAME)) */ COUNT(*) FROM ATLAS_PANDA.Datasets tab WHERE type=:type AND name=:name"
1256 self.cur.arraysize = 100
1257 self.cur.execute(sql + comment, varMap)
1258 res = self.cur.fetchone()
1259
1260 if res is not None and len(res) != 0 and res[0] > 0:
1261 freshFlag = False
1262 else:
1263 freshFlag = True
1264 else:
1265
1266 freshFlag = definedFreshFlag
1267
1268 if self.backend == "oracle":
1269 sql = "SELECT ATLAS_PANDA.SUBCOUNTER_SUBID_SEQ.nextval FROM dual"
1270 self.cur.arraysize = 100
1271 self.cur.execute(sql + comment, {})
1272 (sn,) = self.cur.fetchone()
1273 elif self.backend == "postgres":
1274 sql = f"SELECT {panda_config.schemaPANDA}.SUBCOUNTER_SUBID_SEQ.nextval"
1275 self.cur.arraysize = 100
1276 self.cur.execute(sql + comment, {})
1277 (sn,) = self.cur.fetchone()
1278 else:
1279
1280
1281 sql = " INSERT INTO ATLAS_PANDA.SUBCOUNTER_SUBID_SEQ (col) VALUES (NULL) "
1282 self.cur.arraysize = 100
1283 self.cur.execute(sql + comment, {})
1284 sql2 = """ SELECT LAST_INSERT_ID() """
1285 self.cur.execute(sql2 + comment, {})
1286 (sn,) = self.cur.fetchone()
1287
1288 if not self._commit():
1289 raise RuntimeError("Commit error")
1290 tmp_log.debug(f"SN={sn} {freshFlag}")
1291 return (sn, freshFlag)
1292 except Exception:
1293
1294 self._rollback()
1295
1296 self.dump_error_message(tmp_log)
1297 return (-1, False)
1298
1299
1300 def countFilesWithMap(self, map):
1301 comment = " /* DBProxy.countFilesWithMap */"
1302 tmp_log = self.create_tagged_logger(comment)
1303 sql1 = "SELECT /*+ index(tab FILESTABLE4_DESTDBLOCK_IDX) */ COUNT(*) FROM ATLAS_PANDA.filesTable4 tab"
1304 varMap = {}
1305 for key in map:
1306 if len(varMap) == 0:
1307 sql1 += f" WHERE {key}=:{key}"
1308 else:
1309 sql1 += f" AND {key}=:{key}"
1310 varMap[f":{key}"] = map[key]
1311 nTry = 3
1312 for iTry in range(nTry):
1313 try:
1314
1315 self.conn.begin()
1316
1317 tmp_log.debug(f"{sql1} {str(map)}")
1318 self.cur.arraysize = 10
1319 retS = self.cur.execute(sql1 + comment, varMap)
1320 res = self.cur.fetchone()
1321 tmp_log.debug(f"{retS} {str(res)}")
1322
1323 if not self._commit():
1324 raise RuntimeError("Commit error")
1325 nFiles = 0
1326 if res is not None:
1327 nFiles = res[0]
1328 return nFiles
1329 except Exception:
1330
1331 self._rollback()
1332 if iTry + 1 < nTry:
1333 tmp_log.debug(f"retry : {iTry}")
1334 time.sleep(random.randint(10, 20))
1335 continue
1336 self.dump_error_message(tmp_log)
1337 return -1
1338
1339
1340 def updateInFilesReturnPandaIDs(self, dataset, status, fileLFN=""):
1341 comment = " /* DBProxy.updateInFilesReturnPandaIDs */"
1342 tmp_log = self.create_tagged_logger(comment, f"dataset={dataset}")
1343 tmp_log.debug(f"{fileLFN})")
1344 sql0 = "SELECT /*+ index(tab FILESTABLE4_DISPDBLOCK_IDX) */ row_ID,PandaID FROM ATLAS_PANDA.filesTable4 tab WHERE status<>:status AND dispatchDBlock=:dispatchDBlock"
1345 sql1 = "UPDATE /*+ index(tab FILESTABLE4_DISPDBLOCK_IDX) */ ATLAS_PANDA.filesTable4 tab SET status=:status WHERE status<>:status AND dispatchDBlock=:dispatchDBlock"
1346 varMap = {}
1347 varMap[":status"] = status
1348 varMap[":dispatchDBlock"] = dataset
1349 if fileLFN != "":
1350 sql0 += " AND lfn=:lfn"
1351 sql1 += " AND lfn=:lfn"
1352 varMap[":lfn"] = fileLFN
1353 for iTry in range(self.nTry):
1354 try:
1355
1356 self.conn.begin()
1357
1358 self.cur.arraysize = 10000
1359 retS = self.cur.execute(sql0 + comment, varMap)
1360 resS = self.cur.fetchall()
1361
1362 retU = self.cur.execute(sql1 + comment, varMap)
1363
1364 if not self._commit():
1365 raise RuntimeError("Commit error")
1366
1367 retList = []
1368 for tmpRowID, tmpPandaID in resS:
1369
1370 if tmpPandaID not in retList:
1371 retList.append(tmpPandaID)
1372
1373 tmp_log.debug(f"ret={str(retList)}")
1374 return retList
1375 except Exception:
1376
1377 self._rollback()
1378
1379 if iTry + 1 < self.nTry:
1380 tmp_log.debug(f"retry : {iTry}")
1381 time.sleep(random.randint(10, 20))
1382 continue
1383 self.dump_error_message(tmp_log)
1384 return []
1385
1386
1387 def update_input_files_at_sites_and_get_panda_ids(self, filename: str, sites: list) -> list:
1388 """
1389 Update input files with a LFN for jobs at certain sites and return corresponding PandaIDs
1390
1391 :param filename: LFN of the input file to be updated
1392 :param sites: List of site names where the jobs are running
1393
1394 :return: List of PandaIDs of the jobs whose input files were updated
1395 """
1396 comment = " /* DBProxy.update_input_files_at_sites_and_get_panda_ids */"
1397 tmp_log = self.create_tagged_logger(comment, f"lfn={filename}")
1398 tmp_log.debug("start at sites: " + ",".join(sites))
1399 sql_to_get_ids = (
1400 "SELECT tabJ.PandaID FROM ATLAS_PANDA.jobsDefined4 tabJ, ATLAS_PANDA.filesTable4 tabF WHERE tabF.PandaID=tabJ.PandaID AND tabF.lfn=:lfn "
1401 "AND tabF.type='input' AND tabF.status IN ('defined', 'unknown', 'pending') AND tabJ.computingSite IN ("
1402 )
1403 var_map = {":lfn": filename}
1404 for idx, site in enumerate(sites):
1405 tmp_key = f":site_{idx}"
1406 var_map[tmp_key] = site
1407 sql_to_get_ids += f"{tmp_key},"
1408 sql_to_get_ids = sql_to_get_ids[:-1] + ") "
1409 return_list = []
1410 try:
1411
1412 self.conn.begin()
1413
1414 self.cur.arraysize = 10000
1415 self.cur.execute(sql_to_get_ids + comment, var_map)
1416 res = self.cur.fetchall()
1417
1418 if not self._commit():
1419 raise RuntimeError("Commit error")
1420
1421 sql_to_update_files = (
1422 "UPDATE ATLAS_PANDA.filesTable4 tab SET status='ready' WHERE PandaID=:PandaID AND lfn=:lfn AND type='input' AND status<>'ready' "
1423 )
1424 for (tmp_pandaID,) in res:
1425 var_map = {":PandaID": tmp_pandaID, ":lfn": filename}
1426
1427 self.conn.begin()
1428 self.cur.execute(sql_to_update_files + comment, var_map)
1429 if self.cur.rowcount > 0:
1430 tmp_log.debug(f"updated input file status to 'ready' for PandaID={tmp_pandaID}")
1431 return_list.append(tmp_pandaID)
1432
1433 if not self._commit():
1434 raise RuntimeError("Commit error")
1435
1436 tmp_log.debug(f"done")
1437 return return_list
1438 except Exception:
1439
1440 self._rollback()
1441 self.dump_error_message(tmp_log)
1442 return []
1443
1444
1445 def updateOutFilesReturnPandaIDs(self, dataset, fileLFN=""):
1446 comment = " /* DBProxy.updateOutFilesReturnPandaIDs */"
1447 tmp_log = self.create_tagged_logger(comment, f"dataset={dataset}")
1448 tmp_log.debug(f"{fileLFN}")
1449 sql0 = "SELECT /*+ index(tab FILESTABLE4_DESTDBLOCK_IDX) */ row_ID,PandaID FROM ATLAS_PANDA.filesTable4 tab WHERE destinationDBlock=:destinationDBlock AND status=:status"
1450 sql1 = "UPDATE /*+ index(tab FILESTABLE4_DESTDBLOCK_IDX) */ ATLAS_PANDA.filesTable4 tab SET status='ready' WHERE destinationDBlock=:destinationDBlock AND status=:status"
1451 varMap = {}
1452 varMap[":status"] = "transferring"
1453 varMap[":destinationDBlock"] = dataset
1454 if fileLFN != "":
1455 sql0 += " AND lfn=:lfn"
1456 sql1 += " AND lfn=:lfn"
1457 varMap[":lfn"] = fileLFN
1458 for iTry in range(self.nTry):
1459 try:
1460
1461 self.conn.begin()
1462
1463 self.cur.arraysize = 10000
1464 retS = self.cur.execute(sql0 + comment, varMap)
1465 resS = self.cur.fetchall()
1466
1467 retList = []
1468 retU = self.cur.execute(sql1 + comment, varMap)
1469
1470 if not self._commit():
1471 raise RuntimeError("Commit error")
1472
1473 retList = []
1474 for tmpRowID, tmpPandaID in resS:
1475
1476 if tmpPandaID not in retList:
1477 retList.append(tmpPandaID)
1478
1479 tmp_log.debug(f"ret={str(retList)}")
1480 return retList
1481 except Exception:
1482
1483 self._rollback()
1484
1485 if iTry + 1 < self.nTry:
1486 tmp_log.debug(f"retry : {iTry}")
1487 time.sleep(random.randint(10, 20))
1488 continue
1489 self.dump_error_message(tmp_log)
1490 return []
1491
1492
1493 def getAssociatedDisDatasets(self, subDsName):
1494 comment = " /* DBProxy.getAssociatedDisDatasets */"
1495 tmp_log = self.create_tagged_logger(comment, f"subDsName={subDsName}")
1496 tmp_log.debug(f"start")
1497 sqlF = (
1498 "SELECT /*+ index(tab FILESTABLE4_DESTDBLOCK_IDX) */ distinct PandaID FROM ATLAS_PANDA.filesTable4 tab WHERE destinationDBlock=:destinationDBlock"
1499 )
1500 sqlJ = "SELECT distinct dispatchDBlock FROM ATLAS_PANDA.filesTable4 WHERE PandaID=:PandaID AND type=:type"
1501 try:
1502
1503 self.conn.begin()
1504
1505 varMap = {}
1506 varMap[":destinationDBlock"] = subDsName
1507 self.cur.arraysize = 10000
1508 self.cur.execute(sqlF + comment, varMap)
1509 resS = self.cur.fetchall()
1510
1511 if not self._commit():
1512 raise RuntimeError("Commit error")
1513
1514 retList = []
1515 for (pandaID,) in resS:
1516
1517 self.conn.begin()
1518
1519 varMap = {}
1520 varMap[":type"] = "input"
1521 varMap[":PandaID"] = pandaID
1522 self.cur.arraysize = 1000
1523 self.cur.execute(sqlJ + comment, varMap)
1524 resD = self.cur.fetchall()
1525
1526 if not self._commit():
1527 raise RuntimeError("Commit error")
1528
1529 for (disName,) in resD:
1530 if disName is not None and disName not in retList:
1531 retList.append(disName)
1532
1533 tmp_log.debug(f"ret={str(retList)}")
1534 return retList
1535 except Exception:
1536
1537 self._rollback()
1538 self.dump_error_message(tmp_log)
1539 return []
1540
1541
1542 def setGUIDs(self, files):
1543 comment = " /* DBProxy.setGUIDs */"
1544 tmp_log = self.create_tagged_logger(comment)
1545 tmp_log.debug(f"{files}")
1546 sql0 = "UPDATE ATLAS_PANDA.filesTable4 SET GUID=:GUID,fsize=:fsize,checksum=:checksum,scope=:scope WHERE lfn=:lfn"
1547 for iTry in range(self.nTry):
1548 try:
1549
1550 self.conn.begin()
1551 self.cur.arraysize = 1000000
1552
1553 for file in files:
1554 varMap = {}
1555 varMap[":GUID"] = file["guid"]
1556 varMap[":lfn"] = file["lfn"]
1557 if file["checksum"] in ["", "NULL"]:
1558 varMap[":checksum"] = None
1559 else:
1560 varMap[":checksum"] = file["checksum"]
1561 varMap[":fsize"] = file["fsize"]
1562 if "scope" not in file or file["scope"] in ["", "NULL"]:
1563 varMap[":scope"] = None
1564 else:
1565 varMap[":scope"] = file["scope"]
1566 self.cur.execute(sql0 + comment, varMap)
1567 retU = self.cur.rowcount
1568 tmp_log.debug(f"retU {retU}")
1569 if retU < 0:
1570 raise RuntimeError("SQL error")
1571
1572 if not self._commit():
1573 raise RuntimeError("Commit error")
1574 return True
1575 except Exception:
1576
1577 self._rollback()
1578
1579 if iTry + 1 < self.nTry:
1580 tmp_log.debug(f"retry : {iTry}")
1581 time.sleep(random.randint(10, 20))
1582 continue
1583 self.dump_error_message(tmp_log)
1584 return False
1585
1586
1587 def get_special_dispatch_params(self):
1588 """
1589 Get the following special parameters for dispatcher.Z
1590 Authorized name lists for proxy, key-pair, and token-key retrieval
1591 Key pairs
1592 Token keys
1593 """
1594 comment = " /* DBProxy.get_special_dispatch_params */"
1595 tmp_log = self.create_tagged_logger(comment)
1596 tmp_log.debug("start")
1597 try:
1598 return_map = {}
1599
1600 self.conn.begin()
1601 self.cur.arraysize = 100000
1602
1603 token_keys = {}
1604 sql = f"SELECT dn, credname FROM {panda_config.schemaMETA}.proxykey WHERE expires>:limit ORDER BY expires DESC "
1605 var_map = {":limit": naive_utcnow()}
1606 self.cur.execute(sql + comment, var_map)
1607 res_list = self.cur.fetchall()
1608 for client_name, token_key in res_list:
1609 token_keys.setdefault(client_name, {"fullList": [], "latest": token_key})
1610 token_keys[client_name]["fullList"].append(token_key)
1611 return_map["tokenKeys"] = token_keys
1612 tmp_list = [f"""{k}:{len(token_keys[k]["fullList"])}""" for k in token_keys]
1613 tmp_log.debug(f"""got token keys {",".join(tmp_list)}""")
1614
1615 allow_key = []
1616 allow_proxy = []
1617 allow_token = []
1618 sql = "SELECT DISTINCT name, gridpref FROM ATLAS_PANDAMETA.users " "WHERE (status IS NULL OR status<>:ngStatus) AND gridpref IS NOT NULL "
1619 var_map = {":ngStatus": "disabled"}
1620 self.cur.execute(sql + comment, var_map)
1621 res_list = self.cur.fetchall()
1622
1623 if not self._commit():
1624 raise RuntimeError("Commit error")
1625 for compactDN, gridpref in res_list:
1626
1627 if PrioUtil.PERMISSION_PROXY in gridpref:
1628 if compactDN not in allow_proxy:
1629 allow_proxy.append(compactDN)
1630
1631 if PrioUtil.PERMISSION_KEY in gridpref:
1632 if compactDN not in allow_key:
1633 allow_key.append(compactDN)
1634
1635 if PrioUtil.PERMISSION_TOKEN_KEY in gridpref:
1636 if compactDN not in allow_token:
1637 allow_token.append(compactDN)
1638 return_map["allowKeyPair"] = allow_key
1639 return_map["allowProxy"] = allow_proxy
1640 return_map["allowTokenKey"] = allow_token
1641 tmp_log.debug(
1642 f"got authed users key-pair:{len(return_map['allowKeyPair'])}, proxy:{len(return_map['allowProxy'])}, token-key:{len(return_map['allowTokenKey'])}"
1643 )
1644
1645 keyPair = {}
1646 try:
1647 keyFileNames = glob.glob(panda_config.keyDir + "/*")
1648 for keyName in keyFileNames:
1649 tmpF = open(keyName)
1650 keyPair[os.path.basename(keyName)] = tmpF.read()
1651 tmpF.close()
1652 except Exception as e:
1653 tmp_log.error(f"failed read key-pairs with {str(e)}")
1654 return_map["keyPair"] = keyPair
1655 tmp_log.debug(f"got {len(return_map['keyPair'])} key-pair files")
1656 tmp_log.debug("done")
1657 return return_map
1658 except Exception:
1659
1660 self._rollback()
1661
1662 self.dump_error_message(tmp_log)
1663 return {}
1664
1665
1666 def getOriginalConsumers(self, jediTaskID, jobsetID, pandaID):
1667 comment = " /* DBProxy.getOriginalConsumers */"
1668 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} jobsetID={jobsetID} PandaID={pandaID}")
1669 tmp_log.debug("start")
1670 try:
1671
1672 sqlA = "SELECT computingSite FROM ATLAS_PANDA.jobsActive4 WHERE jediTaskID=:jediTaskID AND jobsetID=:jobsetID "
1673 sqlA += "UNION "
1674 sqlA += "SELECT computingSite FROM ATLAS_PANDA.jobsDefined4 WHERE jediTaskID=:jediTaskID AND jobsetID=:jobsetID "
1675
1676 sqlG = f"SELECT oldPandaID FROM {panda_config.schemaJEDI}.JEDI_Job_Retry_History "
1677 sqlG += "WHERE jediTaskID=:jediTaskID AND newPandaID=:jobsetID AND relationType=:relationType "
1678
1679 sqlC1 = "SELECT computingSite FROM ATLAS_PANDA.jobsArchived4 WHERE PandaID=:PandaID "
1680 sqlC2 = "SELECT computingSite FROM ATLAS_PANDAARCH.jobsArchived WHERE PandaID=:PandaID AND modificationTime>(CURRENT_DATE-30) "
1681
1682 sqlJ = f"SELECT {JobSpec.columnNames()} "
1683 sqlJ += "FROM {0} "
1684 sqlJ += "WHERE PandaID=:PandaID AND modificationTime>(CURRENT_DATE-30) "
1685 sqlF = f"SELECT {FileSpec.columnNames()} FROM ATLAS_PANDA.filesTable4 "
1686 sqlF += "WHERE PandaID=:PandaID "
1687 sqlP = "SELECT jobParameters FROM {0} WHERE PandaID=:PandaID "
1688
1689 aSites = set()
1690 varMap = dict()
1691 varMap[":jediTaskID"] = jediTaskID
1692 varMap[":jobsetID"] = jobsetID
1693 self.cur.execute(sqlA + comment, varMap)
1694 resA = self.cur.fetchall()
1695 for (computingSite,) in resA:
1696 aSites.add(computingSite)
1697
1698 varMap = dict()
1699 varMap[":jediTaskID"] = jediTaskID
1700 varMap[":jobsetID"] = jobsetID
1701 varMap[":relationType"] = EventServiceUtils.relationTypeJS_Map
1702 self.cur.execute(sqlG + comment, varMap)
1703 resG = self.cur.fetchall()
1704 jobList = []
1705 for (pandaID,) in resG:
1706
1707 varMap = dict()
1708 varMap[":PandaID"] = pandaID
1709 self.cur.execute(sqlC1 + comment, varMap)
1710 resC = self.cur.fetchone()
1711 if resC is None:
1712
1713 self.cur.execute(sqlC2 + comment, varMap)
1714 resC = self.cur.fetchone()
1715 inArchived = True
1716 else:
1717 inArchived = False
1718
1719 if resC is None:
1720 continue
1721 (computingSite,) = resC
1722
1723 if computingSite in aSites:
1724 continue
1725
1726 if inArchived:
1727 self.cur.execute(sqlJ.format("ATLAS_PANDAARCH.jobsArchived") + comment, varMap)
1728 else:
1729 self.cur.execute(sqlJ.format("ATLAS_PANDA.jobsArchived4") + comment, varMap)
1730 resJ = self.cur.fetchone()
1731 if resJ is not None:
1732 jobSpec = JobSpec()
1733 jobSpec.pack(resJ)
1734
1735 self.cur.execute(sqlF + comment, varMap)
1736 resFs = self.cur.fetchall()
1737 if len(resFs) == 0:
1738 continue
1739 for resF in resFs:
1740 fileSpec = FileSpec()
1741 fileSpec.pack(resF)
1742 jobSpec.addFile(fileSpec)
1743
1744 if inArchived:
1745 self.cur.execute(
1746 sqlP.format("ATLAS_PANDAARCH.jobParamsTable_ARCH") + comment,
1747 varMap,
1748 )
1749 else:
1750 self.cur.execute(sqlP.format("ATLAS_PANDA.jobParamsTable") + comment, varMap)
1751 for (clobJobP,) in self.cur:
1752 if clobJobP is not None:
1753 try:
1754 jobSpec.jobParameters = clobJobP.read()
1755 except AttributeError:
1756 jobSpec.jobParameters = str(clobJobP)
1757 break
1758
1759 jobList.append(jobSpec)
1760 aSites.add(computingSite)
1761 tmp_log.debug(f"got {len(jobList)} consumers")
1762 return jobList
1763 except Exception:
1764
1765 self.dump_error_message(tmp_log)
1766 return []
1767
1768
1769 def updateUnmergedDatasets(self, job, finalStatusDS, updateCompleted=False):
1770 comment = " /* JediDBProxy.updateUnmergedDatasets */"
1771 tmp_log = self.create_tagged_logger(comment, f"PandaID={job.PandaID}")
1772
1773 umPandaIDs = []
1774 umCheckedIDs = []
1775
1776 sqlGFC = "SELECT status,PandaID,outPandaID FROM ATLAS_PANDA.JEDI_Dataset_Contents "
1777 sqlGFC += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND PandaID IS NOT NULL "
1778
1779 sqlUNF = "UPDATE ATLAS_PANDA.JEDI_Datasets "
1780 sqlUNF += "SET nFilesOnHold=0,nFiles=:nFiles,"
1781 sqlUNF += "nFilesUsed=:nFilesUsed,nFilesTobeUsed=:nFilesTobeUsed "
1782 sqlUNF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
1783
1784 sqlUCF = "SELECT nFilesTobeUsed,nFilesUsed FROM ATLAS_PANDA.JEDI_Datasets "
1785 sqlUCF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
1786
1787 sqlUDS = "UPDATE ATLAS_PANDA.JEDI_Datasets "
1788 sqlUDS += "SET status=:status,modificationTime=CURRENT_DATE "
1789 sqlUDS += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
1790
1791 sqlUDP = "UPDATE ATLAS_PANDA.Datasets "
1792 sqlUDP += "SET status=:status "
1793 sqlUDP += "WHERE vuid=:vuid AND NOT status IN (:statusR,:statusD) "
1794 try:
1795 tmp_log.debug(f"start")
1796
1797 self.conn.begin()
1798
1799 toSkip = False
1800 for datasetSpec in finalStatusDS:
1801 varMap = {}
1802 varMap[":vuid"] = datasetSpec.vuid
1803 varMap[":status"] = "tobeclosed"
1804 varMap[":statusR"] = "tobeclosed"
1805 if not updateCompleted:
1806 varMap[":statusD"] = "completed"
1807 else:
1808 varMap[":statusD"] = "dummy"
1809 tmp_log.debug(sqlUDP + comment + str(varMap))
1810 self.cur.execute(sqlUDP + comment, varMap)
1811 nRow = self.cur.rowcount
1812 if nRow != 1:
1813 toSkip = True
1814 tmp_log.debug(f"failed to lock {datasetSpec.name}")
1815
1816 if not toSkip:
1817 updatedDS = []
1818 for tmpFile in job.Files:
1819 if tmpFile.isUnMergedOutput():
1820 if tmpFile.datasetID in updatedDS:
1821 continue
1822 updatedDS.append(tmpFile.datasetID)
1823
1824 varMap = {}
1825 varMap[":jediTaskID"] = tmpFile.jediTaskID
1826 varMap[":datasetID"] = tmpFile.datasetID
1827 self.cur.arraysize = 100000
1828 tmp_log.debug(sqlGFC + comment + str(varMap))
1829 self.cur.execute(sqlGFC + comment, varMap)
1830 resListGFC = self.cur.fetchall()
1831 varMap = {}
1832 tmpNumFiles = 0
1833 tmpNumReady = 0
1834 for tmpFileStatus, tmpPandaID, tmpOutPandaID in resListGFC:
1835 if tmpFileStatus in [
1836 "finished",
1837 "failed",
1838 "cancelled",
1839 "notmerged",
1840 "ready",
1841 "lost",
1842 "broken",
1843 "picked",
1844 "nooutput",
1845 ]:
1846 pass
1847 elif tmpFileStatus == "running" and tmpPandaID != tmpOutPandaID:
1848 pass
1849 else:
1850 continue
1851 tmpNumFiles += 1
1852 if tmpFileStatus in ["ready"]:
1853 tmpNumReady += 1
1854
1855 varMap = {}
1856 varMap[":jediTaskID"] = tmpFile.jediTaskID
1857 varMap[":datasetID"] = tmpFile.datasetID
1858 varMap[":nFiles"] = tmpNumFiles
1859 varMap[":nFilesTobeUsed"] = tmpNumFiles
1860 varMap[":nFilesUsed"] = tmpNumFiles - tmpNumReady
1861 self.cur.arraysize = 10
1862 tmp_log.debug(sqlUNF + comment + str(varMap))
1863 self.cur.execute(sqlUNF + comment, varMap)
1864 nRow = self.cur.rowcount
1865 if nRow == 1:
1866
1867 varMap = {}
1868 varMap[":jediTaskID"] = tmpFile.jediTaskID
1869 varMap[":datasetID"] = tmpFile.datasetID
1870 self.cur.execute(sqlUCF + comment, varMap)
1871 resUCF = self.cur.fetchone()
1872 if resUCF is not None:
1873 nFilesTobeUsed, nFilesUsed = resUCF
1874 varMap = {}
1875 varMap[":jediTaskID"] = tmpFile.jediTaskID
1876 varMap[":datasetID"] = tmpFile.datasetID
1877 if nFilesTobeUsed - nFilesUsed > 0:
1878 varMap[":status"] = "ready"
1879 else:
1880 varMap[":status"] = "done"
1881
1882 tmp_log.debug(sqlUDS + comment + str(varMap))
1883 self.cur.execute(sqlUDS + comment, varMap)
1884 else:
1885 tmp_log.debug(f"skip jediTaskID={tmpFile.jediTaskID} datasetID={tmpFile.datasetID}")
1886
1887 if not self._commit():
1888 raise RuntimeError("Commit error")
1889 tmp_log.debug(f"done")
1890 return True
1891 except Exception:
1892
1893 self._rollback()
1894
1895 self.dump_error_message(tmp_log)
1896 return False
1897
1898
1899 def getThrottledUsers(self):
1900 comment = " /* DBProxy.getThrottledUsers */"
1901 tmp_log = self.create_tagged_logger(comment)
1902 tmp_log.debug("start")
1903 retVal = set()
1904 try:
1905
1906 sqlT = "SELECT distinct prodUserName,workingGroup FROM ATLAS_PANDA.jobsActive4 "
1907 sqlT += "WHERE prodSourceLabel=:prodSourceLabel AND jobStatus=:jobStatus AND relocationFlag=:relocationFlag "
1908
1909 self.conn.begin()
1910
1911 self.cur.arraysize = 10
1912 varMap = {}
1913 varMap[":prodSourceLabel"] = "user"
1914 varMap[":relocationFlag"] = 3
1915 varMap[":jobStatus"] = "throttled"
1916
1917 self.cur.execute(sqlT + comment, varMap)
1918 resPs = self.cur.fetchall()
1919 for prodUserName, workingGroup in resPs:
1920 retVal.add((prodUserName, workingGroup))
1921
1922 if not self._commit():
1923 raise RuntimeError("Commit error")
1924 tmp_log.debug(f"done with {str(retVal)}")
1925 return retVal
1926 except Exception:
1927
1928 self._rollback()
1929
1930 self.dump_error_message(tmp_log)
1931 return []
1932
1933
1934 def resetFileStatusInJEDI(
1935 self, dn: str, is_prod_manager: bool, dataset_name: str, lost_files: List[str], recover_parent: bool, simul: bool
1936 ) -> Tuple[bool, int | None, Dict[str, List[str]] | None, str | None]:
1937 """
1938 Reset file status in JEDI for lost files
1939 1) check ownership
1940 2) set file status from 'finished' to 'lost' in JEDI_Dataset_Contents
1941 3) cancel events in JEDI_Events
1942 4) if recoverParent is True, set parent files from 'finished' to 'ready'
1943 5) if simul is True, do not commit changes
1944 6) return list of input files that produced the lost files
1945
1946 :param dn: DN of the requester
1947 :param is_prod_manager: whether the requester is a production manager
1948 :param dataset_name: name of the dataset
1949 :param lost_files: list of lost LFNs
1950 :param recover_parent: whether to recover parent files
1951 :param simul: whether to simulate without committing changes
1952 :return: tuple of
1953 success flag,
1954 jediTaskID (None if failed),
1955 dictionary of list of input files that produced the lost files (None if failed),
1956 error message (None if succeeded)
1957 """
1958 comment = " /* DBProxy.resetFileStatusInJEDI */"
1959 tmp_log = self.create_tagged_logger(comment, f"datasetName={dataset_name}")
1960 tmp_log.debug("start")
1961 error_message = None
1962 try:
1963
1964 lostInputFiles = {}
1965
1966 compactDN = CoreUtils.clean_user_id(dn)
1967 if compactDN in ["", "NULL", None]:
1968 compactDN = dn
1969 tmp_log.debug(f"userName={compactDN}")
1970 toSkip = False
1971
1972 self.conn.begin()
1973
1974 varMap = {}
1975 varMap[":type1"] = "log"
1976 varMap[":type2"] = "output"
1977 varMap[":name1"] = dataset_name
1978 varMap[":name2"] = dataset_name.split(":")[-1]
1979 sqlGI = f"SELECT jediTaskID,datasetID FROM {panda_config.schemaJEDI}.JEDI_Datasets "
1980 sqlGI += "WHERE type IN (:type1,:type2) AND datasetName IN (:name1,:name2) "
1981 self.cur.execute(sqlGI + comment, varMap)
1982 resGI = self.cur.fetchall()
1983
1984 jediTaskID = None
1985 datasetID = None
1986 for tmpJediTaskID, tmpDatasetID in resGI:
1987 if jediTaskID is None or jediTaskID < tmpJediTaskID:
1988 jediTaskID = tmpJediTaskID
1989 datasetID = tmpDatasetID
1990 elif datasetID < tmpDatasetID:
1991 datasetID = tmpDatasetID
1992 if jediTaskID is None:
1993 error_message = "jediTaskID not found"
1994 tmp_log.debug(error_message)
1995 toSkip = True
1996 if not toSkip:
1997
1998 tmp_log.debug(f"jediTaskID={jediTaskID} datasetID={datasetID}")
1999 varMap = {}
2000 varMap[":jediTaskID"] = jediTaskID
2001 sqlOW = f"SELECT status,userName,useJumbo FROM {panda_config.schemaJEDI}.JEDI_Tasks "
2002 sqlOW += "WHERE jediTaskID=:jediTaskID "
2003 self.cur.execute(sqlOW + comment, varMap)
2004 resOW = self.cur.fetchone()
2005 taskStatus, ownerName, useJumbo = resOW
2006
2007 if not is_prod_manager and ownerName != compactDN:
2008 error_message = f"Not the owner: {ownerName} != {compactDN}"
2009 tmp_log.debug(error_message)
2010 toSkip = True
2011 if not toSkip:
2012
2013 sqlLP = f"SELECT pandaID FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
2014 sqlLP += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND lfn=:lfn "
2015
2016 sql_get_files = (
2017 f"SELECT datasetID,fileID FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
2018 "WHERE jediTaskID=:jediTaskID AND type=:type AND status=:oldStatus AND PandaID=:PandaID "
2019 )
2020
2021 sqlUFO = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
2022 sqlUFO += "SET status=:newStatus "
2023 sqlUFO += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID AND status=:oldStatus AND PandaID=:PandaID "
2024
2025 sqlCE = "UPDATE /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
2026 sqlCE += f"{panda_config.schemaJEDI}.JEDI_Events tab "
2027 sqlCE += "SET status=:status "
2028 sqlCE += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
2029 sqlCE += "AND status IN (:esFinished,:esDone,:esMerged) "
2030
2031 lostPandaIDs = set([])
2032 nDiff = {}
2033 for lostFile in lost_files:
2034 varMap = {}
2035 varMap[":jediTaskID"] = jediTaskID
2036 varMap[":datasetID"] = datasetID
2037 varMap[":lfn"] = lostFile
2038 self.cur.execute(sqlLP + comment, varMap)
2039 resLP = self.cur.fetchone()
2040 if resLP is not None:
2041 (pandaID,) = resLP
2042 lostPandaIDs.add(pandaID)
2043
2044 varMap = {}
2045 varMap[":jediTaskID"] = jediTaskID
2046 varMap[":PandaID"] = pandaID
2047 varMap[":type"] = "output"
2048 varMap[":oldStatus"] = "finished"
2049 self.cur.execute(sql_get_files + comment, varMap)
2050 res_files = self.cur.fetchall()
2051 if res_files:
2052 for tmpDatasetID, tmpFileID in res_files:
2053
2054 varMap = {}
2055 varMap[":jediTaskID"] = jediTaskID
2056 varMap[":datasetID"] = tmpDatasetID
2057 varMap[":fileID"] = tmpFileID
2058 varMap[":PandaID"] = pandaID
2059 varMap[":newStatus"] = "lost"
2060 varMap[":oldStatus"] = "finished"
2061 tmp_log.debug(sqlUFO + comment + str(varMap))
2062 if not simul:
2063 self.cur.execute(sqlUFO + comment, varMap)
2064 nRow = self.cur.rowcount
2065 if nRow > 0:
2066 nDiff[tmpDatasetID] = nDiff.get(tmpDatasetID, 0) + 1
2067 else:
2068 nDiff[tmpDatasetID] = nDiff.get(tmpDatasetID, 0) + 1
2069 tmp_log.debug(f"PandaIDs produced lost files = {str(lostPandaIDs)}")
2070 tmp_log.debug("update output datasets")
2071
2072 sqlGNE = "SELECT SUM(c.nEvents) "
2073 sqlGNE += "FROM {0}.JEDI_Datasets d,{0}.JEDI_Dataset_Contents c ".format(panda_config.schemaJEDI)
2074 sqlGNE += "WHERE c.jediTaskID=d.jediTaskID AND c.datasetID=d.datasetID "
2075 sqlGNE += "AND d.jediTaskID=:jediTaskID AND d.datasetID=:datasetID AND c.status=:status "
2076
2077 sqlUDO = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
2078 sqlUDO += "SET nFilesFinished=nFilesFinished-:nDiff,state=NULL,nEvents=:nEvents "
2079 sqlUDO += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
2080 for tmpDatasetID, n_diff_value in nDiff.items():
2081
2082 varMap = {}
2083 varMap[":jediTaskID"] = jediTaskID
2084 varMap[":datasetID"] = tmpDatasetID
2085 varMap[":status"] = "finished"
2086 self.cur.execute(sqlGNE + comment, varMap)
2087 (tmp_counts,) = self.cur.fetchone()
2088
2089 varMap = {}
2090 varMap[":jediTaskID"] = jediTaskID
2091 varMap[":datasetID"] = tmpDatasetID
2092 varMap[":nDiff"] = n_diff_value
2093 varMap[":nEvents"] = tmp_counts
2094 tmp_log.debug(sqlUDO + comment + str(varMap))
2095 if not simul:
2096 self.cur.execute(sqlUDO + comment, varMap)
2097
2098 tmp_log.debug("update input datasets")
2099 sqlID = f"SELECT datasetID,datasetName,masterID FROM {panda_config.schemaJEDI}.JEDI_Datasets "
2100 sqlID += "WHERE jediTaskID=:jediTaskID AND type=:type "
2101 varMap = {}
2102 varMap[":jediTaskID"] = jediTaskID
2103 varMap[":type"] = "input"
2104 self.cur.execute(sqlID + comment, varMap)
2105 resID = self.cur.fetchall()
2106 inputDatasets = {}
2107 masterID = None
2108 for tmpDatasetID, tmpDatasetName, tmpMasterID in resID:
2109 inputDatasets[tmpDatasetID] = tmpDatasetName
2110 if tmpMasterID is None:
2111 masterID = tmpDatasetID
2112
2113 if useJumbo is None:
2114 sqlAI = f"SELECT fileID,datasetID,lfn,outPandaID FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
2115 sqlAI += "WHERE jediTaskID=:jediTaskID AND type IN (:type1,:type2,:type3) AND PandaID=:PandaID "
2116 else:
2117 sqlAI = f"SELECT fileID,datasetID,lfn,NULL FROM {panda_config.schemaPANDA}.filesTable4 "
2118 sqlAI += "WHERE PandaID=:PandaID AND type IN (:type1,:type2) "
2119 sqlAI += "UNION "
2120 sqlAI = f"SELECT fileID,datasetID,lfn,NULL FROM {panda_config.schemaPANDA}.filesTable4 "
2121 sqlAI += "WHERE PandaID=:PandaID AND type IN (:type1,:type2) AND modificationTime>CURRENT_TIMESTAMP-365 "
2122
2123 sqlUFI = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
2124 sqlUFI += "SET status=:newStatus,attemptNr=attemptNr+1 "
2125 sqlUFI += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID AND status=:oldStatus "
2126
2127 datasetCountMap = {}
2128 for lostPandaID in lostPandaIDs:
2129 varMap = {}
2130 if useJumbo is None:
2131 varMap[":jediTaskID"] = jediTaskID
2132 varMap[":PandaID"] = lostPandaID
2133 varMap[":type1"] = "input"
2134 varMap[":type2"] = "pseudo_input"
2135 varMap[":type3"] = "output"
2136 else:
2137 varMap[":PandaID"] = lostPandaID
2138 varMap[":type1"] = "input"
2139 varMap[":type2"] = "pseudo_input"
2140 self.cur.execute(sqlAI + comment, varMap)
2141 resAI = self.cur.fetchall()
2142 newResAI = []
2143 for tmpItem in resAI:
2144 tmpFileID, tmpDatasetID, tmpLFN, tmpOutPandaID = tmpItem
2145
2146 if lostPandaID == tmpOutPandaID:
2147 continue
2148
2149 if tmpOutPandaID is not None:
2150 varMap = {}
2151 varMap[":jediTaskID"] = jediTaskID
2152 varMap[":PandaID"] = tmpOutPandaID
2153 varMap[":type1"] = "input"
2154 varMap[":type2"] = "pseudo_input"
2155 varMap[":type3"] = "dummy"
2156 self.cur.execute(sqlAI + comment, varMap)
2157 resAI2 = self.cur.fetchall()
2158 for tmpItem in resAI2:
2159 newResAI.append(tmpItem)
2160 else:
2161 newResAI.append(tmpItem)
2162 for tmpFileID, tmpDatasetID, tmpLFN, tmpOutPandaID in newResAI:
2163
2164 is_lost = False
2165 if recover_parent and tmpDatasetID == masterID:
2166 lostInputFiles.setdefault(inputDatasets[tmpDatasetID], [])
2167 lostInputFiles[inputDatasets[tmpDatasetID]].append(tmpLFN)
2168 is_lost = True
2169
2170 if tmpDatasetID not in datasetCountMap:
2171 datasetCountMap[tmpDatasetID] = 0
2172 varMap = {}
2173 varMap[":jediTaskID"] = jediTaskID
2174 varMap[":datasetID"] = tmpDatasetID
2175 varMap[":fileID"] = tmpFileID
2176 if is_lost:
2177 varMap[":newStatus"] = "lost"
2178 else:
2179 varMap[":newStatus"] = "ready"
2180 varMap[":oldStatus"] = "finished"
2181 if not simul:
2182 self.cur.execute(sqlUFI + comment, varMap)
2183 nRow = self.cur.rowcount
2184 else:
2185 tmp_log.debug(sqlUFI + comment + str(varMap))
2186 nRow = 1
2187 if nRow > 0:
2188 datasetCountMap[tmpDatasetID] += 1
2189 if useJumbo is not None:
2190
2191 varMap = {}
2192 varMap[":jediTaskID"] = jediTaskID
2193 varMap[":datasetID"] = tmpDatasetID
2194 varMap[":fileID"] = tmpFileID
2195 varMap[":status"] = EventServiceUtils.ST_cancelled
2196 varMap[":esFinished"] = EventServiceUtils.ST_finished
2197 varMap[":esDone"] = EventServiceUtils.ST_done
2198 varMap[":esMerged"] = EventServiceUtils.ST_merged
2199 if not simul:
2200 self.cur.execute(sqlCE + comment, varMap)
2201 else:
2202 tmp_log.debug(sqlCE + comment + str(varMap))
2203
2204 sqlUDI = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
2205 sqlUDI += "SET nFilesUsed=nFilesUsed-:nDiff,nFilesFinished=nFilesFinished-:nDiff,"
2206 sqlUDI += "nEventsUsed=(SELECT SUM(CASE WHEN startEvent IS NULL THEN nEvents ELSE endEvent-startEvent+1 END) "
2207 sqlUDI += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
2208 sqlUDI += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND status=:status) "
2209 sqlUDI += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
2210 for tmpDatasetID in datasetCountMap:
2211 nDiff = datasetCountMap[tmpDatasetID]
2212 varMap = {}
2213 varMap[":jediTaskID"] = jediTaskID
2214 varMap[":datasetID"] = tmpDatasetID
2215 varMap[":nDiff"] = nDiff
2216 varMap[":status"] = "finished"
2217 tmp_log.debug(sqlUDI + comment + str(varMap))
2218 if not simul:
2219 self.cur.execute(sqlUDI + comment, varMap)
2220
2221 if taskStatus == "done":
2222 sqlUT = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET status=:newStatus WHERE jediTaskID=:jediTaskID "
2223 varMap = {}
2224 varMap[":jediTaskID"] = jediTaskID
2225 varMap[":newStatus"] = "finished"
2226 if not simul:
2227 self.cur.execute(sqlUT + comment, varMap)
2228
2229 if not self._commit():
2230 raise RuntimeError("Commit error")
2231 tmp_log.debug("done")
2232 return True, jediTaskID, lostInputFiles, error_message
2233 except Exception:
2234
2235 self._rollback()
2236
2237 self.dump_error_message(tmp_log)
2238 return False, None, None, "database error"
2239
2240
2241 def copy_file_records(self, new_lfns, file_spec):
2242 comment = " /* DBProxy.copy_file_records */"
2243 tmp_log = self.create_tagged_logger(comment, f"PandaID={file_spec.PandaID} oldLFN={file_spec.lfn}")
2244 tmp_log.debug(f"start with {len(new_lfns)} files")
2245 try:
2246
2247 self.conn.begin()
2248 for idx_lfn, new_lfn in enumerate(new_lfns):
2249
2250 tmpFileSpec = copy.copy(file_spec)
2251 tmpFileSpec.lfn = new_lfn
2252 if idx_lfn > 0:
2253 tmpFileSpec.row_ID = None
2254
2255 if idx_lfn > 0 and tmpFileSpec.jediTaskID not in [None, "NULL"] and tmpFileSpec.fileID not in ["", "NULL", None]:
2256
2257 sqlFileID = "SELECT ATLAS_PANDA.JEDI_DATASET_CONT_FILEID_SEQ.nextval FROM dual "
2258 self.cur.execute(sqlFileID + comment)
2259 (newFileID,) = self.cur.fetchone()
2260
2261 varMap = {}
2262 varMap[":jediTaskID"] = tmpFileSpec.jediTaskID
2263 varMap[":datasetID"] = tmpFileSpec.datasetID
2264 varMap[":fileID"] = tmpFileSpec.fileID
2265 sqlGI = f"SELECT * FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
2266 sqlGI += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
2267 self.cur.execute(sqlGI + comment, varMap)
2268 resGI = self.cur.fetchone()
2269 tmpFileSpec.fileID = newFileID
2270 if resGI is not None:
2271
2272 sqlJI = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
2273 sqlJI += "VALUES ("
2274 varMap = {}
2275 for columDesc, columVal in zip(self.cur.description, resGI):
2276 columName = columDesc[0]
2277
2278 if columName.upper() == "FILEID":
2279 columVal = tmpFileSpec.fileID
2280 keyName = f":{columName}"
2281 varMap[keyName] = columVal
2282 sqlJI += f"{keyName},"
2283 sqlJI = sqlJI[:-1]
2284 sqlJI += ") "
2285
2286 self.cur.execute(sqlJI + comment, varMap)
2287 if idx_lfn > 0:
2288
2289 sqlFile = f"INSERT INTO ATLAS_PANDA.filesTable4 ({FileSpec.columnNames()}) "
2290 sqlFile += FileSpec.bindValuesExpression(useSeq=True)
2291 varMap = tmpFileSpec.valuesMap(useSeq=True)
2292 self.cur.execute(sqlFile + comment, varMap)
2293 else:
2294
2295 sqlFSF = "UPDATE ATLAS_PANDA.filesTable4 SET lfn=:lfn "
2296 sqlFSF += "WHERE row_ID=:row_ID "
2297 varMap = {}
2298 varMap[":lfn"] = tmpFileSpec.lfn
2299 varMap[":row_ID"] = tmpFileSpec.row_ID
2300 self.cur.execute(sqlFSF + comment, varMap)
2301
2302 if tmpFileSpec.fileID not in ["", "NULL", None]:
2303 sqlJF = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
2304 sqlJF += "SET lfn=:lfn "
2305 sqlJF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
2306 varMap = {}
2307 varMap[":lfn"] = tmpFileSpec.lfn
2308 varMap[":jediTaskID"] = tmpFileSpec.jediTaskID
2309 varMap[":datasetID"] = tmpFileSpec.datasetID
2310 varMap[":fileID"] = tmpFileSpec.fileID
2311 self.cur.execute(sqlJF + comment, varMap)
2312
2313 if not self._commit():
2314 raise RuntimeError("Commit error")
2315 tmp_log.debug("done")
2316 return True
2317 except Exception:
2318
2319 self._rollback()
2320
2321 self.dump_error_message(tmp_log)
2322 return False
2323
2324
2325 @memoize
2326 def getRetrialRules(self):
2327
2328 comment = " /* DBProxy.getRetrialRules */"
2329 tmp_log = self.create_tagged_logger(comment)
2330 tmp_log.debug("start")
2331
2332
2333 sql = """
2334 SELECT re.retryerror_id, re.errorsource, re.errorcode, re.errorDiag, re.parameters, re.architecture, re.release, re.workqueue_id, ra.retry_action, re.active, ra.active
2335 FROM ATLAS_PANDA.RETRYERRORS re, ATLAS_PANDA.RETRYACTIONS ra
2336 WHERE re.retryaction=ra.retryaction_id
2337 AND (CURRENT_TIMESTAMP < re.expiration_date or re.expiration_date IS NULL)
2338 """
2339 self.cur.execute(sql + comment, {})
2340 definitions = self.cur.fetchall()
2341
2342
2343 if not self._commit():
2344 raise RuntimeError("Commit error")
2345
2346
2347
2348 retrial_rules = {}
2349 for definition in definitions:
2350 (
2351 retryerror_id,
2352 error_source,
2353 error_code,
2354 error_diag,
2355 parameters,
2356 architecture,
2357 release,
2358 wqid,
2359 action,
2360 e_active,
2361 a_active,
2362 ) = definition
2363
2364
2365 try:
2366
2367 params_list = map(
2368 lambda key_value_pair: key_value_pair.split("="),
2369 parameters.split("&"),
2370 )
2371
2372 params_dict = dict((key, value) for (key, value) in params_list)
2373 except AttributeError:
2374 params_dict = {}
2375 except ValueError:
2376 params_dict = {}
2377
2378
2379 if e_active == "Y" and a_active == "Y":
2380 active = True
2381 else:
2382 active = False
2383
2384 retrial_rules.setdefault(error_source, {})
2385 retrial_rules[error_source].setdefault(error_code, [])
2386 retrial_rules[error_source][error_code].append(
2387 {
2388 "error_id": retryerror_id,
2389 "error_diag": error_diag,
2390 "action": action,
2391 "params": params_dict,
2392 "architecture": architecture,
2393 "release": release,
2394 "wqid": wqid,
2395 "active": active,
2396 }
2397 )
2398
2399 return retrial_rules
2400
2401 def setMaxAttempt(self, jobID, taskID, files, maxAttempt):
2402
2403 comment = " /* DBProxy.setMaxAttempt */"
2404 tmp_log = self.create_tagged_logger(comment, f"jobID={jobID} taskID={taskID}")
2405 tmp_log.debug("start")
2406
2407
2408 input_types = ("input", "pseudo_input", "pp_input", "trn_log", "trn_output")
2409 input_files = list(
2410 filter(
2411 lambda pandafile: pandafile.type in input_types and re.search("DBRelease", pandafile.lfn) is None,
2412 files,
2413 )
2414 )
2415 input_fileIDs = [input_file.fileID for input_file in input_files]
2416 input_datasetIDs = [input_file.datasetID for input_file in input_files]
2417
2418 if input_fileIDs:
2419 try:
2420
2421 self.conn.begin()
2422
2423 varMap = {}
2424 varMap[":taskID"] = taskID
2425 varMap[":pandaID"] = jobID
2426
2427
2428 file_var_names_str, file_var_map = get_sql_IN_bind_variables(input_fileIDs, prefix=":file")
2429 varMap.update(file_var_map)
2430
2431
2432 ds_var_names_str, ds_var_map = get_sql_IN_bind_variables(input_datasetIDs, prefix=":dataset")
2433 varMap.update(ds_var_map)
2434
2435
2436 sql_select = f"""
2437 select min(maxattempt) from ATLAS_PANDA.JEDI_Dataset_Contents
2438 WHERE JEDITaskID = :taskID
2439 AND datasetID IN ({ds_var_names_str})
2440 AND fileID IN ({file_var_names_str})
2441 AND pandaID = :pandaID
2442 """
2443 self.cur.execute(sql_select + comment, varMap)
2444 try:
2445 maxAttempt_select = self.cur.fetchone()[0]
2446 except (TypeError, IndexError):
2447 maxAttempt_select = None
2448
2449
2450 if maxAttempt_select and maxAttempt_select > maxAttempt:
2451 varMap[":maxAttempt"] = min(maxAttempt, maxAttempt_select)
2452
2453 sql_update = f"""
2454 UPDATE ATLAS_PANDA.JEDI_Dataset_Contents
2455 SET maxAttempt=:maxAttempt
2456 WHERE JEDITaskID = :taskID
2457 AND datasetID IN ({ds_var_names_str})
2458 AND fileID IN ({file_var_names_str})
2459 AND pandaID = :pandaID
2460 """
2461
2462 self.cur.execute(sql_update + comment, varMap)
2463
2464
2465 if not self._commit():
2466 raise RuntimeError("Commit error")
2467 except Exception:
2468
2469 self._rollback()
2470
2471 self.dump_error_message(tmp_log)
2472 return False
2473
2474 tmp_log.debug("done")
2475 return True
2476
2477 def increase_max_failure(self, job_id, task_id, files):
2478 """Increase the max failure number by one for specific files."""
2479 comment = " /* DBProxy.increase_max_failure */"
2480 tmp_log = self.create_tagged_logger(comment, f"PandaID={job_id} jediTaskID={task_id}")
2481 tmp_log.debug("start")
2482
2483
2484 input_types = ("input", "pseudo_input", "pp_input", "trn_log", "trn_output")
2485 input_files = [pandafile for pandafile in files if pandafile.type in input_types and re.search("DBRelease", pandafile.lfn) is None]
2486 input_file_ids = [input_file.fileID for input_file in input_files]
2487 input_dataset_ids = [input_file.datasetID for input_file in input_files]
2488
2489 if input_file_ids:
2490 try:
2491
2492 self.conn.begin()
2493
2494 var_map = {
2495 ":taskID": task_id,
2496 ":pandaID": job_id,
2497 }
2498
2499
2500 file_var_names_str, file_var_map = get_sql_IN_bind_variables(input_file_ids, prefix=":file")
2501 var_map.update(file_var_map)
2502
2503
2504 ds_var_names_str, ds_var_map = get_sql_IN_bind_variables(input_dataset_ids, prefix=":dataset")
2505 var_map.update(ds_var_map)
2506
2507 sql_update = f"""
2508 UPDATE ATLAS_PANDA.JEDI_Dataset_Contents
2509 SET maxFailure = maxFailure + 1
2510 WHERE JEDITaskID = :taskID
2511 AND datasetID IN ({ds_var_names_str})
2512 AND fileID IN ({file_var_names_str})
2513 AND pandaID = :pandaID
2514 """
2515
2516 self.cur.execute(sql_update + comment, var_map)
2517
2518
2519 if not self._commit():
2520 raise RuntimeError("Commit error")
2521
2522 except Exception:
2523
2524 self._rollback()
2525
2526 self.dump_error_message(tmp_log)
2527 return False
2528
2529 tmp_log.debug("done")
2530 return True
2531
2532 def setNoRetry(self, jobID, taskID, files):
2533
2534 comment = " /* DBProxy.setNoRetry */"
2535 tmp_log = self.create_tagged_logger(comment, f"PandaID={jobID} jediTaskID={taskID}")
2536 tmp_log.debug("start")
2537
2538
2539 input_types = ("input", "pseudo_input", "pp_input", "trn_log", "trn_output")
2540 input_files = list(
2541 filter(
2542 lambda pandafile: pandafile.type in input_types and re.search("DBRelease", pandafile.lfn) is None,
2543 files,
2544 )
2545 )
2546 input_fileIDs = [input_file.fileID for input_file in input_files]
2547 input_datasetIDs = [input_file.datasetID for input_file in input_files]
2548
2549 if input_fileIDs:
2550 try:
2551
2552 self.conn.begin()
2553
2554
2555 for datasetID in input_datasetIDs:
2556 varMap = {}
2557 varMap[":taskID"] = taskID
2558 varMap[":datasetID"] = datasetID
2559 varMap[":keepTrack"] = 1
2560
2561
2562 file_var_names_str, file_var_map = get_sql_IN_bind_variables(input_fileIDs, prefix=":file")
2563 varMap.update(file_var_map)
2564
2565 sql_update = f"""
2566 UPDATE ATLAS_PANDA.JEDI_Dataset_Contents
2567 SET maxAttempt=attemptNr
2568 WHERE JEDITaskID = :taskID
2569 AND datasetID=:datasetID
2570 AND fileID IN ({file_var_names_str})
2571 AND maxAttempt IS NOT NULL AND attemptNr IS NOT NULL
2572 AND maxAttempt > attemptNr
2573 AND (maxFailure IS NULL OR failedAttempt IS NULL OR maxFailure > failedAttempt)
2574 AND keepTrack=:keepTrack
2575 AND status=:status
2576 """
2577
2578
2579 varMap[":status"] = "running"
2580 self.cur.execute(sql_update + comment, varMap)
2581
2582
2583 varMap[":status"] = "ready"
2584 self.cur.execute(sql_update + comment, varMap)
2585 rowcount = self.cur.rowcount
2586
2587
2588 if rowcount > 0:
2589 sql_dataset = "UPDATE ATLAS_PANDA.JEDI_Datasets "
2590 sql_dataset += "SET nFilesUsed=nFilesUsed+:nDiff,nFilesFailed=nFilesFailed+:nDiff "
2591 sql_dataset += "WHERE jediTaskID=:taskID AND datasetID=:datasetID "
2592 varMap = dict()
2593 varMap[":taskID"] = taskID
2594 varMap[":datasetID"] = datasetID
2595 varMap[":nDiff"] = rowcount
2596 self.cur.execute(sql_dataset + comment, varMap)
2597
2598
2599 if not self._commit():
2600 raise RuntimeError("Commit error")
2601 except Exception:
2602
2603 self._rollback()
2604
2605 self.dump_error_message(tmp_log)
2606 return False
2607
2608 tmp_log.debug("done")
2609 return True
2610
2611
2612 def getDestDBlocksWithSingleConsumer(self, jediTaskID, PandaID, ngDatasets):
2613 comment = " /* DBProxy.getDestDBlocksWithSingleConsumer */"
2614 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} PandaID={PandaID}")
2615 tmp_log.debug("start")
2616 try:
2617 retMap = {}
2618 checkedDS = set()
2619
2620 sqlF = "SELECT datasetID,fileID FROM ATLAS_PANDA.JEDI_Events "
2621 sqlF += "WHERE jediTaskID=:jediTaskID AND PandaID=:PandaID "
2622
2623 sqlP = "SELECT distinct PandaID FROM ATLAS_PANDA.filesTable4 "
2624 sqlP += "WHERE jediTaskID=:jediTaskID ANd datasetID=:datasetID AND fileID=:fileID "
2625
2626 sqlD = "SELECT destinationDBlock,datasetID FROM ATLAS_PANDA.filesTable4 "
2627 sqlD += "WHERE PandaID=:PandaID AND type IN (:type1,:type2) "
2628
2629 sqlM = "SELECT distinct PandaID FROM ATLAS_PANDA.filesTable4 "
2630 sqlM += "WHERE jediTaskID=:jediTaskID ANd datasetID=:datasetID AND status=:status "
2631 varMap = {}
2632 varMap[":jediTaskID"] = jediTaskID
2633 varMap[":PandaID"] = PandaID
2634
2635 self.conn.begin()
2636
2637 self.cur.execute(sqlF + comment, varMap)
2638 resF = self.cur.fetchall()
2639 for datasetID, fileID in resF:
2640
2641 varMap = {}
2642 varMap[":jediTaskID"] = jediTaskID
2643 varMap[":datasetID"] = datasetID
2644 varMap[":fileID"] = fileID
2645 self.cur.execute(sqlP + comment, varMap)
2646 resP = self.cur.fetchall()
2647 for (sPandaID,) in resP:
2648 if sPandaID == PandaID:
2649 continue
2650
2651 varMap = {}
2652 varMap[":PandaID"] = sPandaID
2653 varMap[":type1"] = "output"
2654 varMap[":type2"] = "log"
2655 self.cur.execute(sqlD + comment, varMap)
2656 resD = self.cur.fetchall()
2657 subDatasets = []
2658 subDatasetID = None
2659 for destinationDBlock, datasetID in resD:
2660 if destinationDBlock in ngDatasets:
2661 continue
2662 if destinationDBlock in checkedDS:
2663 continue
2664 checkedDS.add(destinationDBlock)
2665 subDatasets.append(destinationDBlock)
2666 subDatasetID = datasetID
2667 if subDatasets == []:
2668 continue
2669
2670 varMap = {}
2671 varMap[":jediTaskID"] = jediTaskID
2672 varMap[":datasetID"] = datasetID
2673 varMap[":status"] = "merging"
2674 self.cur.execute(sqlM + comment, varMap)
2675 resM = self.cur.fetchone()
2676 if resM is not None:
2677 (mPandaID,) = resM
2678 retMap[mPandaID] = subDatasets
2679
2680 if not self._commit():
2681 raise RuntimeError("Commit error")
2682 tmp_log.debug(f"got {len(retMap)} jobs")
2683 return retMap
2684 except Exception:
2685
2686 self._rollback()
2687
2688 self.dump_error_message(tmp_log)
2689 return {}
2690
2691
2692 def getDispatchDatasetsPerUser(self, vo, prodSourceLabel, onlyActive, withSize):
2693 comment = " /* DBProxy.getDispatchDatasetsPerUser */"
2694 tmp_log = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel}")
2695 tmp_log.debug("start")
2696
2697 tableStatMap = {"jobsDefined4": ["defined", "assigned"]}
2698 if not onlyActive:
2699 tableStatMap["jobsActive4"] = None
2700 tableStatMap["jobsArchived4"] = None
2701 try:
2702 userDispMap = {}
2703 for tableName in tableStatMap:
2704 statusList = tableStatMap[tableName]
2705
2706 varMap = {}
2707 varMap[":vo"] = vo
2708 varMap[":label"] = prodSourceLabel
2709 varMap[":dType"] = "dispatch"
2710 sqlJ = "SELECT distinct prodUserName,dispatchDBlock,jediTaskID,currentFiles "
2711 sqlJ += "FROM {0}.{1} j, {0}.Datasets d ".format(panda_config.schemaPANDA, tableName)
2712 sqlJ += "WHERE vo=:vo AND prodSourceLabel=:label "
2713 if statusList is not None:
2714 jobstat_var_names_str, jobstat_var_map = get_sql_IN_bind_variables(statusList, prefix=":jobStat_", value_as_suffix=True)
2715 sqlJ += f"AND jobStatus IN ({jobstat_var_names_str}) "
2716 varMap.update(jobstat_var_map)
2717 sqlJ += "AND dispatchDBlock IS NOT NULL "
2718 sqlJ += "AND d.name=j.dispatchDBlock AND d.modificationDate>CURRENT_DATE-14 "
2719 sqlJ += "AND d.type=:dType "
2720
2721 self.conn.begin()
2722
2723 self.cur.execute(sqlJ + comment, varMap)
2724 resJ = self.cur.fetchall()
2725 if not self._commit():
2726 raise RuntimeError("Commit error")
2727
2728 for prodUserName, dispatchDBlock, jediTaskID, dsSize in resJ:
2729 transferType = "transfer"
2730 try:
2731 if dispatchDBlock.split(".")[4] == "prestaging":
2732 transferType = "prestaging"
2733 except Exception:
2734 pass
2735 userDispMap.setdefault(prodUserName, {})
2736 userDispMap[prodUserName].setdefault(transferType, {"datasets": set(), "size": 0, "tasks": set()})
2737 if dispatchDBlock not in userDispMap[prodUserName][transferType]["datasets"]:
2738 userDispMap[prodUserName][transferType]["datasets"].add(dispatchDBlock)
2739 userDispMap[prodUserName][transferType]["tasks"].add(jediTaskID)
2740 userDispMap[prodUserName][transferType]["size"] += dsSize
2741 tmp_log.debug("done")
2742 return userDispMap
2743 except Exception:
2744
2745 self._rollback()
2746
2747 self.dump_error_message(tmp_log)
2748 return {}
2749
2750
2751 def bulk_fetch_panda_ids(self, num_ids):
2752 comment = " /* JediDBProxy.bulk_fetch_panda_ids */"
2753 tmp_log = self.create_tagged_logger(comment, f"num_ids={num_ids}")
2754 tmp_log.debug("start")
2755 try:
2756 new_ids = []
2757 var_map = {}
2758 var_map[":nIDs"] = num_ids
2759
2760 sqlFID = "SELECT ATLAS_PANDA.JOBSDEFINED4_PANDAID_SEQ.nextval FROM "
2761 sqlFID += "(SELECT level FROM dual CONNECT BY level<=:nIDs) "
2762
2763 self.conn.begin()
2764 self.cur.arraysize = 10000
2765 self.cur.execute(sqlFID + comment, var_map)
2766 resFID = self.cur.fetchall()
2767 for (id,) in resFID:
2768 new_ids.append(id)
2769
2770 if not self._commit():
2771 raise RuntimeError("Commit error")
2772 tmp_log.debug(f"got {len(new_ids)} IDs")
2773 return sorted(new_ids)
2774 except Exception:
2775
2776 self._rollback()
2777
2778 self.dump_error_message(tmp_log)
2779 return []
2780
2781
2782 def bulkFetchFileIDsPanda(self, nIDs):
2783 comment = " /* JediDBProxy.bulkFetchFileIDsPanda */"
2784 tmp_log = self.create_tagged_logger(comment, f"nIDs={nIDs}")
2785 tmp_log.debug("start")
2786 try:
2787 newFileIDs = []
2788 varMap = {}
2789 varMap[":nIDs"] = nIDs
2790
2791 sqlFID = "SELECT ATLAS_PANDA.FILESTABLE4_ROW_ID_SEQ.nextval FROM "
2792 sqlFID += "(SELECT level FROM dual CONNECT BY level<=:nIDs) "
2793
2794 self.conn.begin()
2795 self.cur.arraysize = 10000
2796 self.cur.execute(sqlFID + comment, varMap)
2797 resFID = self.cur.fetchall()
2798 for (fileID,) in resFID:
2799 newFileIDs.append(fileID)
2800
2801 if not self._commit():
2802 raise RuntimeError("Commit error")
2803 tmp_log.debug(f"got {len(newFileIDs)} IDs")
2804 return newFileIDs
2805 except Exception:
2806
2807 self._rollback()
2808
2809 self.dump_error_message(tmp_log)
2810 return []
2811
2812
2813 def getLFNsForJumbo(self, jediTaskID):
2814 comment = " /* DBProxy.getLFNsForJumbo */"
2815 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
2816 tmp_log.debug("start")
2817 try:
2818 sqlS = "SELECT lfn,scope FROM {0}.JEDI_Datasets d, {0}.JEDI_Dataset_Contents c ".format(panda_config.schemaJEDI)
2819 sqlS += "WHERE d.jediTaskID=c.jediTaskID AND d.datasetID=c.datasetID AND d.jediTaskID=:jediTaskID "
2820 sqlS += "AND d.type IN (:type1,:type2) AND d.masterID IS NULL "
2821 retSet = set()
2822
2823 self.conn.begin()
2824 varMap = {}
2825 varMap[":jediTaskID"] = jediTaskID
2826 varMap[":type1"] = "input"
2827 varMap[":type2"] = "pseudo_input"
2828 self.cur.execute(sqlS + comment, varMap)
2829 res = self.cur.fetchall()
2830 for tmpLFN, tmpScope in res:
2831 name = f"{tmpScope}:{tmpLFN}"
2832 retSet.add(name)
2833
2834 if not self._commit():
2835 raise RuntimeError("Commit error")
2836 tmp_log.debug(f"has {len(retSet)} LFNs")
2837 return retSet
2838 except Exception:
2839
2840 self._rollback()
2841
2842 self.dump_error_message(tmp_log)
2843 return []
2844
2845
2846 def getNumStartedEvents(self, jobSpec):
2847 comment = " /* DBProxy.getNumStartedEvents */"
2848 tmp_log = self.create_tagged_logger(comment, f"PandaID={jobSpec.PandaID}")
2849 tmp_log.debug("start")
2850 try:
2851
2852 sqlCDO = "SELECT /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
2853 sqlCDO += f"COUNT(*) FROM {panda_config.schemaJEDI}.JEDI_Events tab "
2854 sqlCDO += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
2855 sqlCDO += "AND status IN (:esSent,:esRunning,:esFinished,:esDone) "
2856
2857 self.conn.begin()
2858 nEvt = 0
2859 for fileSpec in jobSpec.Files:
2860 if fileSpec.type != "input":
2861 continue
2862 varMap = {}
2863 varMap[":jediTaskID"] = fileSpec.jediTaskID
2864 varMap[":datasetID"] = fileSpec.datasetID
2865 varMap[":fileID"] = fileSpec.fileID
2866 varMap[":esSent"] = EventServiceUtils.ST_sent
2867 varMap[":esRunning"] = EventServiceUtils.ST_running
2868 varMap[":esFinished"] = EventServiceUtils.ST_finished
2869 varMap[":esDone"] = EventServiceUtils.ST_done
2870 self.cur.execute(sqlCDO + comment, varMap)
2871 res = self.cur.fetchone()
2872 if res is not None:
2873 nEvt += res[0]
2874
2875 if not self._commit():
2876 raise RuntimeError("Commit error")
2877 tmp_log.debug(f"{nEvt} events started")
2878 return nEvt
2879 except Exception:
2880
2881 self._rollback()
2882
2883 self.dump_error_message(tmp_log)
2884 return None
2885
2886
2887 def getJediFileAttributes(self, PandaID, jediTaskID, datasetID, fileID, attrs):
2888 comment = " /* DBProxy.getJediFileAttributes */"
2889 tmp_log = self.create_tagged_logger(comment, f"PandaID={PandaID}")
2890 tmp_log.debug(f"start for jediTaskID={jediTaskID} datasetId={datasetID} fileID={fileID}")
2891 try:
2892
2893 sqlRR = "SELECT "
2894 for attr in attrs:
2895 sqlRR += f"{attr},"
2896 sqlRR = sqlRR[:-1]
2897 sqlRR += f" FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
2898 sqlRR += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
2899 varMap = {}
2900 varMap[":jediTaskID"] = jediTaskID
2901 varMap[":datasetID"] = datasetID
2902 varMap[":fileID"] = fileID
2903
2904 self.conn.begin()
2905 self.cur.execute(sqlRR + comment, varMap)
2906 resRR = self.cur.fetchone()
2907
2908 if not self._commit():
2909 raise RuntimeError("Commit error")
2910 retVal = {}
2911 if resRR is not None:
2912 for idx, attr in enumerate(attrs):
2913 retVal[attr] = resRR[idx]
2914 tmp_log.debug(f"done {str(retVal)}")
2915 return retVal
2916 except Exception:
2917
2918 self._rollback()
2919
2920 self.dump_error_message(tmp_log)
2921 return {}
2922
2923
2924 def getJumboJobDatasets(self, n_days, grace_period):
2925 comment = " /* DBProxy.getJumboJobDatasets */"
2926 tmp_log = self.create_tagged_logger(comment, f"nDays={n_days}")
2927 tmp_log.debug("start")
2928 try:
2929
2930 sqlC = "SELECT t.jediTaskID,d.datasetName,t.status FROM ATLAS_PANDA.JEDI_Tasks t,ATLAS_PANDA.JEDI_Datasets d "
2931 sqlC += "WHERE t.prodSourceLabel='managed' AND t.useJumbo IS NOT NULL "
2932 sqlC += "AND t.modificationTime>CURRENT_DATE-:days AND t.modificationTime<CURRENT_DATE-:grace_period "
2933 sqlC += "AND t.status IN ('finished','done') "
2934 sqlC += "AND d.jediTaskID=t.jediTaskID AND d.type='output' "
2935 varMap = {}
2936 varMap[":days"] = n_days
2937 varMap[":grace_period"] = grace_period
2938
2939 self.conn.begin()
2940 self.cur.execute(sqlC + comment, varMap)
2941 resCs = self.cur.fetchall()
2942 retMap = dict()
2943 nDS = 0
2944 for jediTaskID, datasetName, status in resCs:
2945 retMap.setdefault(jediTaskID, {"status": status, "datasets": []})
2946 retMap[jediTaskID]["datasets"].append(datasetName)
2947 nDS += 1
2948
2949 if not self._commit():
2950 raise RuntimeError("Commit error")
2951 tmp_log.debug(f"got {nDS} datasets")
2952 return retMap
2953 except Exception:
2954
2955 self._rollback()
2956
2957 self.dump_error_message(tmp_log)
2958 return {}
2959
2960
2961 def getOutputDatasetsJEDI(self, panda_id):
2962 comment = " /* DBProxy.getOutputDatasetsJEDI */"
2963 tmp_log = self.create_tagged_logger(comment, f"PandaID={panda_id}")
2964 tmp_log.debug("start")
2965 try:
2966
2967 sqlC = "SELECT d.datasetID,d.datasetName FROM ATLAS_PANDA.filesTable4 f,ATLAS_PANDA.JEDI_Datasets d "
2968 sqlC += "WHERE f.PandaID=:PandaID AND f.type IN (:type1,:type2) AND d.jediTaskID=f.jediTaskID AND d.datasetID=f.datasetID "
2969 varMap = {}
2970 varMap[":PandaID"] = panda_id
2971 varMap[":type1"] = "output"
2972 varMap[":type2"] = "log"
2973
2974 self.conn.begin()
2975 self.cur.execute(sqlC + comment, varMap)
2976 retMap = dict()
2977 resCs = self.cur.fetchall()
2978 for datasetID, datasetName in resCs:
2979 retMap[datasetID] = datasetName
2980
2981 if not self._commit():
2982 raise RuntimeError("Commit error")
2983 tmp_log.debug(f"got {len(retMap)}")
2984 return retMap
2985 except Exception:
2986
2987 self._rollback()
2988
2989 self.dump_error_message(tmp_log)
2990 return {}
2991
2992
2993 def lockProcess_PANDA(self, component, pid, time_limit, force=False):
2994 comment = " /* DBProxy.lockProcess_PANDA */"
2995 tmp_log = self.create_tagged_logger(comment, f"component={component} pid={pid}")
2996
2997 vo = "default"
2998 prodSourceLabel = "default"
2999 cloud = "default"
3000 workqueue_id = 0
3001 resource_name = "default"
3002 tmp_log.debug("start")
3003 try:
3004 retVal = False
3005
3006 sqlCT = (
3007 "SELECT lockedBy "
3008 "FROM {0}.JEDI_Process_Lock "
3009 "WHERE vo=:vo AND prodSourceLabel=:prodSourceLabel "
3010 "AND cloud=:cloud AND workqueue_id=:workqueue_id "
3011 "AND resource_type=:resource_name AND component=:component "
3012 "AND lockedTime>:lockedTime "
3013 "FOR UPDATE"
3014 ).format(panda_config.schemaJEDI)
3015
3016 sqlCD = (
3017 "DELETE FROM {0}.JEDI_Process_Lock "
3018 "WHERE vo=:vo AND prodSourceLabel=:prodSourceLabel "
3019 "AND cloud=:cloud AND workqueue_id=:workqueue_id "
3020 "AND resource_type=:resource_name AND component=:component "
3021 ).format(panda_config.schemaJEDI)
3022
3023 sqlFR = (
3024 "INSERT INTO {0}.JEDI_Process_Lock "
3025 "(vo, prodSourceLabel, cloud, workqueue_id, resource_type, component, lockedBy, lockedTime) "
3026 "VALUES(:vo, :prodSourceLabel, :cloud, :workqueue_id, :resource_name, :component, :lockedBy, CURRENT_DATE) "
3027 ).format(panda_config.schemaJEDI)
3028
3029 self.conn.begin()
3030
3031 if not force:
3032 varMap = {}
3033 varMap[":vo"] = vo
3034 varMap[":prodSourceLabel"] = prodSourceLabel
3035 varMap[":cloud"] = cloud
3036 varMap[":workqueue_id"] = workqueue_id
3037 varMap[":resource_name"] = resource_name
3038 varMap[":component"] = component
3039 varMap[":lockedTime"] = naive_utcnow() - datetime.timedelta(minutes=time_limit)
3040 self.cur.execute(sqlCT + comment, varMap)
3041 resCT = self.cur.fetchone()
3042 else:
3043 resCT = None
3044 if resCT is not None:
3045 tmp_log.debug(f"skipped, locked by {resCT[0]}")
3046 else:
3047
3048 varMap = {}
3049 varMap[":vo"] = vo
3050 varMap[":prodSourceLabel"] = prodSourceLabel
3051 varMap[":cloud"] = cloud
3052 varMap[":workqueue_id"] = workqueue_id
3053 varMap[":resource_name"] = resource_name
3054 varMap[":component"] = component
3055 self.cur.execute(sqlCD + comment, varMap)
3056
3057 varMap = {}
3058 varMap[":vo"] = vo
3059 varMap[":prodSourceLabel"] = prodSourceLabel
3060 varMap[":cloud"] = cloud
3061 varMap[":workqueue_id"] = workqueue_id
3062 varMap[":resource_name"] = resource_name
3063 varMap[":component"] = component
3064 varMap[":lockedBy"] = pid
3065 self.cur.execute(sqlFR + comment, varMap)
3066 tmp_log.debug("successfully locked")
3067 retVal = True
3068
3069 if not self._commit():
3070 raise RuntimeError("Commit error")
3071 return retVal
3072 except Exception:
3073
3074 self._rollback()
3075
3076 self.dump_error_message(tmp_log)
3077 return retVal
3078
3079
3080 def unlockProcess_PANDA(self, component, pid):
3081 comment = " /* DBProxy.unlockProcess_PANDA */"
3082 tmp_log = self.create_tagged_logger(comment, f"component={component} pid={pid}")
3083
3084 vo = "default"
3085 prodSourceLabel = "default"
3086 cloud = "default"
3087 workqueue_id = 0
3088 resource_name = "default"
3089 tmp_log.debug("start")
3090 try:
3091 retVal = False
3092
3093 sqlCD = (
3094 "DELETE FROM {0}.JEDI_Process_Lock "
3095 "WHERE vo=:vo AND prodSourceLabel=:prodSourceLabel AND cloud=:cloud "
3096 "AND workqueue_id=:workqueue_id AND lockedBy=:lockedBy "
3097 "AND resource_type=:resource_name AND component=:component "
3098 ).format(panda_config.schemaJEDI)
3099
3100 self.conn.begin()
3101
3102 varMap = {}
3103 varMap[":vo"] = vo
3104 varMap[":prodSourceLabel"] = prodSourceLabel
3105 varMap[":cloud"] = cloud
3106 varMap[":workqueue_id"] = workqueue_id
3107 varMap[":resource_name"] = resource_name
3108 varMap[":component"] = component
3109 varMap[":lockedBy"] = pid
3110 self.cur.execute(sqlCD + comment, varMap)
3111
3112 if not self._commit():
3113 raise RuntimeError("Commit error")
3114 tmp_log.debug("done")
3115 retVal = True
3116 return retVal
3117 except Exception:
3118
3119 self._rollback()
3120
3121 self.dump_error_message(tmp_log)
3122 return retVal
3123
3124
3125 def checkProcessLock_PANDA(self, component, pid, time_limit, check_base=False):
3126 comment = " /* DBProxy.checkProcessLock_PANDA */"
3127 tmp_log = self.create_tagged_logger(comment, f"component={component} pid={pid}")
3128
3129 vo = "default"
3130 prodSourceLabel = "default"
3131 cloud = "default"
3132 workqueue_id = 0
3133 resource_name = "default"
3134 tmp_log.debug("start")
3135 try:
3136 retVal = False, None
3137
3138 sqlCT = (
3139 "SELECT lockedBy, lockedTime "
3140 "FROM {0}.JEDI_Process_Lock "
3141 "WHERE vo=:vo AND prodSourceLabel=:prodSourceLabel "
3142 "AND cloud=:cloud AND workqueue_id=:workqueue_id "
3143 "AND resource_type=:resource_name AND component=:component "
3144 "AND lockedTime>:lockedTime "
3145 ).format(panda_config.schemaJEDI)
3146
3147 self.conn.begin()
3148
3149 varMap = {}
3150 varMap[":vo"] = vo
3151 varMap[":prodSourceLabel"] = prodSourceLabel
3152 varMap[":cloud"] = cloud
3153 varMap[":workqueue_id"] = workqueue_id
3154 varMap[":resource_name"] = resource_name
3155 varMap[":component"] = component
3156 varMap[":lockedTime"] = naive_utcnow() - datetime.timedelta(minutes=time_limit)
3157 self.cur.execute(sqlCT + comment, varMap)
3158 resCT = self.cur.fetchone()
3159 if resCT is not None:
3160 lockedBy, lockedTime = resCT
3161 if check_base:
3162
3163 if not lockedBy.startswith(pid):
3164 retVal = True, lockedTime
3165 else:
3166
3167 if lockedBy != pid:
3168 retVal = True, lockedTime
3169 if retVal[0]:
3170 tmp_log.debug(f"found locked by {lockedBy} at {lockedTime.strftime('%Y-%m-%d_%H:%M:%S')}")
3171 else:
3172 tmp_log.debug("found unlocked")
3173
3174 if not self._commit():
3175 raise RuntimeError("Commit error")
3176 tmp_log.debug("done")
3177 return retVal
3178 except Exception:
3179
3180 self._rollback()
3181
3182 self.dump_error_message(tmp_log)
3183 return retVal
3184
3185
3186 def update_problematic_resource_info(self, user_name, jedi_task_id, resource, problem_type):
3187 comment = " /* DBProxy.update_problematic_resource_info */"
3188 tmp_log = self.create_tagged_logger(comment, f"user={user_name} jediTaskID={jedi_task_id}")
3189 tmp_log.debug("start")
3190 retVal = False
3191 try:
3192 if problem_type not in ["dest", None]:
3193 tmp_log.debug(f"unknown problem type: {problem_type}")
3194 return None
3195 sqlR = "SELECT pagecache FROM ATLAS_PANDAMETA.users " "WHERE name=:name "
3196 sqlW = "UPDATE ATLAS_PANDAMETA.users SET pagecache=:data " "WHERE name=:name "
3197
3198 jedi_task_id = str(jedi_task_id)
3199
3200 self.conn.begin()
3201
3202 varMap = {}
3203 varMap[":name"] = user_name
3204 self.cur.execute(sqlR + comment, varMap)
3205 data = self.cur.fetchone()
3206 if data is None:
3207 tmp_log.debug("user not found")
3208 else:
3209 try:
3210 data = json.loads(data[0])
3211 except Exception:
3212 data = {}
3213 if problem_type is not None:
3214 data.setdefault(problem_type, {})
3215 data[problem_type].setdefault(jedi_task_id, {})
3216 data[problem_type][jedi_task_id].setdefault(resource, None)
3217 old = data[problem_type][jedi_task_id][resource]
3218 if old is None or datetime.datetime.now(datetime.timezone.utc) - datetime.datetime.fromtimestamp(
3219 old, datetime.timezone.utc
3220 ) > datetime.timedelta(days=1):
3221 retVal = True
3222 data[problem_type][jedi_task_id][resource] = time.time()
3223
3224 for p in list(data):
3225 for t in list(data[p]):
3226 for r in list(data[p][t]):
3227 ts = data[p][t][r]
3228 if datetime.datetime.now(datetime.timezone.utc) - datetime.datetime.fromtimestamp(ts, datetime.timezone.utc) > datetime.timedelta(
3229 days=7
3230 ):
3231 del data[p][t][r]
3232 if not data[p][t]:
3233 del data[p][t]
3234 if not data[p]:
3235 del data[p]
3236
3237 varMap = {}
3238 varMap[":name"] = user_name
3239 varMap[":data"] = json.dumps(data)
3240 self.cur.execute(sqlW + comment, varMap)
3241
3242 if not self._commit():
3243 raise RuntimeError("Commit error")
3244 tmp_log.debug(f"done with {retVal} : {str(data)}")
3245 return retVal
3246 except Exception:
3247
3248 self._rollback()
3249
3250 self.dump_error_message(tmp_log)
3251 return None
3252
3253
3254 def get_files_in_datasets(self, task_id, dataset_types):
3255 comment = " /* DBProxy.get_lfns_in_datasets */"
3256 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={task_id}")
3257 tmp_log.debug("start")
3258 try:
3259 varMap = {}
3260 varMap[":jediTaskID"] = task_id
3261 sqlD = f"SELECT datasetName,datasetID FROM {panda_config.schemaJEDI}.JEDI_Datasets WHERE jediTaskID=:jediTaskID "
3262
3263 if type(dataset_types) == str:
3264 dataset_types = dataset_types.split(",")
3265 dstype_var_names_str, dstype_var_map = get_sql_IN_bind_variables(dataset_types, prefix=":", value_as_suffix=True)
3266 sqlD += f"AND type IN ({dstype_var_names_str}) "
3267 varMap.update(dstype_var_map)
3268 sqlS = f"SELECT lfn,scope,fileID,status FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
3269 sqlS += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID ORDER BY fileID "
3270 retVal = []
3271
3272 self.conn.begin()
3273 self.cur.execute(sqlD + comment, varMap)
3274 res = self.cur.fetchall()
3275 for datasetName, datasetID in res:
3276 datasetDict = {}
3277 datasetDict["name"] = datasetName
3278 datasetDict["id"] = datasetID
3279
3280 varMap = {}
3281 varMap[":jediTaskID"] = task_id
3282 varMap[":datasetID"] = datasetID
3283 self.cur.execute(sqlS + comment, varMap)
3284 resF = self.cur.fetchall()
3285 fileList = []
3286 for lfn, fileScope, fileID, status in resF:
3287 fileDict = {
3288 "lfn": lfn,
3289 "scope": fileScope,
3290 "id": fileID,
3291 "status": status,
3292 }
3293 fileList.append(fileDict)
3294 retVal.append({"dataset": datasetDict, "files": fileList})
3295
3296 if not self._commit():
3297 raise RuntimeError("Commit error")
3298 tmp_log.debug("done")
3299 return retVal
3300 except Exception:
3301
3302 self._rollback()
3303
3304 self.dump_error_message(tmp_log)
3305 return None
3306
3307
3308 def async_update_datasets(self, panda_id):
3309 comment = " /* DBProxy.async_update_datasets */"
3310 g_tmp_log = self.create_tagged_logger(comment)
3311 g_tmp_log.debug("start")
3312 try:
3313 if panda_id is not None:
3314 panda_id_list = [panda_id]
3315 else:
3316
3317 sql = f"SELECT DISTINCT PandaID FROM {panda_config.schemaPANDA}.SQL_QUEUE WHERE topic=:topic AND creationTime<:timeLimit"
3318 varMap = {
3319 ":topic": SQL_QUEUE_TOPIC_async_dataset_update,
3320 ":timeLimit": naive_utcnow() - datetime.timedelta(minutes=1),
3321 }
3322
3323 self.conn.begin()
3324 self.cur.arraysize = 10000
3325 self.cur.execute(sql + comment, varMap)
3326 panda_id_list = [i[0] for i in self.cur.fetchall()]
3327
3328 if not self._commit():
3329 raise RuntimeError("Commit error")
3330 if not panda_id_list:
3331 g_tmp_log.debug("done since no IDs are available")
3332 return None
3333
3334 for tmp_id in panda_id_list:
3335 tmp_log = self.create_tagged_logger(comment, f"PandaID={tmp_id}")
3336 sqlL = "SELECT data FROM {0}.SQL_QUEUE WHERE topic=:topic AND PandaID=:PandaID ORDER BY " "execution_order FOR UPDATE NOWAIT ".format(
3337 panda_config.schemaPANDA
3338 )
3339 sqlD = f"DELETE FROM {panda_config.schemaPANDA}.SQL_QUEUE WHERE PandaID=:PandaID "
3340 n_try = 5
3341 all_ok = True
3342 for i_try in range(n_try):
3343 all_ok = True
3344 query_list = []
3345 tmp_log.debug(f"Trying PandaID={tmp_id} {i_try+1}/{n_try}")
3346 tmp_data_list = None
3347
3348 self.conn.begin()
3349
3350 try:
3351 var_map = {
3352 ":topic": SQL_QUEUE_TOPIC_async_dataset_update,
3353 ":PandaID": tmp_id,
3354 }
3355 self.cur.execute(sqlL + comment, var_map)
3356 tmp_data_list = self.cur.fetchall()
3357 except Exception:
3358 tmp_log.debug("cannot lock queries")
3359 all_ok = False
3360 if tmp_data_list:
3361
3362 if all_ok:
3363 for (tmp_data,) in tmp_data_list:
3364 sql, var_map = json.loads(tmp_data)
3365 query_list.append((sql, var_map))
3366 try:
3367 self.cur.execute(sql + comment, var_map)
3368 except Exception:
3369 tmp_log.error(f'failed to execute "{sql}" var={str(var_map)}')
3370 self.dump_error_message(tmp_log)
3371 all_ok = False
3372 break
3373
3374 if all_ok:
3375 var_map = {":PandaID": tmp_id}
3376 self.cur.execute(sqlD + comment, var_map)
3377
3378 if all_ok:
3379 if not self._commit():
3380 raise RuntimeError("Commit error")
3381 for sql, var_map in query_list:
3382 tmp_log.debug(sql + str(var_map))
3383 tmp_log.debug("done")
3384 break
3385 else:
3386 self._rollback()
3387 if i_try + 1 < n_try:
3388 time.sleep(1)
3389 if not all_ok:
3390 tmp_log.error("all attempts failed")
3391 g_tmp_log.debug(f"processed {len(panda_id_list)} IDs")
3392 return True
3393 except Exception:
3394
3395 self._rollback()
3396
3397 self.dump_error_message(g_tmp_log)
3398 return False
3399
3400
3401 def get_tasks_inputdatasets_JEDI(self, vo):
3402 comment = " /* JediDBProxy.get_tasks_inputdatasets_JEDI */"
3403
3404 tmpLog = self.create_tagged_logger(comment, f"vo={vo}")
3405 tmpLog.debug("start")
3406 now_ts = naive_utcnow()
3407 try:
3408 retVal = None
3409
3410 sql = (
3411 "SELECT tabT.jediTaskID,datasetID, tabD.datasetName "
3412 "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_Datasets tabD,{0}.JEDI_AUX_Status_MinTaskID tabA "
3413 "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID AND tabT.jediTaskID=tabD.jediTaskID "
3414 "AND tabT.vo=:vo AND tabT.status IN ('running', 'ready', 'scouting', 'pending') "
3415 "AND tabD.type IN ('input') AND tabD.masterID IS NULL "
3416 ).format(panda_config.schemaJEDI)
3417
3418 self.conn.begin()
3419
3420 varMap = {}
3421 varMap[":vo"] = vo
3422 self.cur.execute(sql + comment, varMap)
3423 res = self.cur.fetchall()
3424 nRows = self.cur.rowcount
3425
3426 if not self._commit():
3427 raise RuntimeError("Commit error")
3428
3429 retVal = res
3430 tmpLog.debug(f"done with {nRows} rows")
3431 return retVal
3432 except Exception:
3433
3434 self._rollback()
3435
3436 self.dump_error_message(tmpLog)
3437 return retVal
3438
3439
3440 def updateDatasetLocality_JEDI(self, jedi_taskid, datasetid, rse):
3441 comment = " /* JediDBProxy.updateDatasetLocality_JEDI */"
3442
3443 timestamp = naive_utcnow()
3444 timestamp_str = timestamp.strftime("%Y-%m-%d_%H:%M:%S")
3445 tmpLog = self.create_tagged_logger(comment, f"taskID={jedi_taskid} datasetID={datasetid} rse={rse} timestamp={timestamp_str}")
3446
3447 try:
3448 retVal = False
3449
3450 sqlC = f"SELECT timestamp FROM {panda_config.schemaJEDI}.JEDI_Dataset_Locality WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND rse=:rse "
3451
3452 sqlI = (
3453 "INSERT INTO {0}.JEDI_Dataset_Locality " "(jediTaskID, datasetID, rse, timestamp) " "VALUES (:jediTaskID, :datasetID, :rse, :timestamp)"
3454 ).format(panda_config.schemaJEDI)
3455
3456 sqlU = (
3457 "UPDATE {0}.JEDI_Dataset_Locality " "SET timestamp=:timestamp " "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND rse=:rse "
3458 ).format(panda_config.schemaJEDI)
3459
3460 self.conn.begin()
3461
3462 varMap = {}
3463 varMap[":jediTaskID"] = jedi_taskid
3464 varMap[":datasetID"] = datasetid
3465 varMap[":rse"] = rse
3466 self.cur.execute(sqlC + comment, varMap)
3467 resC = self.cur.fetchone()
3468 varMap[":timestamp"] = timestamp
3469 if resC is None:
3470
3471 tmpLog.debug("insert")
3472 self.cur.execute(sqlI + comment, varMap)
3473 else:
3474
3475 tmpLog.debug("update")
3476 self.cur.execute(sqlU + comment, varMap)
3477
3478 if not self._commit():
3479 raise RuntimeError("Commit error")
3480
3481 retVal = True
3482
3483 return retVal
3484 except Exception:
3485
3486 self._rollback()
3487
3488 self.dump_error_message(tmpLog)
3489 return retVal
3490
3491
3492 def deleteOutdatedDatasetLocality_JEDI(self, before_timestamp):
3493 comment = " /* JediDBProxy.deleteOutdatedDatasetLocality_JEDI */"
3494
3495 before_timestamp_str = before_timestamp.strftime("%Y-%m-%d_%H:%M:%S")
3496 tmpLog = self.create_tagged_logger(comment, f"before_timestamp={before_timestamp_str}")
3497 tmpLog.debug("start")
3498 try:
3499 retVal = 0
3500
3501 sqlD = f"DELETE FROM {panda_config.schemaJEDI}.Jedi_Dataset_Locality WHERE timestamp<=:timestamp "
3502
3503 self.conn.begin()
3504
3505 varMap = {}
3506 varMap[":timestamp"] = before_timestamp
3507
3508 self.cur.execute(sqlD + comment, varMap)
3509 retVal = self.cur.rowcount
3510
3511 if not self._commit():
3512 raise RuntimeError("Commit error")
3513
3514 tmpLog.debug(f"done, deleted {retVal} records")
3515 return retVal
3516 except Exception:
3517
3518 self._rollback()
3519
3520 self.dump_error_message(tmpLog)
3521 return retVal
3522
3523
3524 def appendDatasets_JEDI(self, jediTaskID, inMasterDatasetSpecList, inSecDatasetSpecList):
3525 comment = " /* JediDBProxy.appendDatasets_JEDI */"
3526 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
3527 tmpLog.debug("start")
3528 goDefined = False
3529 refreshContents = False
3530 commandStr = "incexec"
3531 try:
3532
3533 self.conn.begin()
3534 self.cur.arraysize = 100000
3535
3536 varMap = {}
3537 varMap[":jediTaskID"] = jediTaskID
3538 sqlTK = f"SELECT status FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID FOR UPDATE "
3539 self.cur.execute(sqlTK + comment, varMap)
3540 resTK = self.cur.fetchone()
3541 if resTK is None:
3542
3543 msgStr = "task not found"
3544 tmpLog.debug(msgStr)
3545 else:
3546 (taskStatus,) = resTK
3547
3548 if taskStatus != JediTaskSpec.commandStatusMap()[commandStr]["done"]:
3549 msgStr = f"invalid status={taskStatus} for dataset appending"
3550 tmpLog.debug(msgStr)
3551 else:
3552 timeNow = naive_utcnow()
3553
3554 master_dataset_names = [datasetSpec.datasetName for datasetSpec in inMasterDatasetSpecList]
3555
3556 varMap = {}
3557 varMap[":jediTaskID"] = jediTaskID
3558 sqlDS = "SELECT datasetName,datasetID,status,nFilesTobeUsed,nFilesUsed,masterID "
3559 sqlDS += f"FROM {panda_config.schemaJEDI}.JEDI_Datasets "
3560 sqlDS += f"WHERE jediTaskID=:jediTaskID AND type IN ({INPUT_TYPES_var_str}) "
3561 varMap.update(INPUT_TYPES_var_map)
3562 self.cur.execute(sqlDS + comment, varMap)
3563 resDS = self.cur.fetchall()
3564
3565 sql_ex = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets SET status=:status WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
3566 existingDatasets = {}
3567 for datasetName, dataset_id, datasetStatus, nFilesTobeUsed, nFilesUsed, masterID in resDS:
3568
3569 try:
3570 if masterID is None and (nFilesTobeUsed - nFilesUsed > 0 or datasetStatus in JediDatasetSpec.statusToUpdateContents()):
3571
3572 to_update_status = False
3573 if datasetStatus == "removed" and datasetName in master_dataset_names:
3574 to_update_status = True
3575 var_map = {":status": "defined", ":jediTaskID": jediTaskID, ":datasetID": dataset_id}
3576
3577 elif datasetStatus != "removed" and datasetName not in master_dataset_names:
3578 to_update_status = True
3579 var_map = {":status": "removed", ":jediTaskID": jediTaskID, ":datasetID": dataset_id}
3580
3581 if to_update_status:
3582 tmpLog.debug(f"""set status={var_map[":status"]} from {datasetStatus} for {datasetName}""")
3583 self.cur.execute(sql_ex + comment, var_map)
3584 datasetStatus = var_map[":status"]
3585 if datasetStatus != "removed":
3586 goDefined = True
3587 if datasetStatus in JediDatasetSpec.statusToUpdateContents():
3588 refreshContents = True
3589 except Exception:
3590 pass
3591 existingDatasets[datasetName] = datasetStatus
3592
3593 sqlID = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Datasets ({JediDatasetSpec.columnNames()}) "
3594 sqlID += JediDatasetSpec.bindValuesExpression()
3595 sqlID += " RETURNING datasetID INTO :newDatasetID"
3596 for datasetSpec in inMasterDatasetSpecList:
3597
3598 if datasetSpec.datasetName in existingDatasets:
3599
3600 if existingDatasets[datasetSpec.datasetName] in JediDatasetSpec.statusToUpdateContents():
3601 goDefined = True
3602 continue
3603 datasetSpec.creationTime = timeNow
3604 datasetSpec.modificationTime = timeNow
3605 varMap = datasetSpec.valuesMap(useSeq=True)
3606 varMap[":newDatasetID"] = self.cur.var(varNUMBER)
3607
3608 tmpLog.debug(f"append {datasetSpec.datasetName}")
3609 self.cur.execute(sqlID + comment, varMap)
3610 val = self.getvalue_corrector(self.cur.getvalue(varMap[":newDatasetID"]))
3611 datasetID = int(val)
3612 masterID = datasetID
3613 datasetSpec.datasetID = datasetID
3614
3615 for datasetSpec in inSecDatasetSpecList:
3616 datasetSpec.creationTime = timeNow
3617 datasetSpec.modificationTime = timeNow
3618 datasetSpec.masterID = masterID
3619 varMap = datasetSpec.valuesMap(useSeq=True)
3620 varMap[":newDatasetID"] = self.cur.var(varNUMBER)
3621
3622 self.cur.execute(sqlID + comment, varMap)
3623 val = self.getvalue_corrector(self.cur.getvalue(varMap[":newDatasetID"]))
3624 datasetID = int(val)
3625 datasetSpec.datasetID = datasetID
3626 goDefined = True
3627
3628 deft_staus = None
3629 sqlUT = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
3630 sqlUT += "SET status=:status,lockedBy=NULL,lockedTime=NULL,modificationtime=:updateTime,stateChangeTime=CURRENT_DATE "
3631 sqlUT += "WHERE jediTaskID=:jediTaskID "
3632 varMap = {}
3633 varMap[":jediTaskID"] = jediTaskID
3634 if goDefined:
3635
3636 if inMasterDatasetSpecList == [] and not refreshContents:
3637
3638 varMap[":status"] = "ready"
3639 deft_staus = "ready"
3640 else:
3641
3642 varMap[":status"] = "defined"
3643 deft_staus = "registered"
3644 else:
3645
3646 varMap[":status"] = "prepared"
3647
3648 varMap[":updateTime"] = naive_utcnow() - datetime.timedelta(hours=6)
3649 tmpLog.debug(f"set taskStatus={varMap[':status']}")
3650 self.cur.execute(sqlUT + comment, varMap)
3651
3652 if deft_staus is not None:
3653 self.setDeftStatus_JEDI(jediTaskID, deft_staus)
3654 self.setSuperStatus_JEDI(jediTaskID, deft_staus)
3655
3656 self.record_task_status_change(jediTaskID)
3657 self.push_task_status_message(None, jediTaskID, varMap[":status"])
3658
3659
3660 if not self._commit():
3661 raise RuntimeError("Commit error")
3662
3663 tmpLog.debug("done")
3664 return True
3665 except Exception:
3666
3667 self._rollback()
3668
3669 self.dump_error_message(tmpLog)
3670 return False
3671
3672
3673 def insertDataset_JEDI(self, datasetSpec):
3674 comment = " /* JediDBProxy.insertDataset_JEDI */"
3675 tmpLog = self.create_tagged_logger(comment)
3676 tmpLog.debug("start")
3677 try:
3678
3679 timeNow = naive_utcnow()
3680 datasetSpec.creationTime = timeNow
3681 datasetSpec.modificationTime = timeNow
3682
3683 sql = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Datasets ({JediDatasetSpec.columnNames()}) "
3684 sql += JediDatasetSpec.bindValuesExpression()
3685 sql += " RETURNING datasetID INTO :newDatasetID"
3686 varMap = datasetSpec.valuesMap(useSeq=True)
3687 varMap[":newDatasetID"] = self.cur.var(varNUMBER)
3688
3689 self.conn.begin()
3690
3691 self.cur.execute(sql + comment, varMap)
3692
3693 if not self._commit():
3694 raise RuntimeError("Commit error")
3695 tmpLog.debug("done")
3696 val = self.getvalue_corrector(self.cur.getvalue(varMap[":newDatasetID"]))
3697 return True, int(val)
3698 except Exception:
3699
3700 self._rollback()
3701
3702 self.dump_error_message(tmpLog)
3703 return False, None
3704
3705
3706 def updateDataset_JEDI(self, datasetSpec, criteria, lockTask):
3707 comment = " /* JediDBProxy.updateDataset_JEDI */"
3708 tmpLog = self.create_tagged_logger(comment, f"datasetID={datasetSpec.datasetID}")
3709 tmpLog.debug("start")
3710
3711 failedRet = False, 0
3712
3713 if criteria == {}:
3714 tmpLog.error("no selection criteria")
3715 return failedRet
3716
3717 for tmpKey in criteria.keys():
3718 if not hasattr(datasetSpec, tmpKey):
3719 tmpLog.error(f"unknown attribute {tmpKey} is used in criteria")
3720 return failedRet
3721 try:
3722
3723 timeNow = naive_utcnow()
3724 datasetSpec.modificationTime = timeNow
3725
3726 varMap = datasetSpec.valuesMap(useSeq=False, onlyChanged=True)
3727
3728 sql = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets SET {datasetSpec.bindUpdateChangesExpression()} WHERE "
3729 useAND = False
3730 for tmpKey, tmpVal in criteria.items():
3731 crKey = f":cr_{tmpKey}"
3732 if useAND:
3733 sql += " AND"
3734 else:
3735 useAND = True
3736 sql += f" {tmpKey}={crKey}"
3737 varMap[crKey] = tmpVal
3738
3739
3740 varMapLock = {}
3741 varMapLock[":jediTaskID"] = datasetSpec.jediTaskID
3742 sqlLock = f"SELECT 1 FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID FOR UPDATE"
3743
3744 self.conn.begin()
3745
3746 if lockTask:
3747 self.cur.execute(sqlLock + comment, varMapLock)
3748
3749 tmpLog.debug(sql + comment + str(varMap))
3750 self.cur.execute(sql + comment, varMap)
3751
3752 nRows = self.cur.rowcount
3753
3754 if not self._commit():
3755 raise RuntimeError("Commit error")
3756 tmpLog.debug(f"updated {nRows} rows")
3757 return True, nRows
3758 except Exception:
3759
3760 self._rollback()
3761
3762 self.dump_error_message(tmpLog)
3763 return failedRet
3764
3765
3766 def updateDatasetAttributes_JEDI(self, jediTaskID, datasetID, attributes):
3767 comment = " /* JediDBProxy.updateDatasetAttributes_JEDI */"
3768 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} datasetID={datasetID}")
3769 tmpLog.debug("start")
3770
3771 failedRet = False
3772 try:
3773
3774 sql = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets SET "
3775
3776 varMap = {}
3777 varMap[":jediTaskID"] = jediTaskID
3778 varMap[":datasetID"] = datasetID
3779 for tmpKey, tmpVal in attributes.items():
3780 crKey = f":{tmpKey}"
3781 sql += f"{tmpKey}={crKey},"
3782 varMap[crKey] = tmpVal
3783 sql = sql[:-1]
3784 sql += " "
3785 sql += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
3786
3787 self.conn.begin()
3788
3789 tmpLog.debug(sql + comment + str(varMap))
3790 self.cur.execute(sql + comment, varMap)
3791
3792 nRows = self.cur.rowcount
3793
3794 if not self._commit():
3795 raise RuntimeError("Commit error")
3796 tmpLog.debug(f"updated {nRows} rows")
3797 return True, nRows
3798 except Exception:
3799
3800 self._rollback()
3801
3802 self.dump_error_message(tmpLog)
3803 return failedRet
3804
3805
3806 def getDatasetAttributes_JEDI(self, jediTaskID, datasetID, attributes):
3807 comment = " /* JediDBProxy.getDatasetAttributes_JEDI */"
3808 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} datasetID={datasetID}")
3809 tmpLog.debug("start")
3810
3811 failedRet = {}
3812 try:
3813
3814 sql = "SELECT "
3815 for tmpKey in attributes:
3816 sql += f"{tmpKey},"
3817 sql = sql[:-1] + " "
3818 sql += f"FROM {panda_config.schemaJEDI}.JEDI_Datasets "
3819 sql += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
3820
3821 varMap = {}
3822 varMap[":jediTaskID"] = jediTaskID
3823 varMap[":datasetID"] = datasetID
3824
3825 self.conn.begin()
3826
3827 self.cur.execute(sql + comment, varMap)
3828 res = self.cur.fetchone()
3829
3830 if not self._commit():
3831 raise RuntimeError("Commit error")
3832
3833 retMap = {}
3834 if res is not None:
3835 for tmpIdx, tmpKey in enumerate(attributes):
3836 retMap[tmpKey] = res[tmpIdx]
3837 tmpLog.debug(f"got {str(retMap)}")
3838 return retMap
3839 except Exception:
3840
3841 self._rollback()
3842
3843 self.dump_error_message(tmpLog)
3844 return failedRet
3845
3846
3847 def getDatasetAttributesWithMap_JEDI(self, jediTaskID, criteria, attributes):
3848 comment = " /* JediDBProxy.getDatasetAttributesWithMap_JEDI */"
3849 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} criteria={str(criteria)}")
3850 tmpLog.debug("start")
3851
3852 failedRet = {}
3853 try:
3854 varMap = {}
3855 varMap[":jediTaskID"] = jediTaskID
3856
3857 sql = "SELECT "
3858 for tmpKey in attributes:
3859 sql += f"{tmpKey},"
3860 sql = sql[:-1] + " "
3861 sql += f"FROM {panda_config.schemaJEDI}.JEDI_Datasets "
3862 sql += "WHERE jediTaskID=:jediTaskID "
3863 for crKey, crVal in criteria.items():
3864 sql += "AND {0}=:{0} ".format(crKey)
3865 varMap[f":{crKey}"] = crVal
3866
3867 self.conn.begin()
3868
3869 self.cur.execute(sql + comment, varMap)
3870 res = self.cur.fetchone()
3871
3872 if not self._commit():
3873 raise RuntimeError("Commit error")
3874
3875 retMap = {}
3876 if res is not None:
3877 for tmpIdx, tmpKey in enumerate(attributes):
3878 retMap[tmpKey] = res[tmpIdx]
3879 tmpLog.debug(f"got {str(retMap)}")
3880 return retMap
3881 except Exception:
3882
3883 self._rollback()
3884
3885 self.dump_error_message(tmpLog)
3886 return failedRet
3887
3888
3889 def getDatasetWithID_JEDI(self, jediTaskID, datasetID):
3890 comment = " /* JediDBProxy.getDatasetWithID_JEDI */"
3891 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} datasetID={datasetID}")
3892 tmpLog.debug("start")
3893
3894 failedRet = False, None
3895 try:
3896
3897 sql = f"SELECT {JediDatasetSpec.columnNames()} "
3898 sql += f"FROM {panda_config.schemaJEDI}.JEDI_Datasets WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
3899 varMap = {}
3900 varMap[":jediTaskID"] = jediTaskID
3901 varMap[":datasetID"] = datasetID
3902
3903 self.conn.begin()
3904
3905 self.cur.execute(sql + comment, varMap)
3906 res = self.cur.fetchone()
3907
3908 if not self._commit():
3909 raise RuntimeError("Commit error")
3910 if res is not None:
3911 datasetSpec = JediDatasetSpec()
3912 datasetSpec.pack(res)
3913 else:
3914 datasetSpec = None
3915 tmpLog.debug("done")
3916 return True, datasetSpec
3917 except Exception:
3918
3919 self._rollback()
3920
3921 self.dump_error_message(tmpLog)
3922 return failedRet
3923
3924
3925 def getDatasetsWithJediTaskID_JEDI(self, jediTaskID, datasetTypes=None):
3926 comment = " /* JediDBProxy.getDatasetsWithJediTaskID_JEDI */"
3927 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} datasetTypes={datasetTypes}")
3928 tmpLog.debug("start")
3929
3930 failedRet = False, None
3931 try:
3932
3933 varMap = {}
3934 varMap[":jediTaskID"] = jediTaskID
3935 sql = f"SELECT {JediDatasetSpec.columnNames()} "
3936 sql += f"FROM {panda_config.schemaJEDI}.JEDI_Datasets WHERE jediTaskID=:jediTaskID "
3937 if datasetTypes is not None:
3938 dstype_var_names_str, dstype_var_map = get_sql_IN_bind_variables(datasetTypes, prefix=":type_", value_as_suffix=True)
3939 sql += f"AND type IN ({dstype_var_names_str}) "
3940 varMap.update(dstype_var_map)
3941
3942 self.conn.begin()
3943 self.cur.arraysize = 10000
3944
3945 self.cur.execute(sql + comment, varMap)
3946 tmpResList = self.cur.fetchall()
3947
3948 if not self._commit():
3949 raise RuntimeError("Commit error")
3950
3951 datasetSpecList = []
3952 for tmpRes in tmpResList:
3953 datasetSpec = JediDatasetSpec()
3954 datasetSpec.pack(tmpRes)
3955 datasetSpecList.append(datasetSpec)
3956 tmpLog.debug(f"done with {len(datasetSpecList)} datasets")
3957 return True, datasetSpecList
3958 except Exception:
3959
3960 self._rollback()
3961
3962 self.dump_error_message(tmpLog)
3963 return failedRet
3964
3965
3966 def get_task_ids_with_dataset_attributes(self, dataset_attributes: dict, only_active_tasks: bool = True) -> tuple[bool, list[int] | None]:
3967 """Get jediTaskIDs with dataset attributes.
3968 Args:
3969 dataset_attributes (dict): A dictionary of dataset attributes to filter on.
3970 only_active_tasks (bool): If True, only consider active tasks.
3971 Returns:
3972 tuple: A tuple containing a boolean indicating success, and a list of jediTaskIDs or None.
3973 """
3974 comment = " /* JediDBProxy.get_task_ids_with_dataset_attributes */"
3975 tmp_log = self.create_tagged_logger(comment, f"dataset_attributes={dataset_attributes} only_active_tasks={only_active_tasks}")
3976 tmp_log.debug("start")
3977 try:
3978
3979 sql = (
3980 f"SELECT DISTINCT tabT.jediTaskID FROM {panda_config.schemaJEDI}.JEDI_Tasks tabT,{panda_config.schemaJEDI}.JEDI_AUX_Status_MinTaskID tabA,{panda_config.schemaJEDI}.JEDI_Datasets tabD "
3981 "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID AND tabT.jediTaskID=tabD.jediTaskID "
3982 )
3983 for k, v in dataset_attributes.items():
3984 sql += f"AND tabD.{k}=:{k} "
3985 var_map = copy.copy(dataset_attributes)
3986 if only_active_tasks:
3987 task_var_names_str, task_var_map = get_sql_IN_bind_variables(
3988 JediTaskSpec.statusToRejectExtChange(), prefix=":task_status_", value_as_suffix=True
3989 )
3990 sql += f"AND tabT.status NOT IN ({task_var_names_str}) "
3991 var_map.update(task_var_map)
3992
3993 self.conn.begin()
3994 self.cur.arraysize = 10000
3995
3996 print(sql, str(var_map))
3997 self.cur.execute(sql + comment, var_map)
3998 tmp_res = self.cur.fetchall()
3999
4000 if not self._commit():
4001 raise RuntimeError("Commit error")
4002 task_id_list = [row[0] for row in tmp_res]
4003 tmp_log.debug(f"done with {len(task_id_list)} tasks")
4004 return True, task_id_list
4005 except Exception:
4006
4007 self._rollback()
4008
4009 self.dump_error_message(tmp_log)
4010 return False, None
4011
4012
4013 def extendSandboxLifetime_JEDI(self, jedi_taskid, file_name):
4014 comment = " /* JediDBProxy.extendSandboxLifetime_JEDI */"
4015 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jedi_taskid}")
4016 try:
4017 self.conn.begin()
4018 retVal = False
4019
4020 sqlC = f"UPDATE {panda_config.schemaMETA}.userCacheUsage SET creationTime=CURRENT_DATE WHERE fileName=:fileName "
4021 varMap = {}
4022 varMap[":fileName"] = file_name
4023 self.cur.execute(sqlC + comment, varMap)
4024 nRows = self.cur.rowcount
4025 if not self._commit():
4026 raise RuntimeError("Commit error")
4027 tmpLog.debug(f"done {file_name} with {nRows}")
4028
4029 return nRows
4030 except Exception:
4031
4032 self._rollback()
4033
4034 self.dump_error_message(tmpLog)
4035 return None
4036
4037
4038 def lockProcess_JEDI(self, vo, prodSourceLabel, cloud, workqueue_id, resource_name, component, pid, forceOption, timeLimit):
4039 comment = " /* JediDBProxy.lockProcess_JEDI */"
4040
4041 if cloud is None:
4042 cloud = "default"
4043 if workqueue_id is None:
4044 workqueue_id = 0
4045 if resource_name is None:
4046 resource_name = "default"
4047 if component is None:
4048 component = "default"
4049 tmpLog = self.create_tagged_logger(
4050 comment, f"vo={vo} label={prodSourceLabel} cloud={cloud} queue={workqueue_id} resource_type={resource_name} component={component} pid={pid}"
4051 )
4052 tmpLog.debug("start")
4053 try:
4054 retVal = False
4055
4056 sqlCT = "SELECT lockedBy "
4057 sqlCT += f"FROM {panda_config.schemaJEDI}.JEDI_Process_Lock "
4058 sqlCT += "WHERE vo=:vo AND prodSourceLabel=:prodSourceLabel AND cloud=:cloud AND workqueue_id=:workqueue_id "
4059 sqlCT += "AND resource_type=:resource_name AND component=:component "
4060 sqlCT += "AND lockedTime>:timeLimit "
4061 sqlCT += "FOR UPDATE"
4062
4063 sqlCD = f"DELETE FROM {panda_config.schemaJEDI}.JEDI_Process_Lock "
4064 sqlCD += "WHERE vo=:vo AND prodSourceLabel=:prodSourceLabel AND cloud=:cloud AND workqueue_id=:workqueue_id "
4065 sqlCD += "AND resource_type=:resource_name AND component=:component "
4066
4067 sqlFR = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Process_Lock "
4068 sqlFR += "(vo, prodSourceLabel, cloud, workqueue_id, resource_type, component, lockedBy, lockedTime) "
4069 sqlFR += "VALUES(:vo, :prodSourceLabel, :cloud, :workqueue_id, :resource_name, :component, :lockedBy, CURRENT_DATE) "
4070
4071 self.conn.begin()
4072
4073 if not forceOption:
4074 varMap = {}
4075 varMap[":vo"] = vo
4076 varMap[":prodSourceLabel"] = prodSourceLabel
4077 varMap[":cloud"] = cloud
4078 varMap[":workqueue_id"] = workqueue_id
4079 varMap[":resource_name"] = resource_name
4080 varMap[":component"] = component
4081 varMap[":timeLimit"] = naive_utcnow() - datetime.timedelta(minutes=timeLimit)
4082 self.cur.execute(sqlCT + comment, varMap)
4083 resCT = self.cur.fetchone()
4084 else:
4085 resCT = None
4086 if resCT is not None:
4087 tmpLog.debug(f"skipped, locked by {resCT[0]}")
4088 else:
4089
4090 varMap = {}
4091 varMap[":vo"] = vo
4092 varMap[":prodSourceLabel"] = prodSourceLabel
4093 varMap[":cloud"] = cloud
4094 varMap[":workqueue_id"] = workqueue_id
4095 varMap[":resource_name"] = resource_name
4096 varMap[":component"] = component
4097 self.cur.execute(sqlCD + comment, varMap)
4098
4099 varMap = {}
4100 varMap[":vo"] = vo
4101 varMap[":prodSourceLabel"] = prodSourceLabel
4102 varMap[":cloud"] = cloud
4103 varMap[":workqueue_id"] = workqueue_id
4104 varMap[":resource_name"] = resource_name
4105 varMap[":component"] = component
4106 varMap[":lockedBy"] = pid
4107 self.cur.execute(sqlFR + comment, varMap)
4108 tmpLog.debug("successfully locked")
4109 retVal = True
4110
4111 if not self._commit():
4112 raise RuntimeError("Commit error")
4113 return retVal
4114 except Exception:
4115
4116 self._rollback()
4117
4118 self.dump_error_message(tmpLog)
4119 return retVal
4120
4121
4122 def unlockProcess_JEDI(self, vo, prodSourceLabel, cloud, workqueue_id, resource_name, component, pid):
4123 comment = " /* JediDBProxy.unlockProcess_JEDI */"
4124
4125 if cloud is None:
4126 cloud = "default"
4127 if workqueue_id is None:
4128 workqueue_id = 0
4129 if resource_name is None:
4130 resource_name = "default"
4131 if component is None:
4132 component = "default"
4133 tmpLog = self.create_tagged_logger(
4134 comment, f"vo={vo} label={prodSourceLabel} cloud={cloud} queue={workqueue_id} resource_type={resource_name} component={component} pid={pid}"
4135 )
4136 tmpLog.debug("start")
4137 try:
4138 retVal = False
4139
4140 sqlCD = f"DELETE FROM {panda_config.schemaJEDI}.JEDI_Process_Lock "
4141 sqlCD += "WHERE vo=:vo AND prodSourceLabel=:prodSourceLabel AND cloud=:cloud "
4142 sqlCD += "AND workqueue_id=:workqueue_id AND lockedBy=:lockedBy "
4143 sqlCD += "AND resource_type=:resource_name AND component=:component "
4144
4145 self.conn.begin()
4146
4147 varMap = {}
4148 varMap[":vo"] = vo
4149 varMap[":prodSourceLabel"] = prodSourceLabel
4150 varMap[":cloud"] = cloud
4151 varMap[":workqueue_id"] = workqueue_id
4152 varMap[":resource_name"] = resource_name
4153 varMap[":component"] = component
4154 varMap[":lockedBy"] = pid
4155 self.cur.execute(sqlCD + comment, varMap)
4156
4157 if not self._commit():
4158 raise RuntimeError("Commit error")
4159 tmpLog.debug("done")
4160 retVal = True
4161 return retVal
4162 except Exception:
4163
4164 self._rollback()
4165
4166 self.dump_error_message(tmpLog)
4167 return retVal
4168
4169
4170 def unlockProcessWithPID_JEDI(self, vo, prodSourceLabel, workqueue_id, resource_name, pid, useBase):
4171 comment = " /* JediDBProxy.unlockProcessWithPID_JEDI */"
4172 tmpLog = self.create_tagged_logger(
4173 comment, f"vo={vo} label={prodSourceLabel} queue={workqueue_id} resource_type={resource_name} pid={pid} useBase={useBase}"
4174 )
4175 tmpLog.debug("start")
4176 try:
4177 retVal = False
4178
4179 sqlCD = f"DELETE FROM {panda_config.schemaJEDI}.JEDI_Process_Lock "
4180 sqlCD += "WHERE vo=:vo AND prodSourceLabel=:prodSourceLabel "
4181 sqlCD += "AND workqueue_id=:workqueue_id "
4182 sqlCD += "AND resource_name=:resource_name "
4183 if useBase:
4184 sqlCD += "AND lockedBy LIKE :lockedBy "
4185 else:
4186 sqlCD += "AND lockedBy=:lockedBy "
4187
4188 self.conn.begin()
4189
4190 varMap = {}
4191 varMap[":vo"] = vo
4192 varMap[":prodSourceLabel"] = prodSourceLabel
4193 varMap[":workqueue_id"] = workqueue_id
4194 varMap[":resource_name"] = resource_name
4195 if useBase:
4196 varMap[":lockedBy"] = pid + "%"
4197 else:
4198 varMap[":lockedBy"] = pid
4199 self.cur.execute(sqlCD + comment, varMap)
4200
4201 if not self._commit():
4202 raise RuntimeError("Commit error")
4203 tmpLog.debug("done")
4204 retVal = True
4205 return retVal
4206 except Exception:
4207
4208 self._rollback()
4209
4210 self.dump_error_message(tmpLog)
4211 return retVal
4212
4213
4214 def checkProcessLock_JEDI(self, vo, prodSourceLabel, cloud, workqueue_id, resource_name, component, pid, checkBase):
4215 comment = " /* JediDBProxy.checkProcessLock_JEDI */"
4216
4217 if cloud is None:
4218 cloud = "default"
4219 if workqueue_id is None:
4220 workqueue_id = 0
4221 if resource_name is None:
4222 resource_name = "default"
4223 if component is None:
4224 component = "default"
4225 tmpLog = self.create_tagged_logger(
4226 comment, f"vo={vo} label={prodSourceLabel} cloud={cloud} queue={workqueue_id} resource_type={resource_name} component={component} pid={pid}"
4227 )
4228 tmpLog.debug("start")
4229 try:
4230 retVal = False
4231
4232 sqlCT = "SELECT lockedBy "
4233 sqlCT += f"FROM {panda_config.schemaJEDI}.JEDI_Process_Lock "
4234 sqlCT += "WHERE vo=:vo AND prodSourceLabel=:prodSourceLabel AND cloud=:cloud AND workqueue_id=:workqueue_id "
4235 sqlCT += "AND resource_type=:resource_name AND component=:component "
4236 sqlCT += "AND lockedTime>:timeLimit "
4237
4238 self.conn.begin()
4239
4240 varMap = {}
4241 varMap[":vo"] = vo
4242 varMap[":prodSourceLabel"] = prodSourceLabel
4243 varMap[":cloud"] = cloud
4244 varMap[":workqueue_id"] = workqueue_id
4245 varMap[":resource_name"] = resource_name
4246 varMap[":component"] = component
4247 varMap[":timeLimit"] = naive_utcnow() - datetime.timedelta(minutes=5)
4248 self.cur.execute(sqlCT + comment, varMap)
4249 resCT = self.cur.fetchone()
4250 if resCT is not None:
4251 (lockedBy,) = resCT
4252 if checkBase:
4253
4254 if not lockedBy.startswith(pid):
4255 retVal = True
4256 else:
4257
4258 if lockedBy != pid:
4259 retVal = True
4260 if retVal is True:
4261 tmpLog.debug(f"skipped locked by {lockedBy}")
4262
4263 if not self._commit():
4264 raise RuntimeError("Commit error")
4265 tmpLog.debug(f"done with {retVal}")
4266 return retVal
4267 except Exception:
4268
4269 self._rollback()
4270
4271 self.dump_error_message(tmpLog)
4272 return retVal