Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:20

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2019 - 2025
0010 
0011 
0012 """
0013 operations related to Transform.
0014 """
0015 
0016 import datetime
0017 
0018 import sqlalchemy
0019 from sqlalchemy import and_, func, select
0020 from sqlalchemy.exc import DatabaseError, IntegrityError
0021 from sqlalchemy.sql.expression import asc
0022 
0023 from idds.common import exceptions
0024 from idds.common.constants import CommandType, TransformStatus, TransformLocking, CollectionRelationType
0025 from idds.common.utils import get_process_thread_info
0026 from idds.orm.base.session import read_session, transactional_session, safe_bulk_update_mappings
0027 from idds.orm.base import models
0028 
0029 
0030 def create_transform(request_id, workload_id, transform_type, transform_tag=None,
0031                      priority=0, status=TransformStatus.New, name=None,
0032                      substatus=TransformStatus.New, locking=TransformLocking.Idle,
0033                      new_poll_period=1, update_poll_period=10,
0034                      new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0,
0035                      parent_transform_id=None, previous_transform_id=None, current_processing_id=None,
0036                      internal_id=None, has_previous_conditions=None, loop_index=None,
0037                      parent_internal_id=None, command=CommandType.NoneCommand,
0038                      cloned_from=None, triggered_conditions=None, untriggered_conditions=None,
0039                      site=None, retries=0, expired_at=None, transform_metadata=None):
0040     """
0041     Create a transform.
0042 
0043     :param request_id: The request id.
0044     :param workload_id: The workload id.
0045     :param transform_type: Transform type.
0046     :param transform_tag: Transform tag.
0047     :param priority: priority.
0048     :param status: Transform status.
0049     :param locking: Transform locking.
0050     :param retries: The number of retries.
0051     :param expired_at: The datetime when it expires.
0052     :param transform_metadata: The metadata as json.
0053 
0054     :returns: transform.
0055     """
0056     new_transform = models.Transform(request_id=request_id, workload_id=workload_id, transform_type=transform_type,
0057                                      transform_tag=transform_tag, priority=priority, name=name,
0058                                      status=status, substatus=substatus, locking=locking,
0059                                      retries=retries, expired_at=expired_at,
0060                                      new_retries=new_retries, update_retries=update_retries,
0061                                      max_new_retries=max_new_retries, max_update_retries=max_update_retries,
0062                                      parent_transform_id=parent_transform_id,
0063                                      previous_transform_id=previous_transform_id,
0064                                      current_processing_id=current_processing_id,
0065                                      internal_id=internal_id, site=site,
0066                                      command=command,
0067                                      parent_internal_id=parent_internal_id,
0068                                      has_previous_conditions=has_previous_conditions,
0069                                      loop_index=loop_index, cloned_from=cloned_from,
0070                                      triggered_conditions=triggered_conditions,
0071                                      untriggered_conditions=untriggered_conditions,
0072                                      transform_metadata=transform_metadata)
0073     if new_poll_period:
0074         new_poll_period = datetime.timedelta(seconds=new_poll_period)
0075         new_transform.new_poll_period = new_poll_period
0076     if update_poll_period:
0077         update_poll_period = datetime.timedelta(seconds=update_poll_period)
0078         new_transform.update_poll_period = update_poll_period
0079     return new_transform
0080 
0081 
0082 @transactional_session
0083 def add_transform(request_id, workload_id, transform_type, transform_tag=None, priority=0, name=None,
0084                   status=TransformStatus.New, substatus=TransformStatus.New, locking=TransformLocking.Idle,
0085                   new_poll_period=1, update_poll_period=10, retries=0, expired_at=None,
0086                   new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0,
0087                   parent_transform_id=None, previous_transform_id=None, current_processing_id=None,
0088                   internal_id=None, has_previous_conditions=None, loop_index=None,
0089                   parent_internal_id=None, command=CommandType.NoneCommand,
0090                   cloned_from=None, triggered_conditions=None, untriggered_conditions=None,
0091                   transform_metadata=None, workprogress_id=None, site=None, session=None):
0092     """
0093     Add a transform.
0094 
0095     :param request_id: The request id.
0096     :param workload_id: The workload id.
0097     :param transform_type: Transform type.
0098     :param transform_tag: Transform tag.
0099     :param priority: priority.
0100     :param status: Transform status.
0101     :param locking: Transform locking.
0102     :param retries: The number of retries.
0103     :param expired_at: The datetime when it expires.
0104     :param transform_metadata: The metadata as json.
0105 
0106     :raises DuplicatedObject: If a transform with the same name exists.
0107     :raises DatabaseException: If there is a database error.
0108 
0109     :returns: transform id.
0110     """
0111     try:
0112         new_transform = create_transform(request_id=request_id, workload_id=workload_id, transform_type=transform_type,
0113                                          transform_tag=transform_tag, priority=priority, name=name,
0114                                          status=status, substatus=substatus, locking=locking,
0115                                          retries=retries, expired_at=expired_at,
0116                                          new_poll_period=new_poll_period,
0117                                          update_poll_period=update_poll_period,
0118                                          new_retries=new_retries, update_retries=update_retries,
0119                                          max_new_retries=max_new_retries, max_update_retries=max_update_retries,
0120                                          parent_transform_id=parent_transform_id,
0121                                          previous_transform_id=previous_transform_id,
0122                                          current_processing_id=current_processing_id,
0123                                          internal_id=internal_id, site=site,
0124                                          command=command,
0125                                          parent_internal_id=parent_internal_id,
0126                                          has_previous_conditions=has_previous_conditions,
0127                                          loop_index=loop_index, cloned_from=cloned_from,
0128                                          triggered_conditions=triggered_conditions,
0129                                          untriggered_conditions=untriggered_conditions,
0130                                          transform_metadata=transform_metadata)
0131         new_transform.save(session=session)
0132         transform_id = new_transform.transform_id
0133 
0134         if workprogress_id:
0135             new_wp2transform = models.Workprogress2transform(workprogress_id=workprogress_id, transform_id=transform_id)
0136             new_wp2transform.save(session=session)
0137 
0138         return transform_id
0139     except IntegrityError as error:
0140         raise exceptions.DuplicatedObject('Transform already exists!: %s' % (error))
0141     except DatabaseError as error:
0142         raise exceptions.DatabaseException(error)
0143 
0144 
0145 @transactional_session
0146 def add_req2transform(request_id, transform_id, session=None):
0147     """
0148     Add the relation between request_id and transform_id
0149 
0150     :param request_id: Request id.
0151     :param transform_id: Transform id.
0152     :param session: The database session in use.
0153     """
0154     try:
0155         new_req2transform = models.Req2transform(request_id=request_id, transform_id=transform_id)
0156         new_req2transform.save(session=session)
0157     except IntegrityError as error:
0158         raise exceptions.DuplicatedObject('Request2Transform already exists!(%s:%s): %s' %
0159                                           (request_id, transform_id, error))
0160     except DatabaseError as error:
0161         raise exceptions.DatabaseException(error)
0162 
0163 
0164 @transactional_session
0165 def add_wp2transform(workprogress_id, transform_id, session=None):
0166     """
0167     Add the relation between workprogress_id and transform_id
0168 
0169     :param workprogress_id: Workprogress id.
0170     :param transform_id: Transform id.
0171     :param session: The database session in use.
0172     """
0173     try:
0174         new_wp2transform = models.Workprogress2transform(workprogress_id=workprogress_id, transform_id=transform_id)
0175         new_wp2transform.save(session=session)
0176     except IntegrityError as error:
0177         raise exceptions.DuplicatedObject('Workprogress2Transform already exists!(%s:%s): %s' %
0178                                           (workprogress_id, transform_id, error))
0179     except DatabaseError as error:
0180         raise exceptions.DatabaseException(error)
0181 
0182 
0183 @read_session
0184 def get_transform(transform_id, request_id=None, to_json=False, session=None):
0185     """
0186     Get transform or raise a NoObject exception.
0187 
0188     :param transform_id: Transform id.
0189     :param session: The database session in use.
0190 
0191     :raises NoObject: If no transform is founded.
0192 
0193     :returns: Transform.
0194     """
0195 
0196     try:
0197         query = session.query(models.Transform)\
0198                        .filter(models.Transform.transform_id == transform_id)
0199         if request_id:
0200             query = query.filter(models.Transform.request_id == request_id)
0201         ret = query.first()
0202         if not ret:
0203             return None
0204         else:
0205             if to_json:
0206                 return ret.to_dict_json()
0207             else:
0208                 return ret.to_dict()
0209     except sqlalchemy.orm.exc.NoResultFound as error:
0210         raise exceptions.NoObject('Transform(transform_id: %s) cannot be found: %s' %
0211                                   (transform_id, error))
0212     except Exception as error:
0213         raise error
0214 
0215 
0216 @read_session
0217 def get_transform_by_id_status(transform_id, status=None, locking=False, session=None):
0218     """
0219     Get a transform or raise a NoObject exception.
0220 
0221     :param transform_id: The id of the transform.
0222     :param status: request status.
0223     :param locking: the locking status.
0224 
0225     :param session: The database session in use.
0226 
0227     :raises NoObject: If no request is founded.
0228 
0229     :returns: Transform.
0230     """
0231 
0232     try:
0233         query = select(models.Transform).where(models.Transform.transform_id == transform_id)
0234 
0235         if status:
0236             if not isinstance(status, (list, tuple)):
0237                 status = [status]
0238             if len(status) == 1:
0239                 status = [status[0], status[0]]
0240             query = query.where(models.Transform.status.in_(status))
0241 
0242         if locking:
0243             query = query.where(models.Transform.locking == TransformLocking.Idle)
0244             query = query.with_for_update(skip_locked=True)
0245 
0246         ret = session.execute(query).fetchone()
0247         if not ret:
0248             return None
0249         else:
0250             if locking:
0251                 ret[0].updated_at = datetime.datetime.utcnow()
0252                 ret[0].locking = TransformLocking.Locking
0253                 hostname, pid, thread_id, thread_name = get_process_thread_info()
0254                 ret[0].locking_hostname = hostname
0255                 ret[0].locking_pid = pid
0256                 ret[0].locking_thread_id = thread_id
0257                 ret[0].locking_thread_name = thread_name
0258 
0259             return ret[0].to_dict()
0260     except sqlalchemy.orm.exc.NoResultFound as error:
0261         raise exceptions.NoObject('transform transform_id: %s cannot be found: %s' % (transform_id, error))
0262 
0263 
0264 @read_session
0265 def get_transform_by_name(request_id, name, session=None):
0266     """
0267     Get a transform or raise a NoObject exception.
0268 
0269     :param request_id: The request id.
0270     :param name: transform name.
0271     :param locking: the locking status.
0272 
0273     :param session: The database session in use.
0274 
0275     :raises NoObject: If no request is founded.
0276 
0277     :returns: Transform.
0278     """
0279 
0280     try:
0281         query = select(models.Transform).where(models.Transform.request_id == request_id)
0282         query = query.where(models.Transform.name == name)
0283 
0284         ret = session.execute(query).fetchone()
0285         if not ret:
0286             return None
0287         else:
0288             return ret[0].to_dict()
0289     except sqlalchemy.orm.exc.NoResultFound as error:
0290         raise exceptions.NoObject(f'transform (request_id: {request_id}, name: {name}) cannot be found: {error}')
0291 
0292 
0293 @read_session
0294 def get_transforms_with_input_collection(transform_type, transform_tag, coll_scope, coll_name, to_json=False, session=None):
0295     """
0296     Get transforms or raise a NoObject exception.
0297 
0298     :param transform_type: Transform type.
0299     :param transform_tag: Transform tag.
0300     :param coll_scope: The collection scope.
0301     :param coll_name: The collection name.
0302     :param to_json: return json format.
0303 
0304     :param session: The database session in use.
0305 
0306     :raises NoObject: If no transform is founded.
0307 
0308     :returns: Transform.
0309     """
0310 
0311     try:
0312         subquery = session.query(models.Collection.transform_id)\
0313                           .filter(models.Collection.scope == coll_scope)\
0314                           .filter(models.Collection.name == coll_name)\
0315                           .filter(models.Collection.relation_type == CollectionRelationType.Input)\
0316                           .subquery()
0317         query = session.query(models.Transform)\
0318                        .join(subquery, and_(subquery.c.transform_id == models.Transform.transform_id,
0319                                             models.Transform.transform_type == transform_type,
0320                                             models.Transform.transform_tag == transform_tag))
0321         tmp = query.all()
0322         rets = []
0323         if tmp:
0324             for transf in tmp:
0325                 if to_json:
0326                     rets.append(transf.to_dict_json())
0327                 else:
0328                     rets.append(transf.to_dict())
0329         return rets
0330     except sqlalchemy.orm.exc.NoResultFound as error:
0331         raise exceptions.NoObject('Transform(transform_type: %s, transform_tag: %s, coll_scope: %s, coll_name: %s) cannot be found: %s' %
0332                                   (transform_type, transform_tag, coll_scope, coll_name, error))
0333     except Exception as error:
0334         raise error
0335 
0336 
0337 @read_session
0338 def get_transform_ids(workprogress_id=None, request_id=None, workload_id=None, transform_id=None, session=None):
0339     """
0340     Get transform ids or raise a NoObject exception.
0341 
0342     :param workprogress_id: Workprogress id.
0343     :param workload_id: Workload id.
0344     :param transform_id: Transform id.
0345     :param session: The database session in use.
0346 
0347     :raises NoObject: If no transform is founded.
0348 
0349     :returns: list of transform ids.
0350     """
0351     try:
0352         query = session.query(models.Transform.transform_id)
0353         if request_id:
0354             query = query.filter(models.Transform.request_id == request_id)
0355         if workload_id:
0356             query = query.filter(models.Transform.workload_id == workload_id)
0357         if transform_id:
0358             query = query.filter(models.Transform.transform_id == transform_id)
0359         if workprogress_id:
0360             query = query.join(models.Workprogress2transform, and_(models.Workprogress2transform.workprogress_id == workprogress_id))
0361 
0362         tmp = query.all()
0363         ret_ids = []
0364         if tmp:
0365             for t in tmp:
0366                 ret_ids.append(t[0])
0367         return ret_ids
0368     except sqlalchemy.orm.exc.NoResultFound as error:
0369         raise exceptions.NoObject('No transforms attached with request id (%s) and transform_id (%s): %s' %
0370                                   (request_id, transform_id, error))
0371     except Exception as error:
0372         raise error
0373 
0374 
0375 @read_session
0376 def get_transforms(request_id=None, workload_id=None, transform_id=None, loop_index=None, internal_ids=None,
0377                    to_json=False, session=None):
0378     """
0379     Get transforms or raise a NoObject exception.
0380 
0381     :param request_id: Request id.
0382     :param workload_id: Workload id.
0383     :param transform_id: Transform id.
0384     :param session: The database session in use.
0385 
0386     :raises NoObject: If no transform is founded.
0387 
0388     :returns: list of transforms.
0389     """
0390     try:
0391         query = session.query(models.Transform)
0392         if request_id:
0393             query = query.filter(models.Transform.request_id == request_id)
0394         if workload_id:
0395             query = query.filter(models.Transform.workload_id == workload_id)
0396         if transform_id:
0397             query = query.filter(models.Transform.transform_id == transform_id)
0398         if loop_index is not None:
0399             query = query.filter(models.Transform.loop_index == loop_index)
0400         if internal_ids:
0401             if not isinstance(internal_ids, (list, tuple)):
0402                 internal_ids = [internal_ids]
0403             if len(internal_ids) == 1:
0404                 internal_ids = [internal_ids[0], internal_ids[0]]
0405             query = query.filter(models.Transform.internal_id.in_(internal_ids))
0406 
0407         tmp = query.all()
0408         rets = []
0409         if tmp:
0410             for t in tmp:
0411                 if to_json:
0412                     rets.append(t.to_dict_json())
0413                 else:
0414                     rets.append(t.to_dict())
0415         return rets
0416     except sqlalchemy.orm.exc.NoResultFound as error:
0417         raise exceptions.NoObject('No transforms attached with request id (%s): %s' %
0418                                   (request_id, error))
0419     except Exception as error:
0420         raise error
0421 
0422 
0423 @transactional_session
0424 def get_transforms_by_status(status, period=None, transform_ids=[], locking=False, locking_for_update=False,
0425                              bulk_size=None, to_json=False, by_substatus=False, only_return_id=False,
0426                              not_lock=False, order_by_fifo=False, min_request_id=None, new_poll=False,
0427                              update_poll=False, session=None):
0428     """
0429     Get transforms or raise a NoObject exception.
0430 
0431     :param status: Transform status or list of transform status.
0432     :param period: Time period in seconds.
0433     :param locking: Whether to retrieved unlocked items.
0434     :param to_json: return json format.
0435 
0436     :param session: The database session in use.
0437 
0438     :raises NoObject: If no transform is founded.
0439 
0440     :returns: list of transform.
0441     """
0442     try:
0443         if status:
0444             if not isinstance(status, (list, tuple)):
0445                 status = [status]
0446             if len(status) == 1:
0447                 status = [status[0], status[0]]
0448 
0449         if only_return_id:
0450             query = session.query(models.Transform.transform_id)
0451         else:
0452             query = session.query(models.Transform)
0453 
0454         if status:
0455             if by_substatus:
0456                 query = query.filter(models.Transform.substatus.in_(status))
0457             else:
0458                 query = query.filter(models.Transform.status.in_(status))
0459         if new_poll:
0460             query = query.filter(models.Transform.updated_at + models.Transform.new_poll_period <= datetime.datetime.utcnow())
0461         if update_poll:
0462             query = query.filter(models.Transform.updated_at + models.Transform.update_poll_period <= datetime.datetime.utcnow())
0463 
0464         if transform_ids:
0465             query = query.filter(models.Transform.transform_id.in_(transform_ids))
0466         if min_request_id:
0467             query = query.filter(models.Transform.request_id >= min_request_id)
0468         # if period:
0469         #     query = query.filter(models.Transform.updated_at < datetime.datetime.utcnow() - datetime.timedelta(seconds=period))
0470         if locking:
0471             query = query.filter(models.Transform.locking == TransformLocking.Idle)
0472 
0473         if locking_for_update:
0474             query = query.with_for_update(skip_locked=True)
0475         else:
0476             # if order_by_fifo:
0477             #     query = query.order_by(desc(models.Transform.priority)).order_by(asc(models.Transform.transform_id))
0478             # else:
0479             #     query = query.order_by(asc(models.Transform.updated_at)).order_by(desc(models.Transform.priority))
0480             query = query.order_by(asc(models.Transform.updated_at))
0481 
0482         if bulk_size:
0483             query = query.limit(bulk_size)
0484 
0485         tmp = query.all()
0486         rets = []
0487         if tmp:
0488             for t in tmp:
0489                 if locking:
0490                     t.updated_at = datetime.datetime.utcnow()
0491                     t.locking = TransformLocking.Locking
0492 
0493                     hostname, pid, thread_id, thread_name = get_process_thread_info()
0494                     t.locking_hostname = hostname
0495                     t.locking_pid = pid
0496                     t.locking_thread_id = thread_id
0497                     t.locking_thread_name = thread_name
0498 
0499                 if only_return_id:
0500                     rets.append(t[0])
0501                 else:
0502                     if to_json:
0503                         rets.append(t.to_dict_json())
0504                     else:
0505                         rets.append(t.to_dict())
0506         return rets
0507     except sqlalchemy.orm.exc.NoResultFound as error:
0508         raise exceptions.NoObject('No transforms attached with status (%s): %s' %
0509                                   (status, error))
0510     except Exception as error:
0511         raise error
0512 
0513 
0514 @transactional_session
0515 def update_transform(transform_id, parameters, locking=False, session=None):
0516     """
0517     update a transform.
0518 
0519     :param transform_id: the transform id.
0520     :param parameters: A dictionary of parameters.
0521     :param session: The database session in use.
0522 
0523     :raises NoObject: If no content is founded.
0524     :raises DatabaseException: If there is a database error.
0525 
0526     """
0527     try:
0528         parameters['updated_at'] = datetime.datetime.utcnow()
0529 
0530         if 'new_poll_period' in parameters and type(parameters['new_poll_period']) not in [datetime.timedelta]:
0531             parameters['new_poll_period'] = datetime.timedelta(seconds=parameters['new_poll_period'])
0532         if 'update_poll_period' in parameters and type(parameters['update_poll_period']) not in [datetime.timedelta]:
0533             parameters['update_poll_period'] = datetime.timedelta(seconds=parameters['update_poll_period'])
0534 
0535         if 'status' in parameters and parameters['status'] in [TransformStatus.Finished, TransformStatus.Finished.value,
0536                                                                TransformStatus.Failed, TransformStatus.Failed.value]:
0537             parameters['finished_at'] = datetime.datetime.utcnow()
0538 
0539         if 'transform_metadata' in parameters and 'work' in parameters['transform_metadata']:
0540             work = parameters['transform_metadata']['work']
0541             if work is not None:
0542                 if hasattr(work, 'refresh_work'):
0543                     work.refresh_work()
0544                 if 'running_metadata' not in parameters:
0545                     parameters['running_metadata'] = {}
0546                 parameters['running_metadata']['work_data'] = work.metadata
0547         if 'transform_metadata' in parameters:
0548             del parameters['transform_metadata']
0549         if 'running_metadata' in parameters:
0550             parameters['_running_metadata'] = parameters['running_metadata']
0551             del parameters['running_metadata']
0552 
0553         query = session.query(models.Transform).filter_by(transform_id=transform_id)
0554         if locking:
0555             query = query.filter(models.Transform.locking == TransformLocking.Idle)
0556             query = query.with_for_update(skip_locked=True)
0557 
0558         num_rows = query.update(parameters, synchronize_session=False)
0559         return num_rows
0560     except sqlalchemy.orm.exc.NoResultFound as error:
0561         raise exceptions.NoObject('Transfrom %s cannot be found: %s' % (transform_id, error))
0562     return 0
0563 
0564 
0565 @transactional_session
0566 def abort_resume_transforms(transform_id=None, request_id=None, abort=False, resume=False, session=None):
0567     """
0568     abort/resume transforms.
0569 
0570     :param request_id: The request id.
0571     :param transform_id: The id of the transform.
0572     :param session: The database session in use.
0573 
0574     :raises NoObject: If no content is founded.
0575     :raises DatabaseException: If there is a database error.
0576     """
0577     if not abort and not resume:
0578         return
0579     if not transform_id and not request_id:
0580         return
0581 
0582     try:
0583         if abort:
0584             # parameters = {'substatus': TransformStatus.ToCancel}
0585             parameters = {'command': CommandType.AbortTransform}
0586             command = CommandType.AbortTransform
0587         if resume:
0588             # parameters = {'substatus': TransformStatus.ToResume}
0589             parameters = {'command': CommandType.ResumeTransform}
0590             command = CommandType.ResumeTransform
0591         query = session.query(models.Transform)
0592         if transform_id:
0593             query = query.filter_by(transform_id=transform_id)
0594         if request_id:
0595             query = query.filter_by(request_id=request_id)
0596         query = query.filter(models.Transform.command != command)
0597         num_rows = query.update(parameters, synchronize_session=False)
0598         return num_rows
0599     except sqlalchemy.orm.exc.NoResultFound as error:
0600         raise exceptions.NoObject('Transfrom %s cannot be found: %s' % (transform_id, error))
0601     return 0
0602 
0603 
0604 @transactional_session
0605 def delete_transform(transform_id=None, session=None):
0606     """
0607     delete a transform.
0608 
0609     :param transform_id: The id of the transform.
0610     :param session: The database session in use.
0611 
0612     :raises NoObject: If no content is founded.
0613     :raises DatabaseException: If there is a database error.
0614     """
0615     try:
0616         session.query(models.Req2transform).filter_by(transform_id=transform_id).delete()
0617         session.query(models.Transform).filter_by(transform_id=transform_id).delete()
0618     except sqlalchemy.orm.exc.NoResultFound as error:
0619         raise exceptions.NoObject('Transfrom %s cannot be found: %s' % (transform_id, error))
0620 
0621 
0622 @transactional_session
0623 def clean_locking(time_period=3600, min_request_id=None, health_items=[], force=False, hostname=None, pid=None, session=None):
0624     """
0625     Clearn locking which is older than time period.
0626 
0627     :param time_period in seconds
0628     """
0629     health_dict = {}
0630     for item in health_items:
0631         hostname = item['hostname']
0632         pid = item['pid']
0633         thread_id = item['thread_id']
0634         if hostname not in health_dict:
0635             health_dict[hostname] = {}
0636         if pid not in health_dict[hostname]:
0637             health_dict[hostname][pid] = []
0638         if thread_id not in health_dict[hostname][pid]:
0639             health_dict[hostname][pid].append(thread_id)
0640     query = session.query(models.Transform.transform_id,
0641                           models.Transform.locking_hostname,
0642                           models.Transform.locking_pid,
0643                           models.Transform.locking_thread_id,
0644                           models.Transform.locking_thread_name,
0645                           models.Transform.updated_at)
0646     query = query.filter(models.Transform.locking == TransformLocking.Locking)
0647     if min_request_id:
0648         query = query.filter(models.Transform.request_id >= min_request_id)
0649 
0650     lost_transform_ids = []
0651     tmp = query.all()
0652     if tmp:
0653         for req in tmp:
0654             tf_id, locking_hostname, locking_pid, locking_thread_id, locking_thread_name, updated_at = req
0655             if (
0656                 (locking_hostname not in health_dict or locking_pid not in health_dict[locking_hostname])
0657                 or (force and hostname == locking_hostname and pid == locking_pid)      # noqa W503
0658                 or (updated_at < datetime.datetime.utcnow() - datetime.timedelta(seconds=time_period))    # noqa W503
0659             ):
0660                 lost_transform_ids.append({"transform_id": tf_id, 'locking': 0})
0661 
0662     # session.bulk_update_mappings(models.Transform, lost_transform_ids)
0663     safe_bulk_update_mappings(session, models.Transform, lost_transform_ids)
0664 
0665 
0666 @transactional_session
0667 def clean_next_poll_at(status, session=None):
0668     """
0669     Clearn next_poll_at.
0670 
0671     :param status: status of the transform
0672     """
0673     if not isinstance(status, (list, tuple)):
0674         status = [status]
0675     if len(status) == 1:
0676         status = [status[0], status[0]]
0677 
0678     params = {'next_poll_at': datetime.datetime.utcnow()}
0679     session.query(models.Transform).filter(models.Transform.status.in_(status))\
0680            .update(params, synchronize_session=False)
0681 
0682 
0683 @read_session
0684 def get_num_active_transforms(active_status=None, session=None):
0685     if active_status and not isinstance(active_status, (list, tuple)):
0686         active_status = [active_status]
0687     if active_status and len(active_status) == 1:
0688         active_status = [active_status[0], active_status[0]]
0689 
0690     try:
0691         query = session.query(models.Transform.status, models.Transform.site, func.count(models.Transform.transform_id))
0692         if active_status:
0693             query = query.filter(models.Transform.status.in_(active_status))
0694         query = query.group_by(models.Transform.status, models.Transform.site)
0695         tmp = query.all()
0696         return tmp
0697     except Exception as error:
0698         raise error
0699 
0700 
0701 @read_session
0702 def get_active_transforms(active_status=None, session=None):
0703     if active_status and not isinstance(active_status, (list, tuple)):
0704         active_status = [active_status]
0705     if active_status and len(active_status) == 1:
0706         active_status = [active_status[0], active_status[0]]
0707 
0708     try:
0709         query = session.query(models.Transform.request_id,
0710                               models.Transform.transform_id,
0711                               models.Transform.site,
0712                               models.Transform.status)
0713         if active_status:
0714             query = query.filter(models.Transform.status.in_(active_status))
0715         tmp = query.all()
0716         return tmp
0717     except Exception as error:
0718         raise error