Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:58

0001 import json
0002 import os
0003 import socket
0004 
0005 from pandacommon.pandalogger import logger_utils
0006 
0007 from pandajedi.jedicore.ThreadUtils import ListWithLock, ThreadPool
0008 from pandajedi.jediddm.DDMInterface import DDMInterface
0009 from pandajedi.jedimsgprocessor.base_msg_processor import BaseMsgProcPlugin
0010 from pandajedi.jediorder.JobGenerator import JobGeneratorThread
0011 from pandajedi.jediorder.TaskSetupper import TaskSetupper
0012 
0013 base_logger = logger_utils.setup_logger(__name__.split(".")[-1])
0014 
0015 
0016 # plugin to process messages from Panda to JEDI
0017 class PandaToJediMsgProcPlugin(BaseMsgProcPlugin):
0018     def initialize(self):
0019         BaseMsgProcPlugin.initialize(self)
0020         self.ddmIF = DDMInterface()
0021         self.ddmIF.setupInterface()
0022         self.pid = f"{socket.getfqdn().split('.')[0]}-{os.getpid()}_{os.getpgrp()}-pjmsg"
0023 
0024     def process(self, msg_obj, decoded_data=None):
0025         tmp_log = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name="process")
0026         # start
0027         tmp_log.info("start")
0028         # parse
0029         if decoded_data is None:
0030             # json decode
0031             try:
0032                 msg_dict = json.loads(msg_obj.data)
0033             except Exception as e:
0034                 err_str = f"failed to parse message json {msg_obj.data} , skipped. {e.__class__.__name__} : {e}"
0035                 tmp_log.error(err_str)
0036                 raise
0037         else:
0038             msg_dict = decoded_data
0039         # run
0040         try:
0041             tmp_log.debug(f"got message {msg_dict}")
0042             if msg_dict["msg_type"] == "generate_job":
0043                 # get task to generate jobs
0044                 jediTaskID = int(msg_dict["taskid"])
0045                 s, taskSpec = self.tbIF.getTaskWithID_JEDI(jediTaskID)
0046                 if not taskSpec:
0047                     tmp_log.debug(f"unknown task {jediTaskID}")
0048                 else:
0049                     # get WQ
0050                     vo = taskSpec.vo
0051                     prodSourceLabel = taskSpec.prodSourceLabel
0052                     workQueue = self.tbIF.getWorkQueueMap().getQueueWithIDGshare(taskSpec.workQueue_ID, taskSpec.gshare)
0053                     # get inputs
0054                     tmpList = self.tbIF.getTasksToBeProcessed_JEDI(self.pid, None, workQueue, None, None, nFiles=1000, target_tasks=[jediTaskID])
0055                     if tmpList:
0056                         inputList = ListWithLock(tmpList)
0057                         # create thread
0058                         threadPool = ThreadPool()
0059                         siteMapper = self.tbIF.get_site_mapper()
0060                         taskSetupper = TaskSetupper(vo, prodSourceLabel)
0061                         taskSetupper.initializeMods(self.tbIF, self.ddmIF)
0062                         gen = JobGeneratorThread(
0063                             inputList, threadPool, self.tbIF, self.ddmIF, siteMapper, True, taskSetupper, self.pid, workQueue, "pjmsg", None, None, None, False
0064                         )
0065                         gen.start()
0066                         gen.join()
0067             else:
0068                 tmp_log.debug(f"unknown message type : {msg_dict['msg_type']}")
0069         except Exception as e:
0070             err_str = f"failed to run, skipped. {e.__class__.__name__} : {e}"
0071             tmp_log.error(err_str)
0072             raise
0073         # done
0074         tmp_log.info("done")