File indexing completed on 2026-04-19 08:00:01
0001 """
0002 interface to give limited database access to plugins
0003 """
0004
0005 import logging
0006 import os
0007 import threading
0008
0009 from pandaharvester.harvesterconfig import harvester_config
0010
0011 from .db_proxy_pool import DBProxyPool
0012
0013
0014 class DBInterface(object):
0015
0016 def __init__(self):
0017 self.dbProxy = DBProxyPool()
0018
0019
0020 def get_cache(self, data_name):
0021 return self.dbProxy.get_cache(data_name)
0022
0023
0024 def get_files_with_group_id(self, group_id):
0025 return self.dbProxy.get_files_with_group_id(group_id)
0026
0027
0028 def update_file_group_status(self, group_id, status_string):
0029 return self.dbProxy.update_file_group_status(group_id, status_string)
0030
0031
0032 def get_file_group_status(self, group_id):
0033 return self.dbProxy.get_file_group_status(group_id)
0034
0035
0036 def get_locked_by(self):
0037 currentThr = threading.current_thread()
0038 if currentThr is not None:
0039 thrName = currentThr.ident
0040 else:
0041 thrName = None
0042 return f"plugin-{os.getpid()}-{thrName}"
0043
0044
0045 def get_object_lock(self, object_name, lock_interval):
0046 lockedBy = self.get_locked_by()
0047 return self.dbProxy.get_process_lock(object_name, lockedBy, lock_interval)
0048
0049
0050 def release_object_lock(self, object_name):
0051 lockedBy = self.get_locked_by()
0052 return self.dbProxy.release_process_lock(object_name, lockedBy)
0053
0054
0055 def refresh_file_group_info(self, job_spec):
0056 return self.dbProxy.refresh_file_group_info(job_spec)
0057
0058
0059 def set_file_group(self, file_specs, group_id, status_string):
0060 return self.dbProxy.set_file_group(file_specs, group_id, status_string)
0061
0062
0063 def get_worker_limits(self, site_name, queue_config):
0064 return self.dbProxy.get_worker_limits(site_name, queue_config)
0065
0066
0067 def get_worker_ce_stats(self, site_name):
0068 return self.dbProxy.get_worker_ce_stats(site_name)
0069
0070
0071 def get_worker_ce_backend_throughput(self, site_name, time_window):
0072 return self.dbProxy.get_worker_ce_backend_throughput(site_name, time_window)
0073
0074
0075 def add_dialog_message(self, message, level, module_name, identifier=None):
0076
0077 validLevels = ["DEBUG", "INFO", "ERROR", "WARNING"]
0078 if level not in validLevels:
0079 level = "INFO"
0080 levelNum = getattr(logging, level)
0081
0082 try:
0083 minLevel = harvester_config.propagator.minMessageLevel
0084 except Exception:
0085 minLevel = None
0086 if minLevel not in validLevels:
0087 minLevel = "WARNING"
0088 minLevelNum = getattr(logging, minLevel)
0089
0090 if levelNum < minLevelNum:
0091 return True
0092 return self.dbProxy.add_dialog_message(message, level, module_name, identifier)