File indexing completed on 2026-04-10 08:39:06
0001 """
0002 proxy for database connection
0003
0004 """
0005
0006 import atexit
0007 import logging
0008 import warnings
0009
0010 from pandacommon.pandalogger.PandaLogger import PandaLogger
0011
0012 from pandaserver.config import panda_config
0013 from pandaserver.taskbuffer.db_proxy_mods import (
0014 data_carousel_module,
0015 entity_module,
0016 job_complex_module,
0017 job_standalone_module,
0018 metrics_module,
0019 misc_standalone_module,
0020 task_complex_module,
0021 task_event_module,
0022 task_standalone_module,
0023 task_utils_module,
0024 worker_module,
0025 workflow_module,
0026 )
0027 from pandaserver.taskbuffer.WrappedCursor import WrappedCursor
0028
0029 try:
0030 import idds.common.constants
0031 import idds.common.utils
0032 from idds.client.client import Client as iDDS_Client
0033 except ImportError:
0034 pass
0035
0036 if panda_config.backend == "oracle":
0037 import oracledb
0038
0039 from . import wrapped_oracle_conn
0040
0041 oracledb.init_oracle_client()
0042
0043 elif panda_config.backend == "postgres":
0044 import psycopg2 as psycopg
0045
0046 from . import WrappedPostgresConn
0047
0048 else:
0049 import MySQLdb
0050
0051 warnings.filterwarnings("ignore")
0052
0053
0054 _logger = PandaLogger().getLogger("DBProxy")
0055 _loggerFiltered = PandaLogger().getLogger("DBProxyFiltered")
0056
0057
0058 for handler in _loggerFiltered.handlers:
0059 handler.setLevel(logging.INFO)
0060 _logger.addHandler(handler)
0061 _loggerFiltered.removeHandler(handler)
0062
0063
0064
0065 class DBProxy(
0066 entity_module.EntityModule,
0067 metrics_module.MetricsModule,
0068 worker_module.WorkerModule,
0069 data_carousel_module.DataCarouselModule,
0070 task_event_module.TaskEventModule,
0071 job_complex_module.JobComplexModule,
0072 job_standalone_module.JobStandaloneModule,
0073 misc_standalone_module.MiscStandaloneModule,
0074 task_complex_module.TaskComplexModule,
0075 task_standalone_module.TaskStandaloneModule,
0076 task_utils_module.TaskUtilsModule,
0077 workflow_module.WorkflowModule,
0078 ):
0079
0080 def __init__(self, useOtherError=False):
0081
0082 super().__init__(_logger)
0083
0084 self.conn = None
0085
0086 self.cur = None
0087
0088
0089 self.useOtherError = useOtherError
0090
0091
0092 self.add_composite_module("entity", self)
0093 self.add_composite_module("metrics", self)
0094 self.add_composite_module("worker", self)
0095 self.add_composite_module("task_event", self)
0096 self.add_composite_module("task_utils", self)
0097 self.add_composite_module("job_complex", self)
0098
0099
0100 def connect(
0101 self,
0102 dbhost=panda_config.dbhost,
0103 dbpasswd=panda_config.dbpasswd,
0104 dbuser=panda_config.dbuser,
0105 dbname=panda_config.dbname,
0106 dbtimeout=panda_config.dbtimeout,
0107 reconnect=False,
0108 dbport=panda_config.dbport,
0109 ):
0110 _logger.debug(f"connect : re={reconnect}")
0111
0112 if not reconnect:
0113 self.dbhost = dbhost
0114 self.dbpasswd = dbpasswd
0115 self.dbuser = dbuser
0116 self.dbname = dbname
0117 self.dbtimeout = dbtimeout
0118 self.dbport = dbport
0119
0120 if reconnect:
0121 _logger.debug("closing old connection")
0122 try:
0123 self.conn.close()
0124 except Exception:
0125 _logger.debug("failed to close old connection")
0126
0127 try:
0128 if self.backend == "oracle":
0129 conn = oracledb.connect(dsn=self.dbhost, user=self.dbuser, password=self.dbpasswd)
0130
0131 def OutputTypeHandler(cursor, name, defaultType, size, precision, scale):
0132 if defaultType == oracledb.CLOB:
0133 return cursor.var(oracledb.LONG_STRING, arraysize=cursor.arraysize)
0134
0135 conn.outputtypehandler = OutputTypeHandler
0136 self.conn = wrapped_oracle_conn.WrappedOracleConn(conn)
0137
0138 elif self.backend == "postgres":
0139 dsn = {"dbname": self.dbname, "user": self.dbuser, "keepalives_idle": 30, "keepalives_interval": 30, "keepalives": 1}
0140 if self.dbpasswd:
0141 dsn["password"] = self.dbpasswd
0142 if self.dbhost:
0143 dsn["host"] = self.dbhost
0144 if self.dbport:
0145 dsn["port"] = self.dbport
0146 conn = psycopg.connect(**dsn)
0147 self.conn = WrappedPostgresConn.WrappedPostgresConn(conn)
0148 else:
0149 self.conn = MySQLdb.connect(
0150 host=self.dbhost,
0151 db=self.dbname,
0152 port=self.dbport,
0153 connect_timeout=self.dbtimeout,
0154 user=self.dbuser,
0155 passwd=self.dbpasswd,
0156 charset="utf8",
0157 )
0158 self.cur = WrappedCursor(self.conn)
0159 try:
0160
0161 if panda_config.dump_sql:
0162 from pandaserver.taskbuffer.SQLDumper import SQLDumper
0163
0164 self.cur = SQLDumper(self.cur)
0165 except Exception:
0166 pass
0167 self.hostname = self.cur.initialize()
0168 if not reconnect:
0169 atexit.register(self.close_connection)
0170 _logger.debug(f"connect : re={reconnect} ready")
0171 return True
0172 except Exception as e:
0173 _logger.error(f"connect : {str(e)}")
0174 return False