Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:06

0001 """
0002 proxy for database connection
0003 
0004 """
0005 
0006 import atexit
0007 import logging
0008 import warnings
0009 
0010 from pandacommon.pandalogger.PandaLogger import PandaLogger
0011 
0012 from pandaserver.config import panda_config
0013 from pandaserver.taskbuffer.db_proxy_mods import (
0014     data_carousel_module,
0015     entity_module,
0016     job_complex_module,
0017     job_standalone_module,
0018     metrics_module,
0019     misc_standalone_module,
0020     task_complex_module,
0021     task_event_module,
0022     task_standalone_module,
0023     task_utils_module,
0024     worker_module,
0025     workflow_module,
0026 )
0027 from pandaserver.taskbuffer.WrappedCursor import WrappedCursor
0028 
0029 try:
0030     import idds.common.constants
0031     import idds.common.utils
0032     from idds.client.client import Client as iDDS_Client
0033 except ImportError:
0034     pass
0035 
0036 if panda_config.backend == "oracle":
0037     import oracledb
0038 
0039     from . import wrapped_oracle_conn
0040 
0041     oracledb.init_oracle_client()
0042 
0043 elif panda_config.backend == "postgres":
0044     import psycopg2 as psycopg
0045 
0046     from . import WrappedPostgresConn
0047 
0048 else:
0049     import MySQLdb
0050 
0051 warnings.filterwarnings("ignore")
0052 
0053 # logger
0054 _logger = PandaLogger().getLogger("DBProxy")
0055 _loggerFiltered = PandaLogger().getLogger("DBProxyFiltered")
0056 
0057 # add handlers
0058 for handler in _loggerFiltered.handlers:
0059     handler.setLevel(logging.INFO)
0060     _logger.addHandler(handler)
0061     _loggerFiltered.removeHandler(handler)
0062 
0063 
0064 # proxy
0065 class DBProxy(
0066     entity_module.EntityModule,
0067     metrics_module.MetricsModule,
0068     worker_module.WorkerModule,
0069     data_carousel_module.DataCarouselModule,
0070     task_event_module.TaskEventModule,
0071     job_complex_module.JobComplexModule,
0072     job_standalone_module.JobStandaloneModule,
0073     misc_standalone_module.MiscStandaloneModule,
0074     task_complex_module.TaskComplexModule,
0075     task_standalone_module.TaskStandaloneModule,
0076     task_utils_module.TaskUtilsModule,
0077     workflow_module.WorkflowModule,
0078 ):
0079     # constructor
0080     def __init__(self, useOtherError=False):
0081         # init modules
0082         super().__init__(_logger)
0083         # connection object
0084         self.conn = None
0085         # cursor object
0086         self.cur = None
0087 
0088         # use special error codes for reconnection in querySQL
0089         self.useOtherError = useOtherError
0090 
0091         # set composite modules. use self to share a single database connection
0092         self.add_composite_module("entity", self)
0093         self.add_composite_module("metrics", self)
0094         self.add_composite_module("worker", self)
0095         self.add_composite_module("task_event", self)
0096         self.add_composite_module("task_utils", self)
0097         self.add_composite_module("job_complex", self)
0098 
0099     # connect to DB
0100     def connect(
0101         self,
0102         dbhost=panda_config.dbhost,
0103         dbpasswd=panda_config.dbpasswd,
0104         dbuser=panda_config.dbuser,
0105         dbname=panda_config.dbname,
0106         dbtimeout=panda_config.dbtimeout,
0107         reconnect=False,
0108         dbport=panda_config.dbport,
0109     ):
0110         _logger.debug(f"connect : re={reconnect}")
0111         # keep parameters for reconnect
0112         if not reconnect:
0113             self.dbhost = dbhost
0114             self.dbpasswd = dbpasswd
0115             self.dbuser = dbuser
0116             self.dbname = dbname
0117             self.dbtimeout = dbtimeout
0118             self.dbport = dbport
0119         # close old connection
0120         if reconnect:
0121             _logger.debug("closing old connection")
0122             try:
0123                 self.conn.close()
0124             except Exception:
0125                 _logger.debug("failed to close old connection")
0126         # connect
0127         try:
0128             if self.backend == "oracle":
0129                 conn = oracledb.connect(dsn=self.dbhost, user=self.dbuser, password=self.dbpasswd)
0130 
0131                 def OutputTypeHandler(cursor, name, defaultType, size, precision, scale):
0132                     if defaultType == oracledb.CLOB:
0133                         return cursor.var(oracledb.LONG_STRING, arraysize=cursor.arraysize)
0134 
0135                 conn.outputtypehandler = OutputTypeHandler
0136                 self.conn = wrapped_oracle_conn.WrappedOracleConn(conn)
0137 
0138             elif self.backend == "postgres":
0139                 dsn = {"dbname": self.dbname, "user": self.dbuser, "keepalives_idle": 30, "keepalives_interval": 30, "keepalives": 1}
0140                 if self.dbpasswd:
0141                     dsn["password"] = self.dbpasswd
0142                 if self.dbhost:
0143                     dsn["host"] = self.dbhost
0144                 if self.dbport:
0145                     dsn["port"] = self.dbport
0146                 conn = psycopg.connect(**dsn)
0147                 self.conn = WrappedPostgresConn.WrappedPostgresConn(conn)
0148             else:
0149                 self.conn = MySQLdb.connect(
0150                     host=self.dbhost,
0151                     db=self.dbname,
0152                     port=self.dbport,
0153                     connect_timeout=self.dbtimeout,
0154                     user=self.dbuser,
0155                     passwd=self.dbpasswd,
0156                     charset="utf8",
0157                 )
0158             self.cur = WrappedCursor(self.conn)
0159             try:
0160                 # use SQL dumper
0161                 if panda_config.dump_sql:
0162                     from pandaserver.taskbuffer.SQLDumper import SQLDumper
0163 
0164                     self.cur = SQLDumper(self.cur)
0165             except Exception:
0166                 pass
0167             self.hostname = self.cur.initialize()
0168             if not reconnect:
0169                 atexit.register(self.close_connection)
0170             _logger.debug(f"connect : re={reconnect} ready")
0171             return True
0172         except Exception as e:
0173             _logger.error(f"connect : {str(e)}")
0174             return False