File indexing completed on 2026-04-11 08:41:04
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 import io
0011 import json
0012 import logging
0013 import os
0014 import re
0015 import subprocess
0016 import time
0017 import threading
0018 import traceback
0019
0020 try:
0021 import Queue as queue
0022 except Exception:
0023 import queue
0024
0025 from pilot.common.exception import PilotException, MessageFailure, SetupFailure, RunPayloadFailure, UnknownException
0026 from pilot.eventservice.esprocess.esmessage import MessageThread
0027 from pilot.util.container import containerise_executable
0028 from pilot.util.processes import kill_child_processes
0029
0030 logger = logging.getLogger(__name__)
0031
0032 """
0033 Main process to handle event service.
0034 It makes use of two hooks get_event_ranges_hook and handle_out_message_hook to communicate with other processes when
0035 it's running. The process will handle the logic of Event service independently.
0036 """
0037
0038
0039 class ESProcess(threading.Thread):
0040 """
0041 Main EventService Process.
0042 """
0043 def __init__(self, payload, waiting_time=30 * 60):
0044 """
0045 Init ESProcess.
0046
0047 :param payload: a dict of {'executable': <cmd string>, 'output_file': <filename or without it>, 'error_file': <filename or without it>}
0048 """
0049 threading.Thread.__init__(self, name='esprocess')
0050
0051 self.__message_queue = queue.Queue()
0052 self.__payload = payload
0053
0054 self.__message_thread = None
0055 self.__process = None
0056
0057 self.get_event_ranges_hook = None
0058 self.handle_out_message_hook = None
0059
0060 self.__monitor_log_time = None
0061 self.is_no_more_events = False
0062 self.__no_more_event_time = None
0063 self.__waiting_time = waiting_time
0064 self.__stop = threading.Event()
0065 self.__stop_time = 180
0066 self.pid = None
0067 self.__is_payload_started = False
0068
0069 self.__ret_code = None
0070 self.setName("ESProcess")
0071 self.corecount = 1
0072
0073 self.event_ranges_cache = []
0074
0075 def __del__(self):
0076 if self.__message_thread:
0077 self.__message_thread.stop()
0078
0079 def is_payload_started(self):
0080 return self.__is_payload_started
0081
0082 def stop(self, delay=1800):
0083 if not self.__stop.is_set():
0084 self.__stop.set()
0085 self.__stop_set_time = time.time()
0086 self.__stop_delay = delay
0087 event_ranges = "No more events"
0088 self.send_event_ranges_to_payload(event_ranges)
0089
0090 def init_message_thread(self, socketname=None, context='local'):
0091 """
0092 init message thread.
0093
0094 :param socket_name: name of the socket between current process and payload.
0095 :param context: name of the context between current process and payload, default is 'local'.
0096
0097 :raises MessageFailure: when failed to init message thread.
0098 """
0099
0100 logger.info("start to init message thread")
0101 try:
0102 self.__message_thread = MessageThread(self.__message_queue, socketname, context)
0103 self.__message_thread.start()
0104 except PilotException as e:
0105 logger.error("Failed to start message thread: %s" % e.get_detail())
0106 self.__ret_code = -1
0107 except Exception as e:
0108 logger.error("Failed to start message thread: %s" % str(e))
0109 self.__ret_code = -1
0110 raise MessageFailure(e)
0111 logger.info("finished to init message thread")
0112
0113 def stop_message_thread(self):
0114 """
0115 Stop message thread
0116 """
0117 logger.info("Stopping message thread")
0118 if self.__message_thread:
0119 while self.__message_thread.is_alive():
0120 if not self.__message_thread.is_stopped():
0121 self.__message_thread.stop()
0122 logger.info("Message thread stopped")
0123
0124 def init_yampl_socket(self, executable):
0125 socket_name = self.__message_thread.get_yampl_socket_name()
0126 if "PILOT_EVENTRANGECHANNEL" in executable:
0127 executable = "export PILOT_EVENTRANGECHANNEL=\"%s\"; " % (socket_name) + executable
0128 elif "--preExec" not in executable:
0129 executable += " --preExec \'from AthenaMP.AthenaMPFlags import jobproperties as jps;jps.AthenaMPFlags.EventRangeChannel=\"%s\"\'" % (socket_name)
0130 else:
0131 if "import jobproperties as jps" in executable:
0132 executable = executable.replace("import jobproperties as jps;",
0133 "import jobproperties as jps;jps.AthenaMPFlags.EventRangeChannel=\"%s\";" % (socket_name))
0134 else:
0135 if "--preExec " in executable:
0136 new_str = "--preExec \'from AthenaMP.AthenaMPFlags import jobproperties as jps;jps.AthenaMPFlags.EventRangeChannel=\"%s\"\' " % socket_name
0137 executable = executable.replace("--preExec ", new_str)
0138 else:
0139 logger.warn("--preExec has an unknown format - expected \'--preExec \"\' or \"--preExec \'\", got: %s" % (executable))
0140
0141 return executable
0142
0143 def init_payload_process(self):
0144 """
0145 init payload process.
0146
0147 :raise SetupFailure: when failed to init payload process.
0148 """
0149
0150 logger.info("start to init payload process")
0151 try:
0152 try:
0153 workdir = self.get_workdir()
0154 except Exception as e:
0155 raise e
0156
0157 executable = self.get_executable(workdir)
0158 output_file_fd = self.get_file(workdir, file_label='output_file', file_name='ES_payload_output.txt')
0159 error_file_fd = self.get_file(workdir, file_label='error_file', file_name='ES_payload_error.txt')
0160
0161
0162 if 'job' in self.__payload and self.__payload['job']:
0163 try:
0164 executable, diagnostics = containerise_executable(executable, job=self.__payload['job'], workdir=workdir)
0165 if diagnostics:
0166 msg = 'containerisation of executable failed: %s' % diagnostics
0167 logger.warning(msg)
0168 raise SetupFailure(msg)
0169 except Exception as e:
0170 msg = 'exception caught while preparing container command: %s' % e
0171 logger.warning(msg)
0172 raise SetupFailure(msg)
0173 else:
0174 logger.warning('could not containerise executable')
0175
0176
0177 self.__process = subprocess.Popen(executable, stdout=output_file_fd, stderr=error_file_fd, shell=True)
0178 self.pid = self.__process.pid
0179 self.__payload['job'].pid = self.pid
0180 self.__is_payload_started = True
0181 logger.debug("Started new processs (executable: %s, stdout: %s, stderr: %s, pid: %s)" % (executable,
0182 output_file_fd,
0183 error_file_fd,
0184 self.__process.pid))
0185 if 'job' in self.__payload and self.__payload['job'] and self.__payload['job'].corecount:
0186 self.corecount = int(self.__payload['job'].corecount)
0187 except PilotException as e:
0188 logger.error("Failed to start payload process: %s, %s" % (e.get_detail(), traceback.format_exc()))
0189 self.__ret_code = -1
0190 except Exception as e:
0191 logger.error("Failed to start payload process: %s, %s" % (str(e), traceback.format_exc()))
0192 self.__ret_code = -1
0193 raise SetupFailure(e)
0194 logger.info("finished initializing payload process")
0195
0196 def get_file(self, workdir, file_label='output_file', file_name='ES_payload_output.txt'):
0197 """
0198 Return the requested file.
0199
0200 :param file_label:
0201 :param workdir:
0202 :return:
0203 """
0204
0205 try:
0206 file_type = file
0207 except NameError:
0208 file_type = io.IOBase
0209
0210 if file_label in self.__payload:
0211 if isinstance(self.__payload[file_label], file_type):
0212 _file_fd = self.__payload[file_label]
0213 else:
0214 _file = self.__payload[file_label] if '/' in self.__payload[file_label] else os.path.join(workdir, self.__payload[file_label])
0215 _file_fd = open(_file, 'w')
0216 else:
0217 _file = os.path.join(workdir, file_name)
0218 _file_fd = open(_file, 'w')
0219
0220 return _file_fd
0221
0222 def get_workdir(self):
0223 """
0224 Return the workdir.
0225 If the workdir is set but is not a directory, return None.
0226
0227 :return: workdir (string or None).
0228 :raises SetupFailure: in case workdir is not a directory.
0229 """
0230
0231 workdir = ''
0232 if 'workdir' in self.__payload:
0233 workdir = self.__payload['workdir']
0234 if not os.path.exists(workdir):
0235 os.makedirs(workdir)
0236 elif not os.path.isdir(workdir):
0237 raise SetupFailure('workdir exists but is not a directory')
0238 return workdir
0239
0240 def get_executable(self, workdir):
0241 """
0242 Return the executable string.
0243
0244 :param workdir: work directory (string).
0245 :return: executable (string).
0246 """
0247 executable = self.__payload['executable']
0248 executable = self.init_yampl_socket(executable)
0249 return 'cd %s; %s' % (workdir, executable)
0250
0251 def set_get_event_ranges_hook(self, hook):
0252 """
0253 set get_event_ranges hook.
0254
0255 :param hook: a hook method to get event ranges.
0256 """
0257
0258 self.get_event_ranges_hook = hook
0259
0260 def get_get_event_ranges_hook(self):
0261 """
0262 get get_event_ranges hook.
0263
0264 :returns: The hook method to get event ranges.
0265 """
0266
0267 return self.get_event_ranges_hook
0268
0269 def set_handle_out_message_hook(self, hook):
0270 """
0271 set handle_out_message hook.
0272
0273 :param hook: a hook method to handle payload output and error messages.
0274 """
0275
0276 self.handle_out_message_hook = hook
0277
0278 def get_handle_out_message_hook(self):
0279 """
0280 get handle_out_message hook.
0281
0282 :returns: The hook method to handle payload output and error messages.
0283 """
0284
0285 return self.handle_out_message_hook
0286
0287 def init(self):
0288 """
0289 initialize message thread and payload process.
0290 """
0291
0292 try:
0293 self.init_message_thread()
0294 self.init_payload_process()
0295 except Exception as e:
0296
0297 self.__ret_code = -1
0298 self.stop()
0299 raise e
0300
0301 def monitor(self):
0302 """
0303 Monitor whether a process is dead.
0304
0305 raises: MessageFailure: when the message thread is dead or exited.
0306 RunPayloadFailure: when the payload process is dead or exited.
0307 """
0308
0309 if self.__no_more_event_time and time.time() - self.__no_more_event_time > self.__waiting_time:
0310 self.__ret_code = -1
0311 raise Exception('Too long time (%s seconds) since "No more events" is injected' %
0312 (time.time() - self.__no_more_event_time))
0313
0314 if self.__monitor_log_time is None or self.__monitor_log_time < time.time() - 10 * 60:
0315 self.__monitor_log_time = time.time()
0316 logger.info('monitor is checking dead process.')
0317
0318 if self.__message_thread is None:
0319 raise MessageFailure("Message thread has not started.")
0320 if not self.__message_thread.is_alive():
0321 raise MessageFailure("Message thread is not alive.")
0322
0323 if self.__process is None:
0324 raise RunPayloadFailure("Payload process has not started.")
0325 if self.__process.poll() is not None:
0326 if self.is_no_more_events:
0327 logger.info("Payload finished with no more events")
0328 else:
0329 self.__ret_code = self.__process.poll()
0330 raise RunPayloadFailure("Payload process is not alive: %s" % self.__process.poll())
0331
0332 if self.__stop.is_set() and time.time() > self.__stop_set_time + self.__stop_delay:
0333 logger.info("Stop has been set for %s seconds, which is more than the stop wait time. Will terminate" % self.__stop_delay)
0334 self.terminate()
0335
0336 def has_running_children(self):
0337 """
0338 Check whether it has running children
0339
0340 :return: True if there are alive children, otherwise False
0341 """
0342 if self.__message_thread and self.__message_thread.is_alive():
0343 return True
0344 if self.__process and self.__process.poll() is None:
0345 return True
0346 return False
0347
0348 def is_payload_running(self):
0349 """
0350 Check whether the payload is still running
0351
0352 :return: True if the payload is running, otherwise False
0353 """
0354 if self.__process and self.__process.poll() is None:
0355 return True
0356 return False
0357
0358 def get_event_range_to_payload(self):
0359 """
0360 Get one event range to be sent to payload
0361 """
0362 logger.debug("Number of cached event ranges: %s" % len(self.event_ranges_cache))
0363 if not self.event_ranges_cache:
0364 event_ranges = self.get_event_ranges()
0365 if event_ranges:
0366 self.event_ranges_cache.extend(event_ranges)
0367
0368 if self.event_ranges_cache:
0369 event_range = self.event_ranges_cache.pop(0)
0370 return event_range
0371 else:
0372 return []
0373
0374 def get_event_ranges(self, num_ranges=None):
0375 """
0376 Calling get_event_ranges hook to get event ranges.
0377
0378 :param num_ranges: number of event ranges to get.
0379
0380 :raises: SetupFailure: If get_event_ranges_hook is not set.
0381 MessageFailure: when failed to get event ranges.
0382 """
0383 if not num_ranges:
0384 num_ranges = self.corecount
0385
0386 logger.debug('getting event ranges(num_ranges=%s)' % num_ranges)
0387 if not self.get_event_ranges_hook:
0388 raise SetupFailure("get_event_ranges_hook is not set")
0389
0390 try:
0391 logger.debug('calling get_event_ranges hook(%s) to get event ranges.' % self.get_event_ranges_hook)
0392 event_ranges = self.get_event_ranges_hook(num_ranges)
0393 logger.debug('got event ranges: %s' % event_ranges)
0394 return event_ranges
0395 except Exception as e:
0396 raise MessageFailure("Failed to get event ranges: %s" % e)
0397
0398 def send_event_ranges_to_payload(self, event_ranges):
0399 """
0400 Send event ranges to payload through message thread.
0401
0402 :param event_ranges: list of event ranges.
0403 """
0404
0405 msg = None
0406 if "No more events" in event_ranges:
0407 msg = event_ranges
0408 self.is_no_more_events = True
0409 self.__no_more_event_time = time.time()
0410 else:
0411 if type(event_ranges) is not list:
0412 event_ranges = [event_ranges]
0413 msg = json.dumps(event_ranges)
0414 logger.debug('send event ranges to payload: %s' % msg)
0415 self.__message_thread.send(msg)
0416
0417 def parse_out_message(self, message):
0418 """
0419 Parse output or error messages from payload.
0420
0421 :param message: The message string received from payload.
0422
0423 :returns: a dict {'id': <id>, 'status': <status>, 'output': <output if produced>, 'cpu': <cpu>, 'wall': <wall>, 'message': <full message>}
0424 :raises: PilotExecption: when a PilotException is caught.
0425 UnknownException: when other unknown exception is caught.
0426 """
0427
0428 logger.debug('parsing message: %s' % message)
0429 try:
0430 if message.startswith("/"):
0431 parts = message.split(",")
0432 ret = {'output': parts[0]}
0433 parts = parts[1:]
0434 for part in parts:
0435 name, value = part.split(":")
0436 name = name.lower()
0437 ret[name] = value
0438 ret['status'] = 'finished'
0439 return ret
0440 elif message.startswith('ERR'):
0441 if "ERR_ATHENAMP_PARSE" in message:
0442 pattern = re.compile(r"(ERR\_[A-Z\_]+)\ (.+)\:\ ?(.+)")
0443 found = re.findall(pattern, message)
0444 event_range = found[0][1]
0445 if "eventRangeID" in event_range:
0446 pattern = re.compile(r"eventRangeID\'\:\ ?.?\'([0-9\-]+)")
0447 found = re.findall(pattern, event_range)
0448 event_range_id = found[0]
0449 ret = {'id': event_range_id, 'status': 'failed', 'message': message}
0450 return ret
0451 else:
0452 raise Exception("Failed to parse %s" % message)
0453 else:
0454 pattern = re.compile(r"(ERR\_[A-Z\_]+)\ ([0-9\-]+)\:\ ?(.+)")
0455 found = re.findall(pattern, message)
0456 event_range_id = found[0][1]
0457 ret = {'id': event_range_id, 'status': 'failed', 'message': message}
0458 return ret
0459 else:
0460 raise UnknownException("Unknown message %s" % message)
0461 except PilotException as e:
0462 raise e
0463 except Exception as e:
0464 raise UnknownException(e)
0465
0466 def handle_out_message(self, message):
0467 """
0468 Handle output or error messages from payload.
0469 Messages from payload will be parsed and the handle_out_message hook is called.
0470
0471 :param message: The message string received from payload.
0472
0473 :raises: SetupFailure: when handle_out_message_hook is not set.
0474 RunPayloadFailure: when failed to handle an output or error message.
0475 """
0476
0477 logger.debug('handling out message: %s' % message)
0478 if not self.handle_out_message_hook:
0479 raise SetupFailure("handle_out_message_hook is not set")
0480
0481 try:
0482 message_status = self.parse_out_message(message)
0483 logger.debug('parsed out message: %s' % message_status)
0484 logger.debug('calling handle_out_message hook(%s) to handle parsed message.' % self.handle_out_message_hook)
0485 self.handle_out_message_hook(message_status)
0486 except Exception as e:
0487 raise RunPayloadFailure("Failed to handle out message: %s" % e)
0488
0489 def handle_messages(self):
0490 """
0491 Monitor the message queue to get output or error messages from payload and response to different messages.
0492 """
0493
0494 try:
0495 message = self.__message_queue.get(False)
0496 except queue.Empty:
0497 pass
0498 else:
0499 logger.debug('received message from payload: %s' % message)
0500 if "Ready for events" in message:
0501 event_ranges = self.get_event_range_to_payload()
0502 if not event_ranges:
0503 event_ranges = "No more events"
0504 self.send_event_ranges_to_payload(event_ranges)
0505 else:
0506 self.handle_out_message(message)
0507
0508 def poll(self):
0509 """
0510 poll whether the process is still running.
0511
0512 :returns: None: still running.
0513 0: finished successfully.
0514 others: failed.
0515 """
0516 return self.__ret_code
0517
0518 def terminate(self, time_to_wait=1):
0519 """
0520 Terminate running threads and processes.
0521
0522 :param time_to_wait: integer, seconds to wait to force kill the payload process.
0523
0524 :raises: PilotExecption: when a PilotException is caught.
0525 UnknownException: when other unknown exception is caught.
0526 """
0527 logger.info('terminate running threads and processes.')
0528 try:
0529 self.stop()
0530 if self.__process:
0531 if not self.__process.poll() is None:
0532 if self.__process.poll() == 0:
0533 logger.info("payload finished successfully.")
0534 else:
0535 logger.error("payload finished with error code: %s" % self.__process.poll())
0536 else:
0537 for i in range(time_to_wait * 10):
0538 if not self.__process.poll() is None:
0539 break
0540 time.sleep(1)
0541
0542 if not self.__process.poll() is None:
0543 if self.__process.poll() == 0:
0544 logger.info("payload finished successfully.")
0545 else:
0546 logger.error("payload finished with error code: %s" % self.__process.poll())
0547 else:
0548 logger.info('terminating payload process.')
0549 pgid = os.getpgid(self.__process.pid)
0550 logger.info('got process group id for pid %s: %s' % (self.__process.pid, pgid))
0551
0552
0553 logger.info('send SIGTERM to process: %s' % self.__process.pid)
0554 kill_child_processes(self.__process.pid)
0555 self.__ret_code = self.__process.poll()
0556 else:
0557 self.__ret_code = -1
0558 except Exception as e:
0559 logger.error('Exception caught when terminating ESProcess: %s' % e)
0560 self.__ret_code = -1
0561 self.stop()
0562 raise UnknownException(e)
0563
0564 def kill(self):
0565 """
0566 Terminate running threads and processes.
0567
0568 :param time_to_wait: integer, seconds to wait to force kill the payload process.
0569
0570 :raises: PilotException: when a PilotException is caught.
0571 UnknownException: when other unknown exception is caught.
0572 """
0573 logger.info('terminate running threads and processes.')
0574 try:
0575 self.stop()
0576 if self.__process:
0577 if not self.__process.poll() is None:
0578 if self.__process.poll() == 0:
0579 logger.info("payload finished successfully.")
0580 else:
0581 logger.error("payload finished with error code: %s" % self.__process.poll())
0582 else:
0583 logger.info('killing payload process.')
0584 pgid = os.getpgid(self.__process.pid)
0585 logger.info('got process group id for pid %s: %s' % (self.__process.pid, pgid))
0586
0587
0588 logger.info('send SIGKILL to process: %s' % self.__process.pid)
0589 kill_child_processes(self.__process.pid)
0590 except Exception as e:
0591 logger.error('Exception caught when terminating ESProcess: %s' % e)
0592 self.stop()
0593 raise UnknownException(e)
0594
0595 def clean(self):
0596 """
0597 Clean left resources
0598 """
0599 self.terminate()
0600
0601 def run(self):
0602 """
0603 Main run loops: monitor message thread and payload process.
0604 handle messages from payload and response messages with injecting new event ranges or process outputs.
0605
0606 :raises: PilotExecption: when a PilotException is caught.
0607 UnknownException: when other unknown exception is caught.
0608 """
0609
0610 logger.info('start esprocess with thread ident: %s' % (self.ident))
0611 logger.debug('initializing')
0612 self.init()
0613 logger.debug('initialization finished.')
0614
0615 logger.info('starts to main loop')
0616 while self.is_payload_running():
0617 try:
0618 self.monitor()
0619 self.handle_messages()
0620 time.sleep(0.01)
0621 except PilotException as e:
0622 logger.error('PilotException caught in the main loop: %s, %s' % (e.get_detail(), traceback.format_exc()))
0623
0624 self.stop()
0625 except Exception as e:
0626 logger.error('Exception caught in the main loop: %s, %s' % (e, traceback.format_exc()))
0627
0628
0629 self.stop()
0630 break
0631 self.clean()
0632 self.stop_message_thread()
0633 logger.debug('main loop finished')