File indexing completed on 2026-04-09 07:58:20
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 operations related to Transform.
0014 """
0015
0016 import datetime
0017
0018 import sqlalchemy
0019 from sqlalchemy import and_, func, select
0020 from sqlalchemy.exc import DatabaseError, IntegrityError
0021 from sqlalchemy.sql.expression import asc
0022
0023 from idds.common import exceptions
0024 from idds.common.constants import CommandType, TransformStatus, TransformLocking, CollectionRelationType
0025 from idds.common.utils import get_process_thread_info
0026 from idds.orm.base.session import read_session, transactional_session, safe_bulk_update_mappings
0027 from idds.orm.base import models
0028
0029
0030 def create_transform(request_id, workload_id, transform_type, transform_tag=None,
0031 priority=0, status=TransformStatus.New, name=None,
0032 substatus=TransformStatus.New, locking=TransformLocking.Idle,
0033 new_poll_period=1, update_poll_period=10,
0034 new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0,
0035 parent_transform_id=None, previous_transform_id=None, current_processing_id=None,
0036 internal_id=None, has_previous_conditions=None, loop_index=None,
0037 parent_internal_id=None, command=CommandType.NoneCommand,
0038 cloned_from=None, triggered_conditions=None, untriggered_conditions=None,
0039 site=None, retries=0, expired_at=None, transform_metadata=None):
0040 """
0041 Create a transform.
0042
0043 :param request_id: The request id.
0044 :param workload_id: The workload id.
0045 :param transform_type: Transform type.
0046 :param transform_tag: Transform tag.
0047 :param priority: priority.
0048 :param status: Transform status.
0049 :param locking: Transform locking.
0050 :param retries: The number of retries.
0051 :param expired_at: The datetime when it expires.
0052 :param transform_metadata: The metadata as json.
0053
0054 :returns: transform.
0055 """
0056 new_transform = models.Transform(request_id=request_id, workload_id=workload_id, transform_type=transform_type,
0057 transform_tag=transform_tag, priority=priority, name=name,
0058 status=status, substatus=substatus, locking=locking,
0059 retries=retries, expired_at=expired_at,
0060 new_retries=new_retries, update_retries=update_retries,
0061 max_new_retries=max_new_retries, max_update_retries=max_update_retries,
0062 parent_transform_id=parent_transform_id,
0063 previous_transform_id=previous_transform_id,
0064 current_processing_id=current_processing_id,
0065 internal_id=internal_id, site=site,
0066 command=command,
0067 parent_internal_id=parent_internal_id,
0068 has_previous_conditions=has_previous_conditions,
0069 loop_index=loop_index, cloned_from=cloned_from,
0070 triggered_conditions=triggered_conditions,
0071 untriggered_conditions=untriggered_conditions,
0072 transform_metadata=transform_metadata)
0073 if new_poll_period:
0074 new_poll_period = datetime.timedelta(seconds=new_poll_period)
0075 new_transform.new_poll_period = new_poll_period
0076 if update_poll_period:
0077 update_poll_period = datetime.timedelta(seconds=update_poll_period)
0078 new_transform.update_poll_period = update_poll_period
0079 return new_transform
0080
0081
0082 @transactional_session
0083 def add_transform(request_id, workload_id, transform_type, transform_tag=None, priority=0, name=None,
0084 status=TransformStatus.New, substatus=TransformStatus.New, locking=TransformLocking.Idle,
0085 new_poll_period=1, update_poll_period=10, retries=0, expired_at=None,
0086 new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0,
0087 parent_transform_id=None, previous_transform_id=None, current_processing_id=None,
0088 internal_id=None, has_previous_conditions=None, loop_index=None,
0089 parent_internal_id=None, command=CommandType.NoneCommand,
0090 cloned_from=None, triggered_conditions=None, untriggered_conditions=None,
0091 transform_metadata=None, workprogress_id=None, site=None, session=None):
0092 """
0093 Add a transform.
0094
0095 :param request_id: The request id.
0096 :param workload_id: The workload id.
0097 :param transform_type: Transform type.
0098 :param transform_tag: Transform tag.
0099 :param priority: priority.
0100 :param status: Transform status.
0101 :param locking: Transform locking.
0102 :param retries: The number of retries.
0103 :param expired_at: The datetime when it expires.
0104 :param transform_metadata: The metadata as json.
0105
0106 :raises DuplicatedObject: If a transform with the same name exists.
0107 :raises DatabaseException: If there is a database error.
0108
0109 :returns: transform id.
0110 """
0111 try:
0112 new_transform = create_transform(request_id=request_id, workload_id=workload_id, transform_type=transform_type,
0113 transform_tag=transform_tag, priority=priority, name=name,
0114 status=status, substatus=substatus, locking=locking,
0115 retries=retries, expired_at=expired_at,
0116 new_poll_period=new_poll_period,
0117 update_poll_period=update_poll_period,
0118 new_retries=new_retries, update_retries=update_retries,
0119 max_new_retries=max_new_retries, max_update_retries=max_update_retries,
0120 parent_transform_id=parent_transform_id,
0121 previous_transform_id=previous_transform_id,
0122 current_processing_id=current_processing_id,
0123 internal_id=internal_id, site=site,
0124 command=command,
0125 parent_internal_id=parent_internal_id,
0126 has_previous_conditions=has_previous_conditions,
0127 loop_index=loop_index, cloned_from=cloned_from,
0128 triggered_conditions=triggered_conditions,
0129 untriggered_conditions=untriggered_conditions,
0130 transform_metadata=transform_metadata)
0131 new_transform.save(session=session)
0132 transform_id = new_transform.transform_id
0133
0134 if workprogress_id:
0135 new_wp2transform = models.Workprogress2transform(workprogress_id=workprogress_id, transform_id=transform_id)
0136 new_wp2transform.save(session=session)
0137
0138 return transform_id
0139 except IntegrityError as error:
0140 raise exceptions.DuplicatedObject('Transform already exists!: %s' % (error))
0141 except DatabaseError as error:
0142 raise exceptions.DatabaseException(error)
0143
0144
0145 @transactional_session
0146 def add_req2transform(request_id, transform_id, session=None):
0147 """
0148 Add the relation between request_id and transform_id
0149
0150 :param request_id: Request id.
0151 :param transform_id: Transform id.
0152 :param session: The database session in use.
0153 """
0154 try:
0155 new_req2transform = models.Req2transform(request_id=request_id, transform_id=transform_id)
0156 new_req2transform.save(session=session)
0157 except IntegrityError as error:
0158 raise exceptions.DuplicatedObject('Request2Transform already exists!(%s:%s): %s' %
0159 (request_id, transform_id, error))
0160 except DatabaseError as error:
0161 raise exceptions.DatabaseException(error)
0162
0163
0164 @transactional_session
0165 def add_wp2transform(workprogress_id, transform_id, session=None):
0166 """
0167 Add the relation between workprogress_id and transform_id
0168
0169 :param workprogress_id: Workprogress id.
0170 :param transform_id: Transform id.
0171 :param session: The database session in use.
0172 """
0173 try:
0174 new_wp2transform = models.Workprogress2transform(workprogress_id=workprogress_id, transform_id=transform_id)
0175 new_wp2transform.save(session=session)
0176 except IntegrityError as error:
0177 raise exceptions.DuplicatedObject('Workprogress2Transform already exists!(%s:%s): %s' %
0178 (workprogress_id, transform_id, error))
0179 except DatabaseError as error:
0180 raise exceptions.DatabaseException(error)
0181
0182
0183 @read_session
0184 def get_transform(transform_id, request_id=None, to_json=False, session=None):
0185 """
0186 Get transform or raise a NoObject exception.
0187
0188 :param transform_id: Transform id.
0189 :param session: The database session in use.
0190
0191 :raises NoObject: If no transform is founded.
0192
0193 :returns: Transform.
0194 """
0195
0196 try:
0197 query = session.query(models.Transform)\
0198 .filter(models.Transform.transform_id == transform_id)
0199 if request_id:
0200 query = query.filter(models.Transform.request_id == request_id)
0201 ret = query.first()
0202 if not ret:
0203 return None
0204 else:
0205 if to_json:
0206 return ret.to_dict_json()
0207 else:
0208 return ret.to_dict()
0209 except sqlalchemy.orm.exc.NoResultFound as error:
0210 raise exceptions.NoObject('Transform(transform_id: %s) cannot be found: %s' %
0211 (transform_id, error))
0212 except Exception as error:
0213 raise error
0214
0215
0216 @read_session
0217 def get_transform_by_id_status(transform_id, status=None, locking=False, session=None):
0218 """
0219 Get a transform or raise a NoObject exception.
0220
0221 :param transform_id: The id of the transform.
0222 :param status: request status.
0223 :param locking: the locking status.
0224
0225 :param session: The database session in use.
0226
0227 :raises NoObject: If no request is founded.
0228
0229 :returns: Transform.
0230 """
0231
0232 try:
0233 query = select(models.Transform).where(models.Transform.transform_id == transform_id)
0234
0235 if status:
0236 if not isinstance(status, (list, tuple)):
0237 status = [status]
0238 if len(status) == 1:
0239 status = [status[0], status[0]]
0240 query = query.where(models.Transform.status.in_(status))
0241
0242 if locking:
0243 query = query.where(models.Transform.locking == TransformLocking.Idle)
0244 query = query.with_for_update(skip_locked=True)
0245
0246 ret = session.execute(query).fetchone()
0247 if not ret:
0248 return None
0249 else:
0250 if locking:
0251 ret[0].updated_at = datetime.datetime.utcnow()
0252 ret[0].locking = TransformLocking.Locking
0253 hostname, pid, thread_id, thread_name = get_process_thread_info()
0254 ret[0].locking_hostname = hostname
0255 ret[0].locking_pid = pid
0256 ret[0].locking_thread_id = thread_id
0257 ret[0].locking_thread_name = thread_name
0258
0259 return ret[0].to_dict()
0260 except sqlalchemy.orm.exc.NoResultFound as error:
0261 raise exceptions.NoObject('transform transform_id: %s cannot be found: %s' % (transform_id, error))
0262
0263
0264 @read_session
0265 def get_transform_by_name(request_id, name, session=None):
0266 """
0267 Get a transform or raise a NoObject exception.
0268
0269 :param request_id: The request id.
0270 :param name: transform name.
0271 :param locking: the locking status.
0272
0273 :param session: The database session in use.
0274
0275 :raises NoObject: If no request is founded.
0276
0277 :returns: Transform.
0278 """
0279
0280 try:
0281 query = select(models.Transform).where(models.Transform.request_id == request_id)
0282 query = query.where(models.Transform.name == name)
0283
0284 ret = session.execute(query).fetchone()
0285 if not ret:
0286 return None
0287 else:
0288 return ret[0].to_dict()
0289 except sqlalchemy.orm.exc.NoResultFound as error:
0290 raise exceptions.NoObject(f'transform (request_id: {request_id}, name: {name}) cannot be found: {error}')
0291
0292
0293 @read_session
0294 def get_transforms_with_input_collection(transform_type, transform_tag, coll_scope, coll_name, to_json=False, session=None):
0295 """
0296 Get transforms or raise a NoObject exception.
0297
0298 :param transform_type: Transform type.
0299 :param transform_tag: Transform tag.
0300 :param coll_scope: The collection scope.
0301 :param coll_name: The collection name.
0302 :param to_json: return json format.
0303
0304 :param session: The database session in use.
0305
0306 :raises NoObject: If no transform is founded.
0307
0308 :returns: Transform.
0309 """
0310
0311 try:
0312 subquery = session.query(models.Collection.transform_id)\
0313 .filter(models.Collection.scope == coll_scope)\
0314 .filter(models.Collection.name == coll_name)\
0315 .filter(models.Collection.relation_type == CollectionRelationType.Input)\
0316 .subquery()
0317 query = session.query(models.Transform)\
0318 .join(subquery, and_(subquery.c.transform_id == models.Transform.transform_id,
0319 models.Transform.transform_type == transform_type,
0320 models.Transform.transform_tag == transform_tag))
0321 tmp = query.all()
0322 rets = []
0323 if tmp:
0324 for transf in tmp:
0325 if to_json:
0326 rets.append(transf.to_dict_json())
0327 else:
0328 rets.append(transf.to_dict())
0329 return rets
0330 except sqlalchemy.orm.exc.NoResultFound as error:
0331 raise exceptions.NoObject('Transform(transform_type: %s, transform_tag: %s, coll_scope: %s, coll_name: %s) cannot be found: %s' %
0332 (transform_type, transform_tag, coll_scope, coll_name, error))
0333 except Exception as error:
0334 raise error
0335
0336
0337 @read_session
0338 def get_transform_ids(workprogress_id=None, request_id=None, workload_id=None, transform_id=None, session=None):
0339 """
0340 Get transform ids or raise a NoObject exception.
0341
0342 :param workprogress_id: Workprogress id.
0343 :param workload_id: Workload id.
0344 :param transform_id: Transform id.
0345 :param session: The database session in use.
0346
0347 :raises NoObject: If no transform is founded.
0348
0349 :returns: list of transform ids.
0350 """
0351 try:
0352 query = session.query(models.Transform.transform_id)
0353 if request_id:
0354 query = query.filter(models.Transform.request_id == request_id)
0355 if workload_id:
0356 query = query.filter(models.Transform.workload_id == workload_id)
0357 if transform_id:
0358 query = query.filter(models.Transform.transform_id == transform_id)
0359 if workprogress_id:
0360 query = query.join(models.Workprogress2transform, and_(models.Workprogress2transform.workprogress_id == workprogress_id))
0361
0362 tmp = query.all()
0363 ret_ids = []
0364 if tmp:
0365 for t in tmp:
0366 ret_ids.append(t[0])
0367 return ret_ids
0368 except sqlalchemy.orm.exc.NoResultFound as error:
0369 raise exceptions.NoObject('No transforms attached with request id (%s) and transform_id (%s): %s' %
0370 (request_id, transform_id, error))
0371 except Exception as error:
0372 raise error
0373
0374
0375 @read_session
0376 def get_transforms(request_id=None, workload_id=None, transform_id=None, loop_index=None, internal_ids=None,
0377 to_json=False, session=None):
0378 """
0379 Get transforms or raise a NoObject exception.
0380
0381 :param request_id: Request id.
0382 :param workload_id: Workload id.
0383 :param transform_id: Transform id.
0384 :param session: The database session in use.
0385
0386 :raises NoObject: If no transform is founded.
0387
0388 :returns: list of transforms.
0389 """
0390 try:
0391 query = session.query(models.Transform)
0392 if request_id:
0393 query = query.filter(models.Transform.request_id == request_id)
0394 if workload_id:
0395 query = query.filter(models.Transform.workload_id == workload_id)
0396 if transform_id:
0397 query = query.filter(models.Transform.transform_id == transform_id)
0398 if loop_index is not None:
0399 query = query.filter(models.Transform.loop_index == loop_index)
0400 if internal_ids:
0401 if not isinstance(internal_ids, (list, tuple)):
0402 internal_ids = [internal_ids]
0403 if len(internal_ids) == 1:
0404 internal_ids = [internal_ids[0], internal_ids[0]]
0405 query = query.filter(models.Transform.internal_id.in_(internal_ids))
0406
0407 tmp = query.all()
0408 rets = []
0409 if tmp:
0410 for t in tmp:
0411 if to_json:
0412 rets.append(t.to_dict_json())
0413 else:
0414 rets.append(t.to_dict())
0415 return rets
0416 except sqlalchemy.orm.exc.NoResultFound as error:
0417 raise exceptions.NoObject('No transforms attached with request id (%s): %s' %
0418 (request_id, error))
0419 except Exception as error:
0420 raise error
0421
0422
0423 @transactional_session
0424 def get_transforms_by_status(status, period=None, transform_ids=[], locking=False, locking_for_update=False,
0425 bulk_size=None, to_json=False, by_substatus=False, only_return_id=False,
0426 not_lock=False, order_by_fifo=False, min_request_id=None, new_poll=False,
0427 update_poll=False, session=None):
0428 """
0429 Get transforms or raise a NoObject exception.
0430
0431 :param status: Transform status or list of transform status.
0432 :param period: Time period in seconds.
0433 :param locking: Whether to retrieved unlocked items.
0434 :param to_json: return json format.
0435
0436 :param session: The database session in use.
0437
0438 :raises NoObject: If no transform is founded.
0439
0440 :returns: list of transform.
0441 """
0442 try:
0443 if status:
0444 if not isinstance(status, (list, tuple)):
0445 status = [status]
0446 if len(status) == 1:
0447 status = [status[0], status[0]]
0448
0449 if only_return_id:
0450 query = session.query(models.Transform.transform_id)
0451 else:
0452 query = session.query(models.Transform)
0453
0454 if status:
0455 if by_substatus:
0456 query = query.filter(models.Transform.substatus.in_(status))
0457 else:
0458 query = query.filter(models.Transform.status.in_(status))
0459 if new_poll:
0460 query = query.filter(models.Transform.updated_at + models.Transform.new_poll_period <= datetime.datetime.utcnow())
0461 if update_poll:
0462 query = query.filter(models.Transform.updated_at + models.Transform.update_poll_period <= datetime.datetime.utcnow())
0463
0464 if transform_ids:
0465 query = query.filter(models.Transform.transform_id.in_(transform_ids))
0466 if min_request_id:
0467 query = query.filter(models.Transform.request_id >= min_request_id)
0468
0469
0470 if locking:
0471 query = query.filter(models.Transform.locking == TransformLocking.Idle)
0472
0473 if locking_for_update:
0474 query = query.with_for_update(skip_locked=True)
0475 else:
0476
0477
0478
0479
0480 query = query.order_by(asc(models.Transform.updated_at))
0481
0482 if bulk_size:
0483 query = query.limit(bulk_size)
0484
0485 tmp = query.all()
0486 rets = []
0487 if tmp:
0488 for t in tmp:
0489 if locking:
0490 t.updated_at = datetime.datetime.utcnow()
0491 t.locking = TransformLocking.Locking
0492
0493 hostname, pid, thread_id, thread_name = get_process_thread_info()
0494 t.locking_hostname = hostname
0495 t.locking_pid = pid
0496 t.locking_thread_id = thread_id
0497 t.locking_thread_name = thread_name
0498
0499 if only_return_id:
0500 rets.append(t[0])
0501 else:
0502 if to_json:
0503 rets.append(t.to_dict_json())
0504 else:
0505 rets.append(t.to_dict())
0506 return rets
0507 except sqlalchemy.orm.exc.NoResultFound as error:
0508 raise exceptions.NoObject('No transforms attached with status (%s): %s' %
0509 (status, error))
0510 except Exception as error:
0511 raise error
0512
0513
0514 @transactional_session
0515 def update_transform(transform_id, parameters, locking=False, session=None):
0516 """
0517 update a transform.
0518
0519 :param transform_id: the transform id.
0520 :param parameters: A dictionary of parameters.
0521 :param session: The database session in use.
0522
0523 :raises NoObject: If no content is founded.
0524 :raises DatabaseException: If there is a database error.
0525
0526 """
0527 try:
0528 parameters['updated_at'] = datetime.datetime.utcnow()
0529
0530 if 'new_poll_period' in parameters and type(parameters['new_poll_period']) not in [datetime.timedelta]:
0531 parameters['new_poll_period'] = datetime.timedelta(seconds=parameters['new_poll_period'])
0532 if 'update_poll_period' in parameters and type(parameters['update_poll_period']) not in [datetime.timedelta]:
0533 parameters['update_poll_period'] = datetime.timedelta(seconds=parameters['update_poll_period'])
0534
0535 if 'status' in parameters and parameters['status'] in [TransformStatus.Finished, TransformStatus.Finished.value,
0536 TransformStatus.Failed, TransformStatus.Failed.value]:
0537 parameters['finished_at'] = datetime.datetime.utcnow()
0538
0539 if 'transform_metadata' in parameters and 'work' in parameters['transform_metadata']:
0540 work = parameters['transform_metadata']['work']
0541 if work is not None:
0542 if hasattr(work, 'refresh_work'):
0543 work.refresh_work()
0544 if 'running_metadata' not in parameters:
0545 parameters['running_metadata'] = {}
0546 parameters['running_metadata']['work_data'] = work.metadata
0547 if 'transform_metadata' in parameters:
0548 del parameters['transform_metadata']
0549 if 'running_metadata' in parameters:
0550 parameters['_running_metadata'] = parameters['running_metadata']
0551 del parameters['running_metadata']
0552
0553 query = session.query(models.Transform).filter_by(transform_id=transform_id)
0554 if locking:
0555 query = query.filter(models.Transform.locking == TransformLocking.Idle)
0556 query = query.with_for_update(skip_locked=True)
0557
0558 num_rows = query.update(parameters, synchronize_session=False)
0559 return num_rows
0560 except sqlalchemy.orm.exc.NoResultFound as error:
0561 raise exceptions.NoObject('Transfrom %s cannot be found: %s' % (transform_id, error))
0562 return 0
0563
0564
0565 @transactional_session
0566 def abort_resume_transforms(transform_id=None, request_id=None, abort=False, resume=False, session=None):
0567 """
0568 abort/resume transforms.
0569
0570 :param request_id: The request id.
0571 :param transform_id: The id of the transform.
0572 :param session: The database session in use.
0573
0574 :raises NoObject: If no content is founded.
0575 :raises DatabaseException: If there is a database error.
0576 """
0577 if not abort and not resume:
0578 return
0579 if not transform_id and not request_id:
0580 return
0581
0582 try:
0583 if abort:
0584
0585 parameters = {'command': CommandType.AbortTransform}
0586 command = CommandType.AbortTransform
0587 if resume:
0588
0589 parameters = {'command': CommandType.ResumeTransform}
0590 command = CommandType.ResumeTransform
0591 query = session.query(models.Transform)
0592 if transform_id:
0593 query = query.filter_by(transform_id=transform_id)
0594 if request_id:
0595 query = query.filter_by(request_id=request_id)
0596 query = query.filter(models.Transform.command != command)
0597 num_rows = query.update(parameters, synchronize_session=False)
0598 return num_rows
0599 except sqlalchemy.orm.exc.NoResultFound as error:
0600 raise exceptions.NoObject('Transfrom %s cannot be found: %s' % (transform_id, error))
0601 return 0
0602
0603
0604 @transactional_session
0605 def delete_transform(transform_id=None, session=None):
0606 """
0607 delete a transform.
0608
0609 :param transform_id: The id of the transform.
0610 :param session: The database session in use.
0611
0612 :raises NoObject: If no content is founded.
0613 :raises DatabaseException: If there is a database error.
0614 """
0615 try:
0616 session.query(models.Req2transform).filter_by(transform_id=transform_id).delete()
0617 session.query(models.Transform).filter_by(transform_id=transform_id).delete()
0618 except sqlalchemy.orm.exc.NoResultFound as error:
0619 raise exceptions.NoObject('Transfrom %s cannot be found: %s' % (transform_id, error))
0620
0621
0622 @transactional_session
0623 def clean_locking(time_period=3600, min_request_id=None, health_items=[], force=False, hostname=None, pid=None, session=None):
0624 """
0625 Clearn locking which is older than time period.
0626
0627 :param time_period in seconds
0628 """
0629 health_dict = {}
0630 for item in health_items:
0631 hostname = item['hostname']
0632 pid = item['pid']
0633 thread_id = item['thread_id']
0634 if hostname not in health_dict:
0635 health_dict[hostname] = {}
0636 if pid not in health_dict[hostname]:
0637 health_dict[hostname][pid] = []
0638 if thread_id not in health_dict[hostname][pid]:
0639 health_dict[hostname][pid].append(thread_id)
0640 query = session.query(models.Transform.transform_id,
0641 models.Transform.locking_hostname,
0642 models.Transform.locking_pid,
0643 models.Transform.locking_thread_id,
0644 models.Transform.locking_thread_name,
0645 models.Transform.updated_at)
0646 query = query.filter(models.Transform.locking == TransformLocking.Locking)
0647 if min_request_id:
0648 query = query.filter(models.Transform.request_id >= min_request_id)
0649
0650 lost_transform_ids = []
0651 tmp = query.all()
0652 if tmp:
0653 for req in tmp:
0654 tf_id, locking_hostname, locking_pid, locking_thread_id, locking_thread_name, updated_at = req
0655 if (
0656 (locking_hostname not in health_dict or locking_pid not in health_dict[locking_hostname])
0657 or (force and hostname == locking_hostname and pid == locking_pid)
0658 or (updated_at < datetime.datetime.utcnow() - datetime.timedelta(seconds=time_period))
0659 ):
0660 lost_transform_ids.append({"transform_id": tf_id, 'locking': 0})
0661
0662
0663 safe_bulk_update_mappings(session, models.Transform, lost_transform_ids)
0664
0665
0666 @transactional_session
0667 def clean_next_poll_at(status, session=None):
0668 """
0669 Clearn next_poll_at.
0670
0671 :param status: status of the transform
0672 """
0673 if not isinstance(status, (list, tuple)):
0674 status = [status]
0675 if len(status) == 1:
0676 status = [status[0], status[0]]
0677
0678 params = {'next_poll_at': datetime.datetime.utcnow()}
0679 session.query(models.Transform).filter(models.Transform.status.in_(status))\
0680 .update(params, synchronize_session=False)
0681
0682
0683 @read_session
0684 def get_num_active_transforms(active_status=None, session=None):
0685 if active_status and not isinstance(active_status, (list, tuple)):
0686 active_status = [active_status]
0687 if active_status and len(active_status) == 1:
0688 active_status = [active_status[0], active_status[0]]
0689
0690 try:
0691 query = session.query(models.Transform.status, models.Transform.site, func.count(models.Transform.transform_id))
0692 if active_status:
0693 query = query.filter(models.Transform.status.in_(active_status))
0694 query = query.group_by(models.Transform.status, models.Transform.site)
0695 tmp = query.all()
0696 return tmp
0697 except Exception as error:
0698 raise error
0699
0700
0701 @read_session
0702 def get_active_transforms(active_status=None, session=None):
0703 if active_status and not isinstance(active_status, (list, tuple)):
0704 active_status = [active_status]
0705 if active_status and len(active_status) == 1:
0706 active_status = [active_status[0], active_status[0]]
0707
0708 try:
0709 query = session.query(models.Transform.request_id,
0710 models.Transform.transform_id,
0711 models.Transform.site,
0712 models.Transform.status)
0713 if active_status:
0714 query = query.filter(models.Transform.status.in_(active_status))
0715 tmp = query.all()
0716 return tmp
0717 except Exception as error:
0718 raise error