File indexing completed on 2026-04-09 07:58:20
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 operations related to Commands.
0014 """
0015
0016 import re
0017 import datetime
0018
0019 import sqlalchemy
0020 from sqlalchemy import or_
0021 from sqlalchemy.exc import DatabaseError, IntegrityError
0022
0023 from idds.common import exceptions
0024 from idds.common.constants import CommandLocation, CommandLocking
0025 from idds.orm.base import models
0026 from idds.orm.base.session import read_session, transactional_session
0027
0028
0029 @transactional_session
0030 def add_command(cmd_type, status, request_id, workload_id, transform_id,
0031 username=None, retries=0, processing_id=None,
0032 source=CommandLocation.Rest, destination=CommandLocation.Clerk,
0033 cmd_content=None, session=None):
0034 """
0035 Add a command to be submitted asynchronously to a command broker.
0036
0037 :param cmd_type: The type of the cmd as a number, e.g., finished_stagein.
0038 :param status: The status about the command
0039 :param source: The source where the command is from.
0040 :param request_id: The request id.
0041 :param workload_id: The workload id.
0042 :param transform_id: The transform id.
0043 :param cmd_content: The command cmd_content as JSON.
0044 :param session: The database session.
0045 """
0046
0047 try:
0048 cmd = models.Command(request_id=request_id, workload_id=workload_id,
0049 transform_id=transform_id, cmd_type=cmd_type,
0050 status=status, substatus=0, locking=0,
0051 source=source, destination=destination,
0052 username=username, retries=retries,
0053 processing_id=processing_id,
0054 cmd_content=cmd_content)
0055
0056 cmd.save(session=session)
0057 cmd_id = cmd.cmd_id
0058 return cmd_id
0059 except TypeError as e:
0060 raise exceptions.DatabaseException('Invalid JSON for cmd_content: %s' % str(e))
0061 except DatabaseError as e:
0062 if re.match('.*ORA-12899.*', e.args[0]) \
0063 or re.match('.*1406.*', e.args[0]):
0064 raise exceptions.DatabaseException('Could not persist command, cmd_content too large: %s' % str(e))
0065 else:
0066 raise exceptions.DatabaseException('Could not persist command: %s' % str(e))
0067
0068
0069 @transactional_session
0070 def update_commands(commands, bulk_size=1000, session=None):
0071 try:
0072 session.bulk_update_mappings(models.Command, commands)
0073 except TypeError as e:
0074 raise exceptions.DatabaseException('Invalid JSON for cmd_content: %s' % str(e))
0075 except DatabaseError as e:
0076 if re.match('.*ORA-12899.*', e.args[0]) \
0077 or re.match('.*1406.*', e.args[0]):
0078 raise exceptions.DatabaseException('Could not persist command, cmd_content too large: %s' % str(e))
0079 else:
0080 raise exceptions.DatabaseException('Could not persist command: %s' % str(e))
0081
0082
0083 @read_session
0084 def retrieve_command(cmd_type=None, status=None, source=None,
0085 destination=None, request_id=None, workload_id=None,
0086 transform_id=None, processing_id=None, bulk_size=None, session=None):
0087 """
0088 Retrieve up to $bulk command.
0089
0090 :param bulk: Number of command as an integer.
0091 :param cmd_type: Return only specified cmd_type.
0092 :param status: The status about the command
0093 :param source: The source where the command is from.
0094 :param session: The database session.
0095
0096 :returns command: List of dictionaries
0097 """
0098 command = []
0099 try:
0100 query = session.query(models.Command)
0101
0102 if cmd_type is not None:
0103 query = query.filter_by(cmd_type=cmd_type)
0104 if status is not None:
0105 query = query.filter_by(status=status)
0106 if source is not None:
0107 query = query.filter_by(source=source)
0108 if destination is not None:
0109 query = query.filter_by(destination=destination)
0110 if request_id is not None:
0111 query = query.filter_by(request_id=request_id)
0112 if workload_id is not None:
0113 query = query.filter_by(workload_id=workload_id)
0114 if transform_id is not None:
0115 query = query.filter_by(transform_id=transform_id)
0116 if processing_id is not None:
0117 query = query.filter_by(processing_id=processing_id)
0118
0119 if bulk_size:
0120 query = query.order_by(models.Command.created_at).limit(bulk_size)
0121
0122
0123 tmp = query.all()
0124 if tmp:
0125 for t in tmp:
0126 command.append(t.to_dict())
0127 return command
0128 except IntegrityError as e:
0129 raise exceptions.DatabaseException(e.args)
0130
0131
0132 @transactional_session
0133 def delete_command(command, session=None):
0134 """
0135 Delete all command with the given IDs.
0136
0137 :param command: The command to delete as a list of dictionaries.
0138 """
0139 command_condition = []
0140 for command in command:
0141 command_condition.append(models.Command.cmd_id == command['cmd_id'])
0142
0143 try:
0144 if command_condition:
0145 session.query(models.Command).\
0146 filter(or_(*command_condition)).\
0147 delete(synchronize_session=False)
0148 except IntegrityError as e:
0149 raise exceptions.DatabaseException(e.args)
0150
0151
0152 @transactional_session
0153 def get_commands_by_status(status, locking=False, period=None, bulk_size=None, session=None):
0154 """
0155 Get commands
0156
0157 :param status: Command status.
0158 :param locking: Whether only retrieves unlocked items.
0159
0160 :param session: The database session in use.
0161
0162 :returns: list of commands.
0163 """
0164 try:
0165 if status:
0166 if not isinstance(status, (list, tuple)):
0167 status = [status]
0168 if len(status) == 1:
0169 status = [status[0], status[0]]
0170
0171 query = session.query(models.Command)
0172 if status:
0173 query = query.filter(models.Command.status.in_(status))
0174
0175 if period:
0176 query = query.filter(models.Command.updated_at <= datetime.datetime.utcnow() - datetime.timedelta(seconds=period))
0177
0178 if locking:
0179 query = query.filter(models.Command.locking == CommandLocking.Idle)
0180
0181
0182
0183 if bulk_size:
0184 query = query.limit(bulk_size)
0185
0186 tmp = query.all()
0187 rets = []
0188 if tmp:
0189 rets = [t.to_dict() for t in tmp]
0190 return rets
0191 except sqlalchemy.orm.exc.NoResultFound as error:
0192 raise exceptions.NoObject('No commands attached with status (%s): %s' %
0193 (status, error))
0194 except Exception as error:
0195 raise error