File indexing completed on 2026-04-09 07:58:20
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 operations related to workflow model.
0014 """
0015
0016 import datetime
0017
0018 import sqlalchemy
0019 from sqlalchemy.exc import DatabaseError, IntegrityError
0020 from sqlalchemy.sql.expression import asc, desc
0021
0022 from idds.common import exceptions
0023 from idds.common.constants import WorkprogressStatus, WorkprogressLocking
0024 from idds.orm.base.session import read_session, transactional_session
0025 from idds.orm.base import models
0026
0027
0028 def create_workprogress(request_id, workload_id, scope, name, priority=0, status=WorkprogressStatus.New,
0029 locking=WorkprogressLocking.Idle, expired_at=None, errors=None,
0030 workprogress_metadata=None, processing_metadata=None):
0031 """
0032 Create a workprogress.
0033
0034 :param request_id: The request id.
0035 :param workload_id: The workload id.
0036 :param scope: The scope.
0037 :param name: The name.
0038 :param status: The status as integer.
0039 :param locking: The locking as integer.
0040 :param priority: The priority as integer.
0041 :param expired_at: The datetime when the workprogress will be expired at.
0042 :param errors: The errors as a json.
0043 :param workprogress_metadata: The metadata as json.
0044 :param processing_metadata: The metadata as json.
0045
0046 :returns: workprogress.
0047 """
0048 new_wp = models.WorkProgress(request_id=request_id, workload_id=workload_id, scope=scope, name=name,
0049 priority=priority, status=status,
0050 locking=locking, expired_at=expired_at,
0051 workprogress_metadata=workprogress_metadata,
0052 processing_metadata=processing_metadata)
0053 return new_wp
0054
0055
0056 @transactional_session
0057 def add_workprogress(request_id, workload_id, scope, name, priority=0, status=WorkprogressStatus.New,
0058 locking=WorkprogressLocking.Idle,
0059 expired_at=None, errors=None, workprogress_metadata=None, processing_metadata=None,
0060 session=None):
0061 """
0062 Add a workprogress.
0063
0064 :param request_id: The request id.
0065 :param workload_id: The workload id.
0066 :param scope: The scope.
0067 :param name: The name.
0068 :param status: The status as integer.
0069 :param locking: The locking as integer.
0070 :param priority: The priority as integer.
0071 :param expired_at: The datetime when the workprogress will be expired at.
0072 :param errors: The errors as a json.
0073 :param workprogress_metadata: The metadata as json.
0074 :param processing_metadata: The metadata as json.
0075
0076 :raises DuplicatedObject: If a workprogress with the same name exists.
0077 :raises DatabaseException: If there is a database error.
0078
0079 :returns: workprogress id.
0080 """
0081
0082 try:
0083 new_wp = create_workprogress(request_id=request_id, workload_id=workload_id, scope=scope, name=name,
0084 priority=priority, status=status,
0085 locking=locking, expired_at=expired_at,
0086 workprogress_metadata=workprogress_metadata,
0087 processing_metadata=processing_metadata)
0088 new_wp.save(session=session)
0089 wp_id = new_wp.workprogress_id
0090 return wp_id
0091 except IntegrityError as error:
0092 raise exceptions.DuplicatedObject('workprogress %s already exists!: %s' % (new_wp, error))
0093 except DatabaseError as error:
0094 raise exceptions.DatabaseException(error)
0095
0096
0097 @transactional_session
0098 def add_workprogresses(workprogresses, bulk_size=1000, session=None):
0099 """
0100 Add workprogresses.
0101
0102 :param workprogresses: dict of workprogress.
0103 :param session: session.
0104
0105 :raises DuplicatedObject: If a collection with the same name exists.
0106 :raises DatabaseException: If there is a database error.
0107
0108 :returns: workprogress ids.
0109 """
0110 sub_params = [workprogresses[i:i + bulk_size] for i in range(0, len(workprogresses), bulk_size)]
0111
0112 try:
0113 for sub_param in sub_params:
0114 session.bulk_insert_mappings(models.Workprogress, sub_param)
0115 wp_ids = [None for _ in range(len(workprogresses))]
0116 return wp_ids
0117 except IntegrityError as error:
0118 raise exceptions.DuplicatedObject('Duplicated objects: %s' % (error))
0119 except DatabaseError as error:
0120 raise exceptions.DatabaseException(error)
0121
0122
0123 @read_session
0124 def get_workprogresses(request_id=None, to_json=False, session=None):
0125 """
0126 Get workprogresses with request_id.
0127
0128 :param request_id: The request_id of the request.
0129 :param to_json: Whether to return json format.
0130 :param session: The database session in use.
0131
0132 :raises NoObject: If no workprogress is founded.
0133
0134 :returns: list of workprogresses.
0135 """
0136
0137 try:
0138 query = session.query(models.Workprogress)
0139 if request_id is not None:
0140 query = query.filter(models.Workprogress.request_id == request_id)
0141 tmp = query.all()
0142 rets = []
0143 if tmp:
0144 for t in tmp:
0145 if to_json:
0146 rets.append(t.to_dict_json())
0147 else:
0148 rets.append(t.to_dict())
0149 return rets
0150 except sqlalchemy.orm.exc.NoResultFound as error:
0151 raise exceptions.NoObject('workprogress with request_id:%s cannot be found: %s' % (request_id, error))
0152
0153
0154 @read_session
0155 def get_workprogress(workprogress_id, to_json=False, session=None):
0156 """
0157 Get a workprogress or raise a NoObject exception.
0158
0159 :param workprogress_id: The id of the workprogress.
0160 :param to_json: whether to return json format.
0161
0162 :param session: The database session in use.
0163
0164 :raises NoObject: If no workprogress is founded.
0165
0166 :returns: Workprogress.
0167 """
0168
0169 try:
0170 query = session.query(models.Workprogress)\
0171 .filter(models.Workprogress.workprogress_id == workprogress_id)
0172
0173 ret = query.first()
0174 if not ret:
0175 return None
0176 else:
0177 if to_json:
0178 return ret.to_dict_json()
0179 else:
0180 return ret.to_dict()
0181 except sqlalchemy.orm.exc.NoResultFound as error:
0182 raise exceptions.NoObject('workprogress workprogress_id: %s cannot be found: %s' % (workprogress_id, error))
0183
0184
0185 @transactional_session
0186 def get_workprogresses_by_status(status, period=None, locking=False, bulk_size=None, to_json=False, session=None):
0187 """
0188 Get workprogresses.
0189
0190 :param status: list of status of the workprogress data.
0191 :param locking: Wheter to lock workprogresses to avoid others get the same workprogress.
0192 :param bulk_size: Size limitation per retrieve.
0193 :param to_json: whether to return json format.
0194
0195 :raises NoObject: If no workprogresses are founded.
0196
0197 :returns: list of Workprogress.
0198 """
0199
0200 try:
0201 if status is None:
0202 raise exceptions.WrongParameterException("status should not be None")
0203 if not isinstance(status, (list, tuple)):
0204 status = [status]
0205 if len(status) == 1:
0206 status = [status[0], status[0]]
0207
0208 query = session.query(models.Workprogress)\
0209 .filter(models.Workprogress.status.in_(status))\
0210 .filter(models.Workprogress.next_poll_at < datetime.datetime.utcnow())
0211
0212 if period is not None:
0213 query = query.filter(models.Workprogress.updated_at < datetime.datetime.utcnow() - datetime.timedelta(seconds=period))
0214 if locking:
0215 query = query.filter(models.Workprogress.locking == WorkprogressLocking.Idle)
0216 query = query.order_by(asc(models.Workprogress.updated_at))\
0217 .order_by(desc(models.Workprogress.priority))
0218 if bulk_size:
0219 query = query.limit(bulk_size)
0220
0221 if locking:
0222 query = query.with_for_update(nowait=True, skip_locked=True)
0223
0224 tmp = query.all()
0225 rets = []
0226 if tmp:
0227 for t in tmp:
0228 if to_json:
0229 rets.append(t.to_dict_json())
0230 else:
0231 rets.append(t.to_dict())
0232 return rets
0233 except sqlalchemy.orm.exc.NoResultFound as error:
0234 raise exceptions.NoObject('No workprogresses with status: %s, period: %s, locking: %s, %s' % (status, period, locking, error))
0235
0236
0237 @transactional_session
0238 def update_workprogress(workprogress_id, parameters, session=None):
0239 """
0240 update a workprogress.
0241
0242 :param workprogress_id: the workprogress id.
0243 :param parameters: A dictionary of parameters.
0244 :param session: The database session in use.
0245
0246 :raises NoObject: If no workprogress is founded.
0247 :raises DatabaseException: If there is a database error.
0248
0249 """
0250 try:
0251 parameters['updated_at'] = datetime.datetime.utcnow()
0252
0253 session.query(models.Workprogress).filter_by(workprogress_id=workprogress_id)\
0254 .update(parameters, synchronize_session=False)
0255 except sqlalchemy.orm.exc.NoResultFound as error:
0256 raise exceptions.NoObject('Workprogress %s cannot be found: %s' % (workprogress_id, error))
0257
0258
0259 @transactional_session
0260 def delete_workprogress(workprogress_id=None, session=None):
0261 """
0262 delete a workprogress.
0263
0264 :param workprogress_id: The id of the workprogress.
0265 :param session: The database session in use.
0266
0267 :raises NoObject: If no workprogress is founded.
0268 :raises DatabaseException: If there is a database error.
0269 """
0270 try:
0271 session.query(models.Workprogress).filter_by(workprogress_id=workprogress_id).delete()
0272 except sqlalchemy.orm.exc.NoResultFound as error:
0273 raise exceptions.NoObject('Workprogress (workprogress_id: %s) cannot be found: %s' % (workprogress_id, error))
0274
0275
0276 @transactional_session
0277 def clean_locking(time_period=3600, session=None):
0278 """
0279 Clean locking which is older than time period.
0280
0281 :param time_period in seconds
0282 """
0283
0284 params = {'locking': 0}
0285 session.query(models.Workprogress).filter(models.Workprogress.locking == WorkprogressLocking.Locking)\
0286 .filter(models.Workprogress.updated_at < datetime.datetime.utcnow() - datetime.timedelta(seconds=time_period))\
0287 .update(params, synchronize_session=False)
0288
0289
0290 @transactional_session
0291 def clean_next_poll_at(status, session=None):
0292 """
0293 Clean next_poll_at.
0294
0295 :param status: status of the workprogress
0296 """
0297 if not isinstance(status, (list, tuple)):
0298 status = [status]
0299 if len(status) == 1:
0300 status = [status[0], status[0]]
0301
0302 params = {'next_poll_at': datetime.datetime.utcnow()}
0303 session.query(models.Workprogress).filter(models.Workprogress.status.in_(status))\
0304 .update(params, synchronize_session=False)