File indexing completed on 2026-04-09 07:58:20
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 operations related to throttler.
0014 """
0015
0016 import re
0017
0018 from sqlalchemy.exc import DatabaseError, IntegrityError
0019
0020 from idds.common import exceptions
0021 from idds.common.constants import ThrottlerStatus
0022 from idds.orm.base import models
0023 from idds.orm.base.session import read_session, transactional_session
0024
0025
0026 @transactional_session
0027 def add_throttler(site, status=ThrottlerStatus.Active, num_requests=None, num_transforms=None, num_processings=None, new_contents=None,
0028 queue_contents=None, others=None, session=None):
0029 """
0030 Add a throttler item
0031
0032 :param site: The site name.
0033 :param session: The database session.
0034 """
0035
0036 try:
0037 old_throttlers = get_throttlers(site=site, session=session)
0038
0039 if old_throttlers:
0040 old_throttler = old_throttlers[0]
0041 parameters = {}
0042 if status is not None:
0043 parameters['status'] = status
0044 if num_requests is not None:
0045 parameters['num_requests'] = num_requests
0046 if num_transforms is not None:
0047 parameters['num_transforms'] = num_transforms
0048 if num_processings is not None:
0049 parameters['num_processings'] = num_processings
0050 if new_contents is not None:
0051 parameters['new_contents'] = new_contents
0052 if queue_contents is not None:
0053 parameters['queue_contents'] = queue_contents
0054 if others is not None:
0055 parameters['others'] = others
0056 update_throttler(throttler_id=old_throttler['throttler_id'], parameters=parameters, session=session)
0057 return old_throttler['throttler_id']
0058 else:
0059 throttler = models.Throttler(site=site,
0060 status=status,
0061 num_requests=num_requests,
0062 num_transforms=num_transforms,
0063 num_processings=num_processings,
0064 new_contents=new_contents,
0065 queue_contents=queue_contents,
0066 others=others)
0067 throttler.save(session=session)
0068 return throttler.throttler_id
0069 except TypeError as e:
0070 raise exceptions.DatabaseException('Invalid JSON for content: %s' % str(e))
0071 except DatabaseError as e:
0072 if re.match('.*ORA-12899.*', e.args[0]) \
0073 or re.match('.*1406.*', e.args[0]):
0074 raise exceptions.DatabaseException('Could not persist throttler, content too large: %s' % str(e))
0075 else:
0076 raise exceptions.DatabaseException('Could not persist throttler: %s' % str(e))
0077 return None
0078
0079
0080 @read_session
0081 def get_throttlers(site=None, status=None, session=None):
0082 """
0083 Get throttler
0084
0085 :param site: site name.
0086 :param status: throttler status.
0087 """
0088 try:
0089 if status and not isinstance(status, (list, tuple)):
0090 status = [status]
0091 if status and len(status) == 1:
0092 status = [status[0], status[0]]
0093
0094 query = session.query(models.Throttler)
0095 if site:
0096 query = query.filter_by(site=site)
0097 if status:
0098 query = query.filter(models.Throttler.status.in_(status))
0099
0100 tmp = query.all()
0101 throttlers = []
0102 if tmp:
0103 for t in tmp:
0104 throttlers.append(t.to_dict())
0105 return throttlers
0106 except DatabaseError as e:
0107 if re.match('.*ORA-12899.*', e.args[0]) \
0108 or re.match('.*1406.*', e.args[0]):
0109 raise exceptions.DatabaseException('Could not persist throttler, content too large: %s' % str(e))
0110 else:
0111 raise exceptions.DatabaseException('Could not persist throttler: %s' % str(e))
0112 return None
0113
0114
0115 @transactional_session
0116 def update_throttler(throttler_id=None, site=None, parameters=None, session=None):
0117 """
0118 Update throttler
0119
0120 :param throttler_id: throttler id.
0121 :param parameters: parameters in dict.
0122 """
0123 try:
0124 query = session.query(models.Throttler)
0125 if throttler_id is None and site is None:
0126 raise exceptions.DatabaseException("Could not update database with both throttler_id and site None")
0127
0128 if throttler_id:
0129 query = query.filter_by(throttler_id=throttler_id)
0130 if site:
0131 query = query.filter_by(site=site)
0132 query.update(parameters, synchronize_session=False)
0133 except DatabaseError as e:
0134 if re.match('.*ORA-12899.*', e.args[0]) \
0135 or re.match('.*1406.*', e.args[0]):
0136 raise exceptions.DatabaseException('Could not persist throttler, content too large: %s' % str(e))
0137 else:
0138 raise exceptions.DatabaseException('Could not persist throttler: %s' % str(e))
0139
0140
0141 @transactional_session
0142 def delete_throttler(throttler_id, session=None):
0143 """
0144 Delete throttler with the given id.
0145
0146 :param throttler_id: The throttler id.
0147 """
0148 try:
0149 session.query(models.Throttler).filter_by(throttler_id=throttler_id).delete()
0150 except IntegrityError as e:
0151 raise exceptions.DatabaseException(e.args)