File indexing completed on 2026-04-09 07:58:20
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 operations related to Events.
0014 """
0015
0016 import re
0017 import datetime
0018
0019 from sqlalchemy.exc import DatabaseError, IntegrityError, NoResultFound
0020 from sqlalchemy.sql.expression import asc, desc
0021
0022 from idds.common import exceptions
0023 from idds.common.event import EventStatus
0024 from idds.orm.base import models
0025 from idds.orm.base.session import read_session, transactional_session
0026
0027
0028 @transactional_session
0029 def add_event(event, session=None):
0030 """
0031 Add an event to be submitted asynchronously to a command broker.
0032
0033 :param event: The Event object.
0034 :param session: The database session.
0035 """
0036
0037 try:
0038 old_events = get_events(event_type=event._event_type, event_actual_id=event.get_event_id(),
0039 status=EventStatus.New, session=session)
0040 merge = False
0041 for old_event_db in old_events:
0042 old_event = old_event_db['content']['event']
0043 if old_event.able_to_merge(event):
0044
0045 old_event.merge(event)
0046 if old_event.changed():
0047 old_event_db['content']['event'] = old_event
0048 update_event(old_event.event_id, status=EventStatus.New, session=session)
0049 merge = True
0050 return None
0051 if not merge:
0052 priority = get_event_priority(event_type=event._event_type,
0053 event_actual_id=event.get_event_id(),
0054 session=session)
0055 event_db = models.Event(event_type=event._event_type,
0056 event_actual_id=event.get_event_id(),
0057 status=EventStatus.New,
0058 priority=priority,
0059 content={'event': event})
0060 event_db.save(session=session)
0061 return event_db.event_id
0062 except TypeError as e:
0063 raise exceptions.DatabaseException('Invalid JSON for content: %s' % str(e))
0064 except DatabaseError as e:
0065 if re.match('.*ORA-12899.*', e.args[0]) \
0066 or re.match('.*1406.*', e.args[0]):
0067 raise exceptions.DatabaseException('Could not persist event, content too large: %s' % str(e))
0068 else:
0069 raise exceptions.DatabaseException('Could not persist event: %s' % str(e))
0070 return None
0071
0072
0073 @read_session
0074 def get_events(event_type, event_actual_id, status=None, session=None):
0075 """
0076 Get events
0077
0078 :param event_type: event type.
0079 :param event_actual_id: event actual id.
0080 :param status: event status.
0081 """
0082 try:
0083 if not isinstance(status, (list, tuple)):
0084 status = [status]
0085 if len(status) == 1:
0086 status = [status[0], status[0]]
0087
0088 query = session.query(models.Event)
0089 if event_type:
0090 query = query.filter_by(event_type=event_type)
0091 if event_actual_id:
0092 query = query.filter_by(event_actual_id=event_actual_id)
0093 if status:
0094 query = query.filter(models.Event.status.in_(status))
0095
0096 tmp = query.all()
0097 events = []
0098 if tmp:
0099 for t in tmp:
0100 events.append(t.to_dict())
0101 return events
0102 except DatabaseError as e:
0103 if re.match('.*ORA-12899.*', e.args[0]) \
0104 or re.match('.*1406.*', e.args[0]):
0105 raise exceptions.DatabaseException('Could not persist event, content too large: %s' % str(e))
0106 else:
0107 raise exceptions.DatabaseException('Could not persist event: %s' % str(e))
0108 return None
0109
0110
0111 @transactional_session
0112 def add_event_priority(event_type, event_actual_id, priority=10, session=None):
0113 """
0114 add event priority
0115
0116 :param event_type: event type.
0117 :param event_actual_id: event actual id.
0118 """
0119 try:
0120 event_pr = models.EventPriority(event_type=event_type,
0121 event_actual_id=event_actual_id,
0122 priority=priority,
0123 last_processed_at=datetime.datetime.utcnow(),
0124 updated_at=datetime.datetime.utcnow())
0125 event_pr.save(session=session)
0126 except DatabaseError as e:
0127 if re.match('.*ORA-12899.*', e.args[0]) \
0128 or re.match('.*1406.*', e.args[0]):
0129 raise exceptions.DatabaseException('Could not persist event, content too large: %s' % str(e))
0130 else:
0131 raise exceptions.DatabaseException('Could not persist event: %s' % str(e))
0132
0133
0134 @transactional_session
0135 def update_event_priority(event_type, event_actual_id, priority=10, session=None):
0136 """
0137 Update event priority
0138
0139 :param event_type: event type.
0140 :param event_actual_id: event actual id.
0141 """
0142 try:
0143 parameters = {'priority': priority,
0144 'last_processed_at': datetime.datetime.utcnow(),
0145 'updated_at': datetime.datetime.utcnow()}
0146 query = session.query(models.EventPriority)
0147 if event_type:
0148 query = query.filter_by(event_type=event_type)
0149 if event_actual_id:
0150 query = query.filter_by(event_actual_id=event_actual_id)
0151 row_count = query.update(parameters, synchronize_session=False)
0152
0153 if row_count < 1:
0154 add_event_priority(event_type=event_type, event_actual_id=event_actual_id,
0155 priority=priority, session=session)
0156 except DatabaseError as e:
0157 if re.match('.*ORA-12899.*', e.args[0]) \
0158 or re.match('.*1406.*', e.args[0]):
0159 raise exceptions.DatabaseException('Could not persist event, content too large: %s' % str(e))
0160 else:
0161 raise exceptions.DatabaseException('Could not persist event: %s' % str(e))
0162 return None
0163
0164
0165 @read_session
0166 def get_event_priority(event_type, event_actual_id, session=None):
0167 """
0168 Get event priority
0169
0170 :param event_type: event type.
0171 :param event_actual_id: event actual id.
0172 """
0173 try:
0174 query = session.query(models.EventPriority)
0175 if event_type:
0176 query = query.filter_by(event_type=event_type)
0177 if event_actual_id:
0178 query = query.filter_by(event_actual_id=event_actual_id)
0179
0180 tmp = query.first()
0181 if tmp:
0182 t = tmp.to_dict()
0183 time_diff = datetime.datetime.utcnow() - t['last_processed_at']
0184 priority = time_diff.total_seconds()
0185 else:
0186 priority = 3600 * 24 * 7
0187 return priority
0188 except DatabaseError as e:
0189 if re.match('.*ORA-12899.*', e.args[0]) \
0190 or re.match('.*1406.*', e.args[0]):
0191 raise exceptions.DatabaseException('Could not persist event, content too large: %s' % str(e))
0192 else:
0193 raise exceptions.DatabaseException('Could not persist event: %s' % str(e))
0194 return 10
0195
0196
0197 @transactional_session
0198 def update_event(event_id, status, session=None):
0199 """
0200 Update event
0201
0202 :param event_id: event id.
0203 :param status: Event status.
0204 """
0205 try:
0206 parameters = {'status': status}
0207 if status == EventStatus.Processing:
0208 parameters['processing_at'] = datetime.datetime.utcnow()
0209 if status == EventStatus.Processed:
0210 parameters['processed_at'] = datetime.datetime.utcnow()
0211 session.query(models.Event).filter_by(event_id=event_id)\
0212 .update(parameters, synchronize_session=False)
0213 except DatabaseError as e:
0214 if re.match('.*ORA-12899.*', e.args[0]) \
0215 or re.match('.*1406.*', e.args[0]):
0216 raise exceptions.DatabaseException('Could not persist event, content too large: %s' % str(e))
0217 else:
0218 raise exceptions.DatabaseException('Could not persist event: %s' % str(e))
0219
0220
0221 @transactional_session
0222 def get_event_for_processing(event_type, num_events=1, session=None):
0223 """
0224 Get event for processing
0225
0226 :param event_type: event type.
0227 """
0228 try:
0229 query = session.query(models.Event)
0230 if event_type:
0231 query = query.filter_by(event_type=event_type)
0232 status = [EventStatus.New, EventStatus.New]
0233 query = query.filter(models.Event.status.in_(status))
0234 query = query.order_by(desc(models.Event.priority))
0235 query = query.order_by(asc(models.Event.event_id))
0236
0237 tmp = query.all()
0238 events = []
0239 if tmp:
0240 i = 0
0241 for event in tmp:
0242
0243
0244 session.expunge(event)
0245 update_event_priority(event.event_type, event.get_event_id(), session=session)
0246 update_event(event.event_id, status=EventStatus.Processing, session=session)
0247 events.append(event)
0248 i += 1
0249 if i >= num_events:
0250 break
0251 return events
0252 except NoResultFound as _:
0253 return []
0254 except DatabaseError as e:
0255 if re.match('.*ORA-12899.*', e.args[0]) \
0256 or re.match('.*1406.*', e.args[0]):
0257 raise exceptions.DatabaseException('Could not persist event, content too large: %s' % str(e))
0258 else:
0259 raise exceptions.DatabaseException('Could not persist event: %s' % str(e))
0260 return []
0261
0262
0263 @transactional_session
0264 def delete_event(event_id, session=None):
0265 """
0266 Delete event with the given id.
0267
0268 :param event_id: The event id.
0269 """
0270 try:
0271 session.query(models.Event).filter_by(event_id=event_id).delete()
0272 except IntegrityError as e:
0273 raise exceptions.DatabaseException(e.args)
0274
0275
0276 @transactional_session
0277 def add_event_archive(event, session=None):
0278 """
0279 Add an event to the archive.
0280
0281 :param event: The Event object.
0282 :param session: The database session.
0283 """
0284
0285 try:
0286 event_db = models.EventArchive(event_id=event.event_id,
0287 event_type=event.event_type,
0288 event_actual_id=event.event_actual_id,
0289 status=event.status,
0290 priority=event.priority,
0291 created_at=event.created_at,
0292 processing_at=event.processing_at,
0293 processed_at=event.processed_at,
0294 content=event.content)
0295 event_db.save(session=session)
0296 return event_db.event_id
0297 except TypeError as e:
0298 raise exceptions.DatabaseException('Invalid JSON for content: %s' % str(e))
0299 except DatabaseError as e:
0300 if re.match('.*ORA-12899.*', e.args[0]) \
0301 or re.match('.*1406.*', e.args[0]):
0302 raise exceptions.DatabaseException('Could not persist event, content too large: %s' % str(e))
0303 else:
0304 raise exceptions.DatabaseException('Could not persist event: %s' % str(e))
0305 return None
0306
0307
0308 @transactional_session
0309 def clean_event(event, to_archive=True, session=None):
0310 event.status = EventStatus.Processed
0311 event.processed_at = datetime.datetime.utcnow()
0312 delete_event(event.event_id, session=session)
0313 if to_archive:
0314 add_event_archive(event, session=session)
0315
0316
0317 @transactional_session
0318 def fail_event(event, to_archive=True, session=None):
0319 event.status = EventStatus.Failed
0320 event.processed_at = datetime.datetime.utcnow()
0321 delete_event(event.event_id, session=session)
0322 if to_archive:
0323 add_event_archive(event, session=session)