Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-19 08:00:01

0001 """
0002 interface to give limited database access to plugins
0003 """
0004 
0005 import logging
0006 import os
0007 import threading
0008 
0009 from pandaharvester.harvesterconfig import harvester_config
0010 
0011 from .db_proxy_pool import DBProxyPool
0012 
0013 
0014 class DBInterface(object):
0015     # constructor
0016     def __init__(self):
0017         self.dbProxy = DBProxyPool()
0018 
0019     # get cache data
0020     def get_cache(self, data_name):
0021         return self.dbProxy.get_cache(data_name)
0022 
0023     # get files with a group ID
0024     def get_files_with_group_id(self, group_id):
0025         return self.dbProxy.get_files_with_group_id(group_id)
0026 
0027     # update group status
0028     def update_file_group_status(self, group_id, status_string):
0029         return self.dbProxy.update_file_group_status(group_id, status_string)
0030 
0031     # get group status
0032     def get_file_group_status(self, group_id):
0033         return self.dbProxy.get_file_group_status(group_id)
0034 
0035     # get locker identifier
0036     def get_locked_by(self):
0037         currentThr = threading.current_thread()
0038         if currentThr is not None:
0039             thrName = currentThr.ident
0040         else:
0041             thrName = None
0042         return f"plugin-{os.getpid()}-{thrName}"
0043 
0044     # get a lock for an object
0045     def get_object_lock(self, object_name, lock_interval):
0046         lockedBy = self.get_locked_by()
0047         return self.dbProxy.get_process_lock(object_name, lockedBy, lock_interval)
0048 
0049     # release a process lock
0050     def release_object_lock(self, object_name):
0051         lockedBy = self.get_locked_by()
0052         return self.dbProxy.release_process_lock(object_name, lockedBy)
0053 
0054     # refresh file group info
0055     def refresh_file_group_info(self, job_spec):
0056         return self.dbProxy.refresh_file_group_info(job_spec)
0057 
0058     # set file group
0059     def set_file_group(self, file_specs, group_id, status_string):
0060         return self.dbProxy.set_file_group(file_specs, group_id, status_string)
0061 
0062     # get queue status
0063     def get_worker_limits(self, site_name, queue_config):
0064         return self.dbProxy.get_worker_limits(site_name, queue_config)
0065 
0066     # get worker CE stats
0067     def get_worker_ce_stats(self, site_name):
0068         return self.dbProxy.get_worker_ce_stats(site_name)
0069 
0070     # get worker CE backend throughput
0071     def get_worker_ce_backend_throughput(self, site_name, time_window):
0072         return self.dbProxy.get_worker_ce_backend_throughput(site_name, time_window)
0073 
0074     # add dialog message
0075     def add_dialog_message(self, message, level, module_name, identifier=None):
0076         # set level
0077         validLevels = ["DEBUG", "INFO", "ERROR", "WARNING"]
0078         if level not in validLevels:
0079             level = "INFO"
0080         levelNum = getattr(logging, level)
0081         # get minimum level
0082         try:
0083             minLevel = harvester_config.propagator.minMessageLevel
0084         except Exception:
0085             minLevel = None
0086         if minLevel not in validLevels:
0087             minLevel = "WARNING"
0088         minLevelNum = getattr(logging, minLevel)
0089         # check level to avoid redundant db lock
0090         if levelNum < minLevelNum:
0091             return True
0092         return self.dbProxy.add_dialog_message(message, level, module_name, identifier)