Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:20

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2019
0010 
0011 
0012 """
0013 operations related to Commands.
0014 """
0015 
0016 import re
0017 import datetime
0018 
0019 import sqlalchemy
0020 from sqlalchemy import or_
0021 from sqlalchemy.exc import DatabaseError, IntegrityError
0022 
0023 from idds.common import exceptions
0024 from idds.common.constants import CommandLocation, CommandLocking
0025 from idds.orm.base import models
0026 from idds.orm.base.session import read_session, transactional_session
0027 
0028 
0029 @transactional_session
0030 def add_command(cmd_type, status, request_id, workload_id, transform_id,
0031                 username=None, retries=0, processing_id=None,
0032                 source=CommandLocation.Rest, destination=CommandLocation.Clerk,
0033                 cmd_content=None, session=None):
0034     """
0035     Add a command to be submitted asynchronously to a command broker.
0036 
0037     :param cmd_type: The type of the cmd as a number, e.g., finished_stagein.
0038     :param status: The status about the command
0039     :param source: The source where the command is from.
0040     :param request_id: The request id.
0041     :param workload_id: The workload id.
0042     :param transform_id: The transform id.
0043     :param cmd_content: The command cmd_content as JSON.
0044     :param session: The database session.
0045     """
0046 
0047     try:
0048         cmd = models.Command(request_id=request_id, workload_id=workload_id,
0049                              transform_id=transform_id, cmd_type=cmd_type,
0050                              status=status, substatus=0, locking=0,
0051                              source=source, destination=destination,
0052                              username=username, retries=retries,
0053                              processing_id=processing_id,
0054                              cmd_content=cmd_content)
0055 
0056         cmd.save(session=session)
0057         cmd_id = cmd.cmd_id
0058         return cmd_id
0059     except TypeError as e:
0060         raise exceptions.DatabaseException('Invalid JSON for cmd_content: %s' % str(e))
0061     except DatabaseError as e:
0062         if re.match('.*ORA-12899.*', e.args[0]) \
0063            or re.match('.*1406.*', e.args[0]):
0064             raise exceptions.DatabaseException('Could not persist command, cmd_content too large: %s' % str(e))
0065         else:
0066             raise exceptions.DatabaseException('Could not persist command: %s' % str(e))
0067 
0068 
0069 @transactional_session
0070 def update_commands(commands, bulk_size=1000, session=None):
0071     try:
0072         session.bulk_update_mappings(models.Command, commands)
0073     except TypeError as e:
0074         raise exceptions.DatabaseException('Invalid JSON for cmd_content: %s' % str(e))
0075     except DatabaseError as e:
0076         if re.match('.*ORA-12899.*', e.args[0]) \
0077            or re.match('.*1406.*', e.args[0]):
0078             raise exceptions.DatabaseException('Could not persist command, cmd_content too large: %s' % str(e))
0079         else:
0080             raise exceptions.DatabaseException('Could not persist command: %s' % str(e))
0081 
0082 
0083 @read_session
0084 def retrieve_command(cmd_type=None, status=None, source=None,
0085                      destination=None, request_id=None, workload_id=None,
0086                      transform_id=None, processing_id=None, bulk_size=None, session=None):
0087     """
0088     Retrieve up to $bulk command.
0089 
0090     :param bulk: Number of command as an integer.
0091     :param cmd_type: Return only specified cmd_type.
0092     :param status: The status about the command
0093     :param source: The source where the command is from.
0094     :param session: The database session.
0095 
0096     :returns command: List of dictionaries
0097     """
0098     command = []
0099     try:
0100         query = session.query(models.Command)
0101 
0102         if cmd_type is not None:
0103             query = query.filter_by(cmd_type=cmd_type)
0104         if status is not None:
0105             query = query.filter_by(status=status)
0106         if source is not None:
0107             query = query.filter_by(source=source)
0108         if destination is not None:
0109             query = query.filter_by(destination=destination)
0110         if request_id is not None:
0111             query = query.filter_by(request_id=request_id)
0112         if workload_id is not None:
0113             query = query.filter_by(workload_id=workload_id)
0114         if transform_id is not None:
0115             query = query.filter_by(transform_id=transform_id)
0116         if processing_id is not None:
0117             query = query.filter_by(processing_id=processing_id)
0118 
0119         if bulk_size:
0120             query = query.order_by(models.Command.created_at).limit(bulk_size)
0121         # query = query.with_for_update(nowait=True)
0122 
0123         tmp = query.all()
0124         if tmp:
0125             for t in tmp:
0126                 command.append(t.to_dict())
0127         return command
0128     except IntegrityError as e:
0129         raise exceptions.DatabaseException(e.args)
0130 
0131 
0132 @transactional_session
0133 def delete_command(command, session=None):
0134     """
0135     Delete all command with the given IDs.
0136 
0137     :param command: The command to delete as a list of dictionaries.
0138     """
0139     command_condition = []
0140     for command in command:
0141         command_condition.append(models.Command.cmd_id == command['cmd_id'])
0142 
0143     try:
0144         if command_condition:
0145             session.query(models.Command).\
0146                 filter(or_(*command_condition)).\
0147                 delete(synchronize_session=False)
0148     except IntegrityError as e:
0149         raise exceptions.DatabaseException(e.args)
0150 
0151 
0152 @transactional_session
0153 def get_commands_by_status(status, locking=False, period=None, bulk_size=None, session=None):
0154     """
0155     Get commands
0156 
0157     :param status: Command status.
0158     :param locking: Whether only retrieves unlocked items.
0159 
0160     :param session: The database session in use.
0161 
0162     :returns: list of commands.
0163     """
0164     try:
0165         if status:
0166             if not isinstance(status, (list, tuple)):
0167                 status = [status]
0168             if len(status) == 1:
0169                 status = [status[0], status[0]]
0170 
0171         query = session.query(models.Command)
0172         if status:
0173             query = query.filter(models.Command.status.in_(status))
0174 
0175         if period:
0176             query = query.filter(models.Command.updated_at <= datetime.datetime.utcnow() - datetime.timedelta(seconds=period))
0177 
0178         if locking:
0179             query = query.filter(models.Command.locking == CommandLocking.Idle)
0180         # query = query.with_for_update(skip_locked=True)
0181         # query = query.order_by(asc(models.Command.updated_at))
0182 
0183         if bulk_size:
0184             query = query.limit(bulk_size)
0185 
0186         tmp = query.all()
0187         rets = []
0188         if tmp:
0189             rets = [t.to_dict() for t in tmp]
0190         return rets
0191     except sqlalchemy.orm.exc.NoResultFound as error:
0192         raise exceptions.NoObject('No commands attached with status (%s): %s' %
0193                                   (status, error))
0194     except Exception as error:
0195         raise error