File indexing completed on 2026-04-09 07:58:19
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 operations related to Health.
0014 """
0015
0016 import datetime
0017
0018 from idds.common.constants import HealthStatus
0019 from idds.orm import health as orm_health
0020 from idds.orm.base.session import read_session, transactional_session
0021
0022
0023 @transactional_session
0024 def add_health_item(agent, hostname, pid, thread_id, thread_name, payload, session=None):
0025 """
0026 Add a health item.
0027
0028 :param agent: The agent name.
0029 :param hostname: The hostname.
0030 :param pid: The pid.
0031 :param thread_id: The thread id.
0032 :param thread_name: The thread name.
0033 :param payload: The payload.
0034 """
0035 return orm_health.add_health_item(agent=agent, hostname=hostname, pid=pid,
0036 thread_id=thread_id,
0037 thread_name=thread_name,
0038 payload=payload,
0039 session=session)
0040
0041
0042 @read_session
0043 def retrieve_health_items(session=None):
0044 """
0045 Retrieve health items.
0046
0047 :returns healths: List of dictionaries
0048 """
0049 return orm_health.retrieve_health_items(session=session)
0050
0051
0052 @transactional_session
0053 def clean_health(older_than=3600, hostname=None, pids=[], session=None):
0054 """
0055 Clearn items which is older than the time.
0056
0057 :param older_than in seconds
0058 """
0059 orm_health.clean_health(older_than=older_than, hostname=hostname, pids=pids, session=session)
0060
0061
0062 @transactional_session
0063 def select_agent(name, newer_than=3600, session=None):
0064 """
0065 Select one active receiver.
0066
0067 :param older_than in seconds to be cleaned.
0068 """
0069 orm_health.clean_health(older_than=newer_than, session=session)
0070 health_items = orm_health.retrieve_health_items(session=session)
0071 selected_agent = None
0072 selected_agent_diff = None
0073 utc_now = datetime.datetime.utcnow()
0074 for health_item in health_items:
0075 if health_item['agent'] != name:
0076 continue
0077
0078 updated_at = health_item['updated_at']
0079 time_diff = utc_now - updated_at
0080 if time_diff.total_seconds() > newer_than:
0081 continue
0082
0083 if health_item['status'] == HealthStatus.Active:
0084 selected_agent = health_item
0085 break
0086
0087 if selected_agent is None:
0088 selected_agent = health_item
0089 selected_agent_diff = time_diff
0090 else:
0091 if time_diff < selected_agent_diff:
0092 selected_agent = health_item
0093 selected_agent_diff = time_diff
0094
0095 if selected_agent:
0096 if selected_agent['status'] != HealthStatus.Active:
0097 orm_health.update_health_item_status(selected_agent, status=HealthStatus.Active, session=session)
0098 selected_agent['status'] = HealthStatus.Active
0099 for health_item in health_items:
0100 if health_item['agent'] == name and health_item['status'] == HealthStatus.Active and health_item['health_id'] != selected_agent['health_id']:
0101 orm_health.update_health_item_status(selected_agent, status=HealthStatus.Default, session=session)
0102 return selected_agent