File indexing completed on 2026-04-10 08:38:58
0001 import gc
0002 import json
0003 import time
0004
0005 from pandacommon.pandalogger import logger_utils
0006 from pandacommon.pandautils.PandaUtils import try_malloc_trim
0007
0008 from pandajedi.jedicore.ThreadUtils import ListWithLock
0009 from pandajedi.jediddm.DDMInterface import DDMInterface
0010 from pandajedi.jedimsgprocessor.base_msg_processor import BaseMsgProcPlugin
0011 from pandajedi.jediorder.JobGenerator import JobGeneratorThread, get_params_to_get_tasks
0012 from pandajedi.jediorder.TaskSetupper import TaskSetupper
0013 from pandaserver.srvcore import CoreUtils
0014
0015 base_logger = logger_utils.setup_logger(__name__.split(".")[-1])
0016
0017
0018
0019 class JediJobGeneratorMsgProcPlugin(BaseMsgProcPlugin):
0020 """
0021 Message-driven Job Generator
0022 """
0023
0024 def initialize(self):
0025 BaseMsgProcPlugin.initialize(self)
0026
0027 self.ddmIF = DDMInterface()
0028 self.ddmIF.setupInterface()
0029 self.task_setupper_map = {}
0030
0031 self._cache_ttl_sec = 300
0032 self._site_mapper = None
0033 self._site_mapper_ts = 0
0034 self._work_queue_mapper = None
0035 self._work_queue_mapper_ts = 0
0036 self._resource_types = None
0037 self._resource_types_ts = 0
0038 self._params_to_get_tasks = {}
0039
0040 self._mem_usage_threshold_mb = 1500
0041
0042
0043
0044
0045
0046
0047
0048 self.pid = self.get_pid()
0049
0050 def _is_cache_valid(self, ts):
0051 return (time.time() - ts) < self._cache_ttl_sec
0052
0053 def _get_site_mapper(self):
0054 if self._site_mapper is None or not self._is_cache_valid(self._site_mapper_ts):
0055 self._site_mapper = self.tbIF.get_site_mapper()
0056 self._site_mapper_ts = time.time()
0057 return self._site_mapper
0058
0059 def _get_work_queue_mapper(self):
0060 if self._work_queue_mapper is None or not self._is_cache_valid(self._work_queue_mapper_ts):
0061 self._work_queue_mapper = self.tbIF.getWorkQueueMap()
0062 self._work_queue_mapper_ts = time.time()
0063 return self._work_queue_mapper
0064
0065 def _get_resource_types(self):
0066 if self._resource_types is None or not self._is_cache_valid(self._resource_types_ts):
0067 self._resource_types = self.tbIF.load_resource_types()
0068 self._resource_types_ts = time.time()
0069 return self._resource_types
0070
0071 def process(self, msg_obj):
0072 tmp_log = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name="process")
0073 input_list = None
0074 gen_thr = None
0075 tmp_list = None
0076 taskSpec = None
0077 taskSetupper = None
0078
0079 tmp_log.info("start")
0080 tmp_log.debug(f"sub_id={msg_obj.sub_id} ; msg_id={msg_obj.msg_id}")
0081
0082 try:
0083 msg_dict = json.loads(msg_obj.data)
0084 except Exception as e:
0085 err_str = f"failed to parse message json {msg_obj.data} , skipped. {e.__class__.__name__} : {e}"
0086 tmp_log.error(err_str)
0087 raise
0088
0089 try:
0090 msg_type = msg_dict["msg_type"]
0091 except Exception as e:
0092 err_str = f"failed to parse message object dict {msg_dict} , skipped. {e.__class__.__name__} : {e}"
0093 tmp_log.error(err_str)
0094 raise
0095 if msg_type != "jedi_job_generator":
0096
0097 err_str = f"got unknown msg_type {msg_type} , skipped"
0098 tmp_log.error(err_str)
0099 raise
0100
0101 try:
0102
0103 task_id = int(msg_dict["taskid"])
0104 _, taskSpec = self.tbIF.getTaskWithID_JEDI(task_id)
0105 if not taskSpec:
0106 tmp_log.debug(f"unknown task_id={task_id}, skipped")
0107 else:
0108 tmp_log.debug(f"processing task_id={task_id} workqueue_id={taskSpec.workQueue_ID} status={taskSpec.status}")
0109
0110 vo = taskSpec.vo
0111 prodSourceLabel = taskSpec.prodSourceLabel
0112 workQueue = self._get_work_queue_mapper().getQueueWithIDGshare(taskSpec.workQueue_ID, taskSpec.gshare)
0113
0114 resource_types = self._get_resource_types()
0115 if not resource_types:
0116 raise RuntimeError("failed to get resource types")
0117
0118 tmp_params = get_params_to_get_tasks(
0119 self.tbIF, self._params_to_get_tasks, vo, prodSourceLabel, workQueue.queue_name if workQueue else "", taskSpec.cloud
0120 )
0121 nFiles = tmp_params["nFiles"]
0122
0123 tmp_list = self.tbIF.getTasksToBeProcessed_JEDI(self.pid, None, workQueue, None, None, nFiles=nFiles, target_tasks=[task_id])
0124 if tmp_list:
0125 input_list = ListWithLock(tmp_list)
0126
0127 siteMapper = self._get_site_mapper()
0128 setupper_key = (vo, prodSourceLabel)
0129 taskSetupper = self.task_setupper_map.get(setupper_key)
0130 if taskSetupper is None:
0131 taskSetupper = TaskSetupper(vo, prodSourceLabel)
0132 taskSetupper.initializeMods(self.tbIF, self.ddmIF)
0133 self.task_setupper_map[setupper_key] = taskSetupper
0134
0135 gen_thr = JobGeneratorThread(
0136 input_list,
0137 None,
0138 self.tbIF,
0139 self.ddmIF,
0140 siteMapper,
0141 True,
0142 taskSetupper,
0143 self.pid,
0144 workQueue,
0145 "pjmsg",
0146 None,
0147 None,
0148 None,
0149 False,
0150 resource_types,
0151 )
0152 gen_thr.run()
0153 tmp_log.info(f"task_id={task_id} generated jobs")
0154 else:
0155 tmp_log.debug(f"task_id={task_id} is not considered to be processed, skipped")
0156 except Exception as e:
0157 err_str = f"failed to run, skipped. {e.__class__.__name__} : {e}"
0158 tmp_log.error(err_str)
0159 raise
0160 finally:
0161
0162 if gen_thr is not None:
0163 try:
0164 gen_thr.buildSpecMap.clear()
0165 gen_thr.finished_lib_specs_map.clear()
0166 gen_thr.active_lib_specs_map.clear()
0167 gen_thr.inputList = None
0168 except Exception:
0169 pass
0170 gen_thr = None
0171 input_list = None
0172 tmp_list = None
0173 taskSpec = None
0174 taskSetupper = None
0175
0176 mem_usage = CoreUtils.getMemoryUsage()
0177 if mem_usage is None:
0178 tmp_log.warning("failed to get memory usage, skipped memory cleanup")
0179 elif mem_usage > self._mem_usage_threshold_mb:
0180 gc.collect()
0181 try_malloc_trim(tmp_log)
0182 new_mem_usage = CoreUtils.getMemoryUsage()
0183 tmp_log.debug(f"trimmed memory usage from {mem_usage} MB to {new_mem_usage} MB")
0184
0185 tmp_log.info("done")