File indexing completed on 2026-04-09 07:58:20
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 operations related to Health.
0014 """
0015
0016 import datetime
0017 import re
0018
0019 from sqlalchemy.exc import DatabaseError, IntegrityError
0020
0021 from idds.common import exceptions
0022 from idds.orm.base import models
0023 from idds.orm.base.session import read_session, transactional_session
0024
0025
0026 @transactional_session
0027 def add_health_item(agent, hostname, pid, thread_id, thread_name, payload, session=None):
0028 """
0029 Add a health item.
0030
0031 :param agent: The agent name.
0032 :param hostname: The hostname.
0033 :param pid: The pid.
0034 :param thread_id: The thread id.
0035 :param thread_name: The thread name.
0036 :param payload: The payload.
0037 :param session: The database session.
0038 """
0039
0040 try:
0041 counts = session.query(models.Health)\
0042 .filter(models.Health.agent == agent)\
0043 .filter(models.Health.hostname == hostname)\
0044 .filter(models.Health.pid == pid)\
0045 .filter(models.Health.thread_id == thread_id)\
0046 .update({'updated_at': datetime.datetime.utcnow(),
0047 'payload': payload})
0048 if not counts:
0049 new_h = models.Health(agent=agent, hostname=hostname, pid=pid,
0050 thread_id=thread_id, thread_name=thread_name,
0051 payload=payload)
0052 new_h.save(session=session)
0053 except IntegrityError as e:
0054 if re.match('.*ORA-00001.*', e.args[0]) or re.match('.*unique constraint.*', e.args[0]):
0055 print("unique constraintviolated: %s" % str(e))
0056 except DatabaseError as e:
0057 raise exceptions.DatabaseException('Could not persist message: %s' % str(e))
0058
0059
0060 @read_session
0061 def retrieve_health_items(session=None):
0062 """
0063 Retrieve health items.
0064
0065 :param session: The database session.
0066
0067 :returns healths: List of dictionaries
0068 """
0069 items = []
0070 try:
0071 query = session.query(models.Health)
0072
0073 tmp = query.all()
0074 if tmp:
0075 for t in tmp:
0076 items.append(t.to_dict())
0077 return items
0078 except IntegrityError as e:
0079 raise exceptions.DatabaseException(e.args)
0080
0081
0082 @transactional_session
0083 def clean_health(older_than=3600, hostname=None, pids=[], session=None):
0084 """
0085 Clearn items which is older than the time.
0086
0087 :param older_than in seconds
0088 """
0089
0090 query = session.query(models.Health)
0091 if older_than:
0092 query = query.filter(models.Health.updated_at < datetime.datetime.utcnow() - datetime.timedelta(seconds=older_than))
0093 if hostname:
0094 query = query.filter(models.Health.hostname == hostname)
0095 if pids:
0096 query = query.filter(models.Health.pid.in_(pids))
0097 query.delete()
0098
0099
0100 @transactional_session
0101 def update_health_item_status(item, status, session=None):
0102 session.query(models.Health)\
0103 .filter(models.Health.agent == item['agent'])\
0104 .filter(models.Health.hostname == item['hostname'])\
0105 .filter(models.Health.pid == item['pid'])\
0106 .filter(models.Health.thread_id == item['thread_id'])\
0107 .update({'status': status, 'updated_at': item['updated_at']})