File indexing completed on 2026-04-09 07:58:20
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 operations related to Conditions.
0014 """
0015
0016 import re
0017 import datetime
0018
0019 from sqlalchemy.exc import DatabaseError, IntegrityError
0020
0021 from idds.common import exceptions
0022 from idds.common.constants import ConditionStatus
0023 from idds.orm.base import models
0024 from idds.orm.base.session import read_session, transactional_session
0025
0026
0027 @transactional_session
0028 def add_condition(request_id, internal_id, status=ConditionStatus.WaitForTrigger,
0029 substatus=None, is_loop=False, loop_index=None, cloned_from=None,
0030 evaluate_result=None, previous_transforms=None, following_transforms=None,
0031 condition=None, session=None):
0032 """
0033 Add a condition.
0034
0035 :param request_id: The request id.
0036 :param intenal_id: The internal id.
0037 :param status: The status about the condition.
0038 :param substatus: The substatus about the condition.
0039 :param is_loop: Whether it's a loop condition.
0040 :param loop_index: The loop index if it's a loop.
0041 :param cloned_from: The original condition if it's a loop.
0042 :param evaluate_result: The condition's evaluated result.
0043 :param previous_transforms: The previous transforms which can trigger this condition.
0044 :param following_transorms: The following transforms which will be triggered.
0045 :param condition: The condition function.
0046 :param session: The database session.
0047 """
0048
0049 try:
0050 cond = models.Condition(request_id=request_id, internal_id=internal_id,
0051 status=status, substatus=substatus, is_loop=is_loop,
0052 loop_index=loop_index, cloned_from=cloned_from,
0053 evaluate_result=evaluate_result,
0054 previous_transforms=previous_transforms,
0055 following_transforms=following_transforms,
0056 condition=condition)
0057
0058 cond.save(session=session)
0059 cond_id = cond.condition_id
0060 return cond_id
0061 except TypeError as e:
0062 raise exceptions.DatabaseException('Invalid JSON for condition: %s' % str(e))
0063 except DatabaseError as e:
0064 if re.match('.*ORA-12899.*', e.args[0]) \
0065 or re.match('.*1406.*', e.args[0]):
0066 raise exceptions.DatabaseException('Could not persist condition, condition too large: %s' % str(e))
0067 else:
0068 raise exceptions.DatabaseException('Could not persist condition: %s' % str(e))
0069
0070
0071 @transactional_session
0072 def update_condition(condition_id, parameters, session=None):
0073 """
0074 Update condition.
0075
0076 :param condition_id: The condition id.
0077 :param parameters: Parameters as a dict.
0078 :param session: The database session.
0079 """
0080
0081 try:
0082 parameters['updated_at'] = datetime.datetime.utcnow()
0083 session.query(models.Condition).filter_by(condition_id=condition_id)\
0084 .update(parameters, synchronize_session=False)
0085 except TypeError as e:
0086 raise exceptions.DatabaseException('Invalid JSON for condition: %s' % str(e))
0087 except DatabaseError as e:
0088 if re.match('.*ORA-12899.*', e.args[0]) \
0089 or re.match('.*1406.*', e.args[0]):
0090 raise exceptions.DatabaseException('Could not persist condition, condition too large: %s' % str(e))
0091 else:
0092 raise exceptions.DatabaseException('Could not persist condition: %s' % str(e))
0093
0094
0095 @transactional_session
0096 def update_conditions(conditions, session=None):
0097 """
0098 Update conditions.
0099
0100 :param conditions: Condtions as a list of dict.
0101 :param session: The database session.
0102 """
0103
0104 try:
0105 session.bulk_update_mappings(models.Conidition, conditions)
0106 except TypeError as e:
0107 raise exceptions.DatabaseException('Invalid JSON for condition: %s' % str(e))
0108 except DatabaseError as e:
0109 if re.match('.*ORA-12899.*', e.args[0]) \
0110 or re.match('.*1406.*', e.args[0]):
0111 raise exceptions.DatabaseException('Could not persist condition, condition too large: %s' % str(e))
0112 else:
0113 raise exceptions.DatabaseException('Could not persist condition: %s' % str(e))
0114
0115
0116 @read_session
0117 def retrieve_conditions(request_id, internal_id=None, status=None, session=None):
0118 """
0119 Retrieve conditions
0120
0121 :param request_id: The request id.
0122 :param intenal_id: The internal id.
0123 :param status: The status about the condition.
0124 :param session: The database session.
0125
0126 :returns command: List of conditions
0127 """
0128 conditions = []
0129 try:
0130 query = session.query(models.Condition)
0131
0132 if request_id is not None:
0133 query = query.filter_by(request_id=request_id)
0134 if internal_id is not None:
0135 query = query.filter_by(internal_id=internal_id)
0136 if status is not None:
0137 query = query.filter_by(status=status)
0138
0139 tmp = query.all()
0140 if tmp:
0141 for t in tmp:
0142 conditions.append(t.to_dict())
0143 return conditions
0144 except IntegrityError as e:
0145 raise exceptions.DatabaseException(e.args)
0146
0147
0148 @transactional_session
0149 def delete_conditions(request_id=None, internal_id=None, session=None):
0150 """
0151 Delete all conditions with the given IDs.
0152
0153 :param request_id: The request id.
0154 :param intenal_id: The internal id.
0155 :param session: The database session.
0156 """
0157 try:
0158 query = session.query(models.Condition)
0159
0160 if request_id is not None:
0161 query = query.filter_by(request_id=request_id)
0162 if internal_id is not None:
0163 query = query.filter_by(internal_id=internal_id)
0164
0165 query.delete(synchronize_session=False)
0166 except IntegrityError as e:
0167 raise exceptions.DatabaseException(e.args)