Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:20

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2019 - 2020
0010 
0011 
0012 """
0013 operations related to workflow model.
0014 """
0015 
0016 import datetime
0017 
0018 import sqlalchemy
0019 from sqlalchemy.exc import DatabaseError, IntegrityError
0020 from sqlalchemy.sql.expression import asc, desc
0021 
0022 from idds.common import exceptions
0023 from idds.common.constants import WorkprogressStatus, WorkprogressLocking
0024 from idds.orm.base.session import read_session, transactional_session
0025 from idds.orm.base import models
0026 
0027 
0028 def create_workprogress(request_id, workload_id, scope, name, priority=0, status=WorkprogressStatus.New,
0029                         locking=WorkprogressLocking.Idle, expired_at=None, errors=None,
0030                         workprogress_metadata=None, processing_metadata=None):
0031     """
0032     Create a workprogress.
0033 
0034     :param request_id: The request id.
0035     :param workload_id: The workload id.
0036     :param scope: The scope.
0037     :param name: The name.
0038     :param status: The status as integer.
0039     :param locking: The locking as integer.
0040     :param priority: The priority as integer.
0041     :param expired_at: The datetime when the workprogress will be expired at.
0042     :param errors: The errors as a json.
0043     :param workprogress_metadata: The metadata as json.
0044     :param processing_metadata: The metadata as json.
0045 
0046     :returns: workprogress.
0047     """
0048     new_wp = models.WorkProgress(request_id=request_id, workload_id=workload_id, scope=scope, name=name,
0049                                  priority=priority, status=status,
0050                                  locking=locking, expired_at=expired_at,
0051                                  workprogress_metadata=workprogress_metadata,
0052                                  processing_metadata=processing_metadata)
0053     return new_wp
0054 
0055 
0056 @transactional_session
0057 def add_workprogress(request_id, workload_id, scope, name, priority=0, status=WorkprogressStatus.New,
0058                      locking=WorkprogressLocking.Idle,
0059                      expired_at=None, errors=None, workprogress_metadata=None, processing_metadata=None,
0060                      session=None):
0061     """
0062     Add a workprogress.
0063 
0064     :param request_id: The request id.
0065     :param workload_id: The workload id.
0066     :param scope: The scope.
0067     :param name: The name.
0068     :param status: The status as integer.
0069     :param locking: The locking as integer.
0070     :param priority: The priority as integer.
0071     :param expired_at: The datetime when the workprogress will be expired at.
0072     :param errors: The errors as a json.
0073     :param workprogress_metadata: The metadata as json.
0074     :param processing_metadata: The metadata as json.
0075 
0076     :raises DuplicatedObject: If a workprogress with the same name exists.
0077     :raises DatabaseException: If there is a database error.
0078 
0079     :returns: workprogress id.
0080     """
0081 
0082     try:
0083         new_wp = create_workprogress(request_id=request_id, workload_id=workload_id, scope=scope, name=name,
0084                                      priority=priority, status=status,
0085                                      locking=locking, expired_at=expired_at,
0086                                      workprogress_metadata=workprogress_metadata,
0087                                      processing_metadata=processing_metadata)
0088         new_wp.save(session=session)
0089         wp_id = new_wp.workprogress_id
0090         return wp_id
0091     except IntegrityError as error:
0092         raise exceptions.DuplicatedObject('workprogress %s already exists!: %s' % (new_wp, error))
0093     except DatabaseError as error:
0094         raise exceptions.DatabaseException(error)
0095 
0096 
0097 @transactional_session
0098 def add_workprogresses(workprogresses, bulk_size=1000, session=None):
0099     """
0100     Add workprogresses.
0101 
0102     :param workprogresses: dict of workprogress.
0103     :param session: session.
0104 
0105     :raises DuplicatedObject: If a collection with the same name exists.
0106     :raises DatabaseException: If there is a database error.
0107 
0108     :returns: workprogress ids.
0109     """
0110     sub_params = [workprogresses[i:i + bulk_size] for i in range(0, len(workprogresses), bulk_size)]
0111 
0112     try:
0113         for sub_param in sub_params:
0114             session.bulk_insert_mappings(models.Workprogress, sub_param)
0115         wp_ids = [None for _ in range(len(workprogresses))]
0116         return wp_ids
0117     except IntegrityError as error:
0118         raise exceptions.DuplicatedObject('Duplicated objects: %s' % (error))
0119     except DatabaseError as error:
0120         raise exceptions.DatabaseException(error)
0121 
0122 
0123 @read_session
0124 def get_workprogresses(request_id=None, to_json=False, session=None):
0125     """
0126     Get workprogresses with request_id.
0127 
0128     :param request_id: The request_id of the request.
0129     :param to_json: Whether to return json format.
0130     :param session: The database session in use.
0131 
0132     :raises NoObject: If no workprogress is founded.
0133 
0134     :returns: list of workprogresses.
0135     """
0136 
0137     try:
0138         query = session.query(models.Workprogress)
0139         if request_id is not None:
0140             query = query.filter(models.Workprogress.request_id == request_id)
0141         tmp = query.all()
0142         rets = []
0143         if tmp:
0144             for t in tmp:
0145                 if to_json:
0146                     rets.append(t.to_dict_json())
0147                 else:
0148                     rets.append(t.to_dict())
0149         return rets
0150     except sqlalchemy.orm.exc.NoResultFound as error:
0151         raise exceptions.NoObject('workprogress with request_id:%s cannot be found: %s' % (request_id, error))
0152 
0153 
0154 @read_session
0155 def get_workprogress(workprogress_id, to_json=False, session=None):
0156     """
0157     Get a workprogress or raise a NoObject exception.
0158 
0159     :param workprogress_id: The id of the workprogress.
0160     :param to_json: whether to return json format.
0161 
0162     :param session: The database session in use.
0163 
0164     :raises NoObject: If no workprogress is founded.
0165 
0166     :returns: Workprogress.
0167     """
0168 
0169     try:
0170         query = session.query(models.Workprogress)\
0171                        .filter(models.Workprogress.workprogress_id == workprogress_id)
0172 
0173         ret = query.first()
0174         if not ret:
0175             return None
0176         else:
0177             if to_json:
0178                 return ret.to_dict_json()
0179             else:
0180                 return ret.to_dict()
0181     except sqlalchemy.orm.exc.NoResultFound as error:
0182         raise exceptions.NoObject('workprogress workprogress_id: %s cannot be found: %s' % (workprogress_id, error))
0183 
0184 
0185 @transactional_session
0186 def get_workprogresses_by_status(status, period=None, locking=False, bulk_size=None, to_json=False, session=None):
0187     """
0188     Get workprogresses.
0189 
0190     :param status: list of status of the workprogress data.
0191     :param locking: Wheter to lock workprogresses to avoid others get the same workprogress.
0192     :param bulk_size: Size limitation per retrieve.
0193     :param to_json: whether to return json format.
0194 
0195     :raises NoObject: If no workprogresses are founded.
0196 
0197     :returns: list of Workprogress.
0198     """
0199 
0200     try:
0201         if status is None:
0202             raise exceptions.WrongParameterException("status should not be None")
0203         if not isinstance(status, (list, tuple)):
0204             status = [status]
0205         if len(status) == 1:
0206             status = [status[0], status[0]]
0207 
0208         query = session.query(models.Workprogress)\
0209                        .filter(models.Workprogress.status.in_(status))\
0210                        .filter(models.Workprogress.next_poll_at < datetime.datetime.utcnow())
0211 
0212         if period is not None:
0213             query = query.filter(models.Workprogress.updated_at < datetime.datetime.utcnow() - datetime.timedelta(seconds=period))
0214         if locking:
0215             query = query.filter(models.Workprogress.locking == WorkprogressLocking.Idle)
0216         query = query.order_by(asc(models.Workprogress.updated_at))\
0217                      .order_by(desc(models.Workprogress.priority))
0218         if bulk_size:
0219             query = query.limit(bulk_size)
0220 
0221         if locking:
0222             query = query.with_for_update(nowait=True, skip_locked=True)
0223 
0224         tmp = query.all()
0225         rets = []
0226         if tmp:
0227             for t in tmp:
0228                 if to_json:
0229                     rets.append(t.to_dict_json())
0230                 else:
0231                     rets.append(t.to_dict())
0232         return rets
0233     except sqlalchemy.orm.exc.NoResultFound as error:
0234         raise exceptions.NoObject('No workprogresses with status: %s, period: %s, locking: %s, %s' % (status, period, locking, error))
0235 
0236 
0237 @transactional_session
0238 def update_workprogress(workprogress_id, parameters, session=None):
0239     """
0240     update a workprogress.
0241 
0242     :param workprogress_id: the workprogress id.
0243     :param parameters: A dictionary of parameters.
0244     :param session: The database session in use.
0245 
0246     :raises NoObject: If no workprogress is founded.
0247     :raises DatabaseException: If there is a database error.
0248 
0249     """
0250     try:
0251         parameters['updated_at'] = datetime.datetime.utcnow()
0252 
0253         session.query(models.Workprogress).filter_by(workprogress_id=workprogress_id)\
0254                .update(parameters, synchronize_session=False)
0255     except sqlalchemy.orm.exc.NoResultFound as error:
0256         raise exceptions.NoObject('Workprogress %s cannot be found: %s' % (workprogress_id, error))
0257 
0258 
0259 @transactional_session
0260 def delete_workprogress(workprogress_id=None, session=None):
0261     """
0262     delete a workprogress.
0263 
0264     :param workprogress_id: The id of the workprogress.
0265     :param session: The database session in use.
0266 
0267     :raises NoObject: If no workprogress is founded.
0268     :raises DatabaseException: If there is a database error.
0269     """
0270     try:
0271         session.query(models.Workprogress).filter_by(workprogress_id=workprogress_id).delete()
0272     except sqlalchemy.orm.exc.NoResultFound as error:
0273         raise exceptions.NoObject('Workprogress (workprogress_id: %s) cannot be found: %s' % (workprogress_id, error))
0274 
0275 
0276 @transactional_session
0277 def clean_locking(time_period=3600, session=None):
0278     """
0279     Clean locking which is older than time period.
0280 
0281     :param time_period in seconds
0282     """
0283 
0284     params = {'locking': 0}
0285     session.query(models.Workprogress).filter(models.Workprogress.locking == WorkprogressLocking.Locking)\
0286            .filter(models.Workprogress.updated_at < datetime.datetime.utcnow() - datetime.timedelta(seconds=time_period))\
0287            .update(params, synchronize_session=False)
0288 
0289 
0290 @transactional_session
0291 def clean_next_poll_at(status, session=None):
0292     """
0293     Clean next_poll_at.
0294 
0295     :param status: status of the workprogress
0296     """
0297     if not isinstance(status, (list, tuple)):
0298         status = [status]
0299     if len(status) == 1:
0300         status = [status[0], status[0]]
0301 
0302     params = {'next_poll_at': datetime.datetime.utcnow()}
0303     session.query(models.Workprogress).filter(models.Workprogress.status.in_(status))\
0304            .update(params, synchronize_session=False)