Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:20

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2023
0010 
0011 
0012 """
0013 operations related to Events.
0014 """
0015 
0016 import re
0017 import datetime
0018 
0019 from sqlalchemy.exc import DatabaseError, IntegrityError, NoResultFound
0020 from sqlalchemy.sql.expression import asc, desc
0021 
0022 from idds.common import exceptions
0023 from idds.common.event import EventStatus
0024 from idds.orm.base import models
0025 from idds.orm.base.session import read_session, transactional_session
0026 
0027 
0028 @transactional_session
0029 def add_event(event, session=None):
0030     """
0031     Add an event to be submitted asynchronously to a command broker.
0032 
0033     :param event: The Event object.
0034     :param session: The database session.
0035     """
0036 
0037     try:
0038         old_events = get_events(event_type=event._event_type, event_actual_id=event.get_event_id(),
0039                                 status=EventStatus.New, session=session)
0040         merge = False
0041         for old_event_db in old_events:
0042             old_event = old_event_db['content']['event']
0043             if old_event.able_to_merge(event):
0044                 # discard current event
0045                 old_event.merge(event)
0046                 if old_event.changed():
0047                     old_event_db['content']['event'] = old_event
0048                     update_event(old_event.event_id, status=EventStatus.New, session=session)
0049                 merge = True
0050                 return None
0051         if not merge:
0052             priority = get_event_priority(event_type=event._event_type,
0053                                           event_actual_id=event.get_event_id(),
0054                                           session=session)
0055             event_db = models.Event(event_type=event._event_type,
0056                                     event_actual_id=event.get_event_id(),
0057                                     status=EventStatus.New,
0058                                     priority=priority,
0059                                     content={'event': event})
0060             event_db.save(session=session)
0061             return event_db.event_id
0062     except TypeError as e:
0063         raise exceptions.DatabaseException('Invalid JSON for content: %s' % str(e))
0064     except DatabaseError as e:
0065         if re.match('.*ORA-12899.*', e.args[0]) \
0066            or re.match('.*1406.*', e.args[0]):
0067             raise exceptions.DatabaseException('Could not persist event, content too large: %s' % str(e))
0068         else:
0069             raise exceptions.DatabaseException('Could not persist event: %s' % str(e))
0070     return None
0071 
0072 
0073 @read_session
0074 def get_events(event_type, event_actual_id, status=None, session=None):
0075     """
0076     Get events
0077 
0078     :param event_type: event type.
0079     :param event_actual_id: event actual id.
0080     :param status: event status.
0081     """
0082     try:
0083         if not isinstance(status, (list, tuple)):
0084             status = [status]
0085         if len(status) == 1:
0086             status = [status[0], status[0]]
0087 
0088         query = session.query(models.Event)
0089         if event_type:
0090             query = query.filter_by(event_type=event_type)
0091         if event_actual_id:
0092             query = query.filter_by(event_actual_id=event_actual_id)
0093         if status:
0094             query = query.filter(models.Event.status.in_(status))
0095 
0096         tmp = query.all()
0097         events = []
0098         if tmp:
0099             for t in tmp:
0100                 events.append(t.to_dict())
0101         return events
0102     except DatabaseError as e:
0103         if re.match('.*ORA-12899.*', e.args[0]) \
0104            or re.match('.*1406.*', e.args[0]):
0105             raise exceptions.DatabaseException('Could not persist event, content too large: %s' % str(e))
0106         else:
0107             raise exceptions.DatabaseException('Could not persist event: %s' % str(e))
0108     return None
0109 
0110 
0111 @transactional_session
0112 def add_event_priority(event_type, event_actual_id, priority=10, session=None):
0113     """
0114     add event priority
0115 
0116     :param event_type: event type.
0117     :param event_actual_id: event actual id.
0118     """
0119     try:
0120         event_pr = models.EventPriority(event_type=event_type,
0121                                         event_actual_id=event_actual_id,
0122                                         priority=priority,
0123                                         last_processed_at=datetime.datetime.utcnow(),
0124                                         updated_at=datetime.datetime.utcnow())
0125         event_pr.save(session=session)
0126     except DatabaseError as e:
0127         if re.match('.*ORA-12899.*', e.args[0]) \
0128            or re.match('.*1406.*', e.args[0]):
0129             raise exceptions.DatabaseException('Could not persist event, content too large: %s' % str(e))
0130         else:
0131             raise exceptions.DatabaseException('Could not persist event: %s' % str(e))
0132 
0133 
0134 @transactional_session
0135 def update_event_priority(event_type, event_actual_id, priority=10, session=None):
0136     """
0137     Update event priority
0138 
0139     :param event_type: event type.
0140     :param event_actual_id: event actual id.
0141     """
0142     try:
0143         parameters = {'priority': priority,
0144                       'last_processed_at': datetime.datetime.utcnow(),
0145                       'updated_at': datetime.datetime.utcnow()}
0146         query = session.query(models.EventPriority)
0147         if event_type:
0148             query = query.filter_by(event_type=event_type)
0149         if event_actual_id:
0150             query = query.filter_by(event_actual_id=event_actual_id)
0151         row_count = query.update(parameters, synchronize_session=False)
0152 
0153         if row_count < 1:
0154             add_event_priority(event_type=event_type, event_actual_id=event_actual_id,
0155                                priority=priority, session=session)
0156     except DatabaseError as e:
0157         if re.match('.*ORA-12899.*', e.args[0]) \
0158            or re.match('.*1406.*', e.args[0]):
0159             raise exceptions.DatabaseException('Could not persist event, content too large: %s' % str(e))
0160         else:
0161             raise exceptions.DatabaseException('Could not persist event: %s' % str(e))
0162     return None
0163 
0164 
0165 @read_session
0166 def get_event_priority(event_type, event_actual_id, session=None):
0167     """
0168     Get event priority
0169 
0170     :param event_type: event type.
0171     :param event_actual_id: event actual id.
0172     """
0173     try:
0174         query = session.query(models.EventPriority)
0175         if event_type:
0176             query = query.filter_by(event_type=event_type)
0177         if event_actual_id:
0178             query = query.filter_by(event_actual_id=event_actual_id)
0179 
0180         tmp = query.first()
0181         if tmp:
0182             t = tmp.to_dict()
0183             time_diff = datetime.datetime.utcnow() - t['last_processed_at']
0184             priority = time_diff.total_seconds()
0185         else:
0186             priority = 3600 * 24 * 7
0187         return priority
0188     except DatabaseError as e:
0189         if re.match('.*ORA-12899.*', e.args[0]) \
0190            or re.match('.*1406.*', e.args[0]):
0191             raise exceptions.DatabaseException('Could not persist event, content too large: %s' % str(e))
0192         else:
0193             raise exceptions.DatabaseException('Could not persist event: %s' % str(e))
0194     return 10
0195 
0196 
0197 @transactional_session
0198 def update_event(event_id, status, session=None):
0199     """
0200     Update event
0201 
0202     :param event_id: event id.
0203     :param status: Event status.
0204     """
0205     try:
0206         parameters = {'status': status}
0207         if status == EventStatus.Processing:
0208             parameters['processing_at'] = datetime.datetime.utcnow()
0209         if status == EventStatus.Processed:
0210             parameters['processed_at'] = datetime.datetime.utcnow()
0211         session.query(models.Event).filter_by(event_id=event_id)\
0212                .update(parameters, synchronize_session=False)
0213     except DatabaseError as e:
0214         if re.match('.*ORA-12899.*', e.args[0]) \
0215            or re.match('.*1406.*', e.args[0]):
0216             raise exceptions.DatabaseException('Could not persist event, content too large: %s' % str(e))
0217         else:
0218             raise exceptions.DatabaseException('Could not persist event: %s' % str(e))
0219 
0220 
0221 @transactional_session
0222 def get_event_for_processing(event_type, num_events=1, session=None):
0223     """
0224     Get event for processing
0225 
0226     :param event_type: event type.
0227     """
0228     try:
0229         query = session.query(models.Event)
0230         if event_type:
0231             query = query.filter_by(event_type=event_type)
0232         status = [EventStatus.New, EventStatus.New]
0233         query = query.filter(models.Event.status.in_(status))
0234         query = query.order_by(desc(models.Event.priority))
0235         query = query.order_by(asc(models.Event.event_id))
0236 
0237         tmp = query.all()
0238         events = []
0239         if tmp:
0240             i = 0
0241             for event in tmp:
0242                 # event = tmp.to_dict()
0243                 # event = tmp
0244                 session.expunge(event)
0245                 update_event_priority(event.event_type, event.get_event_id(), session=session)
0246                 update_event(event.event_id, status=EventStatus.Processing, session=session)
0247                 events.append(event)
0248                 i += 1
0249                 if i >= num_events:
0250                     break
0251         return events
0252     except NoResultFound as _:     # noqa F841
0253         return []
0254     except DatabaseError as e:
0255         if re.match('.*ORA-12899.*', e.args[0]) \
0256            or re.match('.*1406.*', e.args[0]):
0257             raise exceptions.DatabaseException('Could not persist event, content too large: %s' % str(e))
0258         else:
0259             raise exceptions.DatabaseException('Could not persist event: %s' % str(e))
0260     return []
0261 
0262 
0263 @transactional_session
0264 def delete_event(event_id, session=None):
0265     """
0266     Delete event with the given id.
0267 
0268     :param event_id: The event id.
0269     """
0270     try:
0271         session.query(models.Event).filter_by(event_id=event_id).delete()
0272     except IntegrityError as e:
0273         raise exceptions.DatabaseException(e.args)
0274 
0275 
0276 @transactional_session
0277 def add_event_archive(event, session=None):
0278     """
0279     Add an event to the archive.
0280 
0281     :param event: The Event object.
0282     :param session: The database session.
0283     """
0284 
0285     try:
0286         event_db = models.EventArchive(event_id=event.event_id,
0287                                        event_type=event.event_type,
0288                                        event_actual_id=event.event_actual_id,
0289                                        status=event.status,
0290                                        priority=event.priority,
0291                                        created_at=event.created_at,
0292                                        processing_at=event.processing_at,
0293                                        processed_at=event.processed_at,
0294                                        content=event.content)
0295         event_db.save(session=session)
0296         return event_db.event_id
0297     except TypeError as e:
0298         raise exceptions.DatabaseException('Invalid JSON for content: %s' % str(e))
0299     except DatabaseError as e:
0300         if re.match('.*ORA-12899.*', e.args[0]) \
0301            or re.match('.*1406.*', e.args[0]):
0302             raise exceptions.DatabaseException('Could not persist event, content too large: %s' % str(e))
0303         else:
0304             raise exceptions.DatabaseException('Could not persist event: %s' % str(e))
0305     return None
0306 
0307 
0308 @transactional_session
0309 def clean_event(event, to_archive=True, session=None):
0310     event.status = EventStatus.Processed
0311     event.processed_at = datetime.datetime.utcnow()
0312     delete_event(event.event_id, session=session)
0313     if to_archive:
0314         add_event_archive(event, session=session)
0315 
0316 
0317 @transactional_session
0318 def fail_event(event, to_archive=True, session=None):
0319     event.status = EventStatus.Failed
0320     event.processed_at = datetime.datetime.utcnow()
0321     delete_event(event.event_id, session=session)
0322     if to_archive:
0323         add_event_archive(event, session=session)