Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:58

0001 import gc
0002 import json
0003 import time
0004 
0005 from pandacommon.pandalogger import logger_utils
0006 from pandacommon.pandautils.PandaUtils import try_malloc_trim
0007 
0008 from pandajedi.jedicore.ThreadUtils import ListWithLock
0009 from pandajedi.jediddm.DDMInterface import DDMInterface
0010 from pandajedi.jedimsgprocessor.base_msg_processor import BaseMsgProcPlugin
0011 from pandajedi.jediorder.JobGenerator import JobGeneratorThread, get_params_to_get_tasks
0012 from pandajedi.jediorder.TaskSetupper import TaskSetupper
0013 from pandaserver.srvcore import CoreUtils
0014 
0015 base_logger = logger_utils.setup_logger(__name__.split(".")[-1])
0016 
0017 
0018 # Jedi Job Generator message processor plugin
0019 class JediJobGeneratorMsgProcPlugin(BaseMsgProcPlugin):
0020     """
0021     Message-driven Job Generator
0022     """
0023 
0024     def initialize(self):
0025         BaseMsgProcPlugin.initialize(self)
0026         # DDM interface
0027         self.ddmIF = DDMInterface()
0028         self.ddmIF.setupInterface()
0029         self.task_setupper_map = {}
0030         # cache heavy metadata objects with short TTL to reduce allocation churn
0031         self._cache_ttl_sec = 300
0032         self._site_mapper = None
0033         self._site_mapper_ts = 0
0034         self._work_queue_mapper = None
0035         self._work_queue_mapper_ts = 0
0036         self._resource_types = None
0037         self._resource_types_ts = 0
0038         self._params_to_get_tasks = {}
0039         # memory limit to trigger early cleanup to avoid OOM killer. This is not a hard limit, just a threshold to trigger early cleanup.
0040         self._mem_usage_threshold_mb = 1500
0041         # get SiteMapper
0042         # siteMapper = self.tbIF.get_site_mapper()
0043         # get work queue mapper
0044         # workQueueMapper = self.tbIF.getWorkQueueMap()
0045         # get TaskSetupper
0046         # taskSetupper = TaskSetupper(self.vos, self.prodSourceLabels)
0047         # taskSetupper.initializeMods(self.tbIF, self.ddmIF)
0048         self.pid = self.get_pid()
0049 
0050     def _is_cache_valid(self, ts):
0051         return (time.time() - ts) < self._cache_ttl_sec
0052 
0053     def _get_site_mapper(self):
0054         if self._site_mapper is None or not self._is_cache_valid(self._site_mapper_ts):
0055             self._site_mapper = self.tbIF.get_site_mapper()
0056             self._site_mapper_ts = time.time()
0057         return self._site_mapper
0058 
0059     def _get_work_queue_mapper(self):
0060         if self._work_queue_mapper is None or not self._is_cache_valid(self._work_queue_mapper_ts):
0061             self._work_queue_mapper = self.tbIF.getWorkQueueMap()
0062             self._work_queue_mapper_ts = time.time()
0063         return self._work_queue_mapper
0064 
0065     def _get_resource_types(self):
0066         if self._resource_types is None or not self._is_cache_valid(self._resource_types_ts):
0067             self._resource_types = self.tbIF.load_resource_types()
0068             self._resource_types_ts = time.time()
0069         return self._resource_types
0070 
0071     def process(self, msg_obj):
0072         tmp_log = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name="process")
0073         input_list = None
0074         gen_thr = None
0075         tmp_list = None
0076         taskSpec = None
0077         taskSetupper = None
0078         # start
0079         tmp_log.info("start")
0080         tmp_log.debug(f"sub_id={msg_obj.sub_id} ; msg_id={msg_obj.msg_id}")
0081         # parse json
0082         try:
0083             msg_dict = json.loads(msg_obj.data)
0084         except Exception as e:
0085             err_str = f"failed to parse message json {msg_obj.data} , skipped. {e.__class__.__name__} : {e}"
0086             tmp_log.error(err_str)
0087             raise
0088         # sanity check
0089         try:
0090             msg_type = msg_dict["msg_type"]
0091         except Exception as e:
0092             err_str = f"failed to parse message object dict {msg_dict} , skipped. {e.__class__.__name__} : {e}"
0093             tmp_log.error(err_str)
0094             raise
0095         if msg_type != "jedi_job_generator":
0096             # FIXME
0097             err_str = f"got unknown msg_type {msg_type} , skipped"
0098             tmp_log.error(err_str)
0099             raise
0100         # run
0101         try:
0102             # get task to generate jobs
0103             task_id = int(msg_dict["taskid"])
0104             _, taskSpec = self.tbIF.getTaskWithID_JEDI(task_id)
0105             if not taskSpec:
0106                 tmp_log.debug(f"unknown task_id={task_id}, skipped")
0107             else:
0108                 tmp_log.debug(f"processing task_id={task_id} workqueue_id={taskSpec.workQueue_ID} status={taskSpec.status}")
0109                 # get WQ
0110                 vo = taskSpec.vo
0111                 prodSourceLabel = taskSpec.prodSourceLabel
0112                 workQueue = self._get_work_queue_mapper().getQueueWithIDGshare(taskSpec.workQueue_ID, taskSpec.gshare)
0113                 # get resource types
0114                 resource_types = self._get_resource_types()
0115                 if not resource_types:
0116                     raise RuntimeError("failed to get resource types")
0117                 # nFiles from shared JobGenerator config resolver
0118                 tmp_params = get_params_to_get_tasks(
0119                     self.tbIF, self._params_to_get_tasks, vo, prodSourceLabel, workQueue.queue_name if workQueue else "", taskSpec.cloud
0120                 )
0121                 nFiles = tmp_params["nFiles"]
0122                 # get inputs
0123                 tmp_list = self.tbIF.getTasksToBeProcessed_JEDI(self.pid, None, workQueue, None, None, nFiles=nFiles, target_tasks=[task_id])
0124                 if tmp_list:
0125                     input_list = ListWithLock(tmp_list)
0126                     # run generator inline to avoid creating an extra worker thread.
0127                     siteMapper = self._get_site_mapper()
0128                     setupper_key = (vo, prodSourceLabel)
0129                     taskSetupper = self.task_setupper_map.get(setupper_key)
0130                     if taskSetupper is None:
0131                         taskSetupper = TaskSetupper(vo, prodSourceLabel)
0132                         taskSetupper.initializeMods(self.tbIF, self.ddmIF)
0133                         self.task_setupper_map[setupper_key] = taskSetupper
0134                     # log memory usage before running job generator to help identify if this is a source of memory bloat.
0135                     gen_thr = JobGeneratorThread(
0136                         input_list,
0137                         None,
0138                         self.tbIF,
0139                         self.ddmIF,
0140                         siteMapper,
0141                         True,
0142                         taskSetupper,
0143                         self.pid,
0144                         workQueue,
0145                         "pjmsg",
0146                         None,
0147                         None,
0148                         None,
0149                         False,
0150                         resource_types,
0151                     )
0152                     gen_thr.run()
0153                     tmp_log.info(f"task_id={task_id} generated jobs")
0154                 else:
0155                     tmp_log.debug(f"task_id={task_id} is not considered to be processed, skipped")
0156         except Exception as e:
0157             err_str = f"failed to run, skipped. {e.__class__.__name__} : {e}"
0158             tmp_log.error(err_str)
0159             raise
0160         finally:
0161             # Explicitly clear large per-run maps to release references as early as possible.
0162             if gen_thr is not None:
0163                 try:
0164                     gen_thr.buildSpecMap.clear()
0165                     gen_thr.finished_lib_specs_map.clear()
0166                     gen_thr.active_lib_specs_map.clear()
0167                     gen_thr.inputList = None
0168                 except Exception:
0169                     pass
0170             gen_thr = None
0171             input_list = None
0172             tmp_list = None
0173             taskSpec = None
0174             taskSetupper = None
0175             # If memory usage is above the threshold, trigger early cleanup to avoid OOM killer.
0176             mem_usage = CoreUtils.getMemoryUsage()
0177             if mem_usage is None:
0178                 tmp_log.warning("failed to get memory usage, skipped memory cleanup")
0179             elif mem_usage > self._mem_usage_threshold_mb:
0180                 gc.collect()
0181                 try_malloc_trim(tmp_log)
0182                 new_mem_usage = CoreUtils.getMemoryUsage()
0183                 tmp_log.debug(f"trimmed memory usage from {mem_usage} MB to {new_mem_usage} MB")
0184         # done
0185         tmp_log.info("done")