File indexing completed on 2026-04-09 07:58:32
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 import atexit
0012 import logging
0013 import os
0014 import random
0015 import socket
0016 import sys
0017 import threading
0018 import time
0019 import traceback
0020 import uuid
0021
0022 from queue import Queue
0023
0024 try:
0025 import stomp
0026 with_stomp = True
0027 except Exception as ex:
0028 print(f"Failed to import stomp, with_stomp is False: {ex}")
0029 with_stomp = False
0030
0031 from idds.common.constants import WorkflowType, GracefulEvent
0032 from idds.common.utils import json_dumps, json_loads, setup_logging, get_unique_id_for_dict, timeout_wrapper, is_panda_client_verbose
0033 from .base import Base
0034
0035
0036 setup_logging(__name__)
0037 logging.getLogger("stomp").setLevel(logging.CRITICAL)
0038
0039
0040 class MessagingListener(stomp.ConnectionListener):
0041 '''
0042 Messaging Listener
0043 '''
0044 def __init__(self, broker, output_queue, logger=None):
0045 '''
0046 __init__
0047 '''
0048 self.name = "MessagingListener"
0049 self.__broker = broker
0050 self.__output_queue = output_queue
0051
0052 if logger:
0053 self.logger = logger
0054 else:
0055 self.logger = logging.getLogger(self.__class__.__name__)
0056
0057 def on_error(self, frame):
0058 '''
0059 Error handler
0060 '''
0061 self.logger.error('[broker] [%s]: %s', self.__broker, frame.body)
0062
0063 def on_message(self, frame):
0064 self.logger.debug('[broker] [%s]: headers: %s, body: %s', self.__broker, frame.headers, frame.body)
0065 self.__output_queue.put(json_loads(frame.body))
0066
0067
0068 class MapResult(object):
0069 def __init__(self):
0070 self._name_results = {}
0071 self._results = {}
0072 self._details = {}
0073 self._name_details = {}
0074
0075 def __str__(self):
0076 return str(self._name_results)
0077
0078 def add_result(self, name=None, args=None, key=None, result=None, details=None):
0079 if key is None:
0080 key = get_unique_id_for_dict(args)
0081 name_key = '%s:%s' % (name, key)
0082 else:
0083
0084
0085
0086 name_key = '%s:%s' % (name, key)
0087
0088 self._name_results[name_key] = result
0089 self._results[key] = result
0090 self._details[key] = details
0091 self._name_details[name_key] = details
0092
0093 def has_result(self, name=None, args=None, key=None):
0094 if key is None:
0095 key = get_unique_id_for_dict(args)
0096
0097 name_key = '%s:%s' % (name, key)
0098 if name_key in self._name_results:
0099 return True
0100 else:
0101 if key in self._result:
0102 return True
0103 return False
0104
0105 def get_result(self, name=None, args=None, key=None, verbose=False, with_details=False):
0106 if verbose:
0107 logging.info("get_result: key %s, name: %s, args: %s" % (key, name, args))
0108 logging.info("get_result: results: %s, name_results: %s" % (self._results, self._name_results))
0109 logging.info(f"get_result: details: {self._details}, name_details: {self._name_details}")
0110
0111 if key is None:
0112 key = get_unique_id_for_dict(args)
0113
0114 name_key = '%s:%s' % (name, key)
0115 if name_key in self._name_results:
0116 ret = self._name_results.get(name_key, None)
0117 details = self._name_details.get(name_key, None)
0118 else:
0119 ret = self._results.get(key, None)
0120 details = self._details.get(key, None)
0121 if verbose:
0122 logging.info("get_result: name key %s, args key %s, ret: %s, details: %s" % (name_key, key, ret, details))
0123 if with_details:
0124 return ret, details
0125 return ret
0126
0127 def set_result(self, name=None, args=None, key=None, value=None, verbose=False, details=None):
0128 if verbose:
0129 logging.info("set_result: key %s, name: %s, args: %s, value: %s, details: %s" % (key, name, args, value, details))
0130 logging.info("set_result: results: %s, name_results: %s" % (self._results, self._name_results))
0131 logging.info(f"get_result: details: {self._details}, name_details: {self._name_details}")
0132
0133 if key is None:
0134 key = get_unique_id_for_dict(args)
0135 name_key = '%s:%s' % (name, key)
0136
0137 self._name_results[name_key] = value
0138 self._results[key] = value
0139 self._name_details[name_key] = details
0140 self._details[key] = details
0141
0142 if verbose:
0143 logging.info("set_result: name key %s, args key %s, value: %s, details: %s" % (name_key, key, value, details))
0144
0145 def get_all_results(self):
0146 return self._results
0147
0148 def get_dict_results(self):
0149 return {'results': self._results, 'name_results': self._name_results, 'details': self._details, 'name_details': self._name_details}
0150
0151 def set_from_dict_results(self, results):
0152 self._results = results.get('results', {})
0153 self._name_results = results.get('name_results', {})
0154 self._details = results.get('details', {})
0155 self._name_details = results.get('name_details', {})
0156
0157
0158 class AsyncResult(Base):
0159
0160 def __init__(self, work_context, name=None, wait_num=1, wait_keys=[], multi_jobs_kwargs_list=[], current_job_kwargs=None, map_results=False,
0161 wait_percent=1, internal_id=None, timeout=None):
0162 """
0163 Init a workflow.
0164 """
0165 super(AsyncResult, self).__init__()
0166 if internal_id:
0167 self.internal_id = internal_id
0168 self._work_context = work_context
0169 try:
0170 ret = self._work_context.init_brokers()
0171 if ret:
0172 self._broker_initialized = True
0173 else:
0174 self._broker_initialized = False
0175 except Exception as ex:
0176 logging.warn(f"{self.internal_id} Failed to initialize messaging broker, will use Rest: {ex}")
0177 self._broker_initialized = False
0178
0179 self._name = name
0180 self._queue = Queue()
0181
0182 self._connections = []
0183 self._subscribe_connections = []
0184 self._graceful_stop = False
0185 self._is_stop = False
0186 self._subscribe_thread = None
0187 self._subscribed = False
0188
0189 self._results = []
0190 self._bad_results = []
0191 self._results_percentage = 0
0192 self._map_results = map_results
0193 self.waiting_result_terminated = False
0194
0195 self._metrics = {}
0196
0197 self._wait_num = wait_num
0198 if not self._wait_num:
0199 self._wait_num = 1
0200 self._wait_keys = set(wait_keys)
0201 self._multi_jobs_kwargs_list = multi_jobs_kwargs_list
0202 self._current_job_kwargs = current_job_kwargs
0203
0204 self._wait_percent = wait_percent
0205 self._num_wrong_keys = 0
0206
0207 self._timeout = timeout
0208
0209 self._nologs = False
0210
0211 self._num_stomp_failures = 0
0212 self._max_stomp_failures = 5
0213 try:
0214 max_stomp_failures = os.environ.get("AYNC_RESULT_MAX_STOMP_FAILURES", None)
0215 if max_stomp_failures:
0216 max_stomp_failures = int(max_stomp_failures)
0217 self._max_stomp_failures = max_stomp_failures
0218 except Exception:
0219 pass
0220
0221 self._poll_period = 300
0222 try:
0223 poll_period = os.environ.get("AYNC_RESULT_POLL_PERIOD", None)
0224 if poll_period:
0225 poll_period = int(poll_period)
0226 self._poll_period = poll_period
0227 except Exception:
0228 pass
0229
0230 self._is_messaging_ok = True
0231 self._is_polling_ok = True
0232
0233 @property
0234 def logger(self):
0235 return logging.getLogger(self.__class__.__name__)
0236
0237 @logger.setter
0238 def logger(self, value):
0239 pass
0240
0241 @property
0242 def wait_keys(self):
0243 if len(self._wait_keys) > 0:
0244 self._wait_num = len(self._wait_keys)
0245 return self._wait_keys
0246 if self._multi_jobs_kwargs_list:
0247 for kwargs in self._multi_jobs_kwargs_list:
0248 k = get_unique_id_for_dict(kwargs)
0249 k = "%s:%s" % (self._name, k)
0250 request_id, transform_id, internal_id = self.get_request_id_internal_id()
0251 self.logger.info(f"request_id {request_id} transform_id {transform_id} internal_id {internal_id} args ({kwargs}) to key: {k}")
0252 self._wait_keys.add(k)
0253 self._wait_num = len(self._wait_keys)
0254 return self._wait_keys
0255
0256 @wait_keys.setter
0257 def wait_keys(self, value):
0258 self._wait_keys = set(value)
0259
0260 @property
0261 def is_all_results_available(self):
0262 percent = self.get_results_percentage()
0263 if percent >= self._wait_percent:
0264 return True
0265
0266 @is_all_results_available.setter
0267 def is_all_results_available(self, value):
0268 raise Exception(f"{self.internal_id} Not allowd to set is_all_results_available")
0269
0270 @property
0271 def is_stop(self):
0272 return self._is_stop
0273
0274 @is_stop.setter
0275 def is_stop(self, value):
0276 raise Exception(f"{self.internal_id} Not allowd to set is_stop")
0277
0278 @property
0279 def is_finished(self):
0280 if self._graceful_stop and self._graceful_stop.is_set():
0281 percent = self.get_results_percentage()
0282 if percent >= self._wait_percent:
0283 return True
0284 return False
0285
0286 @is_finished.setter
0287 def is_finished(self, value):
0288 raise Exception(f"{self.internal_id} Not allowd to set is_finished")
0289
0290 @property
0291 def is_subfinished(self):
0292 if self._graceful_stop and self._graceful_stop.is_set():
0293 percent = self.get_results_percentage()
0294 if percent > 0 and percent < self._wait_percent:
0295 return True
0296 return False
0297
0298 @is_subfinished.setter
0299 def is_subfinished(self, value):
0300 raise Exception(f"{self.internal_id} Not allowd to set is_subfinished")
0301
0302 @property
0303 def is_failed(self):
0304 if self._graceful_stop and self._graceful_stop.is_set():
0305 percent = self.get_results_percentage()
0306 if percent <= 0:
0307 return True
0308 return False
0309
0310 @is_failed.setter
0311 def is_failed(self, value):
0312 raise Exception(f"{self.internal_id} Not allowd to set is_failed")
0313
0314 @property
0315 def is_terminated(self):
0316 return self._graceful_stop and self._graceful_stop.is_set()
0317
0318 @is_terminated.setter
0319 def is_terminated(self, value):
0320 raise Exception(f"{self.internal_id} Not allowd to set is_terminated")
0321
0322 @property
0323 def results(self):
0324 has_new_data = False
0325 while not self._queue.empty():
0326 ret = self._queue.get()
0327 has_new_data = True
0328 try:
0329 internal_id = ret['internal_id']
0330 if internal_id == self.internal_id:
0331 self._results.append(ret)
0332 else:
0333 self._bad_results.append(ret)
0334 except Exception as ex:
0335 self.logger.error(f"{self.internal_id} Received bad result: {ret}: {ex}")
0336 if self._bad_results:
0337 self.logger.error(f"{self.internal_id} Received bad results: {self._bad_results}")
0338
0339 if not self._nologs:
0340 self.logger.debug(f"{self.internal_id} _results: {self._results}, bad_results: {self._bad_results}")
0341 self.logger.debug(f"{self.internal_id} wait_keys: {self.wait_keys}, wait_num: {self._wait_num}")
0342
0343 rets_list = []
0344 failure_list = []
0345 for result in self._results:
0346
0347
0348 ret = result['ret']
0349 status, output, error, metrics = ret
0350 if status is True:
0351 rets_list.append(result)
0352 else:
0353 failure_list.append(result)
0354 if not self._nologs:
0355 self.logger.debug(f"{self.internal_id} rets_list: {rets_list}, failure_list: {failure_list}")
0356
0357 if self._map_results:
0358 rets = {}
0359 if len(self.wait_keys) > 0:
0360 for result in rets_list:
0361 name = result['name']
0362 key = result['key']
0363 name_key = f"{name}:{key}"
0364 if name_key in self.wait_keys:
0365 ret = result['ret']
0366 status, output, error, metrics = ret
0367 rets[name_key] = output
0368 self._results_percentage = len(list(rets.keys())) * 1.0 / len(self.wait_keys)
0369 else:
0370 for result in rets_list:
0371 name = result['name']
0372 key = result['key']
0373 name_key = f"{name}:{key}"
0374 ret = result['ret']
0375 status, output, error, metrics = ret
0376
0377 rets[name_key] = output
0378 self._results_percentage = len(list(rets.keys())) * 1.0 / self._wait_num
0379
0380 ret_map = MapResult()
0381 for k in rets_list:
0382 name = result['name']
0383 key = result['key']
0384 name_key = f"{name}:{key}"
0385 ret = result['ret']
0386 status, output, error, metrics = ret
0387 self._metrics[name_key] = metrics
0388
0389 details = {"status": status, "error": error, "metrics": metrics}
0390
0391 ret_map.add_result(name=name, key=key, result=output, details=details)
0392
0393 if has_new_data:
0394 self.logger.debug(f'{self.internal_id} percent {self._results_percentage}, results: {ret_map}')
0395
0396 return ret_map
0397 else:
0398 rets = []
0399 if len(self.wait_keys) > 0:
0400 for result in rets_list:
0401 name = result['name']
0402 key = result['key']
0403 name_key = f"{name}:{key}"
0404 if name_key in self.wait_keys:
0405 ret = result['ret']
0406 status, output, error, metrics = ret
0407 self._metrics[name_key] = metrics
0408 rets.append(output)
0409
0410 self._results_percentage = len(rets) * 1.0 / len(self.wait_keys)
0411 else:
0412 for result in rets_list:
0413 name = result['name']
0414 key = result['key']
0415 name_key = f"{name}:{key}"
0416 ret = result['ret']
0417 status, output, error, metrics = ret
0418 self._metrics[name_key] = metrics
0419
0420 rets.append(output)
0421 self._results_percentage = len(rets) * 1.0 / self._wait_num
0422
0423 if has_new_data:
0424 self.logger.debug(f'{self.internal_id} percent {self._results_percentage}, results: {rets}')
0425
0426 if self._wait_num == 1:
0427 if rets:
0428 return rets[0]
0429 return rets
0430
0431 @results.setter
0432 def results(self, value):
0433 raise Exception(f"{self.internal_id} Not allowed to set results.")
0434 if type(value) not in [list, tuple]:
0435 raise Exception(f"{self.internal_id} Results must be list or tuple, currently it is {value}")
0436 self._results = value
0437
0438 @property
0439 def metrics(self):
0440 return self._metrics
0441
0442 @metrics.setter
0443 def metrics(self, value):
0444 raise Exception(f"{self.internal_id} Not allowed to set metrics.")
0445
0446 def disconnect(self):
0447 for con in self._connections:
0448 try:
0449 if con.is_connected():
0450 con.disconnect()
0451 except Exception:
0452 pass
0453 self._connections = []
0454
0455 def disconnect_subscribe(self):
0456 for con in self._subscribe_connections:
0457 try:
0458 if con.is_connected():
0459 con.disconnect()
0460 except Exception:
0461 pass
0462 self._subscribe_connections = []
0463
0464 def has_connections(self, conns):
0465 if conns:
0466 for con in conns:
0467 try:
0468 if con.is_connected():
0469 return True
0470 except Exception:
0471 pass
0472 return False
0473
0474 def get_connections(self, conns):
0475 if conns:
0476 for con in conns:
0477 try:
0478 if con.is_connected():
0479 return con
0480 except Exception:
0481 pass
0482 return None
0483
0484 def connect_to_messaging_broker(self):
0485 conn = self.get_connections(self._connections)
0486 if conn:
0487 return conn
0488
0489 workflow_context = self._work_context
0490 brokers = workflow_context.brokers
0491
0492 brokers = brokers.split(",")
0493 broker = random.sample(brokers, k=1)[0]
0494
0495 self.logger.info("Got broker: %s" % (broker))
0496
0497 timeout = workflow_context.broker_timeout
0498 self.disconnect()
0499
0500 broker, port = broker.split(":")
0501 conn = stomp.Connection12(host_and_ports=[(broker, port)],
0502 keepalive=True,
0503 heartbeats=(30000, 30000),
0504 timeout=timeout)
0505 conn.connect(workflow_context.broker_username, workflow_context.broker_password, wait=True)
0506 self._connections = [conn]
0507 return conn
0508
0509 def subscribe_to_messaging_brokers(self, force=False):
0510 if self._subscribed and not force and self._subscribe_connections:
0511 return self._subscribe_connections
0512
0513 workflow_context = self._work_context
0514 brokers = workflow_context.brokers
0515 conns = []
0516
0517 broker_addresses = []
0518 if not brokers:
0519 raise Exception(f"brokers <{brokers}> not defined")
0520 else:
0521 for b in brokers.split(","):
0522 try:
0523 b, port = b.split(":")
0524
0525 addrinfos = socket.getaddrinfo(b, 0, socket.AF_INET, 0, socket.IPPROTO_TCP)
0526 for addrinfo in addrinfos:
0527 b_addr = addrinfo[4][0]
0528 broker_addresses.append((b_addr, port))
0529 except socket.gaierror as error:
0530 self.logger.error(f'{self.internal_id} Cannot resolve hostname {b}: {error}')
0531
0532
0533 self.logger.info(f"{self.internal_id} Resolved broker addresses: {broker_addresses}")
0534
0535 timeout = workflow_context.broker_timeout
0536
0537 self.disconnect_subscribe()
0538
0539 listener = MessagingListener(brokers, self._queue, logger=self.logger)
0540 conns = []
0541 for broker, port in broker_addresses:
0542 conn = stomp.Connection12(host_and_ports=[(broker, port)],
0543 keepalive=True,
0544 heartbeats=(30000, 30000),
0545 timeout=timeout)
0546 conn.set_listener("messag-subscriber", listener)
0547 conn.connect(workflow_context.broker_username, workflow_context.broker_password, wait=True)
0548 if workflow_context.workflow_type in [WorkflowType.iWorkflow, WorkflowType.iWorkflowLocal]:
0549 subscribe_id = 'idds-workflow_%s' % self.internal_id
0550
0551
0552 subscribe_selector = {'selector': "internal_id = '%s'" % self.internal_id}
0553 elif workflow_context.workflow_type == WorkflowType.iWork:
0554 subscribe_id = 'idds-work_%s' % self.internal_id
0555
0556
0557
0558 subscribe_selector = {'selector': "internal_id = '%s'" % self.internal_id}
0559 else:
0560 subscribe_id = 'idds-workflow_%s' % self.internal_id
0561 subscribe_selector = None
0562
0563
0564
0565 conn.subscribe(destination=workflow_context.broker_destination, id=subscribe_id,
0566 ack='auto', headers=subscribe_selector)
0567 self.logger.info(f"{self.internal_id} subscribe to {broker}:{port} with selector: {subscribe_selector}")
0568 conns.append(conn)
0569 self._subscribe_connections = conns
0570 return conns
0571
0572 def get_message(self, ret, name=None, key=None):
0573 message = {}
0574 workflow_context = self._work_context
0575 if key is None:
0576 if self._current_job_kwargs:
0577 key = get_unique_id_for_dict(self._current_job_kwargs)
0578 if name is None:
0579 name = self._name
0580
0581 self.logger.info(f"{self.internal_id} publish args ({self._current_job_kwargs}) to key: {key}")
0582
0583 if workflow_context.workflow_type in [WorkflowType.iWorkflow, WorkflowType.iWorkflowLocal]:
0584 headers = {'persistent': 'true',
0585 'channel': 'asyncresult',
0586 'type': 'iworkflow',
0587 'internal_id': str(self.internal_id),
0588 'request_id': workflow_context.request_id}
0589 body = {'ret': ret, 'name': name, 'key': key, 'internal_id': self.internal_id, 'type': 'iworkflow',
0590 'request_id': workflow_context.request_id}
0591 message = {"headers": headers, "body": body}
0592 elif workflow_context.workflow_type == WorkflowType.iWork:
0593 headers = {'persistent': 'true',
0594 'channel': 'asyncresult',
0595 'type': 'iwork',
0596 'internal_id': str(self.internal_id),
0597 'request_id': workflow_context.request_id,
0598 'transform_id': workflow_context.transform_id}
0599 body = {'ret': ret, 'name': name, 'key': key, 'internal_id': self.internal_id, 'type': 'iwork',
0600 'request_id': workflow_context.request_id,
0601 'transform_id': workflow_context.transform_id}
0602 message = {"headers": headers, "body": body}
0603 return message
0604
0605 def publish_message(self, ret, name=None, key=None):
0606 message = self.get_message(ret=ret, name=name, key=key)
0607 headers = message['headers']
0608 body = message['body']
0609 conn = self.connect_to_messaging_broker()
0610 workflow_context = self._work_context
0611 if workflow_context.workflow_type in [WorkflowType.iWorkflow, WorkflowType.iWorkflowLocal]:
0612 conn.send(body=json_dumps(body),
0613 destination=workflow_context.broker_destination,
0614 id='idds-iworkflow_%s' % self.internal_id,
0615 ack='auto',
0616 headers=headers
0617 )
0618 self.logger.info(f"{self.internal_id} published header: {headers}, body: {body}")
0619 elif workflow_context.workflow_type == WorkflowType.iWork:
0620 conn.send(body=json_dumps(body),
0621 destination=workflow_context.broker_destination,
0622 id='idds-iwork_%s' % self.internal_id,
0623 ack='auto',
0624 headers=headers
0625 )
0626 self.logger.info(f"{self.internal_id} published header: {headers}, body: {body}")
0627
0628
0629 def get_request_id_internal_id(self):
0630 workflow_context = self._work_context
0631 request_id, transform_id, internal_id = None, None, None
0632 if workflow_context.workflow_type in [WorkflowType.iWorkflow, WorkflowType.iWorkflowLocal]:
0633 request_id = workflow_context.request_id
0634 transform_id = 0
0635
0636 elif workflow_context.workflow_type == WorkflowType.iWork:
0637 request_id = workflow_context.request_id
0638 transform_id = workflow_context.transform_id
0639
0640 else:
0641 request_id = workflow_context.request_id
0642 transform_id = 0
0643
0644 internal_id = self.internal_id
0645 return request_id, transform_id, internal_id
0646
0647 def publish_through_panda_server(self, request_id, transform_id, internal_id, message):
0648 import idds.common.utils as idds_utils
0649 import pandaclient.idds_api as idds_api
0650 idds_server = self._work_context.get_idds_server()
0651
0652 client = idds_api.get_api(idds_utils.json_dumps,
0653 idds_host=idds_server,
0654 compress=True,
0655 verbose=is_panda_client_verbose(),
0656 manager=True)
0657 status, ret = client.send_messages(request_id=request_id, transform_id=transform_id, internal_id=internal_id, msgs=[message])
0658 if status == 0 and type(ret) in (list, tuple) and len(ret) > 1 and ret[0] is True and type(ret[1]) in (list, tuple) and ret[1][0] is True:
0659 self.logger.info(f"{self.internal_id} published message through panda server: {message}")
0660 else:
0661 self.logger.error(f"{self.internal_id} failed to publish message through panda server, status: {status}, ret: {ret}")
0662
0663 def publish_through_idds_server(self, request_id, transform_id, internal_id, message):
0664 from idds.client.clientmanager import ClientManager
0665 client = ClientManager(host=self._work_context.get_idds_server(), timeout=60)
0666 status, ret = client.send_messages(request_id=request_id, transform_id=transform_id, internal_id=internal_id, msgs=[message])
0667 if status:
0668 self.logger.info(f"{self.internal_id} published message through idds server: {message}")
0669 else:
0670 self.logger.error(f"{self.internal_id} failed to publish message through idds server, status: {status}, ret: {ret}")
0671
0672 def publish_through_api(self, ret, name=None, key=None, force=False):
0673 message = self.get_message(ret=ret, name=name, key=key)
0674
0675
0676 message['msg_type'] = 'async_result'
0677
0678 try:
0679 request_id, transform_id, internal_id = self.get_request_id_internal_id()
0680 if request_id is None:
0681 if force:
0682 request_id = 0
0683 else:
0684 self.logger.warn(f"{self.internal_id} Not to publish message through API since the request id is None")
0685 return
0686
0687 if self._work_context.service == 'panda':
0688 self.publish_through_panda_server(request_id, transform_id, internal_id, message)
0689 else:
0690 self.publish_through_idds_server(request_id, transform_id, internal_id, message)
0691 except Exception as ex:
0692 self.logger.error(f"{self.internal_id} Failed to publish message through API: {ex}")
0693
0694 @timeout_wrapper(timeout=90)
0695 def publish(self, ret, name=None, key=None, force=False, ret_status=True, ret_error=None, metrics=None):
0696 stomp_failed = False
0697 if with_stomp and self._broker_initialized:
0698 try:
0699 self.logger.info(f"{self.internal_id} publishing results through messaging brokers")
0700 self.publish_message(ret=(ret_status, ret, ret_error, metrics), name=name, key=key)
0701 self.logger.info(f"{self.internal_id} finished to publish results through messaging brokers")
0702 except Exception as ex:
0703 self.logger.warn(f"{self.internal_id} Failed to publish result through messaging brokers: {ex}")
0704 stomp_failed = True
0705
0706 if not with_stomp or not self._broker_initialized or stomp_failed:
0707 self.logger.info(f"{self.internal_id} publishing results through http API")
0708 self.publish_through_api(ret=(ret_status, ret, ret_error, metrics), name=name, key=key, force=force)
0709 self.logger.info(f"{self.internal_id} finished to publish results through http API")
0710
0711 def poll_messages_through_panda_server(self, request_id, transform_id, internal_id):
0712 if request_id is None:
0713 self.logger.warn(f"{self.internal_id} Not to poll message through panda server, since the request_id is None")
0714 return []
0715
0716 import idds.common.utils as idds_utils
0717 import pandaclient.idds_api as idds_api
0718 idds_server = self._work_context.get_idds_server()
0719
0720 client = idds_api.get_api(idds_utils.json_dumps,
0721 idds_host=idds_server,
0722 compress=True,
0723 verbose=is_panda_client_verbose(),
0724 manager=True)
0725 status, ret = client.get_messages(request_id=request_id, transform_id=transform_id, internal_id=internal_id)
0726 if status == 0 and type(ret) in (list, tuple) and len(ret) > 1 and ret[0] is True and type(ret[1]) in (list, tuple) and ret[1][0] is True:
0727 self.logger.info(f"{self.internal_id} poll message through panda server, ret: {ret}")
0728 messages = ret[1][1]
0729 self.logger.info(f"{self.internal_id} poll message through panda server, number of messages: {len(messages)}")
0730 return messages
0731 else:
0732 self.logger.error(f"{self.internal_id} failed to poll messages through panda server, status: {status}, ret: {ret}")
0733 return []
0734
0735 def poll_messages_through_idds_server(self, request_id, transform_id, internal_id):
0736 if request_id is None:
0737 self.logger.warn(f"{self.internal_id} Not to poll message through idds server, since the request_id is None")
0738 return []
0739
0740 from idds.client.clientmanager import ClientManager
0741 client = ClientManager(host=self._work_context.get_idds_server(), timeout=60)
0742 status, messages = client.get_messages(request_id=request_id, transform_id=transform_id, internal_id=internal_id)
0743 if status:
0744 self.logger.info(f"{self.internal_id} poll message through panda server, ret: {messages}")
0745 self.logger.info(f"{self.internal_id} poll message through idds server, number of messages: {len(messages)}")
0746 return messages
0747 else:
0748 self.logger.error(f"{self.internal_id} failed to poll messages through idds server, error: {messages}")
0749 return []
0750
0751 @timeout_wrapper(timeout=90)
0752 def poll_messages(self, force=False):
0753 try:
0754 request_id, transform_id, internal_id = self.get_request_id_internal_id()
0755 if request_id is None:
0756 if force:
0757 request_id = 0
0758 else:
0759 self.logger.warn(f"{self.internal_id} Not to poll message, since the request_id is None")
0760 return
0761
0762 if self._work_context.service == 'panda':
0763 messages = self.poll_messages_through_panda_server(request_id=request_id, transform_id=transform_id, internal_id=internal_id)
0764 else:
0765 messages = self.poll_messages_through_idds_server(request_id=request_id, transform_id=transform_id, internal_id=internal_id)
0766
0767 for message in messages:
0768 body = message['body']
0769 self._queue.put(body)
0770 except Exception as ex:
0771 self.logger.error(f"{self.internal_id} Failed to poll message: {ex}")
0772
0773 def run_subscriber(self, force=False):
0774 try:
0775 self.logger.info(f"{self.internal_id} run subscriber")
0776 if with_stomp and self._broker_initialized and self._num_stomp_failures < self._max_stomp_failures:
0777 try:
0778 self.subscribe_to_messaging_brokers(force=True)
0779 except Exception as ex:
0780 self.logger.warn(f"{self.internal_id} run subscriber fails to subscribe to message broker: {ex}")
0781 self._num_stomp_failures += 1
0782 self._is_messaging_ok = False
0783 self._broker_initialized = False
0784
0785 time_poll = None
0786 time_start = time.time()
0787 while self._graceful_stop and not self._graceful_stop.is_set():
0788 if with_stomp and self._broker_initialized and self._is_messaging_ok and self._num_stomp_failures < self._max_stomp_failures:
0789 has_failed_conns = False
0790 for conn in self._subscribe_connections:
0791 if not conn.is_connected():
0792 has_failed_conns = True
0793 if has_failed_conns:
0794 try:
0795 self.subscribe_to_messaging_brokers(force=True)
0796 except Exception as ex:
0797 self.logger.warn(f"{self.internal_id} run subscriber fails to subscribe to message broker: {ex}")
0798 self._num_stomp_failures += 1
0799 self._is_messaging_ok = False
0800 self._broker_initialized = False
0801 time.sleep(1)
0802 else:
0803 if self._timeout:
0804 sleep_time = min(self._timeout / 3, self._poll_period)
0805 else:
0806 sleep_time = self._poll_period
0807
0808 if time_poll is None or time.time() - time_poll > sleep_time:
0809 try:
0810 self.poll_messages(force=force)
0811 except Exception as ex:
0812 self.logger.info(f"{self.internal_id} run subscriber fails to poll messages: {ex}")
0813 time_poll = time.time()
0814 time.sleep(1)
0815
0816 if self._timeout and time.time() - time_start > self._timeout:
0817 self.logger.info(f"{self.internal_id} timeout reached")
0818 break
0819
0820 if self._graceful_stop and self._graceful_stop.is_set():
0821 self.logger.info(f"{self.internal_id} graceful stop is set")
0822
0823 try:
0824 if not sys.is_finalizing():
0825 self.poll_messages(force=force)
0826 except Exception as ex:
0827 self.logger.info(f"{self.internal_id} run subscriber fails to poll messages: {ex}")
0828
0829 self._is_stop = True
0830 self.stop()
0831 self.logger.info(f"{self.internal_id} subscriber finished.")
0832 except Exception as ex:
0833 self.logger.error(f"{self.internal_id} run subscriber failed with error: {ex}")
0834 self.logger.error(traceback.format_exc())
0835 self._is_stop = True
0836 self.stop()
0837
0838 def get_results(self, nologs=True):
0839 old_nologs = self._nologs
0840 self._nologs = nologs
0841 rets = self.results
0842 if not self._nologs:
0843 self.logger.debug(f'{self.internal_id} percent {self.get_results_percentage()}, results: {rets}')
0844
0845 percent = self.get_results_percentage()
0846 if percent >= self._wait_percent:
0847 self.stop()
0848 self.logger.info(f"{self.internal_id} Got results: {percent} (number of wrong keys: {self._num_wrong_keys})")
0849 self._nologs = old_nologs
0850 return rets
0851
0852 def get_results_percentage(self):
0853 return self._results_percentage
0854
0855 def subscribe(self, force=False):
0856 if not self._subscribed:
0857 atexit.register(self.stop)
0858
0859 self._graceful_stop = GracefulEvent()
0860 thread = threading.Thread(target=self.run_subscriber, kwargs={'force': force}, name="RunSubscriber")
0861 thread.start()
0862 time.sleep(1)
0863 self._subscribed = True
0864 self._subscribe_thread = thread
0865 self._is_stop = False
0866
0867 def stop(self):
0868 if self._graceful_stop:
0869 self._graceful_stop.set()
0870 self.disconnect()
0871 self._subscribed = False
0872 self._subscribe_thread = None
0873 time_start = time.time()
0874 while not self._is_stop and time.time() - time_start < 60:
0875 time.sleep(1)
0876 self._is_stop = True
0877
0878 def __del__(self):
0879
0880 pass
0881
0882 def wait_results(self, timeout=None, force_return_results=False):
0883 self.subscribe()
0884
0885 get_results = False
0886 time_log = time.time()
0887 time_start = time.time()
0888 if timeout is None:
0889 self.logger.info(f"{self.internal_id} waiting for results")
0890 try:
0891 while not get_results and self._graceful_stop and not self._graceful_stop.is_set():
0892 self.get_results(nologs=True)
0893 percent = self.get_results_percentage()
0894 if time.time() - time_log > 600:
0895 self.logger.info(f"{self.internal_id} waiting for results: {percent} (number of wrong keys: {self._num_wrong_keys})")
0896 time_log = time.time()
0897 time.sleep(1)
0898 if self.is_all_results_available:
0899 get_results = True
0900 self.waiting_result_terminated = True
0901 self.logger.info(f"{self.internal_id} Got result percentage {percent} is not smaller then wait_percent {self._wait_percent}, set waiting_result_terminated to True")
0902 if self._timeout is not None and self._timeout > 0 and time.time() - time_start > self._timeout:
0903
0904 self.logger.info(f"{self.internal_id} Waiting result timeout({self._timeout} seconds), set waiting_result_terminated to True")
0905 get_results = True
0906 self.waiting_result_terminated = True
0907 if timeout is not None and timeout > 0 and time.time() - time_start > timeout:
0908
0909 self.logger.info(f"{self.internal_id} timeout reached")
0910 break
0911
0912 percent = self.get_results_percentage()
0913 if timeout is None or time.time() - time_start > 600:
0914 self.logger.info(f"{self.internal_id} Got results: {percent} (number of wrong keys: {self._num_wrong_keys})")
0915 except Exception as ex:
0916 self.logger.error(f"Wait_results got some exception: {ex}")
0917 self.logger.error(traceback.format_exc())
0918 self._graceful_stop.set()
0919
0920 if get_results or self._graceful_stop.is_set() or self.is_all_results_available or force_return_results:
0921
0922 self._graceful_stop.set()
0923
0924 time.sleep(2)
0925 percent = self.get_results_percentage()
0926 self.logger.info(f"{self.internal_id} Got results: {percent} (number of wrong keys: {self._num_wrong_keys})")
0927
0928 results = self.results
0929 return results
0930 return None
0931
0932 def wait_result(self, timeout=None, force_return_results=False):
0933 self.wait_results(timeout=timeout, force_return_results=force_return_results)
0934 results = self.results
0935 return results
0936
0937 def is_ok(self):
0938 try:
0939 self.subscribe(force=True)
0940 test_id = str(uuid.uuid4())
0941 self.publish(test_id, force=True)
0942 ret = self.wait_result(force_return_results=True)
0943 self.logger.info(f"{self.internal_id} AsyncResult: publish: {test_id}, received: {ret}")
0944 if test_id == ret:
0945 self.logger.info(f"{self.internal_id} AsyncResult is ok")
0946 return True
0947 else:
0948 self.logger.info(f"{self.internal_id} AsyncResult is not ok")
0949 return False
0950 except Exception as ex:
0951 self.logger.error(f"{self.internal_id} AsyncResult is not ok: {ex}")
0952 return False