File indexing completed on 2026-04-10 08:39:02
0001 import atexit
0002 import datetime
0003 import json
0004 import socket
0005 import sys
0006 import time
0007 import traceback
0008 from contextlib import contextmanager
0009
0010 from pandacommon.pandalogger.LogWrapper import LogWrapper
0011 from pandacommon.pandautils.PandaUtils import naive_utcnow
0012
0013 from pandaserver.config import panda_config
0014 from pandaserver.taskbuffer.JediTaskSpec import (
0015 push_status_changes as task_push_status_changes,
0016 )
0017 from pandaserver.taskbuffer.JobSpec import (
0018 push_status_changes as job_push_status_changes,
0019 )
0020
0021 if panda_config.backend == "oracle":
0022 import oracledb
0023
0024 varNUMBER = oracledb.NUMBER
0025 else:
0026 varNUMBER = int
0027
0028
0029 SQL_QUEUE_TOPIC_async_dataset_update = "async_dataset_update"
0030
0031
0032
0033 def memoize(f):
0034 memo = {}
0035 kwd_mark = object()
0036
0037 def helper(self, *args, **kwargs):
0038 now = datetime.datetime.now()
0039 key = args + (kwd_mark,) + tuple(sorted(kwargs.items()))
0040 if key not in memo or memo[key]["timestamp"] < now - datetime.timedelta(hours=1):
0041 tmp_data = {"value": f(self, *args, **kwargs), "timestamp": now}
0042 memo[key] = tmp_data
0043 return memo[key]["value"]
0044
0045 return helper
0046
0047
0048
0049 def convert_dict_to_bind_vars(item):
0050 ret = dict()
0051 for k in item:
0052 ret[f":{k}"] = item[k]
0053 return ret
0054
0055
0056
0057 class BaseModule:
0058
0059 def __init__(self, log_stream: LogWrapper):
0060 self._log_stream = log_stream
0061 self.conn = None
0062 self.cur = None
0063 self.mb_proxy_dict = None
0064 self.useOtherError = False
0065 self.backend = panda_config.backend
0066
0067 self.nTry = 5
0068
0069 self.myHostName = socket.getfqdn()
0070 self.backend = panda_config.backend
0071
0072 self.hostname = None
0073
0074 self.composite_modules = {}
0075
0076
0077 self.typical_input_cache = {}
0078
0079
0080 self.workQueueMap = None
0081
0082 self.updateTimeForWorkQueue = None
0083
0084
0085 self.jedi_mb_proxy_dict = None
0086 self.jedi_mb_proxy_dict_setter = None
0087
0088
0089 self.jedi_config = None
0090
0091
0092 def set_jedi_attributes(self, jedi_config, jedi_mb_proxy_dict_setter):
0093 self.jedi_config = jedi_config
0094 self.jedi_mb_proxy_dict_setter = jedi_mb_proxy_dict_setter
0095
0096
0097 def connect(self, **kwargs):
0098 """
0099 Commit the transaction
0100 """
0101 raise NotImplementedError("connect is not implemented")
0102
0103
0104 def _commit(self):
0105 try:
0106 self.conn.commit()
0107 return True
0108 except Exception:
0109 self._log_stream.error("commit error")
0110 return False
0111
0112
0113 def _rollback(self, useOtherError=False):
0114 return_value = True
0115
0116 err_code = None
0117 self._log_stream.debug("rollback")
0118 try:
0119 self.conn.rollback()
0120 except Exception:
0121 self._log_stream.error("rollback error")
0122 return_value = False
0123
0124 try:
0125 err_type, err_value = sys.exc_info()[:2]
0126
0127 if self.backend == "postgres":
0128 try:
0129 err_code = err_value.pgcode
0130 except Exception:
0131 pass
0132
0133 if err_code is None:
0134 err_code = str(err_value).split()[0]
0135 err_code = err_code[:-1]
0136 err_msg = f"rollback EC:{err_code} {err_value}"
0137 self._log_stream.debug(err_msg)
0138
0139 if self.backend == "oracle":
0140 error_list_for_reconnect = [
0141 "ORA-01012",
0142 "ORA-01033",
0143 "ORA-01034",
0144 "ORA-01089",
0145 "ORA-03113",
0146 "ORA-03114",
0147 "ORA-12203",
0148 "ORA-12500",
0149 "ORA-12571",
0150 "ORA-03135",
0151 "ORA-25402",
0152 ]
0153
0154 if useOtherError:
0155 error_list_for_reconnect += ["ORA-01861", "ORA-01008"]
0156 elif self.backend == "postgres":
0157 import psycopg2.errorcodes as psycopg_errorcodes
0158
0159 error_list_for_reconnect = [
0160 psycopg_errorcodes.CONNECTION_EXCEPTION,
0161 psycopg_errorcodes.SQLSERVER_REJECTED_ESTABLISHMENT_OF_SQLCONNECTION,
0162 psycopg_errorcodes.CONNECTION_DOES_NOT_EXIST,
0163 psycopg_errorcodes.SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION,
0164 psycopg_errorcodes.CONNECTION_FAILURE,
0165 psycopg_errorcodes.READ_ONLY_SQL_TRANSACTION,
0166 ]
0167 else:
0168
0169 import MySQLdb
0170 from MySQLdb.constants.CR import (
0171 CONN_HOST_ERROR,
0172 CONNECTION_ERROR,
0173 LOCALHOST_CONNECTION,
0174 SERVER_LOST,
0175 )
0176 from MySQLdb.constants.ER import (
0177 ACCESS_DENIED_ERROR,
0178 DBACCESS_DENIED_ERROR,
0179 ILLEGAL_VALUE_FOR_TYPE,
0180 SERVER_SHUTDOWN,
0181 )
0182
0183 error_list_for_reconnect = [
0184 ACCESS_DENIED_ERROR,
0185 DBACCESS_DENIED_ERROR,
0186 SERVER_SHUTDOWN,
0187 CONNECTION_ERROR,
0188 CONN_HOST_ERROR,
0189 LOCALHOST_CONNECTION,
0190 SERVER_LOST,
0191 ]
0192
0193 if useOtherError:
0194 error_list_for_reconnect += [ILLEGAL_VALUE_FOR_TYPE]
0195 if err_code in error_list_for_reconnect:
0196
0197 reconnect_stat = self.connect(reconnect=True)
0198 self._log_stream.debug(f"rollback reconnected {reconnect_stat}")
0199 except Exception:
0200 pass
0201
0202 return return_value
0203
0204
0205 def add_composite_module(self, module_name, module):
0206 self.composite_modules[module_name] = module
0207
0208
0209 def get_composite_module(self, module_name):
0210 return self.composite_modules.get(module_name, None)
0211
0212
0213 def dump_error_message(self, tmp_log: LogWrapper):
0214 """
0215 Dump error message to the log
0216
0217 :param tmp_log: log wrapper
0218 """
0219
0220 err_type, err_value = sys.exc_info()[:2]
0221 err_str = f"{err_type.__name__} {err_value}"
0222 err_str.strip()
0223 err_str += " "
0224 err_str += traceback.format_exc()
0225 tmp_log.error(err_str)
0226
0227
0228 def create_tagged_logger(self, comment: str, tag: str = None) -> LogWrapper:
0229 """
0230 Create logger from function comment and tag
0231
0232 param comment: comment of the function
0233 param tag: tag to add to the method name
0234 return: logger
0235 """
0236 method_name = comment.split(" ")[-2].split(".")[-1]
0237 if tag is None:
0238 tag = naive_utcnow().strftime("%Y-%m-%d/%H:%M:%S.%f")
0239 method_name += f" < {tag} >"
0240 tmp_log = LogWrapper(self._log_stream, method_name)
0241 return tmp_log
0242
0243
0244 @memoize
0245 def getConfigValue(self, component, key, app="pandaserver", vo=None):
0246 comment = " /* DBProxy.getConfigValue */"
0247 tmp_log = self.create_tagged_logger(comment)
0248 varMap = {":component": component, ":key": key, ":app": app}
0249 sql = """
0250 SELECT value, value_json, type FROM ATLAS_PANDA.CONFIG
0251 WHERE component=:component
0252 AND key=:key
0253 AND app=:app
0254 """
0255
0256
0257 if vo:
0258 varMap[":vo"] = vo
0259 sql += "AND (vo=:vo or vo IS NULL)"
0260
0261 self.cur.execute(sql + comment, varMap)
0262
0263 try:
0264 value_str, value_json_str, type = self.cur.fetchone()
0265 except TypeError:
0266 error_message = f"Specified key={key} not found for component={component} app={app}"
0267 tmp_log.debug(error_message)
0268 return None
0269
0270 try:
0271 if type.lower() in ("str", "string"):
0272 return value_str
0273 elif type.lower() in ("int", "integer"):
0274 return int(value_str)
0275 elif type.lower() == "float":
0276 return float(value_str)
0277 elif type.lower() in ("bool", "boolean"):
0278 if value_str.lower() == "true":
0279 return True
0280 else:
0281 return False
0282 elif type.lower() == "json":
0283 return json.loads(value_json_str)
0284 else:
0285 raise ValueError
0286 except json.decoder.JSONDecodeError:
0287 tmp_log.debug(f"Could not decode. Value_json: {value_json_str}, Type: {type}")
0288 return None
0289 except ValueError:
0290 tmp_log.debug(f"Wrong value/type pair. Value: {value_str}, Type: {type}")
0291 return None
0292 except Exception as e:
0293 tmp_log.debug(f"Unexpected error: {str(e)}")
0294 raise e
0295
0296 def getvalue_corrector(self, value):
0297 """
0298 Needed to support old and new versions of cx_Oracle
0299 :return:
0300 """
0301 if isinstance(value, list):
0302 return value[0]
0303 else:
0304 return value
0305
0306
0307 def get_mb_proxy(self, channel):
0308 if self.mb_proxy_dict is None:
0309 try:
0310 if hasattr(panda_config, "mq_configFile") and panda_config.mq_configFile:
0311
0312 from pandaserver.taskbuffer.PanDAMsgProcessor import MsgProcAgent
0313
0314 out_q_list = [
0315 "panda_jobstatus",
0316 "panda_jedi",
0317 "panda_pilot_topic",
0318 "panda_pilot_queue",
0319 ]
0320 mp_agent = MsgProcAgent(config_file=panda_config.mq_configFile)
0321 mb_proxy_dict = mp_agent.start_passive_mode(in_q_list=[], out_q_list=out_q_list)
0322
0323 atexit.register(mp_agent.stop_passive_mode)
0324
0325 self.mb_proxy_dict = mb_proxy_dict
0326 except Exception:
0327 comment = " /* DBProxy.get_mb_proxy */"
0328 tmp_log = self.create_tagged_logger(comment)
0329 self.dump_error_message(tmp_log)
0330 self.mb_proxy_dict = {}
0331 if not self.mb_proxy_dict or channel not in self.mb_proxy_dict["out"]:
0332 return None
0333 return self.mb_proxy_dict["out"][channel]
0334
0335
0336 def close_connection(self):
0337 if self.conn:
0338 try:
0339 self.conn.close()
0340 except Exception:
0341 pass
0342 return
0343
0344
0345 def cleanup(self):
0346 comment = " /* DBProxy.cleanup */"
0347 tmp_log = self.create_tagged_logger(comment)
0348 tmp_log.debug("start")
0349 self.close_connection()
0350 atexit.unregister(self.close_connection)
0351 tmp_log.debug("done")
0352
0353
0354 def querySQL(self, sql, arraySize=1000):
0355 comment = " /* DBProxy.querySQL */"
0356 tmp_log = self.create_tagged_logger(comment)
0357 try:
0358 tmp_log.debug(f"SQL={sql} ")
0359
0360 self.conn.begin()
0361 self.cur.arraysize = arraySize
0362 self.cur.execute(sql + comment)
0363 res = self.cur.fetchall()
0364
0365 if not self._commit():
0366 raise RuntimeError("Commit error")
0367 return res
0368 except Exception:
0369
0370 self._rollback(self.useOtherError)
0371 self.dump_error_message(tmp_log)
0372 return None
0373
0374
0375 def querySQLS(self, sql, varMap, arraySize=1000):
0376 comment = " /* DBProxy.querySQLS */"
0377 tmp_log = self.create_tagged_logger(comment)
0378 try:
0379 tmp_log.debug(f"SQL={sql} vapMap={varMap} ")
0380
0381 self.conn.begin()
0382 self.cur.arraysize = arraySize
0383 ret = self.cur.execute(sql + comment, varMap)
0384 if ret:
0385 ret = True
0386 if sql.startswith("INSERT") or sql.startswith("UPDATE") or sql.startswith("DELETE"):
0387 res = self.cur.rowcount
0388 else:
0389 res = self.cur.fetchall()
0390
0391 if not self._commit():
0392 raise RuntimeError("Commit error")
0393 return ret, res
0394 except Exception as e:
0395
0396 self._rollback(self.useOtherError)
0397 self.dump_error_message(tmp_log)
0398 return -1, None
0399
0400
0401 def executemanySQL(self, sql, varMaps, arraySize=1000):
0402 comment = " /* DBProxy.executemanySQL */"
0403 try:
0404
0405 self.conn.begin()
0406 self.cur.arraysize = arraySize
0407 ret = self.cur.executemany(sql + comment, varMaps)
0408 if sql.startswith("INSERT") or sql.startswith("UPDATE") or sql.startswith("DELETE"):
0409 res = self.cur.rowcount
0410 else:
0411 raise RuntimeError("Operation unsupported. Only INSERT, UPDATE, DELETE are allowed")
0412
0413 if not self._commit():
0414 raise RuntimeError("Commit error")
0415 return res
0416 except Exception as e:
0417
0418 self._rollback(self.useOtherError)
0419 tmp_log = self.create_tagged_logger(comment)
0420 tmp_log.error(f"{sql} {str(varMaps)}")
0421 self.dump_error_message(tmp_log)
0422 return None
0423
0424
0425 def getClobObj(self, sql, varMap, arraySize=10000, use_commit=True):
0426 comment = " /* DBProxy.getClobObj */"
0427 try:
0428
0429 if use_commit:
0430 self.conn.begin()
0431 self.cur.arraysize = arraySize
0432 ret = self.cur.execute(sql + comment, varMap)
0433 if ret:
0434 ret = True
0435 res = []
0436 for items in self.cur:
0437 resItem = []
0438 for item in items:
0439
0440 try:
0441 itemRead = item.read()
0442 except AttributeError:
0443 itemRead = item
0444 resItem.append(itemRead)
0445
0446 res.append(resItem)
0447
0448 if use_commit:
0449 if not self._commit():
0450 raise RuntimeError("Commit error")
0451 return ret, res
0452 except Exception as e:
0453
0454 if use_commit:
0455 self._rollback()
0456 tmp_log = self.create_tagged_logger(comment)
0457 tmp_log.error(f"{sql} {str(varMap)}")
0458 self.dump_error_message(tmp_log)
0459 return -1, None
0460
0461
0462 def wakeUp(self):
0463 comment = " /* DBProxy.wakeUp */"
0464 tmp_log = self.create_tagged_logger(comment)
0465 for iTry in range(5):
0466 try:
0467
0468 self.conn.ping()
0469 return
0470 except Exception:
0471 tmp_log.error(f"{iTry} : connection is dead")
0472 self.dump_error_message(tmp_log)
0473
0474 time.sleep(1)
0475 self.connect(reconnect=True)
0476
0477
0478 @contextmanager
0479 def transaction(self, name: str | None = None, tmp_log=None):
0480 """
0481 Context manager for transaction
0482
0483 Args:
0484 name (str, optional): name of the transaction to be shown in the log
0485 tmp_log (LogWrapper, optional): logger to use. If None, a new logger will be created
0486
0487 Yields:
0488 Any: the cursor object for executing SQL commands
0489 Any: the logger object for logging in DBProxy
0490 """
0491 comment = " /* DBProxy.transaction */"
0492 try:
0493 if tmp_log is None:
0494 tmp_log = self.create_tagged_logger(comment, tag=name)
0495 tmp_log.debug("transaction start")
0496
0497 self.conn.begin()
0498
0499 yield (self.cur, tmp_log)
0500
0501 if not self._commit():
0502 raise RuntimeError("commit error")
0503 tmp_log.debug("transaction done")
0504 except Exception as e:
0505
0506 self._rollback()
0507 self.dump_error_message(tmp_log)
0508 raise e
0509
0510
0511 def recordStatusChange(self, pandaID, jobStatus, jobInfo=None, infoMap={}, useCommit=True, no_late_bulk_exec=True, extracted_sqls=None):
0512 comment = " /* DBProxy.recordStatusChange */"
0513 tmp_log = self.create_tagged_logger(comment)
0514
0515 if not hasattr(panda_config, "record_statuschange") or panda_config.record_statuschange is not True:
0516 return
0517
0518 varMap = {}
0519 varMap[":PandaID"] = pandaID
0520 varMap[":jobStatus"] = jobStatus
0521 varMap[":modificationHost"] = self.myHostName
0522 if jobInfo is not None:
0523 varMap[":computingSite"] = jobInfo.computingSite
0524 varMap[":cloud"] = jobInfo.cloud
0525 varMap[":prodSourceLabel"] = jobInfo.prodSourceLabel
0526 elif infoMap is not None:
0527 varMap[":computingSite"] = infoMap["computingSite"]
0528 varMap[":cloud"] = infoMap["cloud"]
0529 varMap[":prodSourceLabel"] = infoMap["prodSourceLabel"]
0530 else:
0531
0532 return
0533
0534 for tmpKey in varMap:
0535 if varMap[tmpKey] == "NULL":
0536 varMap[tmpKey] = None
0537
0538 sql = "INSERT INTO ATLAS_PANDA.jobs_StatusLog "
0539 sql += "(PandaID,modificationTime,jobStatus,prodSourceLabel,cloud,computingSite,modificationHost,modiftime_extended) "
0540 sql += "VALUES (:PandaID,CURRENT_DATE,:jobStatus,:prodSourceLabel,:cloud,:computingSite,:modificationHost,CURRENT_TIMESTAMP) "
0541 try:
0542
0543 if no_late_bulk_exec:
0544 if useCommit:
0545 self.conn.begin()
0546 self.cur.execute(sql + comment, varMap)
0547
0548 if useCommit:
0549 if not self._commit():
0550 raise RuntimeError("Commit error")
0551 else:
0552 extracted_sqls.setdefault("state_change", {"sql": sql + comment, "vars": []})
0553 extracted_sqls["state_change"]["vars"].append(varMap)
0554 except Exception:
0555
0556 if useCommit and no_late_bulk_exec:
0557 self._rollback()
0558 self.dump_error_message(tmp_log)
0559 if not useCommit:
0560 raise RuntimeError("recordStatusChange failed")
0561 return
0562
0563 def push_job_status_message(
0564 self,
0565 job_spec,
0566 panda_id,
0567 status,
0568 jedi_task_id=None,
0569 special_handling=None,
0570 extra_data=None,
0571 ):
0572 comment = " /* DBProxy.push_job_status_message */"
0573 if not (hasattr(panda_config, "mq_configFile") and panda_config.mq_configFile):
0574
0575 return
0576 to_push = False
0577 if special_handling is not None:
0578 to_push = job_push_status_changes(special_handling)
0579 elif job_spec is not None:
0580 to_push = job_spec.push_status_changes()
0581
0582 if not to_push:
0583 return
0584
0585 if status in ["sent", "holding", "merging"]:
0586 return
0587
0588 mb_proxy = self.get_mb_proxy("panda_jobstatus")
0589 if not mb_proxy:
0590 return
0591 if to_push:
0592 tmp_log = self.create_tagged_logger(comment)
0593
0594 try:
0595 now_time = naive_utcnow()
0596 now_ts = int(now_time.timestamp())
0597
0598 inputs = []
0599 computingsite = None
0600 error_tmp_dict = {}
0601
0602 if job_spec is not None:
0603
0604 if jedi_task_id is None:
0605 jedi_task_id = job_spec.jediTaskID
0606
0607 if job_spec.Files is not None:
0608 for file_spec in job_spec.Files:
0609 if file_spec.type in ["input", "pseudo_input"]:
0610 inputs.append(file_spec.lfn)
0611
0612 if job_spec.computingSite is not None:
0613 computingsite = job_spec.computingSite
0614
0615 error_tmp_dict["piloterrorcode"] = job_spec.pilotErrorCode
0616 error_tmp_dict["exeerrorcode"] = job_spec.exeErrorCode
0617 error_tmp_dict["superrorcode"] = job_spec.supErrorCode
0618 error_tmp_dict["ddmerrorcode"] = job_spec.ddmErrorCode
0619 error_tmp_dict["brokerageerrorcode"] = job_spec.brokerageErrorCode
0620 error_tmp_dict["jobdispatchererrorcode"] = job_spec.jobDispatcherErrorCode
0621 error_tmp_dict["taskbuffererrorcode"] = job_spec.taskBufferErrorCode
0622 error_tmp_dict["piloterrordiag"] = job_spec.pilotErrorDiag
0623 error_tmp_dict["exeerrordiag"] = job_spec.exeErrorDiag
0624 error_tmp_dict["superrordiag"] = job_spec.supErrorDiag
0625 error_tmp_dict["ddmerrordiag"] = job_spec.ddmErrorDiag
0626 error_tmp_dict["brokerageerrordiag"] = job_spec.brokerageErrorDiag
0627 error_tmp_dict["jobdispatchererrordiag"] = job_spec.jobDispatcherErrorDiag
0628 error_tmp_dict["taskbuffererrordiag"] = job_spec.taskBufferErrorDiag
0629
0630 orig_msg_dict = {
0631 "msg_type": "job_status",
0632 "jobid": panda_id,
0633 "taskid": jedi_task_id,
0634 "status": status,
0635 "timestamp": now_ts,
0636 }
0637 update_msg_dict = {
0638 "computingsite": computingsite,
0639 "inputs": inputs if inputs else None,
0640 }
0641 update_msg_dict.update(error_tmp_dict)
0642 msg_dict = update_msg_dict.copy()
0643 if extra_data:
0644 msg_dict.update(extra_data)
0645 msg_dict.update(orig_msg_dict)
0646 msg = json.dumps(msg_dict)
0647 if mb_proxy.got_disconnected:
0648 mb_proxy.restart()
0649 mb_proxy.send(msg)
0650 tmp_log.debug(f"sent message: {msg}")
0651 except Exception:
0652 self.dump_error_message(tmp_log)
0653
0654 def insert_to_query_pool(self, topic, panda_id, task_id, sql, var_map, exec_order):
0655 comment = " /* DBProxy.insert_to_query_pool */"
0656 sqlI = (
0657 "INSERT INTO {}.SQL_QUEUE (topic,PandaID,jediTaskID,creationTime,data,execution_order) "
0658 "VALUES(:topic,:PandaID,:taskID,:creationTime,:data,:execution_order) ".format(panda_config.schemaPANDA)
0659 )
0660 varMap = {
0661 ":topic": topic,
0662 ":PandaID": panda_id,
0663 ":taskID": task_id,
0664 ":creationTime": naive_utcnow(),
0665 ":execution_order": exec_order,
0666 ":data": json.dumps((sql, var_map)),
0667 }
0668 self.cur.execute(sqlI + comment, varMap)
0669
0670
0671 def isNoWaitException(self, errValue):
0672
0673 ora_err_code = str(errValue).split()[0]
0674 ora_err_code = ora_err_code[:-1]
0675 if ora_err_code == "ORA-00054":
0676 return True
0677
0678 if type(errValue).__name__ == "LockNotAvailable":
0679 return True
0680 return False
0681
0682
0683 def setSuperStatus_JEDI(self, jediTaskID, superStatus):
0684 comment = " /* JediDBProxy.setSuperStatus_JEDI */"
0685 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
0686 retTasks = []
0687 try:
0688
0689 sqlCT = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
0690 sqlCT += "SET superStatus=:superStatus "
0691 sqlCT += "WHERE jediTaskID=:jediTaskID "
0692
0693 varMap = {}
0694 varMap[":jediTaskID"] = jediTaskID
0695 varMap[":superStatus"] = superStatus
0696 self.cur.execute(sqlCT + comment, varMap)
0697 return True
0698 except Exception:
0699
0700 self.dump_error_message(tmpLog)
0701 return False
0702
0703
0704 def setDeftStatus_JEDI(self, jediTaskID, taskStatus):
0705 comment = " /* JediDBProxy.setDeftStatus_JEDI */"
0706 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
0707 try:
0708 sqlD = f"UPDATE {panda_config.schemaDEFT}.T_TASK "
0709 sqlD += "SET status=:status,timeStamp=CURRENT_DATE "
0710 sqlD += "WHERE taskID=:jediTaskID "
0711 varMap = {}
0712 varMap[":status"] = taskStatus
0713 varMap[":jediTaskID"] = jediTaskID
0714 tmpLog.debug(sqlD + comment + str(varMap))
0715 self.cur.execute(sqlD + comment, varMap)
0716 return True
0717 except Exception:
0718
0719 self.dump_error_message(tmpLog)
0720 return False
0721
0722
0723 def record_task_status_change(self, jedi_task_id):
0724 comment = " /* JediDBProxy.record_task_status_change */"
0725 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id}")
0726 tmpLog.debug("start")
0727 varMap = dict()
0728 varMap[":jediTaskID"] = jedi_task_id
0729 varMap[":modificationHost"] = socket.getfqdn()
0730
0731 sqlNS = (
0732 "INSERT INTO {0}.TASKS_STATUSLOG "
0733 "(jediTaskID,modificationTime,status,modificationHost,attemptNr,reason) "
0734 "SELECT jediTaskID,CURRENT_TIMESTAMP,status,:modificationHost,attemptNr,"
0735 "SUBSTR(errorDialog,0,255) "
0736 "FROM {0}.JEDI_Tasks WHERE jediTaskID=:jediTaskID "
0737 ).format(panda_config.schemaJEDI)
0738 self.cur.execute(sqlNS + comment, varMap)
0739 tmpLog.debug("done")
0740
0741
0742 def push_task_status_message(self, task_spec, jedi_task_id, status, split_rule=None):
0743 to_push = False
0744 if task_spec is not None:
0745 to_push = task_spec.push_status_changes()
0746 elif split_rule is not None:
0747 to_push = task_push_status_changes(split_rule)
0748
0749 if not to_push:
0750 return
0751
0752
0753
0754 comment = " /* JediDBProxy.push_task_status_message */"
0755 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id}")
0756 tmpLog.debug("start")
0757
0758 try:
0759 now_time = naive_utcnow()
0760 now_ts = int(now_time.timestamp())
0761 msg_dict = {
0762 "msg_type": "task_status",
0763 "taskid": jedi_task_id,
0764 "status": status,
0765 "timestamp": now_ts,
0766 }
0767 msg = json.dumps(msg_dict)
0768 if self.jedi_mb_proxy_dict is None:
0769 self.jedi_mb_proxy_dict = self.jedi_mb_proxy_dict_setter()
0770 if self.jedi_mb_proxy_dict is None:
0771 tmpLog.debug("Failed to get mb_proxy of internal MQs. Skipped ")
0772 return
0773 try:
0774 mb_proxy = self.jedi_mb_proxy_dict["out"]["jedi_jobtaskstatus"]
0775 except KeyError as e:
0776 tmpLog.warning(f"Skipped due to {e} ; jedi_mb_proxy_dict is {self.jedi_mb_proxy_dict}")
0777 return
0778 if mb_proxy.got_disconnected:
0779 mb_proxy.restart()
0780 mb_proxy.send(msg)
0781 except Exception:
0782 self.dump_error_message(tmpLog)
0783 tmpLog.debug("done")
0784
0785
0786 def push_task_trigger_message(self, msg_type, jedi_task_id, data_dict=None, priority=None, task_spec=None):
0787 comment = " /* JediDBProxy.push_task_trigger_message */"
0788 tmpLog = self.create_tagged_logger(comment, f"msg_type={msg_type} jediTaskID={jedi_task_id}")
0789 tmpLog.debug("start")
0790
0791 try:
0792 now_time = naive_utcnow()
0793 now_ts = int(now_time.timestamp())
0794
0795 msg_dict = {}
0796 if data_dict:
0797 msg_dict.update(data_dict)
0798 msg_dict.update(
0799 {
0800 "msg_type": msg_type,
0801 "taskid": jedi_task_id,
0802 "timestamp": now_ts,
0803 }
0804 )
0805 msg = json.dumps(msg_dict)
0806 if self.jedi_mb_proxy_dict is None:
0807 self.jedi_mb_proxy_dict = self.jedi_mb_proxy_dict_setter()
0808 if self.jedi_mb_proxy_dict is None:
0809 tmpLog.debug("Failed to get mb_proxy of internal MQs. Skipped ")
0810 return
0811 try:
0812 mq_name = msg_type
0813 mb_proxy = self.jedi_mb_proxy_dict["out"][mq_name]
0814 except KeyError as e:
0815 tmpLog.warning(f"Skipped due to {e} ; jedi_mb_proxy_dict is {self.jedi_mb_proxy_dict}")
0816 return
0817 if mb_proxy.got_disconnected:
0818 mb_proxy.restart()
0819
0820 msg_priority = None
0821 if priority is not None:
0822 msg_priority = priority
0823 elif task_spec is not None:
0824 try:
0825 if task_spec.prodSourceLabel == "user":
0826 if task_spec.gshare in ["User Analysis", "Express Analysis"]:
0827 msg_priority = 2
0828 else:
0829 msg_priority = 1
0830 except AttributeError:
0831 pass
0832
0833 if msg_priority is not None:
0834 mb_proxy.send(msg, priority=msg_priority)
0835 else:
0836 mb_proxy.send(msg)
0837 except Exception:
0838 self.dump_error_message(tmpLog)
0839 return
0840 tmpLog.debug("done")
0841 return True