File indexing completed on 2026-04-09 07:58:19
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 operations related to Commands.
0014 """
0015
0016 from idds.common.constants import CommandLocation, CommandLocking, CommandStatus
0017 from idds.orm.base.session import read_session, transactional_session
0018 from idds.orm import commands as orm_commands
0019
0020
0021 @transactional_session
0022 def add_command(request_id, cmd_type, cmd_content,
0023 status=CommandStatus.New, workload_id=None,
0024 transform_id=None,
0025 username=None, retries=0, processing_id=0,
0026 source=CommandLocation.Rest,
0027 destination=CommandLocation.Clerk, session=None):
0028 """
0029 Add a command to be submitted asynchronously to a command broker.
0030
0031 :param cmd_type: The type of the cmd as a number, e.g., finished_stagein.
0032 :param status: The status about the command
0033 :param source: The source where the command is from.
0034 :param cmd_content: The command cmd_content as JSON.
0035 :param session: The database session.
0036 """
0037 return orm_commands.add_command(cmd_type=cmd_type, status=status, source=source,
0038 request_id=request_id, workload_id=workload_id,
0039 transform_id=transform_id, username=username, retries=retries,
0040 destination=destination, processing_id=processing_id,
0041 cmd_content=cmd_content, session=session)
0042
0043
0044 @read_session
0045 def retrieve_commands(bulk_size=None, cmd_type=None, status=None, destination=None,
0046 source=None, request_id=None, workload_id=None, transform_id=None,
0047 processing_id=None, session=None):
0048 """
0049 Retrieve up to $bulk commands.
0050
0051 :param bulk: Number of commands as an integer.
0052 :param cmd_type: Return only specified cmd_type.
0053 :param status: The status about the command
0054 :param source: The source where the command is from.
0055 :param session: The database session.
0056
0057 :returns commands: List of dictionaries
0058 """
0059 return orm_commands.retrieve_commands(bulk_size=bulk_size, cmd_type=cmd_type,
0060 status=status, source=source, destination=destination,
0061 request_id=request_id, workload_id=workload_id,
0062 transform_id=transform_id, processing_id=processing_id,
0063 session=session)
0064
0065
0066 @transactional_session
0067 def delete_commands(commands, session=None):
0068 """
0069 Delete all commands with the given IDs.
0070
0071 :param commands: The commands to delete as a list of dictionaries.
0072 """
0073 return orm_commands.delete_commands(commands=commands, session=session)
0074
0075
0076 @transactional_session
0077 def update_commands(commands, session=None):
0078 """
0079 Update all commands status with the given IDs.
0080
0081 :param commands: The commands to be updated as a list of dictionaries.
0082 """
0083 return orm_commands.update_commands(commands=commands, session=session)
0084
0085
0086 @transactional_session
0087 def get_commands_by_status(status, locking=False, period=None, session=None):
0088 """
0089 Get commands
0090
0091 :param status: Command status.
0092 :param locking: Whether only retrieves unlocked items.
0093
0094 :param session: The database session in use.
0095
0096 :returns: list of commands.
0097 """
0098 cmds = orm_commands.get_commands_by_status(status=status, locking=locking, period=period, session=session)
0099 if locking:
0100 parameters = []
0101 for cmd in cmds:
0102 param = {'cmd_id': cmd['cmd_id'],
0103 'locking': CommandLocking.Locking}
0104 parameters.append(param)
0105 orm_commands.update_commands(parameters)
0106 return cmds