Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:19

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2019
0010 
0011 
0012 """
0013 operations related to Commands.
0014 """
0015 
0016 from idds.common.constants import CommandLocation, CommandLocking, CommandStatus
0017 from idds.orm.base.session import read_session, transactional_session
0018 from idds.orm import commands as orm_commands
0019 
0020 
0021 @transactional_session
0022 def add_command(request_id, cmd_type, cmd_content,
0023                 status=CommandStatus.New, workload_id=None,
0024                 transform_id=None,
0025                 username=None, retries=0, processing_id=0,
0026                 source=CommandLocation.Rest,
0027                 destination=CommandLocation.Clerk, session=None):
0028     """
0029     Add a command to be submitted asynchronously to a command broker.
0030 
0031     :param cmd_type: The type of the cmd as a number, e.g., finished_stagein.
0032     :param status: The status about the command
0033     :param source: The source where the command is from.
0034     :param cmd_content: The command cmd_content as JSON.
0035     :param session: The database session.
0036     """
0037     return orm_commands.add_command(cmd_type=cmd_type, status=status, source=source,
0038                                     request_id=request_id, workload_id=workload_id,
0039                                     transform_id=transform_id, username=username, retries=retries,
0040                                     destination=destination, processing_id=processing_id,
0041                                     cmd_content=cmd_content, session=session)
0042 
0043 
0044 @read_session
0045 def retrieve_commands(bulk_size=None, cmd_type=None, status=None, destination=None,
0046                       source=None, request_id=None, workload_id=None, transform_id=None,
0047                       processing_id=None, session=None):
0048     """
0049     Retrieve up to $bulk commands.
0050 
0051     :param bulk: Number of commands as an integer.
0052     :param cmd_type: Return only specified cmd_type.
0053     :param status: The status about the command
0054     :param source: The source where the command is from.
0055     :param session: The database session.
0056 
0057     :returns commands: List of dictionaries
0058     """
0059     return orm_commands.retrieve_commands(bulk_size=bulk_size, cmd_type=cmd_type,
0060                                           status=status, source=source, destination=destination,
0061                                           request_id=request_id, workload_id=workload_id,
0062                                           transform_id=transform_id, processing_id=processing_id,
0063                                           session=session)
0064 
0065 
0066 @transactional_session
0067 def delete_commands(commands, session=None):
0068     """
0069     Delete all commands with the given IDs.
0070 
0071     :param commands: The commands to delete as a list of dictionaries.
0072     """
0073     return orm_commands.delete_commands(commands=commands, session=session)
0074 
0075 
0076 @transactional_session
0077 def update_commands(commands, session=None):
0078     """
0079     Update all commands status with the given IDs.
0080 
0081     :param commands: The commands to be updated as a list of dictionaries.
0082     """
0083     return orm_commands.update_commands(commands=commands, session=session)
0084 
0085 
0086 @transactional_session
0087 def get_commands_by_status(status, locking=False, period=None, session=None):
0088     """
0089     Get commands
0090 
0091     :param status: Command status.
0092     :param locking: Whether only retrieves unlocked items.
0093 
0094     :param session: The database session in use.
0095 
0096     :returns: list of commands.
0097     """
0098     cmds = orm_commands.get_commands_by_status(status=status, locking=locking, period=period, session=session)
0099     if locking:
0100         parameters = []
0101         for cmd in cmds:
0102             param = {'cmd_id': cmd['cmd_id'],
0103                      'locking': CommandLocking.Locking}
0104             parameters.append(param)
0105         orm_commands.update_commands(parameters)
0106     return cmds