Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:19

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2020 - 2023
0010 
0011 
0012 """
0013 operations related to Health.
0014 """
0015 
0016 import datetime
0017 
0018 from idds.common.constants import HealthStatus
0019 from idds.orm import health as orm_health
0020 from idds.orm.base.session import read_session, transactional_session
0021 
0022 
0023 @transactional_session
0024 def add_health_item(agent, hostname, pid, thread_id, thread_name, payload, session=None):
0025     """
0026     Add a health item.
0027 
0028     :param agent: The agent name.
0029     :param hostname: The hostname.
0030     :param pid: The pid.
0031     :param thread_id: The thread id.
0032     :param thread_name: The thread name.
0033     :param payload: The payload.
0034     """
0035     return orm_health.add_health_item(agent=agent, hostname=hostname, pid=pid,
0036                                       thread_id=thread_id,
0037                                       thread_name=thread_name,
0038                                       payload=payload,
0039                                       session=session)
0040 
0041 
0042 @read_session
0043 def retrieve_health_items(session=None):
0044     """
0045     Retrieve health items.
0046 
0047     :returns healths: List of dictionaries
0048     """
0049     return orm_health.retrieve_health_items(session=session)
0050 
0051 
0052 @transactional_session
0053 def clean_health(older_than=3600, hostname=None, pids=[], session=None):
0054     """
0055     Clearn items which is older than the time.
0056 
0057     :param older_than in seconds
0058     """
0059     orm_health.clean_health(older_than=older_than, hostname=hostname, pids=pids, session=session)
0060 
0061 
0062 @transactional_session
0063 def select_agent(name, newer_than=3600, session=None):
0064     """
0065     Select one active receiver.
0066 
0067     :param older_than in seconds to be cleaned.
0068     """
0069     orm_health.clean_health(older_than=newer_than, session=session)
0070     health_items = orm_health.retrieve_health_items(session=session)
0071     selected_agent = None
0072     selected_agent_diff = None
0073     utc_now = datetime.datetime.utcnow()
0074     for health_item in health_items:
0075         if health_item['agent'] != name:
0076             continue
0077 
0078         updated_at = health_item['updated_at']
0079         time_diff = utc_now - updated_at
0080         if time_diff.total_seconds() > newer_than:
0081             continue
0082 
0083         if health_item['status'] == HealthStatus.Active:
0084             selected_agent = health_item
0085             break
0086 
0087         if selected_agent is None:
0088             selected_agent = health_item
0089             selected_agent_diff = time_diff
0090         else:
0091             if time_diff < selected_agent_diff:
0092                 selected_agent = health_item
0093                 selected_agent_diff = time_diff
0094 
0095     if selected_agent:
0096         if selected_agent['status'] != HealthStatus.Active:
0097             orm_health.update_health_item_status(selected_agent, status=HealthStatus.Active, session=session)
0098             selected_agent['status'] = HealthStatus.Active
0099     for health_item in health_items:
0100         if health_item['agent'] == name and health_item['status'] == HealthStatus.Active and health_item['health_id'] != selected_agent['health_id']:
0101             orm_health.update_health_item_status(selected_agent, status=HealthStatus.Default, session=session)
0102     return selected_agent