Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:02

0001 import atexit
0002 import datetime
0003 import json
0004 import socket
0005 import sys
0006 import time
0007 import traceback
0008 from contextlib import contextmanager
0009 
0010 from pandacommon.pandalogger.LogWrapper import LogWrapper
0011 from pandacommon.pandautils.PandaUtils import naive_utcnow
0012 
0013 from pandaserver.config import panda_config
0014 from pandaserver.taskbuffer.JediTaskSpec import (
0015     push_status_changes as task_push_status_changes,
0016 )
0017 from pandaserver.taskbuffer.JobSpec import (
0018     push_status_changes as job_push_status_changes,
0019 )
0020 
0021 if panda_config.backend == "oracle":
0022     import oracledb
0023 
0024     varNUMBER = oracledb.NUMBER
0025 else:
0026     varNUMBER = int
0027 
0028 # topics in SQL_QUEUE
0029 SQL_QUEUE_TOPIC_async_dataset_update = "async_dataset_update"
0030 
0031 
0032 # Internal caching of a result. Use only for information with low update frequency and low memory footprint
0033 def memoize(f):
0034     memo = {}
0035     kwd_mark = object()
0036 
0037     def helper(self, *args, **kwargs):
0038         now = datetime.datetime.now()
0039         key = args + (kwd_mark,) + tuple(sorted(kwargs.items()))
0040         if key not in memo or memo[key]["timestamp"] < now - datetime.timedelta(hours=1):
0041             tmp_data = {"value": f(self, *args, **kwargs), "timestamp": now}
0042             memo[key] = tmp_data
0043         return memo[key]["value"]
0044 
0045     return helper
0046 
0047 
0048 # convert dict to bind variable dict
0049 def convert_dict_to_bind_vars(item):
0050     ret = dict()
0051     for k in item:
0052         ret[f":{k}"] = item[k]
0053     return ret
0054 
0055 
0056 # Base class for DB proxy modules
0057 class BaseModule:
0058     # constructor
0059     def __init__(self, log_stream: LogWrapper):
0060         self._log_stream = log_stream
0061         self.conn = None
0062         self.cur = None
0063         self.mb_proxy_dict = None
0064         self.useOtherError = False
0065         self.backend = panda_config.backend
0066         # retry count
0067         self.nTry = 5
0068         # hostname
0069         self.myHostName = socket.getfqdn()
0070         self.backend = panda_config.backend
0071         # host name
0072         self.hostname = None
0073         # composite modules
0074         self.composite_modules = {}
0075 
0076         # typical input cache
0077         self.typical_input_cache = {}
0078 
0079         # list of work queues
0080         self.workQueueMap = None
0081         # update time for work queue map
0082         self.updateTimeForWorkQueue = None
0083 
0084         # mb proxy for JEDI
0085         self.jedi_mb_proxy_dict = None
0086         self.jedi_mb_proxy_dict_setter = None
0087 
0088         # JEDI config
0089         self.jedi_config = None
0090 
0091     # set JEDI attributes
0092     def set_jedi_attributes(self, jedi_config, jedi_mb_proxy_dict_setter):
0093         self.jedi_config = jedi_config
0094         self.jedi_mb_proxy_dict_setter = jedi_mb_proxy_dict_setter
0095 
0096     # abstract method to commit
0097     def connect(self, **kwargs):
0098         """
0099         Commit the transaction
0100         """
0101         raise NotImplementedError("connect is not implemented")
0102 
0103     # commit
0104     def _commit(self):
0105         try:
0106             self.conn.commit()
0107             return True
0108         except Exception:
0109             self._log_stream.error("commit error")
0110             return False
0111 
0112     # rollback
0113     def _rollback(self, useOtherError=False):
0114         return_value = True
0115         # rollback
0116         err_code = None
0117         self._log_stream.debug("rollback")
0118         try:
0119             self.conn.rollback()
0120         except Exception:
0121             self._log_stream.error("rollback error")
0122             return_value = False
0123         # reconnect if needed
0124         try:
0125             err_type, err_value = sys.exc_info()[:2]
0126             # get error code for postgres
0127             if self.backend == "postgres":
0128                 try:
0129                     err_code = err_value.pgcode
0130                 except Exception:
0131                     pass
0132             # get ORA ErrorCode
0133             if err_code is None:
0134                 err_code = str(err_value).split()[0]
0135                 err_code = err_code[:-1]
0136             err_msg = f"rollback EC:{err_code} {err_value}"
0137             self._log_stream.debug(err_msg)
0138             # error codes for connection error
0139             if self.backend == "oracle":
0140                 error_list_for_reconnect = [
0141                     "ORA-01012",
0142                     "ORA-01033",
0143                     "ORA-01034",
0144                     "ORA-01089",
0145                     "ORA-03113",
0146                     "ORA-03114",
0147                     "ORA-12203",
0148                     "ORA-12500",
0149                     "ORA-12571",
0150                     "ORA-03135",
0151                     "ORA-25402",
0152                 ]
0153                 # other errors are apparently given when connection lost contact
0154                 if useOtherError:
0155                     error_list_for_reconnect += ["ORA-01861", "ORA-01008"]
0156             elif self.backend == "postgres":
0157                 import psycopg2.errorcodes as psycopg_errorcodes
0158 
0159                 error_list_for_reconnect = [
0160                     psycopg_errorcodes.CONNECTION_EXCEPTION,
0161                     psycopg_errorcodes.SQLSERVER_REJECTED_ESTABLISHMENT_OF_SQLCONNECTION,
0162                     psycopg_errorcodes.CONNECTION_DOES_NOT_EXIST,
0163                     psycopg_errorcodes.SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION,
0164                     psycopg_errorcodes.CONNECTION_FAILURE,
0165                     psycopg_errorcodes.READ_ONLY_SQL_TRANSACTION,
0166                 ]
0167             else:
0168                 # mysql error codes for connection error
0169                 import MySQLdb
0170                 from MySQLdb.constants.CR import (
0171                     CONN_HOST_ERROR,
0172                     CONNECTION_ERROR,
0173                     LOCALHOST_CONNECTION,
0174                     SERVER_LOST,
0175                 )
0176                 from MySQLdb.constants.ER import (
0177                     ACCESS_DENIED_ERROR,
0178                     DBACCESS_DENIED_ERROR,
0179                     ILLEGAL_VALUE_FOR_TYPE,
0180                     SERVER_SHUTDOWN,
0181                 )
0182 
0183                 error_list_for_reconnect = [
0184                     ACCESS_DENIED_ERROR,
0185                     DBACCESS_DENIED_ERROR,
0186                     SERVER_SHUTDOWN,
0187                     CONNECTION_ERROR,
0188                     CONN_HOST_ERROR,
0189                     LOCALHOST_CONNECTION,
0190                     SERVER_LOST,
0191                 ]
0192                 # other errors are apparently given when connection lost contact
0193                 if useOtherError:
0194                     error_list_for_reconnect += [ILLEGAL_VALUE_FOR_TYPE]
0195             if err_code in error_list_for_reconnect:
0196                 # reconnect
0197                 reconnect_stat = self.connect(reconnect=True)
0198                 self._log_stream.debug(f"rollback reconnected {reconnect_stat}")
0199         except Exception:
0200             pass
0201         # return
0202         return return_value
0203 
0204     # add composite module
0205     def add_composite_module(self, module_name, module):
0206         self.composite_modules[module_name] = module
0207 
0208     # get composite module
0209     def get_composite_module(self, module_name):
0210         return self.composite_modules.get(module_name, None)
0211 
0212     # dump error message
0213     def dump_error_message(self, tmp_log: LogWrapper):
0214         """
0215         Dump error message to the log
0216 
0217         :param tmp_log: log wrapper
0218         """
0219         # error
0220         err_type, err_value = sys.exc_info()[:2]
0221         err_str = f"{err_type.__name__} {err_value}"
0222         err_str.strip()
0223         err_str += " "
0224         err_str += traceback.format_exc()
0225         tmp_log.error(err_str)
0226 
0227     # create logger with tag
0228     def create_tagged_logger(self, comment: str, tag: str = None) -> LogWrapper:
0229         """
0230         Create logger from function comment and tag
0231 
0232         param comment: comment of the function
0233         param tag: tag to add to the method name
0234         return: logger
0235         """
0236         method_name = comment.split(" ")[-2].split(".")[-1]
0237         if tag is None:
0238             tag = naive_utcnow().strftime("%Y-%m-%d/%H:%M:%S.%f")
0239         method_name += f" < {tag} >"
0240         tmp_log = LogWrapper(self._log_stream, method_name)
0241         return tmp_log
0242 
0243     # get configuration value. cached for an hour
0244     @memoize
0245     def getConfigValue(self, component, key, app="pandaserver", vo=None):
0246         comment = " /* DBProxy.getConfigValue */"
0247         tmp_log = self.create_tagged_logger(comment)
0248         varMap = {":component": component, ":key": key, ":app": app}
0249         sql = """
0250         SELECT value, value_json, type FROM ATLAS_PANDA.CONFIG
0251         WHERE component=:component
0252         AND key=:key
0253         AND app=:app
0254         """
0255 
0256         # If VO is specified, select only the config values for this VO or VO independent values
0257         if vo:
0258             varMap[":vo"] = vo
0259             sql += "AND (vo=:vo or vo IS NULL)"
0260 
0261         self.cur.execute(sql + comment, varMap)
0262 
0263         try:
0264             value_str, value_json_str, type = self.cur.fetchone()
0265         except TypeError:
0266             error_message = f"Specified key={key} not found for component={component} app={app}"
0267             tmp_log.debug(error_message)
0268             return None
0269 
0270         try:
0271             if type.lower() in ("str", "string"):
0272                 return value_str
0273             elif type.lower() in ("int", "integer"):
0274                 return int(value_str)
0275             elif type.lower() == "float":
0276                 return float(value_str)
0277             elif type.lower() in ("bool", "boolean"):
0278                 if value_str.lower() == "true":
0279                     return True
0280                 else:
0281                     return False
0282             elif type.lower() == "json":
0283                 return json.loads(value_json_str)
0284             else:
0285                 raise ValueError
0286         except json.decoder.JSONDecodeError:
0287             tmp_log.debug(f"Could not decode. Value_json: {value_json_str}, Type: {type}")
0288             return None
0289         except ValueError:
0290             tmp_log.debug(f"Wrong value/type pair. Value: {value_str}, Type: {type}")
0291             return None
0292         except Exception as e:
0293             tmp_log.debug(f"Unexpected error: {str(e)}")
0294             raise e
0295 
0296     def getvalue_corrector(self, value):
0297         """
0298         Needed to support old and new versions of cx_Oracle
0299         :return:
0300         """
0301         if isinstance(value, list):  # cx_Oracle version >= 6.3
0302             return value[0]
0303         else:  # cx_Oracle version < 6.3
0304             return value
0305 
0306     # get mb proxy
0307     def get_mb_proxy(self, channel):
0308         if self.mb_proxy_dict is None:
0309             try:
0310                 if hasattr(panda_config, "mq_configFile") and panda_config.mq_configFile:
0311                     # delay import to open logger file inside python daemon
0312                     from pandaserver.taskbuffer.PanDAMsgProcessor import MsgProcAgent
0313 
0314                     out_q_list = [
0315                         "panda_jobstatus",
0316                         "panda_jedi",
0317                         "panda_pilot_topic",
0318                         "panda_pilot_queue",
0319                     ]
0320                     mp_agent = MsgProcAgent(config_file=panda_config.mq_configFile)
0321                     mb_proxy_dict = mp_agent.start_passive_mode(in_q_list=[], out_q_list=out_q_list)
0322                     # stop with atexit
0323                     atexit.register(mp_agent.stop_passive_mode)
0324                     # return
0325                     self.mb_proxy_dict = mb_proxy_dict
0326             except Exception:
0327                 comment = " /* DBProxy.get_mb_proxy */"
0328                 tmp_log = self.create_tagged_logger(comment)
0329                 self.dump_error_message(tmp_log)
0330                 self.mb_proxy_dict = {}
0331         if not self.mb_proxy_dict or channel not in self.mb_proxy_dict["out"]:
0332             return None
0333         return self.mb_proxy_dict["out"][channel]
0334 
0335     # close connection
0336     def close_connection(self):
0337         if self.conn:
0338             try:
0339                 self.conn.close()
0340             except Exception:
0341                 pass
0342         return
0343 
0344     # cleanup
0345     def cleanup(self):
0346         comment = " /* DBProxy.cleanup */"
0347         tmp_log = self.create_tagged_logger(comment)
0348         tmp_log.debug("start")
0349         self.close_connection()
0350         atexit.unregister(self.close_connection)
0351         tmp_log.debug("done")
0352 
0353     # query an SQL
0354     def querySQL(self, sql, arraySize=1000):
0355         comment = " /* DBProxy.querySQL */"
0356         tmp_log = self.create_tagged_logger(comment)
0357         try:
0358             tmp_log.debug(f"SQL={sql} ")
0359             # begin transaction
0360             self.conn.begin()
0361             self.cur.arraysize = arraySize
0362             self.cur.execute(sql + comment)
0363             res = self.cur.fetchall()
0364             # commit
0365             if not self._commit():
0366                 raise RuntimeError("Commit error")
0367             return res
0368         except Exception:
0369             # roll back
0370             self._rollback(self.useOtherError)
0371             self.dump_error_message(tmp_log)
0372             return None
0373 
0374     # query an SQL return Status
0375     def querySQLS(self, sql, varMap, arraySize=1000):
0376         comment = " /* DBProxy.querySQLS */"
0377         tmp_log = self.create_tagged_logger(comment)
0378         try:
0379             tmp_log.debug(f"SQL={sql} vapMap={varMap} ")
0380             # begin transaction
0381             self.conn.begin()
0382             self.cur.arraysize = arraySize
0383             ret = self.cur.execute(sql + comment, varMap)
0384             if ret:
0385                 ret = True
0386             if sql.startswith("INSERT") or sql.startswith("UPDATE") or sql.startswith("DELETE"):
0387                 res = self.cur.rowcount
0388             else:
0389                 res = self.cur.fetchall()
0390             # commit
0391             if not self._commit():
0392                 raise RuntimeError("Commit error")
0393             return ret, res
0394         except Exception as e:
0395             # roll back
0396             self._rollback(self.useOtherError)
0397             self.dump_error_message(tmp_log)
0398             return -1, None
0399 
0400     # execute an SQL return with executemany
0401     def executemanySQL(self, sql, varMaps, arraySize=1000):
0402         comment = " /* DBProxy.executemanySQL */"
0403         try:
0404             # begin transaction
0405             self.conn.begin()
0406             self.cur.arraysize = arraySize
0407             ret = self.cur.executemany(sql + comment, varMaps)
0408             if sql.startswith("INSERT") or sql.startswith("UPDATE") or sql.startswith("DELETE"):
0409                 res = self.cur.rowcount
0410             else:
0411                 raise RuntimeError("Operation unsupported. Only INSERT, UPDATE, DELETE are allowed")
0412             # commit
0413             if not self._commit():
0414                 raise RuntimeError("Commit error")
0415             return res
0416         except Exception as e:
0417             # roll back
0418             self._rollback(self.useOtherError)
0419             tmp_log = self.create_tagged_logger(comment)
0420             tmp_log.error(f"{sql} {str(varMaps)}")
0421             self.dump_error_message(tmp_log)
0422             return None
0423 
0424     # get CLOB
0425     def getClobObj(self, sql, varMap, arraySize=10000, use_commit=True):
0426         comment = " /* DBProxy.getClobObj */"
0427         try:
0428             # begin transaction
0429             if use_commit:
0430                 self.conn.begin()
0431                 self.cur.arraysize = arraySize
0432             ret = self.cur.execute(sql + comment, varMap)
0433             if ret:
0434                 ret = True
0435             res = []
0436             for items in self.cur:
0437                 resItem = []
0438                 for item in items:
0439                     # read CLOB
0440                     try:
0441                         itemRead = item.read()
0442                     except AttributeError:
0443                         itemRead = item
0444                     resItem.append(itemRead)
0445                 # append
0446                 res.append(resItem)
0447             # commit
0448             if use_commit:
0449                 if not self._commit():
0450                     raise RuntimeError("Commit error")
0451             return ret, res
0452         except Exception as e:
0453             # roll back
0454             if use_commit:
0455                 self._rollback()
0456             tmp_log = self.create_tagged_logger(comment)
0457             tmp_log.error(f"{sql} {str(varMap)}")
0458             self.dump_error_message(tmp_log)
0459             return -1, None
0460 
0461     # wake up connection
0462     def wakeUp(self):
0463         comment = " /* DBProxy.wakeUp */"
0464         tmp_log = self.create_tagged_logger(comment)
0465         for iTry in range(5):
0466             try:
0467                 # check if the connection is working
0468                 self.conn.ping()
0469                 return
0470             except Exception:
0471                 tmp_log.error(f"{iTry} : connection is dead")
0472                 self.dump_error_message(tmp_log)
0473                 # wait for reconnection
0474                 time.sleep(1)
0475                 self.connect(reconnect=True)
0476 
0477     # transaction as a context manager
0478     @contextmanager
0479     def transaction(self, name: str | None = None, tmp_log=None):
0480         """
0481         Context manager for transaction
0482 
0483         Args:
0484             name (str, optional): name of the transaction to be shown in the log
0485             tmp_log (LogWrapper, optional): logger to use. If None, a new logger will be created
0486 
0487         Yields:
0488             Any: the cursor object for executing SQL commands
0489             Any: the logger object for logging in DBProxy
0490         """
0491         comment = " /* DBProxy.transaction */"
0492         try:
0493             if tmp_log is None:
0494                 tmp_log = self.create_tagged_logger(comment, tag=name)
0495             tmp_log.debug("transaction start")
0496             # begin transaction
0497             self.conn.begin()
0498             # cursor and logger for the with block
0499             yield (self.cur, tmp_log)
0500             # commit transaction
0501             if not self._commit():
0502                 raise RuntimeError("commit error")
0503             tmp_log.debug("transaction done")
0504         except Exception as e:
0505             # roll back
0506             self._rollback()
0507             self.dump_error_message(tmp_log)
0508             raise e
0509 
0510     # record status change
0511     def recordStatusChange(self, pandaID, jobStatus, jobInfo=None, infoMap={}, useCommit=True, no_late_bulk_exec=True, extracted_sqls=None):
0512         comment = " /* DBProxy.recordStatusChange */"
0513         tmp_log = self.create_tagged_logger(comment)
0514         # check config
0515         if not hasattr(panda_config, "record_statuschange") or panda_config.record_statuschange is not True:
0516             return
0517         # get job info
0518         varMap = {}
0519         varMap[":PandaID"] = pandaID
0520         varMap[":jobStatus"] = jobStatus
0521         varMap[":modificationHost"] = self.myHostName
0522         if jobInfo is not None:
0523             varMap[":computingSite"] = jobInfo.computingSite
0524             varMap[":cloud"] = jobInfo.cloud
0525             varMap[":prodSourceLabel"] = jobInfo.prodSourceLabel
0526         elif infoMap is not None:
0527             varMap[":computingSite"] = infoMap["computingSite"]
0528             varMap[":cloud"] = infoMap["cloud"]
0529             varMap[":prodSourceLabel"] = infoMap["prodSourceLabel"]
0530         else:
0531             # no info
0532             return
0533         # convert NULL to None
0534         for tmpKey in varMap:
0535             if varMap[tmpKey] == "NULL":
0536                 varMap[tmpKey] = None
0537         # insert
0538         sql = "INSERT INTO ATLAS_PANDA.jobs_StatusLog "
0539         sql += "(PandaID,modificationTime,jobStatus,prodSourceLabel,cloud,computingSite,modificationHost,modiftime_extended) "
0540         sql += "VALUES (:PandaID,CURRENT_DATE,:jobStatus,:prodSourceLabel,:cloud,:computingSite,:modificationHost,CURRENT_TIMESTAMP) "
0541         try:
0542             # start transaction
0543             if no_late_bulk_exec:
0544                 if useCommit:
0545                     self.conn.begin()
0546                 self.cur.execute(sql + comment, varMap)
0547                 # commit
0548                 if useCommit:
0549                     if not self._commit():
0550                         raise RuntimeError("Commit error")
0551             else:
0552                 extracted_sqls.setdefault("state_change", {"sql": sql + comment, "vars": []})
0553                 extracted_sqls["state_change"]["vars"].append(varMap)
0554         except Exception:
0555             # roll back
0556             if useCommit and no_late_bulk_exec:
0557                 self._rollback()
0558             self.dump_error_message(tmp_log)
0559             if not useCommit:
0560                 raise RuntimeError("recordStatusChange failed")
0561         return
0562 
0563     def push_job_status_message(
0564         self,
0565         job_spec,
0566         panda_id,
0567         status,
0568         jedi_task_id=None,
0569         special_handling=None,
0570         extra_data=None,
0571     ):
0572         comment = " /* DBProxy.push_job_status_message */"
0573         if not (hasattr(panda_config, "mq_configFile") and panda_config.mq_configFile):
0574             # skip if not configured
0575             return
0576         to_push = False
0577         if special_handling is not None:
0578             to_push = job_push_status_changes(special_handling)
0579         elif job_spec is not None:
0580             to_push = job_spec.push_status_changes()
0581         # only run if to push status change
0582         if not to_push:
0583             return
0584         # skip statuses unnecessary to push
0585         if status in ["sent", "holding", "merging"]:
0586             return
0587         # skip if no mb to push to
0588         mb_proxy = self.get_mb_proxy("panda_jobstatus")
0589         if not mb_proxy:
0590             return
0591         if to_push:
0592             tmp_log = self.create_tagged_logger(comment)
0593             # push job status change
0594             try:
0595                 now_time = naive_utcnow()
0596                 now_ts = int(now_time.timestamp())
0597                 # init
0598                 inputs = []
0599                 computingsite = None
0600                 error_tmp_dict = {}
0601                 # info from job spec
0602                 if job_spec is not None:
0603                     # task id
0604                     if jedi_task_id is None:
0605                         jedi_task_id = job_spec.jediTaskID
0606                     # inputs
0607                     if job_spec.Files is not None:
0608                         for file_spec in job_spec.Files:
0609                             if file_spec.type in ["input", "pseudo_input"]:
0610                                 inputs.append(file_spec.lfn)
0611                     # computing site
0612                     if job_spec.computingSite is not None:
0613                         computingsite = job_spec.computingSite
0614                     # error codes and diags
0615                     error_tmp_dict["piloterrorcode"] = job_spec.pilotErrorCode
0616                     error_tmp_dict["exeerrorcode"] = job_spec.exeErrorCode
0617                     error_tmp_dict["superrorcode"] = job_spec.supErrorCode
0618                     error_tmp_dict["ddmerrorcode"] = job_spec.ddmErrorCode
0619                     error_tmp_dict["brokerageerrorcode"] = job_spec.brokerageErrorCode
0620                     error_tmp_dict["jobdispatchererrorcode"] = job_spec.jobDispatcherErrorCode
0621                     error_tmp_dict["taskbuffererrorcode"] = job_spec.taskBufferErrorCode
0622                     error_tmp_dict["piloterrordiag"] = job_spec.pilotErrorDiag
0623                     error_tmp_dict["exeerrordiag"] = job_spec.exeErrorDiag
0624                     error_tmp_dict["superrordiag"] = job_spec.supErrorDiag
0625                     error_tmp_dict["ddmerrordiag"] = job_spec.ddmErrorDiag
0626                     error_tmp_dict["brokerageerrordiag"] = job_spec.brokerageErrorDiag
0627                     error_tmp_dict["jobdispatchererrordiag"] = job_spec.jobDispatcherErrorDiag
0628                     error_tmp_dict["taskbuffererrordiag"] = job_spec.taskBufferErrorDiag
0629                 # message
0630                 orig_msg_dict = {
0631                     "msg_type": "job_status",
0632                     "jobid": panda_id,
0633                     "taskid": jedi_task_id,
0634                     "status": status,
0635                     "timestamp": now_ts,
0636                 }
0637                 update_msg_dict = {
0638                     "computingsite": computingsite,
0639                     "inputs": inputs if inputs else None,
0640                 }
0641                 update_msg_dict.update(error_tmp_dict)
0642                 msg_dict = update_msg_dict.copy()
0643                 if extra_data:
0644                     msg_dict.update(extra_data)
0645                 msg_dict.update(orig_msg_dict)
0646                 msg = json.dumps(msg_dict)
0647                 if mb_proxy.got_disconnected:
0648                     mb_proxy.restart()
0649                 mb_proxy.send(msg)
0650                 tmp_log.debug(f"sent message: {msg}")
0651             except Exception:
0652                 self.dump_error_message(tmp_log)
0653 
0654     def insert_to_query_pool(self, topic, panda_id, task_id, sql, var_map, exec_order):
0655         comment = " /* DBProxy.insert_to_query_pool */"
0656         sqlI = (
0657             "INSERT INTO {}.SQL_QUEUE (topic,PandaID,jediTaskID,creationTime,data,execution_order) "
0658             "VALUES(:topic,:PandaID,:taskID,:creationTime,:data,:execution_order) ".format(panda_config.schemaPANDA)
0659         )
0660         varMap = {
0661             ":topic": topic,
0662             ":PandaID": panda_id,
0663             ":taskID": task_id,
0664             ":creationTime": naive_utcnow(),
0665             ":execution_order": exec_order,
0666             ":data": json.dumps((sql, var_map)),
0667         }
0668         self.cur.execute(sqlI + comment, varMap)
0669 
0670     # check if exception is from NOWAIT
0671     def isNoWaitException(self, errValue):
0672         # for oracle
0673         ora_err_code = str(errValue).split()[0]
0674         ora_err_code = ora_err_code[:-1]
0675         if ora_err_code == "ORA-00054":
0676             return True
0677         # for postgres
0678         if type(errValue).__name__ == "LockNotAvailable":
0679             return True
0680         return False
0681 
0682     # set super status
0683     def setSuperStatus_JEDI(self, jediTaskID, superStatus):
0684         comment = " /* JediDBProxy.setSuperStatus_JEDI */"
0685         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
0686         retTasks = []
0687         try:
0688             # sql to set super status
0689             sqlCT = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
0690             sqlCT += "SET superStatus=:superStatus "
0691             sqlCT += "WHERE jediTaskID=:jediTaskID "
0692             # set super status
0693             varMap = {}
0694             varMap[":jediTaskID"] = jediTaskID
0695             varMap[":superStatus"] = superStatus
0696             self.cur.execute(sqlCT + comment, varMap)
0697             return True
0698         except Exception:
0699             # error
0700             self.dump_error_message(tmpLog)
0701             return False
0702 
0703     # set DEFT status
0704     def setDeftStatus_JEDI(self, jediTaskID, taskStatus):
0705         comment = " /* JediDBProxy.setDeftStatus_JEDI */"
0706         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
0707         try:
0708             sqlD = f"UPDATE {panda_config.schemaDEFT}.T_TASK "
0709             sqlD += "SET status=:status,timeStamp=CURRENT_DATE "
0710             sqlD += "WHERE taskID=:jediTaskID "
0711             varMap = {}
0712             varMap[":status"] = taskStatus
0713             varMap[":jediTaskID"] = jediTaskID
0714             tmpLog.debug(sqlD + comment + str(varMap))
0715             self.cur.execute(sqlD + comment, varMap)
0716             return True
0717         except Exception:
0718             # error
0719             self.dump_error_message(tmpLog)
0720             return False
0721 
0722     # task status logging
0723     def record_task_status_change(self, jedi_task_id):
0724         comment = " /* JediDBProxy.record_task_status_change */"
0725         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id}")
0726         tmpLog.debug("start")
0727         varMap = dict()
0728         varMap[":jediTaskID"] = jedi_task_id
0729         varMap[":modificationHost"] = socket.getfqdn()
0730         # sql
0731         sqlNS = (
0732             "INSERT INTO {0}.TASKS_STATUSLOG "
0733             "(jediTaskID,modificationTime,status,modificationHost,attemptNr,reason) "
0734             "SELECT jediTaskID,CURRENT_TIMESTAMP,status,:modificationHost,attemptNr,"
0735             "SUBSTR(errorDialog,0,255) "
0736             "FROM {0}.JEDI_Tasks WHERE jediTaskID=:jediTaskID "
0737         ).format(panda_config.schemaJEDI)
0738         self.cur.execute(sqlNS + comment, varMap)
0739         tmpLog.debug("done")
0740 
0741     # push task status message
0742     def push_task_status_message(self, task_spec, jedi_task_id, status, split_rule=None):
0743         to_push = False
0744         if task_spec is not None:
0745             to_push = task_spec.push_status_changes()
0746         elif split_rule is not None:
0747             to_push = task_push_status_changes(split_rule)
0748         # only run if to push status change
0749         if not to_push:
0750             return
0751         # skip statuses unnecessary to push
0752         # if status in ['pending']:
0753         #     return
0754         comment = " /* JediDBProxy.push_task_status_message */"
0755         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id}")
0756         tmpLog.debug("start")
0757         # send task status messages to mq
0758         try:
0759             now_time = naive_utcnow()
0760             now_ts = int(now_time.timestamp())
0761             msg_dict = {
0762                 "msg_type": "task_status",
0763                 "taskid": jedi_task_id,
0764                 "status": status,
0765                 "timestamp": now_ts,
0766             }
0767             msg = json.dumps(msg_dict)
0768             if self.jedi_mb_proxy_dict is None:
0769                 self.jedi_mb_proxy_dict = self.jedi_mb_proxy_dict_setter()
0770                 if self.jedi_mb_proxy_dict is None:
0771                     tmpLog.debug("Failed to get mb_proxy of internal MQs. Skipped ")
0772                     return
0773             try:
0774                 mb_proxy = self.jedi_mb_proxy_dict["out"]["jedi_jobtaskstatus"]
0775             except KeyError as e:
0776                 tmpLog.warning(f"Skipped due to {e} ; jedi_mb_proxy_dict is {self.jedi_mb_proxy_dict}")
0777                 return
0778             if mb_proxy.got_disconnected:
0779                 mb_proxy.restart()
0780             mb_proxy.send(msg)
0781         except Exception:
0782             self.dump_error_message(tmpLog)
0783         tmpLog.debug("done")
0784 
0785     # push message to message processors which triggers functions of agents
0786     def push_task_trigger_message(self, msg_type, jedi_task_id, data_dict=None, priority=None, task_spec=None):
0787         comment = " /* JediDBProxy.push_task_trigger_message */"
0788         tmpLog = self.create_tagged_logger(comment, f"msg_type={msg_type} jediTaskID={jedi_task_id}")
0789         tmpLog.debug("start")
0790         # send task status messages to mq
0791         try:
0792             now_time = naive_utcnow()
0793             now_ts = int(now_time.timestamp())
0794             # get mbproxy
0795             msg_dict = {}
0796             if data_dict:
0797                 msg_dict.update(data_dict)
0798             msg_dict.update(
0799                 {
0800                     "msg_type": msg_type,
0801                     "taskid": jedi_task_id,
0802                     "timestamp": now_ts,
0803                 }
0804             )
0805             msg = json.dumps(msg_dict)
0806             if self.jedi_mb_proxy_dict is None:
0807                 self.jedi_mb_proxy_dict = self.jedi_mb_proxy_dict_setter()
0808                 if self.jedi_mb_proxy_dict is None:
0809                     tmpLog.debug("Failed to get mb_proxy of internal MQs. Skipped ")
0810                     return
0811             try:
0812                 mq_name = msg_type
0813                 mb_proxy = self.jedi_mb_proxy_dict["out"][mq_name]
0814             except KeyError as e:
0815                 tmpLog.warning(f"Skipped due to {e} ; jedi_mb_proxy_dict is {self.jedi_mb_proxy_dict}")
0816                 return
0817             if mb_proxy.got_disconnected:
0818                 mb_proxy.restart()
0819             # message priority
0820             msg_priority = None
0821             if priority is not None:
0822                 msg_priority = priority
0823             elif task_spec is not None:
0824                 try:
0825                     if task_spec.prodSourceLabel == "user":
0826                         if task_spec.gshare in ["User Analysis", "Express Analysis"]:
0827                             msg_priority = 2
0828                         else:
0829                             msg_priority = 1
0830                 except AttributeError:
0831                     pass
0832             # send message
0833             if msg_priority is not None:
0834                 mb_proxy.send(msg, priority=msg_priority)
0835             else:
0836                 mb_proxy.send(msg)
0837         except Exception:
0838             self.dump_error_message(tmpLog)
0839             return
0840         tmpLog.debug("done")
0841         return True