Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:20

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2020
0010 
0011 
0012 """
0013 operations related to Health.
0014 """
0015 
0016 import datetime
0017 import re
0018 
0019 from sqlalchemy.exc import DatabaseError, IntegrityError
0020 
0021 from idds.common import exceptions
0022 from idds.orm.base import models
0023 from idds.orm.base.session import read_session, transactional_session
0024 
0025 
0026 @transactional_session
0027 def add_health_item(agent, hostname, pid, thread_id, thread_name, payload, session=None):
0028     """
0029     Add a health item.
0030 
0031     :param agent: The agent name.
0032     :param hostname: The hostname.
0033     :param pid: The pid.
0034     :param thread_id: The thread id.
0035     :param thread_name: The thread name.
0036     :param payload: The payload.
0037     :param session: The database session.
0038     """
0039 
0040     try:
0041         counts = session.query(models.Health)\
0042                         .filter(models.Health.agent == agent)\
0043                         .filter(models.Health.hostname == hostname)\
0044                         .filter(models.Health.pid == pid)\
0045                         .filter(models.Health.thread_id == thread_id)\
0046                         .update({'updated_at': datetime.datetime.utcnow(),
0047                                  'payload': payload})
0048         if not counts:
0049             new_h = models.Health(agent=agent, hostname=hostname, pid=pid,
0050                                   thread_id=thread_id, thread_name=thread_name,
0051                                   payload=payload)
0052             new_h.save(session=session)
0053     except IntegrityError as e:
0054         if re.match('.*ORA-00001.*', e.args[0]) or re.match('.*unique constraint.*', e.args[0]):
0055             print("unique constraintviolated: %s" % str(e))
0056     except DatabaseError as e:
0057         raise exceptions.DatabaseException('Could not persist message: %s' % str(e))
0058 
0059 
0060 @read_session
0061 def retrieve_health_items(session=None):
0062     """
0063     Retrieve health items.
0064 
0065     :param session: The database session.
0066 
0067     :returns healths: List of dictionaries
0068     """
0069     items = []
0070     try:
0071         query = session.query(models.Health)
0072 
0073         tmp = query.all()
0074         if tmp:
0075             for t in tmp:
0076                 items.append(t.to_dict())
0077         return items
0078     except IntegrityError as e:
0079         raise exceptions.DatabaseException(e.args)
0080 
0081 
0082 @transactional_session
0083 def clean_health(older_than=3600, hostname=None, pids=[], session=None):
0084     """
0085     Clearn items which is older than the time.
0086 
0087     :param older_than in seconds
0088     """
0089 
0090     query = session.query(models.Health)
0091     if older_than:
0092         query = query.filter(models.Health.updated_at < datetime.datetime.utcnow() - datetime.timedelta(seconds=older_than))
0093     if hostname:
0094         query = query.filter(models.Health.hostname == hostname)
0095     if pids:
0096         query = query.filter(models.Health.pid.in_(pids))
0097     query.delete()
0098 
0099 
0100 @transactional_session
0101 def update_health_item_status(item, status, session=None):
0102     session.query(models.Health)\
0103            .filter(models.Health.agent == item['agent'])\
0104            .filter(models.Health.hostname == item['hostname'])\
0105            .filter(models.Health.pid == item['pid'])\
0106            .filter(models.Health.thread_id == item['thread_id'])\
0107            .update({'status': status, 'updated_at': item['updated_at']})