Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:32

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2023 - 2024
0010 
0011 import atexit
0012 import logging
0013 import os
0014 import random
0015 import socket
0016 import sys
0017 import threading
0018 import time
0019 import traceback
0020 import uuid
0021 
0022 from queue import Queue
0023 
0024 try:
0025     import stomp
0026     with_stomp = True
0027 except Exception as ex:
0028     print(f"Failed to import stomp, with_stomp is False: {ex}")
0029     with_stomp = False
0030 
0031 from idds.common.constants import WorkflowType, GracefulEvent
0032 from idds.common.utils import json_dumps, json_loads, setup_logging, get_unique_id_for_dict, timeout_wrapper, is_panda_client_verbose
0033 from .base import Base
0034 
0035 
0036 setup_logging(__name__)
0037 logging.getLogger("stomp").setLevel(logging.CRITICAL)
0038 
0039 
0040 class MessagingListener(stomp.ConnectionListener):
0041     '''
0042     Messaging Listener
0043     '''
0044     def __init__(self, broker, output_queue, logger=None):
0045         '''
0046         __init__
0047         '''
0048         self.name = "MessagingListener"
0049         self.__broker = broker
0050         self.__output_queue = output_queue
0051         # self.logger = logging.getLogger(self.__class__.__name__)
0052         if logger:
0053             self.logger = logger
0054         else:
0055             self.logger = logging.getLogger(self.__class__.__name__)
0056 
0057     def on_error(self, frame):
0058         '''
0059         Error handler
0060         '''
0061         self.logger.error('[broker] [%s]: %s', self.__broker, frame.body)
0062 
0063     def on_message(self, frame):
0064         self.logger.debug('[broker] [%s]: headers: %s, body: %s', self.__broker, frame.headers, frame.body)
0065         self.__output_queue.put(json_loads(frame.body))
0066 
0067 
0068 class MapResult(object):
0069     def __init__(self):
0070         self._name_results = {}
0071         self._results = {}
0072         self._details = {}
0073         self._name_details = {}
0074 
0075     def __str__(self):
0076         return str(self._name_results)
0077 
0078     def add_result(self, name=None, args=None, key=None, result=None, details=None):
0079         if key is None:
0080             key = get_unique_id_for_dict(args)
0081             name_key = '%s:%s' % (name, key)
0082         else:
0083             # name_key = key
0084             # name = ':'.join(name_key.split(":")[:-1])
0085             # key = name_key.split(":")[-1]
0086             name_key = '%s:%s' % (name, key)
0087 
0088         self._name_results[name_key] = result
0089         self._results[key] = result
0090         self._details[key] = details   # {"status": status, "error": error, "metris": metrics}
0091         self._name_details[name_key] = details
0092 
0093     def has_result(self, name=None, args=None, key=None):
0094         if key is None:
0095             key = get_unique_id_for_dict(args)
0096 
0097         name_key = '%s:%s' % (name, key)
0098         if name_key in self._name_results:
0099             return True
0100         else:
0101             if key in self._result:
0102                 return True
0103         return False
0104 
0105     def get_result(self, name=None, args=None, key=None, verbose=False, with_details=False):
0106         if verbose:
0107             logging.info("get_result: key %s, name: %s, args: %s" % (key, name, args))
0108             logging.info("get_result: results: %s, name_results: %s" % (self._results, self._name_results))
0109             logging.info(f"get_result: details: {self._details}, name_details: {self._name_details}")
0110 
0111         if key is None:
0112             key = get_unique_id_for_dict(args)
0113 
0114         name_key = '%s:%s' % (name, key)
0115         if name_key in self._name_results:
0116             ret = self._name_results.get(name_key, None)
0117             details = self._name_details.get(name_key, None)
0118         else:
0119             ret = self._results.get(key, None)
0120             details = self._details.get(key, None)
0121         if verbose:
0122             logging.info("get_result: name key %s, args key %s, ret: %s, details: %s" % (name_key, key, ret, details))
0123         if with_details:
0124             return ret, details
0125         return ret
0126 
0127     def set_result(self, name=None, args=None, key=None, value=None, verbose=False, details=None):
0128         if verbose:
0129             logging.info("set_result: key %s, name: %s, args: %s, value: %s, details: %s" % (key, name, args, value, details))
0130             logging.info("set_result: results: %s, name_results: %s" % (self._results, self._name_results))
0131             logging.info(f"get_result: details: {self._details}, name_details: {self._name_details}")
0132 
0133         if key is None:
0134             key = get_unique_id_for_dict(args)
0135         name_key = '%s:%s' % (name, key)
0136 
0137         self._name_results[name_key] = value
0138         self._results[key] = value
0139         self._name_details[name_key] = details
0140         self._details[key] = details
0141 
0142         if verbose:
0143             logging.info("set_result: name key %s, args key %s, value: %s, details: %s" % (name_key, key, value, details))
0144 
0145     def get_all_results(self):
0146         return self._results
0147 
0148     def get_dict_results(self):
0149         return {'results': self._results, 'name_results': self._name_results, 'details': self._details, 'name_details': self._name_details}
0150 
0151     def set_from_dict_results(self, results):
0152         self._results = results.get('results', {})
0153         self._name_results = results.get('name_results', {})
0154         self._details = results.get('details', {})
0155         self._name_details = results.get('name_details', {})
0156 
0157 
0158 class AsyncResult(Base):
0159 
0160     def __init__(self, work_context, name=None, wait_num=1, wait_keys=[], multi_jobs_kwargs_list=[], current_job_kwargs=None, map_results=False,
0161                  wait_percent=1, internal_id=None, timeout=None):
0162         """
0163         Init a workflow.
0164         """
0165         super(AsyncResult, self).__init__()
0166         if internal_id:
0167             self.internal_id = internal_id
0168         self._work_context = work_context
0169         try:
0170             ret = self._work_context.init_brokers()
0171             if ret:
0172                 self._broker_initialized = True
0173             else:
0174                 self._broker_initialized = False
0175         except Exception as ex:
0176             logging.warn(f"{self.internal_id} Failed to initialize messaging broker, will use Rest: {ex}")
0177             self._broker_initialized = False
0178 
0179         self._name = name
0180         self._queue = Queue()
0181 
0182         self._connections = []
0183         self._subscribe_connections = []
0184         self._graceful_stop = False
0185         self._is_stop = False
0186         self._subscribe_thread = None
0187         self._subscribed = False
0188 
0189         self._results = []
0190         self._bad_results = []
0191         self._results_percentage = 0
0192         self._map_results = map_results
0193         self.waiting_result_terminated = False
0194 
0195         self._metrics = {}
0196 
0197         self._wait_num = wait_num
0198         if not self._wait_num:
0199             self._wait_num = 1
0200         self._wait_keys = set(wait_keys)
0201         self._multi_jobs_kwargs_list = multi_jobs_kwargs_list
0202         self._current_job_kwargs = current_job_kwargs
0203 
0204         self._wait_percent = wait_percent
0205         self._num_wrong_keys = 0
0206 
0207         self._timeout = timeout
0208 
0209         self._nologs = False
0210 
0211         self._num_stomp_failures = 0
0212         self._max_stomp_failures = 5
0213         try:
0214             max_stomp_failures = os.environ.get("AYNC_RESULT_MAX_STOMP_FAILURES", None)
0215             if max_stomp_failures:
0216                 max_stomp_failures = int(max_stomp_failures)
0217                 self._max_stomp_failures = max_stomp_failures
0218         except Exception:
0219             pass
0220 
0221         self._poll_period = 300
0222         try:
0223             poll_period = os.environ.get("AYNC_RESULT_POLL_PERIOD", None)
0224             if poll_period:
0225                 poll_period = int(poll_period)
0226                 self._poll_period = poll_period
0227         except Exception:
0228             pass
0229 
0230         self._is_messaging_ok = True
0231         self._is_polling_ok = True
0232 
0233     @property
0234     def logger(self):
0235         return logging.getLogger(self.__class__.__name__)
0236 
0237     @logger.setter
0238     def logger(self, value):
0239         pass
0240 
0241     @property
0242     def wait_keys(self):
0243         if len(self._wait_keys) > 0:
0244             self._wait_num = len(self._wait_keys)
0245             return self._wait_keys
0246         if self._multi_jobs_kwargs_list:
0247             for kwargs in self._multi_jobs_kwargs_list:
0248                 k = get_unique_id_for_dict(kwargs)
0249                 k = "%s:%s" % (self._name, k)
0250                 request_id, transform_id, internal_id = self.get_request_id_internal_id()
0251                 self.logger.info(f"request_id {request_id} transform_id {transform_id} internal_id {internal_id} args ({kwargs}) to key: {k}")
0252                 self._wait_keys.add(k)
0253             self._wait_num = len(self._wait_keys)
0254         return self._wait_keys
0255 
0256     @wait_keys.setter
0257     def wait_keys(self, value):
0258         self._wait_keys = set(value)
0259 
0260     @property
0261     def is_all_results_available(self):
0262         percent = self.get_results_percentage()
0263         if percent >= self._wait_percent:
0264             return True
0265 
0266     @is_all_results_available.setter
0267     def is_all_results_available(self, value):
0268         raise Exception(f"{self.internal_id} Not allowd to set is_all_results_available")
0269 
0270     @property
0271     def is_stop(self):
0272         return self._is_stop
0273 
0274     @is_stop.setter
0275     def is_stop(self, value):
0276         raise Exception(f"{self.internal_id} Not allowd to set is_stop")
0277 
0278     @property
0279     def is_finished(self):
0280         if self._graceful_stop and self._graceful_stop.is_set():
0281             percent = self.get_results_percentage()
0282             if percent >= self._wait_percent:
0283                 return True
0284         return False
0285 
0286     @is_finished.setter
0287     def is_finished(self, value):
0288         raise Exception(f"{self.internal_id} Not allowd to set is_finished")
0289 
0290     @property
0291     def is_subfinished(self):
0292         if self._graceful_stop and self._graceful_stop.is_set():
0293             percent = self.get_results_percentage()
0294             if percent > 0 and percent < self._wait_percent:
0295                 return True
0296         return False
0297 
0298     @is_subfinished.setter
0299     def is_subfinished(self, value):
0300         raise Exception(f"{self.internal_id} Not allowd to set is_subfinished")
0301 
0302     @property
0303     def is_failed(self):
0304         if self._graceful_stop and self._graceful_stop.is_set():
0305             percent = self.get_results_percentage()
0306             if percent <= 0:
0307                 return True
0308         return False
0309 
0310     @is_failed.setter
0311     def is_failed(self, value):
0312         raise Exception(f"{self.internal_id} Not allowd to set is_failed")
0313 
0314     @property
0315     def is_terminated(self):
0316         return self._graceful_stop and self._graceful_stop.is_set()
0317 
0318     @is_terminated.setter
0319     def is_terminated(self, value):
0320         raise Exception(f"{self.internal_id} Not allowd to set is_terminated")
0321 
0322     @property
0323     def results(self):
0324         has_new_data = False
0325         while not self._queue.empty():
0326             ret = self._queue.get()
0327             has_new_data = True
0328             try:
0329                 internal_id = ret['internal_id']
0330                 if internal_id == self.internal_id:
0331                     self._results.append(ret)
0332                 else:
0333                     self._bad_results.append(ret)
0334             except Exception as ex:
0335                 self.logger.error(f"{self.internal_id} Received bad result: {ret}: {ex}")
0336         if self._bad_results:
0337             self.logger.error(f"{self.internal_id} Received bad results: {self._bad_results}")
0338 
0339         if not self._nologs:
0340             self.logger.debug(f"{self.internal_id} _results: {self._results}, bad_results: {self._bad_results}")
0341             self.logger.debug(f"{self.internal_id} wait_keys: {self.wait_keys}, wait_num: {self._wait_num}")
0342 
0343         rets_list = []
0344         failure_list = []
0345         for result in self._results:
0346             # name = result['name']
0347             # key = result['key']
0348             ret = result['ret']
0349             status, output, error, metrics = ret
0350             if status is True:
0351                 rets_list.append(result)
0352             else:
0353                 failure_list.append(result)
0354         if not self._nologs:
0355             self.logger.debug(f"{self.internal_id} rets_list: {rets_list}, failure_list: {failure_list}")
0356 
0357         if self._map_results:
0358             rets = {}
0359             if len(self.wait_keys) > 0:
0360                 for result in rets_list:
0361                     name = result['name']
0362                     key = result['key']
0363                     name_key = f"{name}:{key}"
0364                     if name_key in self.wait_keys:
0365                         ret = result['ret']
0366                         status, output, error, metrics = ret
0367                         rets[name_key] = output
0368                 self._results_percentage = len(list(rets.keys())) * 1.0 / len(self.wait_keys)
0369             else:
0370                 for result in rets_list:
0371                     name = result['name']
0372                     key = result['key']
0373                     name_key = f"{name}:{key}"
0374                     ret = result['ret']
0375                     status, output, error, metrics = ret
0376 
0377                     rets[name_key] = output
0378                 self._results_percentage = len(list(rets.keys())) * 1.0 / self._wait_num
0379 
0380             ret_map = MapResult()
0381             for k in rets_list:
0382                 name = result['name']
0383                 key = result['key']
0384                 name_key = f"{name}:{key}"
0385                 ret = result['ret']
0386                 status, output, error, metrics = ret
0387                 self._metrics[name_key] = metrics
0388 
0389                 details = {"status": status, "error": error, "metrics": metrics}
0390 
0391                 ret_map.add_result(name=name, key=key, result=output, details=details)
0392 
0393             if has_new_data:
0394                 self.logger.debug(f'{self.internal_id} percent {self._results_percentage}, results: {ret_map}')
0395 
0396             return ret_map
0397         else:
0398             rets = []
0399             if len(self.wait_keys) > 0:
0400                 for result in rets_list:
0401                     name = result['name']
0402                     key = result['key']
0403                     name_key = f"{name}:{key}"
0404                     if name_key in self.wait_keys:
0405                         ret = result['ret']
0406                         status, output, error, metrics = ret
0407                         self._metrics[name_key] = metrics
0408                         rets.append(output)
0409 
0410                 self._results_percentage = len(rets) * 1.0 / len(self.wait_keys)
0411             else:
0412                 for result in rets_list:
0413                     name = result['name']
0414                     key = result['key']
0415                     name_key = f"{name}:{key}"
0416                     ret = result['ret']
0417                     status, output, error, metrics = ret
0418                     self._metrics[name_key] = metrics
0419 
0420                     rets.append(output)
0421                 self._results_percentage = len(rets) * 1.0 / self._wait_num
0422 
0423             if has_new_data:
0424                 self.logger.debug(f'{self.internal_id} percent {self._results_percentage}, results: {rets}')
0425 
0426             if self._wait_num == 1:
0427                 if rets:
0428                     return rets[0]
0429             return rets
0430 
0431     @results.setter
0432     def results(self, value):
0433         raise Exception(f"{self.internal_id} Not allowed to set results.")
0434         if type(value) not in [list, tuple]:
0435             raise Exception(f"{self.internal_id} Results must be list or tuple, currently it is {value}")
0436         self._results = value
0437 
0438     @property
0439     def metrics(self):
0440         return self._metrics
0441 
0442     @metrics.setter
0443     def metrics(self, value):
0444         raise Exception(f"{self.internal_id} Not allowed to set metrics.")
0445 
0446     def disconnect(self):
0447         for con in self._connections:
0448             try:
0449                 if con.is_connected():
0450                     con.disconnect()
0451             except Exception:
0452                 pass
0453         self._connections = []
0454 
0455     def disconnect_subscribe(self):
0456         for con in self._subscribe_connections:
0457             try:
0458                 if con.is_connected():
0459                     con.disconnect()
0460             except Exception:
0461                 pass
0462         self._subscribe_connections = []
0463 
0464     def has_connections(self, conns):
0465         if conns:
0466             for con in conns:
0467                 try:
0468                     if con.is_connected():
0469                         return True
0470                 except Exception:
0471                     pass
0472         return False
0473 
0474     def get_connections(self, conns):
0475         if conns:
0476             for con in conns:
0477                 try:
0478                     if con.is_connected():
0479                         return con
0480                 except Exception:
0481                     pass
0482         return None
0483 
0484     def connect_to_messaging_broker(self):
0485         conn = self.get_connections(self._connections)
0486         if conn:
0487             return conn
0488 
0489         workflow_context = self._work_context
0490         brokers = workflow_context.brokers
0491 
0492         brokers = brokers.split(",")
0493         broker = random.sample(brokers, k=1)[0]
0494 
0495         self.logger.info("Got broker: %s" % (broker))
0496 
0497         timeout = workflow_context.broker_timeout
0498         self.disconnect()
0499 
0500         broker, port = broker.split(":")
0501         conn = stomp.Connection12(host_and_ports=[(broker, port)],
0502                                   keepalive=True,
0503                                   heartbeats=(30000, 30000),     # half minute = num / 1000
0504                                   timeout=timeout)
0505         conn.connect(workflow_context.broker_username, workflow_context.broker_password, wait=True)
0506         self._connections = [conn]
0507         return conn
0508 
0509     def subscribe_to_messaging_brokers(self, force=False):
0510         if self._subscribed and not force and self._subscribe_connections:
0511             return self._subscribe_connections
0512 
0513         workflow_context = self._work_context
0514         brokers = workflow_context.brokers
0515         conns = []
0516 
0517         broker_addresses = []
0518         if not brokers:
0519             raise Exception(f"brokers <{brokers}> not defined")
0520         else:
0521             for b in brokers.split(","):
0522                 try:
0523                     b, port = b.split(":")
0524 
0525                     addrinfos = socket.getaddrinfo(b, 0, socket.AF_INET, 0, socket.IPPROTO_TCP)
0526                     for addrinfo in addrinfos:
0527                         b_addr = addrinfo[4][0]
0528                         broker_addresses.append((b_addr, port))
0529                 except socket.gaierror as error:
0530                     self.logger.error(f'{self.internal_id} Cannot resolve hostname {b}: {error}')
0531                     # self._graceful_stop.set()
0532 
0533         self.logger.info(f"{self.internal_id} Resolved broker addresses: {broker_addresses}")
0534 
0535         timeout = workflow_context.broker_timeout
0536 
0537         self.disconnect_subscribe()
0538 
0539         listener = MessagingListener(brokers, self._queue, logger=self.logger)
0540         conns = []
0541         for broker, port in broker_addresses:
0542             conn = stomp.Connection12(host_and_ports=[(broker, port)],
0543                                       keepalive=True,
0544                                       heartbeats=(30000, 30000),     # half minute = num / 1000
0545                                       timeout=timeout)
0546             conn.set_listener("messag-subscriber", listener)
0547             conn.connect(workflow_context.broker_username, workflow_context.broker_password, wait=True)
0548             if workflow_context.workflow_type in [WorkflowType.iWorkflow, WorkflowType.iWorkflowLocal]:
0549                 subscribe_id = 'idds-workflow_%s' % self.internal_id
0550                 # subscribe_selector = {'selector': "type = 'iworkflow' AND request_id = %s" % workflow_context.request_id}
0551                 # subscribe_selector = {'selector': "type = 'iworkflow' AND internal_id = '%s'" % self.internal_id}
0552                 subscribe_selector = {'selector': "internal_id = '%s'" % self.internal_id}
0553             elif workflow_context.workflow_type == WorkflowType.iWork:
0554                 subscribe_id = 'idds-work_%s' % self.internal_id
0555                 # subscribe_selector = {'selector': "type = 'iwork' AND request_id = %s AND transform_id = %s " % (workflow_context.request_id,
0556                 #                                                                                                  workflow_context.transform_id)}
0557                 # subscribe_selector = {'selector': "type = 'iwork' AND internal_id = '%s'" % self.internal_id}
0558                 subscribe_selector = {'selector': "internal_id = '%s'" % self.internal_id}
0559             else:
0560                 subscribe_id = 'idds-workflow_%s' % self.internal_id
0561                 subscribe_selector = None
0562             # subscribe_selector = None
0563             # conn.subscribe(destination=workflow_context.broker_destination, id=subscribe_id,
0564             #                ack='auto', conf=subscribe_selector)
0565             conn.subscribe(destination=workflow_context.broker_destination, id=subscribe_id,
0566                            ack='auto', headers=subscribe_selector)
0567             self.logger.info(f"{self.internal_id} subscribe to {broker}:{port} with selector: {subscribe_selector}")
0568             conns.append(conn)
0569         self._subscribe_connections = conns
0570         return conns
0571 
0572     def get_message(self, ret, name=None, key=None):
0573         message = {}
0574         workflow_context = self._work_context
0575         if key is None:
0576             if self._current_job_kwargs:
0577                 key = get_unique_id_for_dict(self._current_job_kwargs)
0578         if name is None:
0579             name = self._name
0580         # key = "%s:%s" % (self._name, key)
0581         self.logger.info(f"{self.internal_id} publish args ({self._current_job_kwargs}) to key: {key}")
0582 
0583         if workflow_context.workflow_type in [WorkflowType.iWorkflow, WorkflowType.iWorkflowLocal]:
0584             headers = {'persistent': 'true',
0585                        'channel': 'asyncresult',
0586                        'type': 'iworkflow',
0587                        'internal_id': str(self.internal_id),
0588                        'request_id': workflow_context.request_id}
0589             body = {'ret': ret, 'name': name, 'key': key, 'internal_id': self.internal_id, 'type': 'iworkflow',
0590                     'request_id': workflow_context.request_id}
0591             message = {"headers": headers, "body": body}
0592         elif workflow_context.workflow_type == WorkflowType.iWork:
0593             headers = {'persistent': 'true',
0594                        'channel': 'asyncresult',
0595                        'type': 'iwork',
0596                        'internal_id': str(self.internal_id),
0597                        'request_id': workflow_context.request_id,
0598                        'transform_id': workflow_context.transform_id}
0599             body = {'ret': ret, 'name': name, 'key': key, 'internal_id': self.internal_id, 'type': 'iwork',
0600                     'request_id': workflow_context.request_id,
0601                     'transform_id': workflow_context.transform_id}
0602             message = {"headers": headers, "body": body}
0603         return message
0604 
0605     def publish_message(self, ret, name=None, key=None):
0606         message = self.get_message(ret=ret, name=name, key=key)
0607         headers = message['headers']
0608         body = message['body']
0609         conn = self.connect_to_messaging_broker()
0610         workflow_context = self._work_context
0611         if workflow_context.workflow_type in [WorkflowType.iWorkflow, WorkflowType.iWorkflowLocal]:
0612             conn.send(body=json_dumps(body),
0613                       destination=workflow_context.broker_destination,
0614                       id='idds-iworkflow_%s' % self.internal_id,
0615                       ack='auto',
0616                       headers=headers
0617                       )
0618             self.logger.info(f"{self.internal_id} published header: {headers}, body: {body}")
0619         elif workflow_context.workflow_type == WorkflowType.iWork:
0620             conn.send(body=json_dumps(body),
0621                       destination=workflow_context.broker_destination,
0622                       id='idds-iwork_%s' % self.internal_id,
0623                       ack='auto',
0624                       headers=headers
0625                       )
0626             self.logger.info(f"{self.internal_id} published header: {headers}, body: {body}")
0627         # self.disconnect()
0628 
0629     def get_request_id_internal_id(self):
0630         workflow_context = self._work_context
0631         request_id, transform_id, internal_id = None, None, None
0632         if workflow_context.workflow_type in [WorkflowType.iWorkflow, WorkflowType.iWorkflowLocal]:
0633             request_id = workflow_context.request_id
0634             transform_id = 0
0635             # internal_id = workflow_context.internal_id
0636         elif workflow_context.workflow_type == WorkflowType.iWork:
0637             request_id = workflow_context.request_id
0638             transform_id = workflow_context.transform_id
0639             # internal_id = workflow_context.internal_id
0640         else:
0641             request_id = workflow_context.request_id
0642             transform_id = 0
0643             # internal_id = workflow_context.internal_id
0644         internal_id = self.internal_id
0645         return request_id, transform_id, internal_id
0646 
0647     def publish_through_panda_server(self, request_id, transform_id, internal_id, message):
0648         import idds.common.utils as idds_utils
0649         import pandaclient.idds_api as idds_api
0650         idds_server = self._work_context.get_idds_server()
0651         # request_id = self._context.request_id
0652         client = idds_api.get_api(idds_utils.json_dumps,
0653                                   idds_host=idds_server,
0654                                   compress=True,
0655                                   verbose=is_panda_client_verbose(),
0656                                   manager=True)
0657         status, ret = client.send_messages(request_id=request_id, transform_id=transform_id, internal_id=internal_id, msgs=[message])
0658         if status == 0 and type(ret) in (list, tuple) and len(ret) > 1 and ret[0] is True and type(ret[1]) in (list, tuple) and ret[1][0] is True:
0659             self.logger.info(f"{self.internal_id} published message through panda server: {message}")
0660         else:
0661             self.logger.error(f"{self.internal_id} failed to publish message through panda server, status: {status}, ret: {ret}")
0662 
0663     def publish_through_idds_server(self, request_id, transform_id, internal_id, message):
0664         from idds.client.clientmanager import ClientManager
0665         client = ClientManager(host=self._work_context.get_idds_server(), timeout=60)
0666         status, ret = client.send_messages(request_id=request_id, transform_id=transform_id, internal_id=internal_id, msgs=[message])
0667         if status:
0668             self.logger.info(f"{self.internal_id} published message through idds server: {message}")
0669         else:
0670             self.logger.error(f"{self.internal_id} failed to publish message through idds server, status: {status}, ret: {ret}")
0671 
0672     def publish_through_api(self, ret, name=None, key=None, force=False):
0673         message = self.get_message(ret=ret, name=name, key=key)
0674         # headers = message['headers']
0675         # body = message['body']
0676         message['msg_type'] = 'async_result'
0677 
0678         try:
0679             request_id, transform_id, internal_id = self.get_request_id_internal_id()
0680             if request_id is None:
0681                 if force:
0682                     request_id = 0
0683                 else:
0684                     self.logger.warn(f"{self.internal_id} Not to publish message through API since the request id is None")
0685                     return
0686 
0687             if self._work_context.service == 'panda':
0688                 self.publish_through_panda_server(request_id, transform_id, internal_id, message)
0689             else:
0690                 self.publish_through_idds_server(request_id, transform_id, internal_id, message)
0691         except Exception as ex:
0692             self.logger.error(f"{self.internal_id} Failed to publish message through API: {ex}")
0693 
0694     @timeout_wrapper(timeout=90)
0695     def publish(self, ret, name=None, key=None, force=False, ret_status=True, ret_error=None, metrics=None):
0696         stomp_failed = False
0697         if with_stomp and self._broker_initialized:
0698             try:
0699                 self.logger.info(f"{self.internal_id} publishing results through messaging brokers")
0700                 self.publish_message(ret=(ret_status, ret, ret_error, metrics), name=name, key=key)
0701                 self.logger.info(f"{self.internal_id} finished to publish results through messaging brokers")
0702             except Exception as ex:
0703                 self.logger.warn(f"{self.internal_id} Failed to publish result through messaging brokers: {ex}")
0704                 stomp_failed = True
0705 
0706         if not with_stomp or not self._broker_initialized or stomp_failed:
0707             self.logger.info(f"{self.internal_id} publishing results through http API")
0708             self.publish_through_api(ret=(ret_status, ret, ret_error, metrics), name=name, key=key, force=force)
0709             self.logger.info(f"{self.internal_id} finished to publish results through http API")
0710 
0711     def poll_messages_through_panda_server(self, request_id, transform_id, internal_id):
0712         if request_id is None:
0713             self.logger.warn(f"{self.internal_id} Not to poll message through panda server, since the request_id is None")
0714             return []
0715 
0716         import idds.common.utils as idds_utils
0717         import pandaclient.idds_api as idds_api
0718         idds_server = self._work_context.get_idds_server()
0719         # request_id = self._work_context.request_id
0720         client = idds_api.get_api(idds_utils.json_dumps,
0721                                   idds_host=idds_server,
0722                                   compress=True,
0723                                   verbose=is_panda_client_verbose(),
0724                                   manager=True)
0725         status, ret = client.get_messages(request_id=request_id, transform_id=transform_id, internal_id=internal_id)
0726         if status == 0 and type(ret) in (list, tuple) and len(ret) > 1 and ret[0] is True and type(ret[1]) in (list, tuple) and ret[1][0] is True:
0727             self.logger.info(f"{self.internal_id} poll message through panda server, ret: {ret}")
0728             messages = ret[1][1]
0729             self.logger.info(f"{self.internal_id} poll message through panda server, number of messages: {len(messages)}")
0730             return messages
0731         else:
0732             self.logger.error(f"{self.internal_id} failed to poll messages through panda server, status: {status}, ret: {ret}")
0733             return []
0734 
0735     def poll_messages_through_idds_server(self, request_id, transform_id, internal_id):
0736         if request_id is None:
0737             self.logger.warn(f"{self.internal_id} Not to poll message through idds server, since the request_id is None")
0738             return []
0739 
0740         from idds.client.clientmanager import ClientManager
0741         client = ClientManager(host=self._work_context.get_idds_server(), timeout=60)
0742         status, messages = client.get_messages(request_id=request_id, transform_id=transform_id, internal_id=internal_id)
0743         if status:
0744             self.logger.info(f"{self.internal_id} poll message through panda server, ret: {messages}")
0745             self.logger.info(f"{self.internal_id} poll message through idds server, number of messages: {len(messages)}")
0746             return messages
0747         else:
0748             self.logger.error(f"{self.internal_id} failed to poll messages through idds server, error: {messages}")
0749             return []
0750 
0751     @timeout_wrapper(timeout=90)
0752     def poll_messages(self, force=False):
0753         try:
0754             request_id, transform_id, internal_id = self.get_request_id_internal_id()
0755             if request_id is None:
0756                 if force:
0757                     request_id = 0
0758                 else:
0759                     self.logger.warn(f"{self.internal_id} Not to poll message, since the request_id is None")
0760                     return
0761 
0762             if self._work_context.service == 'panda':
0763                 messages = self.poll_messages_through_panda_server(request_id=request_id, transform_id=transform_id, internal_id=internal_id)
0764             else:
0765                 messages = self.poll_messages_through_idds_server(request_id=request_id, transform_id=transform_id, internal_id=internal_id)
0766 
0767             for message in messages:
0768                 body = message['body']
0769                 self._queue.put(body)
0770         except Exception as ex:
0771             self.logger.error(f"{self.internal_id} Failed to poll message: {ex}")
0772 
0773     def run_subscriber(self, force=False):
0774         try:
0775             self.logger.info(f"{self.internal_id} run subscriber")
0776             if with_stomp and self._broker_initialized and self._num_stomp_failures < self._max_stomp_failures:
0777                 try:
0778                     self.subscribe_to_messaging_brokers(force=True)
0779                 except Exception as ex:
0780                     self.logger.warn(f"{self.internal_id} run subscriber fails to subscribe to message broker: {ex}")
0781                     self._num_stomp_failures += 1
0782                     self._is_messaging_ok = False
0783                     self._broker_initialized = False
0784 
0785             time_poll = None
0786             time_start = time.time()
0787             while self._graceful_stop and not self._graceful_stop.is_set():
0788                 if with_stomp and self._broker_initialized and self._is_messaging_ok and self._num_stomp_failures < self._max_stomp_failures:
0789                     has_failed_conns = False
0790                     for conn in self._subscribe_connections:
0791                         if not conn.is_connected():
0792                             has_failed_conns = True
0793                     if has_failed_conns:
0794                         try:
0795                             self.subscribe_to_messaging_brokers(force=True)
0796                         except Exception as ex:
0797                             self.logger.warn(f"{self.internal_id} run subscriber fails to subscribe to message broker: {ex}")
0798                             self._num_stomp_failures += 1
0799                             self._is_messaging_ok = False
0800                             self._broker_initialized = False
0801                     time.sleep(1)
0802                 else:
0803                     if self._timeout:
0804                         sleep_time = min(self._timeout / 3, self._poll_period)
0805                     else:
0806                         sleep_time = self._poll_period
0807 
0808                     if time_poll is None or time.time() - time_poll > sleep_time:
0809                         try:
0810                             self.poll_messages(force=force)
0811                         except Exception as ex:
0812                             self.logger.info(f"{self.internal_id} run subscriber fails to poll messages: {ex}")
0813                         time_poll = time.time()
0814                     time.sleep(1)
0815 
0816                 if self._timeout and time.time() - time_start > self._timeout:
0817                     self.logger.info(f"{self.internal_id} timeout reached")
0818                     break
0819 
0820             if self._graceful_stop and self._graceful_stop.is_set():
0821                 self.logger.info(f"{self.internal_id} graceful stop is set")
0822 
0823             try:
0824                 if not sys.is_finalizing():
0825                     self.poll_messages(force=force)
0826             except Exception as ex:
0827                 self.logger.info(f"{self.internal_id} run subscriber fails to poll messages: {ex}")
0828 
0829             self._is_stop = True
0830             self.stop()
0831             self.logger.info(f"{self.internal_id} subscriber finished.")
0832         except Exception as ex:
0833             self.logger.error(f"{self.internal_id} run subscriber failed with error: {ex}")
0834             self.logger.error(traceback.format_exc())
0835             self._is_stop = True
0836             self.stop()
0837 
0838     def get_results(self, nologs=True):
0839         old_nologs = self._nologs
0840         self._nologs = nologs
0841         rets = self.results
0842         if not self._nologs:
0843             self.logger.debug(f'{self.internal_id} percent {self.get_results_percentage()}, results: {rets}')
0844 
0845         percent = self.get_results_percentage()
0846         if percent >= self._wait_percent:
0847             self.stop()
0848             self.logger.info(f"{self.internal_id} Got results: {percent} (number of wrong keys: {self._num_wrong_keys})")
0849         self._nologs = old_nologs
0850         return rets
0851 
0852     def get_results_percentage(self):
0853         return self._results_percentage
0854 
0855     def subscribe(self, force=False):
0856         if not self._subscribed:
0857             atexit.register(self.stop)
0858 
0859             self._graceful_stop = GracefulEvent()
0860             thread = threading.Thread(target=self.run_subscriber, kwargs={'force': force}, name="RunSubscriber")
0861             thread.start()
0862             time.sleep(1)
0863             self._subscribed = True
0864             self._subscribe_thread = thread
0865             self._is_stop = False
0866 
0867     def stop(self):
0868         if self._graceful_stop:
0869             self._graceful_stop.set()
0870         self.disconnect()
0871         self._subscribed = False
0872         self._subscribe_thread = None
0873         time_start = time.time()
0874         while not self._is_stop and time.time() - time_start < 60:
0875             time.sleep(1)
0876         self._is_stop = True
0877 
0878     def __del__(self):
0879         # self.stop()
0880         pass
0881 
0882     def wait_results(self, timeout=None, force_return_results=False):
0883         self.subscribe()
0884 
0885         get_results = False
0886         time_log = time.time()
0887         time_start = time.time()
0888         if timeout is None:
0889             self.logger.info(f"{self.internal_id} waiting for results")
0890         try:
0891             while not get_results and self._graceful_stop and not self._graceful_stop.is_set():
0892                 self.get_results(nologs=True)
0893                 percent = self.get_results_percentage()
0894                 if time.time() - time_log > 600:  # 10 minutes
0895                     self.logger.info(f"{self.internal_id} waiting for results: {percent} (number of wrong keys: {self._num_wrong_keys})")
0896                     time_log = time.time()
0897                 time.sleep(1)
0898                 if self.is_all_results_available:
0899                     get_results = True
0900                     self.waiting_result_terminated = True
0901                     self.logger.info(f"{self.internal_id} Got result percentage {percent} is not smaller then wait_percent {self._wait_percent}, set waiting_result_terminated to True")
0902                 if self._timeout is not None and self._timeout > 0 and time.time() - time_start > self._timeout:
0903                     # global timeout
0904                     self.logger.info(f"{self.internal_id} Waiting result timeout({self._timeout} seconds), set waiting_result_terminated to True")
0905                     get_results = True
0906                     self.waiting_result_terminated = True
0907                 if timeout is not None and timeout > 0 and time.time() - time_start > timeout:
0908                     # local timeout
0909                     self.logger.info(f"{self.internal_id} timeout reached")
0910                     break
0911 
0912             percent = self.get_results_percentage()
0913             if timeout is None or time.time() - time_start > 600:
0914                 self.logger.info(f"{self.internal_id} Got results: {percent} (number of wrong keys: {self._num_wrong_keys})")
0915         except Exception as ex:
0916             self.logger.error(f"Wait_results got some exception: {ex}")
0917             self.logger.error(traceback.format_exc())
0918             self._graceful_stop.set()
0919 
0920         if get_results or self._graceful_stop.is_set() or self.is_all_results_available or force_return_results:
0921             # stop the subscriber
0922             self._graceful_stop.set()
0923             # wait the subscriber to finish
0924             time.sleep(2)
0925             percent = self.get_results_percentage()
0926             self.logger.info(f"{self.internal_id} Got results: {percent} (number of wrong keys: {self._num_wrong_keys})")
0927 
0928             results = self.results
0929             return results
0930         return None
0931 
0932     def wait_result(self, timeout=None, force_return_results=False):
0933         self.wait_results(timeout=timeout, force_return_results=force_return_results)
0934         results = self.results
0935         return results
0936 
0937     def is_ok(self):
0938         try:
0939             self.subscribe(force=True)
0940             test_id = str(uuid.uuid4())
0941             self.publish(test_id, force=True)
0942             ret = self.wait_result(force_return_results=True)
0943             self.logger.info(f"{self.internal_id} AsyncResult: publish: {test_id}, received: {ret}")
0944             if test_id == ret:
0945                 self.logger.info(f"{self.internal_id} AsyncResult is ok")
0946                 return True
0947             else:
0948                 self.logger.info(f"{self.internal_id} AsyncResult is not ok")
0949                 return False
0950         except Exception as ex:
0951             self.logger.error(f"{self.internal_id} AsyncResult is not ok: {ex}")
0952             return False