File indexing completed on 2026-04-09 07:58:19
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 core operations related to workflow model.
0014 """
0015
0016
0017 from idds.common.constants import WorkprogressStatus, WorkprogressLocking
0018 from idds.orm.base.session import read_session, transactional_session
0019 from idds.orm import workprogress as orm_workprogress, transforms as orm_transforms
0020 from idds.workflowv2.work import WorkStatus
0021
0022
0023 def create_workprogress(request_id, workload_id, scope, name, priority=0, status=WorkprogressStatus.New,
0024 locking=WorkprogressLocking.Idle,
0025 expired_at=None, errors=None, workprogress_metadata=None, processing_metadata=None):
0026 """
0027 Create a workprogress.
0028
0029 :param request_id: The request id.
0030 :param workload_id: The workload id.
0031 :param scope: The scope.
0032 :param name: The name.
0033 :param status: The status as integer.
0034 :param locking: The locking as integer.
0035 :param priority: The priority as integer.
0036 :param expired_at: The datetime when the workprogress will be expired at.
0037 :param errors: The errors as a json.
0038 :param workprogress_metadata: The metadata as json.
0039 :param processing_metadata: The metadata as json.
0040
0041 :returns: workprogress.
0042 """
0043 return orm_workprogress.create_workprogress(request_id=request_id, workload_id=workload_id, scope=scope, name=name,
0044 priority=priority, status=status,
0045 locking=locking, expired_at=expired_at,
0046 workprogress_metadata=workprogress_metadata,
0047 processing_metadata=processing_metadata)
0048
0049
0050 @transactional_session
0051 def add_workprogress(request_id, workload_id, scope, name, priority=0, status=WorkprogressStatus.New,
0052 locking=WorkprogressLocking.Idle,
0053 expired_at=None, errors=None, workprogress_metadata=None, processing_metadata=None,
0054 session=None):
0055 """
0056 Add a workprogress.
0057
0058 :param request_id: The request id.
0059 :param workload_id: The workload id.
0060 :param scope: The scope.
0061 :param name: The name.
0062 :param status: The status as integer.
0063 :param locking: The locking as integer.
0064 :param priority: The priority as integer.
0065 :param expired_at: The datetime when the workprogress will be expired at.
0066 :param errors: The errors as a json.
0067 :param workprogress_metadata: The metadata as json.
0068 :param processing_metadata: The metadata as json.
0069
0070 :raises DuplicatedObject: If a workprogress with the same name exists.
0071 :raises DatabaseException: If there is a database error.
0072
0073 :returns: workprogress id.
0074 """
0075
0076 return orm_workprogress.add_workprogress(request_id=request_id, workload_id=workload_id,
0077 scope=scope, name=name, priority=priority, status=status,
0078 locking=locking, expired_at=expired_at,
0079 workprogress_metadata=workprogress_metadata,
0080 processing_metadata=processing_metadata,
0081 session=session)
0082
0083
0084 @transactional_session
0085 def add_workprogresses(workprogresses, bulk_size=1000, session=None):
0086 """
0087 Add workprogresses.
0088
0089 :param workprogresses: dict of workprogress.
0090 :param session: session.
0091
0092 :raises DuplicatedObject: If a collection with the same name exists.
0093 :raises DatabaseException: If there is a database error.
0094
0095 :returns: workprogress ids.
0096 """
0097 return orm_workprogress.add_workprogresses(workprogresses, bulk_size=bulk_size, session=session)
0098
0099
0100 @read_session
0101 def get_workprogresses(request_id=None, to_json=False, session=None):
0102 """
0103 Get workprogresses with request_id.
0104
0105 :param request_id: The request_id of the request.
0106 :param to_json: Whether to return json format.
0107 :param session: The database session in use.
0108
0109 :raises NoObject: If no workprogress is founded.
0110
0111 :returns: list of workprogresses.
0112 """
0113
0114 return orm_workprogress.get_workprogresses(request_id=request_id, to_json=to_json, session=session)
0115
0116
0117 @read_session
0118 def get_workprogress(workprogress_id, to_json=False, session=None):
0119 """
0120 Get a workprogress or raise a NoObject exception.
0121
0122 :param workprogress_id: The id of the workprogress.
0123 :param to_json: whether to return json format.
0124
0125 :param session: The database session in use.
0126
0127 :raises NoObject: If no workprogress is founded.
0128
0129 :returns: Workprogress.
0130 """
0131 return orm_workprogress.get_workprogress(workprogress_id=workprogress_id, to_json=to_json, session=session)
0132
0133
0134 @transactional_session
0135 def get_workprogresses_by_status(status, period=None, locking=False, bulk_size=None, to_json=False, session=None):
0136 """
0137 Get workprogresses.
0138
0139 :param status: list of status of the workprogress data.
0140 :param locking: Wheter to lock workprogresses to avoid others get the same workprogress.
0141 :param bulk_size: Size limitation per retrieve.
0142 :param to_json: whether to return json format.
0143
0144 :raises NoObject: If no workprogresses are founded.
0145
0146 :returns: list of Workprogress.
0147 """
0148
0149 return orm_workprogress.get_workprogresses_by_status(status=status, period=period, locking=locking,
0150 bulk_size=bulk_size, to_json=to_json, session=session)
0151
0152
0153 @transactional_session
0154 def update_workprogress(workprogress_id, parameters, new_transforms=None, update_transforms=None, session=None):
0155 """
0156 update a workprogress.
0157
0158 :param workprogress_id: the workprogress id.
0159 :param parameters: A dictionary of parameters.
0160 :param session: The database session in use.
0161
0162 :raises NoObject: If no workprogress is founded.
0163 :raises DatabaseException: If there is a database error.
0164
0165 """
0166
0167 if new_transforms:
0168 for tf in new_transforms:
0169 orginal_work = tf['transform_metadata']['orginal_work']
0170 del tf['transform_metadata']['orginal_work']
0171 tf_id = orm_transforms.add_transform(**tf, session=session)
0172
0173 orginal_work.set_work_id(tf_id, transforming=True)
0174 orginal_work.set_status(WorkStatus.New)
0175 if update_transforms:
0176 for tr_id in update_transforms:
0177 orm_transforms.update_transform(transform_id=tr_id, parameters=update_transforms[tr_id], session=session)
0178 return orm_workprogress.update_workprogress(workprogress_id=workprogress_id, parameters=parameters, session=session)
0179
0180
0181 @transactional_session
0182 def delete_workprogress(workprogress_id=None, session=None):
0183 """
0184 delete a workprogress.
0185
0186 :param workprogress_id: The id of the workprogress.
0187 :param session: The database session in use.
0188
0189 :raises NoObject: If no workprogress is founded.
0190 :raises DatabaseException: If there is a database error.
0191 """
0192 return orm_workprogress.update_workprogress(workprogress_id=workprogress_id, session=session)
0193
0194
0195 @transactional_session
0196 def clean_locking(time_period=3600, session=None):
0197 """
0198 Clean locking which is older than time period.
0199
0200 :param time_period in seconds
0201 """
0202 return orm_workprogress.clean_locking(time_period=time_period, session=session)
0203
0204
0205 @transactional_session
0206 def clean_next_poll_at(status, session=None):
0207 """
0208 Clean next_poll_at.
0209
0210 :param status: status of the workprogress
0211 """
0212 return orm_workprogress.clean_next_poll_at(status=status, session=session)