Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:19

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2020
0010 
0011 
0012 """
0013 core operations related to workflow model.
0014 """
0015 
0016 # from idds.common import exceptions
0017 from idds.common.constants import WorkprogressStatus, WorkprogressLocking
0018 from idds.orm.base.session import read_session, transactional_session
0019 from idds.orm import workprogress as orm_workprogress, transforms as orm_transforms
0020 from idds.workflowv2.work import WorkStatus
0021 
0022 
0023 def create_workprogress(request_id, workload_id, scope, name, priority=0, status=WorkprogressStatus.New,
0024                         locking=WorkprogressLocking.Idle,
0025                         expired_at=None, errors=None, workprogress_metadata=None, processing_metadata=None):
0026     """
0027     Create a workprogress.
0028 
0029     :param request_id: The request id.
0030     :param workload_id: The workload id.
0031     :param scope: The scope.
0032     :param name: The name.
0033     :param status: The status as integer.
0034     :param locking: The locking as integer.
0035     :param priority: The priority as integer.
0036     :param expired_at: The datetime when the workprogress will be expired at.
0037     :param errors: The errors as a json.
0038     :param workprogress_metadata: The metadata as json.
0039     :param processing_metadata: The metadata as json.
0040 
0041     :returns: workprogress.
0042     """
0043     return orm_workprogress.create_workprogress(request_id=request_id, workload_id=workload_id, scope=scope, name=name,
0044                                                 priority=priority, status=status,
0045                                                 locking=locking, expired_at=expired_at,
0046                                                 workprogress_metadata=workprogress_metadata,
0047                                                 processing_metadata=processing_metadata)
0048 
0049 
0050 @transactional_session
0051 def add_workprogress(request_id, workload_id, scope, name, priority=0, status=WorkprogressStatus.New,
0052                      locking=WorkprogressLocking.Idle,
0053                      expired_at=None, errors=None, workprogress_metadata=None, processing_metadata=None,
0054                      session=None):
0055     """
0056     Add a workprogress.
0057 
0058     :param request_id: The request id.
0059     :param workload_id: The workload id.
0060     :param scope: The scope.
0061     :param name: The name.
0062     :param status: The status as integer.
0063     :param locking: The locking as integer.
0064     :param priority: The priority as integer.
0065     :param expired_at: The datetime when the workprogress will be expired at.
0066     :param errors: The errors as a json.
0067     :param workprogress_metadata: The metadata as json.
0068     :param processing_metadata: The metadata as json.
0069 
0070     :raises DuplicatedObject: If a workprogress with the same name exists.
0071     :raises DatabaseException: If there is a database error.
0072 
0073     :returns: workprogress id.
0074     """
0075 
0076     return orm_workprogress.add_workprogress(request_id=request_id, workload_id=workload_id,
0077                                              scope=scope, name=name, priority=priority, status=status,
0078                                              locking=locking, expired_at=expired_at,
0079                                              workprogress_metadata=workprogress_metadata,
0080                                              processing_metadata=processing_metadata,
0081                                              session=session)
0082 
0083 
0084 @transactional_session
0085 def add_workprogresses(workprogresses, bulk_size=1000, session=None):
0086     """
0087     Add workprogresses.
0088 
0089     :param workprogresses: dict of workprogress.
0090     :param session: session.
0091 
0092     :raises DuplicatedObject: If a collection with the same name exists.
0093     :raises DatabaseException: If there is a database error.
0094 
0095     :returns: workprogress ids.
0096     """
0097     return orm_workprogress.add_workprogresses(workprogresses, bulk_size=bulk_size, session=session)
0098 
0099 
0100 @read_session
0101 def get_workprogresses(request_id=None, to_json=False, session=None):
0102     """
0103     Get workprogresses with request_id.
0104 
0105     :param request_id: The request_id of the request.
0106     :param to_json: Whether to return json format.
0107     :param session: The database session in use.
0108 
0109     :raises NoObject: If no workprogress is founded.
0110 
0111     :returns: list of workprogresses.
0112     """
0113 
0114     return orm_workprogress.get_workprogresses(request_id=request_id, to_json=to_json, session=session)
0115 
0116 
0117 @read_session
0118 def get_workprogress(workprogress_id, to_json=False, session=None):
0119     """
0120     Get a workprogress or raise a NoObject exception.
0121 
0122     :param workprogress_id: The id of the workprogress.
0123     :param to_json: whether to return json format.
0124 
0125     :param session: The database session in use.
0126 
0127     :raises NoObject: If no workprogress is founded.
0128 
0129     :returns: Workprogress.
0130     """
0131     return orm_workprogress.get_workprogress(workprogress_id=workprogress_id, to_json=to_json, session=session)
0132 
0133 
0134 @transactional_session
0135 def get_workprogresses_by_status(status, period=None, locking=False, bulk_size=None, to_json=False, session=None):
0136     """
0137     Get workprogresses.
0138 
0139     :param status: list of status of the workprogress data.
0140     :param locking: Wheter to lock workprogresses to avoid others get the same workprogress.
0141     :param bulk_size: Size limitation per retrieve.
0142     :param to_json: whether to return json format.
0143 
0144     :raises NoObject: If no workprogresses are founded.
0145 
0146     :returns: list of Workprogress.
0147     """
0148 
0149     return orm_workprogress.get_workprogresses_by_status(status=status, period=period, locking=locking,
0150                                                          bulk_size=bulk_size, to_json=to_json, session=session)
0151 
0152 
0153 @transactional_session
0154 def update_workprogress(workprogress_id, parameters, new_transforms=None, update_transforms=None, session=None):
0155     """
0156     update a workprogress.
0157 
0158     :param workprogress_id: the workprogress id.
0159     :param parameters: A dictionary of parameters.
0160     :param session: The database session in use.
0161 
0162     :raises NoObject: If no workprogress is founded.
0163     :raises DatabaseException: If there is a database error.
0164 
0165     """
0166 
0167     if new_transforms:
0168         for tf in new_transforms:
0169             orginal_work = tf['transform_metadata']['orginal_work']
0170             del tf['transform_metadata']['orginal_work']
0171             tf_id = orm_transforms.add_transform(**tf, session=session)
0172             # work = tf['transform_metadata']['work']
0173             orginal_work.set_work_id(tf_id, transforming=True)
0174             orginal_work.set_status(WorkStatus.New)
0175     if update_transforms:
0176         for tr_id in update_transforms:
0177             orm_transforms.update_transform(transform_id=tr_id, parameters=update_transforms[tr_id], session=session)
0178     return orm_workprogress.update_workprogress(workprogress_id=workprogress_id, parameters=parameters, session=session)
0179 
0180 
0181 @transactional_session
0182 def delete_workprogress(workprogress_id=None, session=None):
0183     """
0184     delete a workprogress.
0185 
0186     :param workprogress_id: The id of the workprogress.
0187     :param session: The database session in use.
0188 
0189     :raises NoObject: If no workprogress is founded.
0190     :raises DatabaseException: If there is a database error.
0191     """
0192     return orm_workprogress.update_workprogress(workprogress_id=workprogress_id, session=session)
0193 
0194 
0195 @transactional_session
0196 def clean_locking(time_period=3600, session=None):
0197     """
0198     Clean locking which is older than time period.
0199 
0200     :param time_period in seconds
0201     """
0202     return orm_workprogress.clean_locking(time_period=time_period, session=session)
0203 
0204 
0205 @transactional_session
0206 def clean_next_poll_at(status, session=None):
0207     """
0208     Clean next_poll_at.
0209 
0210     :param status: status of the workprogress
0211     """
0212     return orm_workprogress.clean_next_poll_at(status=status, session=session)