File indexing completed on 2026-04-10 08:38:58
0001 import json
0002 import os
0003 import socket
0004
0005 from pandacommon.pandalogger import logger_utils
0006
0007 from pandajedi.jedicore.ThreadUtils import ListWithLock, ThreadPool
0008 from pandajedi.jediddm.DDMInterface import DDMInterface
0009 from pandajedi.jedimsgprocessor.base_msg_processor import BaseMsgProcPlugin
0010 from pandajedi.jediorder.JobGenerator import JobGeneratorThread
0011 from pandajedi.jediorder.TaskSetupper import TaskSetupper
0012
0013 base_logger = logger_utils.setup_logger(__name__.split(".")[-1])
0014
0015
0016
0017 class PandaToJediMsgProcPlugin(BaseMsgProcPlugin):
0018 def initialize(self):
0019 BaseMsgProcPlugin.initialize(self)
0020 self.ddmIF = DDMInterface()
0021 self.ddmIF.setupInterface()
0022 self.pid = f"{socket.getfqdn().split('.')[0]}-{os.getpid()}_{os.getpgrp()}-pjmsg"
0023
0024 def process(self, msg_obj, decoded_data=None):
0025 tmp_log = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name="process")
0026
0027 tmp_log.info("start")
0028
0029 if decoded_data is None:
0030
0031 try:
0032 msg_dict = json.loads(msg_obj.data)
0033 except Exception as e:
0034 err_str = f"failed to parse message json {msg_obj.data} , skipped. {e.__class__.__name__} : {e}"
0035 tmp_log.error(err_str)
0036 raise
0037 else:
0038 msg_dict = decoded_data
0039
0040 try:
0041 tmp_log.debug(f"got message {msg_dict}")
0042 if msg_dict["msg_type"] == "generate_job":
0043
0044 jediTaskID = int(msg_dict["taskid"])
0045 s, taskSpec = self.tbIF.getTaskWithID_JEDI(jediTaskID)
0046 if not taskSpec:
0047 tmp_log.debug(f"unknown task {jediTaskID}")
0048 else:
0049
0050 vo = taskSpec.vo
0051 prodSourceLabel = taskSpec.prodSourceLabel
0052 workQueue = self.tbIF.getWorkQueueMap().getQueueWithIDGshare(taskSpec.workQueue_ID, taskSpec.gshare)
0053
0054 tmpList = self.tbIF.getTasksToBeProcessed_JEDI(self.pid, None, workQueue, None, None, nFiles=1000, target_tasks=[jediTaskID])
0055 if tmpList:
0056 inputList = ListWithLock(tmpList)
0057
0058 threadPool = ThreadPool()
0059 siteMapper = self.tbIF.get_site_mapper()
0060 taskSetupper = TaskSetupper(vo, prodSourceLabel)
0061 taskSetupper.initializeMods(self.tbIF, self.ddmIF)
0062 gen = JobGeneratorThread(
0063 inputList, threadPool, self.tbIF, self.ddmIF, siteMapper, True, taskSetupper, self.pid, workQueue, "pjmsg", None, None, None, False
0064 )
0065 gen.start()
0066 gen.join()
0067 else:
0068 tmp_log.debug(f"unknown message type : {msg_dict['msg_type']}")
0069 except Exception as e:
0070 err_str = f"failed to run, skipped. {e.__class__.__name__} : {e}"
0071 tmp_log.error(err_str)
0072 raise
0073
0074 tmp_log.info("done")