Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:20

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2023
0010 
0011 
0012 """
0013 operations related to throttler.
0014 """
0015 
0016 import re
0017 
0018 from sqlalchemy.exc import DatabaseError, IntegrityError
0019 
0020 from idds.common import exceptions
0021 from idds.common.constants import ThrottlerStatus
0022 from idds.orm.base import models
0023 from idds.orm.base.session import read_session, transactional_session
0024 
0025 
0026 @transactional_session
0027 def add_throttler(site, status=ThrottlerStatus.Active, num_requests=None, num_transforms=None, num_processings=None, new_contents=None,
0028                   queue_contents=None, others=None, session=None):
0029     """
0030     Add a throttler item
0031 
0032     :param site: The site name.
0033     :param session: The database session.
0034     """
0035 
0036     try:
0037         old_throttlers = get_throttlers(site=site, session=session)
0038 
0039         if old_throttlers:
0040             old_throttler = old_throttlers[0]
0041             parameters = {}
0042             if status is not None:
0043                 parameters['status'] = status
0044             if num_requests is not None:
0045                 parameters['num_requests'] = num_requests
0046             if num_transforms is not None:
0047                 parameters['num_transforms'] = num_transforms
0048             if num_processings is not None:
0049                 parameters['num_processings'] = num_processings
0050             if new_contents is not None:
0051                 parameters['new_contents'] = new_contents
0052             if queue_contents is not None:
0053                 parameters['queue_contents'] = queue_contents
0054             if others is not None:
0055                 parameters['others'] = others
0056             update_throttler(throttler_id=old_throttler['throttler_id'], parameters=parameters, session=session)
0057             return old_throttler['throttler_id']
0058         else:
0059             throttler = models.Throttler(site=site,
0060                                          status=status,
0061                                          num_requests=num_requests,
0062                                          num_transforms=num_transforms,
0063                                          num_processings=num_processings,
0064                                          new_contents=new_contents,
0065                                          queue_contents=queue_contents,
0066                                          others=others)
0067             throttler.save(session=session)
0068             return throttler.throttler_id
0069     except TypeError as e:
0070         raise exceptions.DatabaseException('Invalid JSON for content: %s' % str(e))
0071     except DatabaseError as e:
0072         if re.match('.*ORA-12899.*', e.args[0]) \
0073            or re.match('.*1406.*', e.args[0]):
0074             raise exceptions.DatabaseException('Could not persist throttler, content too large: %s' % str(e))
0075         else:
0076             raise exceptions.DatabaseException('Could not persist throttler: %s' % str(e))
0077     return None
0078 
0079 
0080 @read_session
0081 def get_throttlers(site=None, status=None, session=None):
0082     """
0083     Get throttler
0084 
0085     :param site: site name.
0086     :param status: throttler status.
0087     """
0088     try:
0089         if status and not isinstance(status, (list, tuple)):
0090             status = [status]
0091         if status and len(status) == 1:
0092             status = [status[0], status[0]]
0093 
0094         query = session.query(models.Throttler)
0095         if site:
0096             query = query.filter_by(site=site)
0097         if status:
0098             query = query.filter(models.Throttler.status.in_(status))
0099 
0100         tmp = query.all()
0101         throttlers = []
0102         if tmp:
0103             for t in tmp:
0104                 throttlers.append(t.to_dict())
0105         return throttlers
0106     except DatabaseError as e:
0107         if re.match('.*ORA-12899.*', e.args[0]) \
0108            or re.match('.*1406.*', e.args[0]):
0109             raise exceptions.DatabaseException('Could not persist throttler, content too large: %s' % str(e))
0110         else:
0111             raise exceptions.DatabaseException('Could not persist throttler: %s' % str(e))
0112     return None
0113 
0114 
0115 @transactional_session
0116 def update_throttler(throttler_id=None, site=None, parameters=None, session=None):
0117     """
0118     Update throttler
0119 
0120     :param throttler_id: throttler id.
0121     :param parameters: parameters in dict.
0122     """
0123     try:
0124         query = session.query(models.Throttler)
0125         if throttler_id is None and site is None:
0126             raise exceptions.DatabaseException("Could not update database with both throttler_id and site None")
0127 
0128         if throttler_id:
0129             query = query.filter_by(throttler_id=throttler_id)
0130         if site:
0131             query = query.filter_by(site=site)
0132         query.update(parameters, synchronize_session=False)
0133     except DatabaseError as e:
0134         if re.match('.*ORA-12899.*', e.args[0]) \
0135            or re.match('.*1406.*', e.args[0]):
0136             raise exceptions.DatabaseException('Could not persist throttler, content too large: %s' % str(e))
0137         else:
0138             raise exceptions.DatabaseException('Could not persist throttler: %s' % str(e))
0139 
0140 
0141 @transactional_session
0142 def delete_throttler(throttler_id, session=None):
0143     """
0144     Delete throttler with the given id.
0145 
0146     :param throttler_id: The throttler id.
0147     """
0148     try:
0149         session.query(models.Throttler).filter_by(throttler_id=throttler_id).delete()
0150     except IntegrityError as e:
0151         raise exceptions.DatabaseException(e.args)