Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:20

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2024
0010 
0011 
0012 """
0013 operations related to Conditions.
0014 """
0015 
0016 import re
0017 import datetime
0018 
0019 from sqlalchemy.exc import DatabaseError, IntegrityError
0020 
0021 from idds.common import exceptions
0022 from idds.common.constants import ConditionStatus
0023 from idds.orm.base import models
0024 from idds.orm.base.session import read_session, transactional_session
0025 
0026 
0027 @transactional_session
0028 def add_condition(request_id, internal_id, status=ConditionStatus.WaitForTrigger,
0029                   substatus=None, is_loop=False, loop_index=None, cloned_from=None,
0030                   evaluate_result=None, previous_transforms=None, following_transforms=None,
0031                   condition=None, session=None):
0032     """
0033     Add a condition.
0034 
0035     :param request_id: The request id.
0036     :param intenal_id: The internal id.
0037     :param status: The status about the condition.
0038     :param substatus: The substatus about the condition.
0039     :param is_loop: Whether it's a loop condition.
0040     :param loop_index: The loop index if it's a loop.
0041     :param cloned_from: The original condition if it's a loop.
0042     :param evaluate_result: The condition's evaluated result.
0043     :param previous_transforms: The previous transforms which can trigger this condition.
0044     :param following_transorms: The following transforms which will be triggered.
0045     :param condition: The condition function.
0046     :param session: The database session.
0047     """
0048 
0049     try:
0050         cond = models.Condition(request_id=request_id, internal_id=internal_id,
0051                                 status=status, substatus=substatus, is_loop=is_loop,
0052                                 loop_index=loop_index, cloned_from=cloned_from,
0053                                 evaluate_result=evaluate_result,
0054                                 previous_transforms=previous_transforms,
0055                                 following_transforms=following_transforms,
0056                                 condition=condition)
0057 
0058         cond.save(session=session)
0059         cond_id = cond.condition_id
0060         return cond_id
0061     except TypeError as e:
0062         raise exceptions.DatabaseException('Invalid JSON for condition: %s' % str(e))
0063     except DatabaseError as e:
0064         if re.match('.*ORA-12899.*', e.args[0]) \
0065            or re.match('.*1406.*', e.args[0]):
0066             raise exceptions.DatabaseException('Could not persist condition, condition too large: %s' % str(e))
0067         else:
0068             raise exceptions.DatabaseException('Could not persist condition: %s' % str(e))
0069 
0070 
0071 @transactional_session
0072 def update_condition(condition_id, parameters, session=None):
0073     """
0074     Update condition.
0075 
0076     :param condition_id: The condition id.
0077     :param parameters: Parameters as a dict.
0078     :param session: The database session.
0079     """
0080 
0081     try:
0082         parameters['updated_at'] = datetime.datetime.utcnow()
0083         session.query(models.Condition).filter_by(condition_id=condition_id)\
0084                .update(parameters, synchronize_session=False)
0085     except TypeError as e:
0086         raise exceptions.DatabaseException('Invalid JSON for condition: %s' % str(e))
0087     except DatabaseError as e:
0088         if re.match('.*ORA-12899.*', e.args[0]) \
0089            or re.match('.*1406.*', e.args[0]):
0090             raise exceptions.DatabaseException('Could not persist condition, condition too large: %s' % str(e))
0091         else:
0092             raise exceptions.DatabaseException('Could not persist condition: %s' % str(e))
0093 
0094 
0095 @transactional_session
0096 def update_conditions(conditions, session=None):
0097     """
0098     Update conditions.
0099 
0100     :param conditions: Condtions as a list of dict.
0101     :param session: The database session.
0102     """
0103 
0104     try:
0105         session.bulk_update_mappings(models.Conidition, conditions)
0106     except TypeError as e:
0107         raise exceptions.DatabaseException('Invalid JSON for condition: %s' % str(e))
0108     except DatabaseError as e:
0109         if re.match('.*ORA-12899.*', e.args[0]) \
0110            or re.match('.*1406.*', e.args[0]):
0111             raise exceptions.DatabaseException('Could not persist condition, condition too large: %s' % str(e))
0112         else:
0113             raise exceptions.DatabaseException('Could not persist condition: %s' % str(e))
0114 
0115 
0116 @read_session
0117 def retrieve_conditions(request_id, internal_id=None, status=None, session=None):
0118     """
0119     Retrieve conditions
0120 
0121     :param request_id: The request id.
0122     :param intenal_id: The internal id.
0123     :param status: The status about the condition.
0124     :param session: The database session.
0125 
0126     :returns command: List of conditions
0127     """
0128     conditions = []
0129     try:
0130         query = session.query(models.Condition)
0131 
0132         if request_id is not None:
0133             query = query.filter_by(request_id=request_id)
0134         if internal_id is not None:
0135             query = query.filter_by(internal_id=internal_id)
0136         if status is not None:
0137             query = query.filter_by(status=status)
0138 
0139         tmp = query.all()
0140         if tmp:
0141             for t in tmp:
0142                 conditions.append(t.to_dict())
0143         return conditions
0144     except IntegrityError as e:
0145         raise exceptions.DatabaseException(e.args)
0146 
0147 
0148 @transactional_session
0149 def delete_conditions(request_id=None, internal_id=None, session=None):
0150     """
0151     Delete all conditions with the given IDs.
0152 
0153     :param request_id: The request id.
0154     :param intenal_id: The internal id.
0155     :param session: The database session.
0156     """
0157     try:
0158         query = session.query(models.Condition)
0159 
0160         if request_id is not None:
0161             query = query.filter_by(request_id=request_id)
0162         if internal_id is not None:
0163             query = query.filter_by(internal_id=internal_id)
0164 
0165         query.delete(synchronize_session=False)
0166     except IntegrityError as e:
0167         raise exceptions.DatabaseException(e.args)