Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:20

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2019 - 2025
0010 
0011 
0012 """
0013 operations related to Requests.
0014 """
0015 
0016 import datetime
0017 import random
0018 
0019 import sqlalchemy
0020 from sqlalchemy import and_, func, select, not_
0021 from sqlalchemy.exc import DatabaseError, IntegrityError
0022 from sqlalchemy.sql.expression import asc, desc
0023 
0024 from idds.common import exceptions
0025 from idds.common.constants import RequestType, RequestStatus, RequestLocking, CommandType
0026 from idds.common.utils import get_process_thread_info
0027 from idds.orm.base.session import read_session, transactional_session, safe_bulk_update_mappings
0028 from idds.orm.base import models
0029 
0030 
0031 def create_request(scope=None, name=None, requester=None, request_type=None,
0032                    username=None, userdn=None, transform_tag=None,
0033                    status=RequestStatus.New, locking=RequestLocking.Idle, priority=0,
0034                    lifetime=None, workload_id=None, request_metadata=None,
0035                    new_poll_period=1, update_poll_period=10, site=None,
0036                    cloud=None, queue=None, command=CommandType.NoneCommand,
0037                    new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0,
0038                    group_id=None, campaign=None, campaign_scope=None, campaign_group=None,
0039                    campaign_tag=None, additional_data_storage=None,
0040                    processing_metadata=None):
0041     """
0042     Create a request.
0043 
0044     :param scope: The scope of the request data.
0045     :param name: The name of the request data.
0046     :param requestr: The requester, such as panda, user and so on.
0047     :param request_type: The type of the request, such as ESS, DAOD.
0048     :param transform_tag: Transform tag, such as ATLAS AMI tag.
0049     :param status: The request status as integer.
0050     :param locking: The request locking as integer.
0051     :param priority: The priority as integer.
0052     :param lifetime: The life time as umber of days.
0053     :param workload_id: The external workload id.
0054     :param request_metadata: The metadata as json.
0055     :param processing_metadata: The metadata as json.
0056 
0057     :returns: request.
0058     """
0059 
0060     is_pseudo_input = None
0061     if request_type in [RequestType.HyperParameterOpt, RequestType.HyperParameterOpt.value]:
0062         if not scope:
0063             scope = 'hpo'
0064             is_pseudo_input = True
0065         if not name:
0066             if workload_id is not None:
0067                 name = 'hpo.%s.' % workload_id
0068             else:
0069                 name = 'hpo.'
0070             name = name + datetime.datetime.utcnow().strftime("%Y_%m_%d_%H_%M_%S_%f") + str(random.randint(1, 1000))
0071             is_pseudo_input = True
0072 
0073     if is_pseudo_input:
0074         if not request_metadata:
0075             request_metadata = {}
0076         request_metadata['is_pseudo_input'] = True
0077 
0078     if lifetime:
0079         expired_at = datetime.datetime.utcnow() + datetime.timedelta(days=lifetime)
0080     else:
0081         expired_at = None
0082 
0083     new_request = models.Request(scope=scope, name=name, requester=requester, request_type=request_type,
0084                                  username=username, userdn=userdn,
0085                                  transform_tag=transform_tag, status=status, locking=locking,
0086                                  priority=priority, workload_id=workload_id,
0087                                  expired_at=expired_at, site=site, additional_data_storage=additional_data_storage,
0088                                  cloud=cloud, queue=queue, command=command,
0089                                  new_retries=new_retries, update_retries=update_retries,
0090                                  max_new_retries=max_new_retries, max_update_retries=max_update_retries,
0091                                  group_id=group_id, campaign=campaign, campaign_scope=campaign_scope,
0092                                  campaign_group=campaign_group, campaign_tag=campaign_tag,
0093                                  request_metadata=request_metadata, processing_metadata=processing_metadata)
0094     if new_poll_period:
0095         new_poll_period = datetime.timedelta(seconds=new_poll_period)
0096         new_request.new_poll_period = new_poll_period
0097     if update_poll_period:
0098         update_poll_period = datetime.timedelta(seconds=update_poll_period)
0099         new_request.update_poll_period = update_poll_period
0100     return new_request
0101 
0102 
0103 @transactional_session
0104 def add_request(scope=None, name=None, requester=None, request_type=None,
0105                 username=None, userdn=None, transform_tag=None,
0106                 status=RequestStatus.New, locking=RequestLocking.Idle, priority=0,
0107                 lifetime=None, workload_id=None, request_metadata=None,
0108                 new_poll_period=1, update_poll_period=10, site=None,
0109                 cloud=None, queue=None, command=CommandType.NoneCommand,
0110                 new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0,
0111                 group_id=None, campaign=None, campaign_scope=None, campaign_group=None,
0112                 campaign_tag=None, additional_data_storage=None,
0113                 processing_metadata=None, session=None):
0114     """
0115     Add a request.
0116 
0117     :param scope: The scope of the request data.
0118     :param name: The name of the request data.
0119     :param requestr: The requester, such as panda, user and so on.
0120     :param request_type: The type of the request, such as ESS, DAOD.
0121     :param transform_tag: Transform tag, such as ATLAS AMI tag.
0122     :param status: The request status as integer.
0123     :param locking: The request locking as integer.
0124     :param priority: The priority as integer.
0125     :param lifetime: The life time as umber of days.
0126     :param workload_id: The external workload id.
0127     :param request_metadata: The metadata as json.
0128     :param processing_metadata: The metadata as json.
0129 
0130     :raises DuplicatedObject: If an request with the same name exists.
0131     :raises DatabaseException: If there is a database error.
0132 
0133     :returns: request id.
0134     """
0135 
0136     try:
0137         new_request = create_request(scope=scope, name=name, requester=requester, request_type=request_type,
0138                                      username=username, userdn=userdn,
0139                                      transform_tag=transform_tag, status=status, locking=locking,
0140                                      priority=priority, workload_id=workload_id, lifetime=lifetime,
0141                                      new_poll_period=new_poll_period, site=site,
0142                                      cloud=cloud, queue=queue,
0143                                      command=command,
0144                                      update_poll_period=update_poll_period,
0145                                      additional_data_storage=additional_data_storage,
0146                                      new_retries=new_retries, update_retries=update_retries,
0147                                      max_new_retries=max_new_retries, max_update_retries=max_update_retries,
0148                                      group_id=group_id, campaign=campaign, campaign_scope=campaign_scope,
0149                                      campaign_group=campaign_group, campaign_tag=campaign_tag,
0150                                      request_metadata=request_metadata, processing_metadata=processing_metadata)
0151         new_request.save(session=session)
0152         request_id = new_request.request_id
0153         return request_id
0154     except IntegrityError as error:
0155         raise exceptions.DuplicatedObject('Request %s:%s already exists!: %s' % (scope, name, error))
0156     except DatabaseError as error:
0157         raise exceptions.DatabaseException(error)
0158 
0159 
0160 @read_session
0161 def get_request_ids_by_workload_id(workload_id, session=None):
0162     """
0163     Get request id or raise a NoObject exception.
0164 
0165     :param workload_id: The workload_id of the request.
0166     :param session: The database session in use.
0167 
0168     :raises NoObject: If no request is founded.
0169 
0170     :returns: Request id.
0171     """
0172 
0173     if workload_id is None:
0174         return exceptions.WrongParameterException("workload_id should not be None")
0175 
0176     try:
0177         query = session.query(models.Request.request_id)\
0178                        .filter(models.Request.workload_id == workload_id)
0179         tmp = query.all()
0180         ret_ids = []
0181         if tmp:
0182             for req in tmp:
0183                 ret_ids.append(req[0])
0184         return ret_ids
0185     except sqlalchemy.orm.exc.NoResultFound as error:
0186         raise exceptions.NoObject('request with workload_id:%s cannot be found: %s' % (workload_id, error))
0187 
0188 
0189 @read_session
0190 def get_request_ids_by_name(name, scope=None, exact_match=False, session=None):
0191     """
0192     Get request ids or raise a NoObject exception.
0193 
0194     :param name: name of the request.
0195     :param session: The database session in use.
0196 
0197     :raises NoObject: If no request is founded.
0198 
0199     :returns: Request {name:id} dict.
0200     """
0201     try:
0202         if not exact_match:
0203             query = session.query(models.Request.request_id, models.Request.name)\
0204                            .filter(models.Request.name.like(name.replace('*', '%')))
0205         else:
0206             query = session.query(models.Request.request_id, models.Request.name)\
0207                            .filter(models.Request.name == name)
0208         if scope:
0209             query = query.filter(models.Request.scope == scope)
0210 
0211         tmp = query.all()
0212         ret_ids = {}
0213         if tmp:
0214             for req in tmp:
0215                 ret_ids[req[1]] = req[0]
0216         return ret_ids
0217     except sqlalchemy.orm.exc.NoResultFound as error:
0218         raise exceptions.NoObject('request with name:%s cannot be found: %s' % (name, error))
0219 
0220 
0221 @read_session
0222 def get_request_ids(request_id=None, workload_id=None, session=None):
0223     """
0224     Get request id or raise a NoObject exception.
0225 
0226     :param request_id: the request id.
0227     :param workload_id: The workload_id of the request.
0228     :param session: The database session in use.
0229 
0230     :raises NoObject: If no request is founded.
0231 
0232     :returns: Request id.
0233     """
0234     if request_id:
0235         return [request_id]
0236     return get_request_ids_by_workload_id(workload_id)
0237 
0238 
0239 @read_session
0240 def get_request(request_id, to_json=False, session=None):
0241     """
0242     Get a request or raise a NoObject exception.
0243 
0244     :param request_id: The id of the request.
0245     :param to_json: return json format.
0246 
0247     :param session: The database session in use.
0248 
0249     :raises NoObject: If no request is founded.
0250 
0251     :returns: Request.
0252     """
0253 
0254     try:
0255         query = select(models.Request).where(models.Request.request_id == request_id)
0256 
0257         ret = session.execute(query).fetchone()
0258         if not ret:
0259             return None
0260         else:
0261             if to_json:
0262                 return ret[0].to_dict_json()
0263             else:
0264                 return ret[0].to_dict()
0265     except sqlalchemy.orm.exc.NoResultFound as error:
0266         raise exceptions.NoObject('request request_id: %s cannot be found: %s' % (request_id, error))
0267 
0268 
0269 @read_session
0270 def get_request_by_id_status(request_id, status=None, locking=False, session=None):
0271     """
0272     Get a request or raise a NoObject exception.
0273 
0274     :param request_id: The id of the request.
0275     :param status: request status.
0276     :param locking: the locking status.
0277 
0278     :param session: The database session in use.
0279 
0280     :raises NoObject: If no request is founded.
0281 
0282     :returns: Request.
0283     """
0284 
0285     try:
0286         query = select(models.Request).filter(models.Request.request_id == request_id)
0287 
0288         if status:
0289             if not isinstance(status, (list, tuple)):
0290                 status = [status]
0291             if len(status) == 1:
0292                 status = [status[0], status[0]]
0293             query = query.where(models.Request.status.in_(status))
0294 
0295         if locking:
0296             query = query.where(models.Request.locking == RequestLocking.Idle)
0297             query = query.with_for_update(skip_locked=True)
0298 
0299         ret = session.execute(query).fetchone()
0300         if not ret:
0301             return None
0302         else:
0303             if locking:
0304                 ret[0].updated_at = datetime.datetime.utcnow()
0305                 ret[0].locking = RequestLocking.Locking
0306                 hostname, pid, thread_id, thread_name = get_process_thread_info()
0307                 ret[0].locking_hostname = hostname
0308                 ret[0].locking_pid = pid
0309                 ret[0].locking_thread_id = thread_id
0310                 ret[0].locking_thread_name = thread_name
0311 
0312             return ret[0].to_dict()
0313     except sqlalchemy.orm.exc.NoResultFound as error:
0314         raise exceptions.NoObject('request request_id: %s cannot be found: %s' % (request_id, error))
0315 
0316 
0317 @read_session
0318 def get_requests_old(request_id=None, workload_id=None, with_detail=False, with_metadata=False,
0319                      with_request=False, with_transform=False, with_processing=False, to_json=False,
0320                      session=None):
0321     """
0322     Get a request or raise a NoObject exception.
0323 
0324     :param workload_id: The workload id of the request.
0325     :param to_json: return json format.
0326 
0327     :param session: The database session in use.
0328 
0329     :raises NoObject: If no request is founded.
0330 
0331     :returns: Request.
0332     """
0333     try:
0334         if with_request or not (with_transform or with_processing or with_detail or with_metadata):
0335             if with_metadata:
0336                 query = session.query(models.Request)
0337 
0338                 if request_id:
0339                     query = query.filter(models.Request.request_id == request_id)
0340                 if workload_id:
0341                     query = query.filter(models.Request.workload_id == workload_id)
0342 
0343                 tmp = query.all()
0344                 rets = []
0345                 if tmp:
0346                     for t in tmp:
0347                         if to_json:
0348                             t_dict = t.to_dict_json()
0349                         else:
0350                             t_dict = t.to_dict()
0351                         rets.append(t_dict)
0352                 return rets
0353             else:
0354                 query = session.query(models.Request.request_id,
0355                                       models.Request.scope,
0356                                       models.Request.name,
0357                                       models.Request.requester,
0358                                       models.Request.request_type,
0359                                       models.Request.username,
0360                                       models.Request.userdn,
0361                                       models.Request.transform_tag,
0362                                       models.Request.workload_id,
0363                                       models.Request.priority,
0364                                       models.Request.status,
0365                                       models.Request.substatus,
0366                                       models.Request.locking,
0367                                       models.Request.created_at,
0368                                       models.Request.updated_at,
0369                                       models.Request.next_poll_at,
0370                                       models.Request.accessed_at,
0371                                       models.Request.expired_at,
0372                                       models.Request.errors)
0373 
0374                 if request_id:
0375                     query = query.filter(models.Request.request_id == request_id)
0376                 if workload_id:
0377                     query = query.filter(models.Request.workload_id == workload_id)
0378 
0379                 tmp = query.all()
0380                 rets = []
0381                 if tmp:
0382                     for t in tmp:
0383                         t2 = dict(zip(t.keys(), t))
0384                         rets.append(t2)
0385                 return rets
0386         elif with_transform:
0387             subquery1 = session.query(models.Collection.coll_id, models.Collection.transform_id,
0388                                       models.Collection.scope.label("input_coll_scope"),
0389                                       models.Collection.name.label("input_coll_name"),
0390                                       models.Collection.status.label("input_coll_status"),
0391                                       models.Collection.bytes.label("input_coll_bytes"),
0392                                       models.Collection.total_files.label("input_total_files"),
0393                                       models.Collection.processed_files.label("input_processed_files"),
0394                                       models.Collection.new_files.label("input_new_files"),
0395                                       models.Collection.failed_files.label("input_failed_files"),
0396                                       models.Collection.missing_files.label("input_missing_files"),
0397                                       models.Collection.processing_files.label("input_processing_files")).filter(models.Collection.relation_type == 0)
0398             subquery1 = subquery1.subquery()
0399 
0400             subquery2 = session.query(models.Collection.coll_id, models.Collection.transform_id,
0401                                       models.Collection.scope.label("output_coll_scope"),
0402                                       models.Collection.name.label("output_coll_name"),
0403                                       models.Collection.status.label("output_coll_status"),
0404                                       models.Collection.bytes.label("output_coll_bytes"),
0405                                       models.Collection.total_files.label("output_total_files"),
0406                                       models.Collection.processed_files.label("output_processed_files"),
0407                                       models.Collection.new_files.label("output_new_files"),
0408                                       models.Collection.failed_files.label("output_failed_files"),
0409                                       models.Collection.missing_files.label("output_missing_files"),
0410                                       models.Collection.processing_files.label("output_processing_files")).filter(models.Collection.relation_type == 1)
0411             subquery2 = subquery2.subquery()
0412             if True:
0413                 query = session.query(models.Request.request_id,
0414                                       models.Request.scope,
0415                                       models.Request.name,
0416                                       models.Request.requester,
0417                                       models.Request.request_type,
0418                                       models.Request.username,
0419                                       models.Request.userdn,
0420                                       models.Request.transform_tag,
0421                                       models.Request.workload_id,
0422                                       models.Request.priority,
0423                                       models.Request.status,
0424                                       models.Request.substatus,
0425                                       models.Request.locking,
0426                                       models.Request.created_at,
0427                                       models.Request.updated_at,
0428                                       models.Request.next_poll_at,
0429                                       models.Request.accessed_at,
0430                                       models.Request.expired_at,
0431                                       models.Request.errors,
0432                                       models.Transform.transform_id,
0433                                       models.Transform.transform_type,
0434                                       models.Transform.workload_id.label("transform_workload_id"),
0435                                       models.Transform.status.label("transform_status"),
0436                                       models.Transform.created_at.label("transform_created_at"),
0437                                       models.Transform.updated_at.label("transform_updated_at"),
0438                                       models.Transform.finished_at.label("transform_finished_at"),
0439                                       subquery1.c.input_coll_scope, subquery1.c.input_coll_name,
0440                                       subquery1.c.input_coll_status, subquery1.c.input_coll_bytes,
0441                                       subquery1.c.input_total_files,
0442                                       subquery1.c.input_processed_files,
0443                                       subquery1.c.input_processing_files,
0444                                       subquery1.c.input_new_files,
0445                                       subquery1.c.input_failed_files,
0446                                       subquery1.c.input_missing_files,
0447                                       subquery2.c.output_coll_scope, subquery2.c.output_coll_name,
0448                                       subquery2.c.output_coll_status, subquery2.c.output_coll_bytes,
0449                                       subquery2.c.output_total_files,
0450                                       subquery2.c.output_processed_files,
0451                                       subquery2.c.output_processing_files,
0452                                       subquery2.c.output_new_files,
0453                                       subquery2.c.output_failed_files,
0454                                       subquery2.c.output_missing_files)
0455 
0456             if request_id:
0457                 query = query.filter(models.Request.request_id == request_id)
0458             if workload_id:
0459                 query = query.filter(models.Request.workload_id == workload_id)
0460 
0461             query = query.outerjoin(models.Transform, and_(models.Request.request_id == models.Transform.request_id))
0462             query = query.outerjoin(subquery1, and_(subquery1.c.transform_id == models.Transform.transform_id))
0463             query = query.outerjoin(subquery2, and_(subquery2.c.transform_id == models.Transform.transform_id))
0464             query = query.order_by(asc(models.Request.request_id))
0465 
0466             tmp = query.all()
0467             rets = []
0468             if tmp:
0469                 for t in tmp:
0470                     # t2 = dict(t)
0471                     t2 = dict(zip(t.keys(), t))
0472 
0473                     if 'request_metadata' in t2 and t2['request_metadata']:
0474                         if 'workflow' in t2['request_metadata']:
0475                             workflow = t2['request_metadata']['workflow']
0476                             workflow_data = None
0477                             if 'processing_metadata' in t2 and t2['processing_metadata'] and 'workflow_data' in t2['processing_metadata']:
0478                                 workflow_data = t2['processing_metadata']['workflow_data']
0479                             if workflow is not None and workflow_data is not None:
0480                                 workflow.metadata = workflow_data
0481                                 t2['request_metadata']['workflow'] = workflow
0482                         if 'build_workflow' in t2['request_metadata']:
0483                             build_workflow = t2['request_metadata']['build_workflow']
0484                             build_workflow_data = None
0485                             if 'processing_metadata' in t2 and t2['processing_metadata'] and 'build_workflow_data' in t2['processing_metadata']:
0486                                 build_workflow_data = t2['processing_metadata']['build_workflow_data']
0487                             if build_workflow is not None and build_workflow_data is not None:
0488                                 build_workflow.metadata = build_workflow_data
0489                                 t2['request_metadata']['build_workflow'] = build_workflow
0490 
0491                     rets.append(t2)
0492             return rets
0493         elif with_processing:
0494             subquery = session.query(models.Processing.processing_id,
0495                                      models.Processing.transform_id,
0496                                      models.Processing.workload_id,
0497                                      models.Processing.status.label("processing_status"),
0498                                      models.Processing.created_at.label("processing_created_at"),
0499                                      models.Processing.updated_at.label("processing_updated_at"),
0500                                      models.Processing.finished_at.label("processing_finished_at"))
0501             subquery = subquery.subquery()
0502 
0503             if True:
0504                 query = session.query(models.Request.request_id,
0505                                       models.Request.scope,
0506                                       models.Request.name,
0507                                       models.Request.requester,
0508                                       models.Request.request_type,
0509                                       models.Request.username,
0510                                       models.Request.userdn,
0511                                       models.Request.transform_tag,
0512                                       models.Request.workload_id,
0513                                       models.Request.priority,
0514                                       models.Request.status,
0515                                       models.Request.substatus,
0516                                       models.Request.locking,
0517                                       models.Request.created_at,
0518                                       models.Request.updated_at,
0519                                       models.Request.next_poll_at,
0520                                       models.Request.accessed_at,
0521                                       models.Request.expired_at,
0522                                       models.Request.errors,
0523                                       models.Transform.transform_id,
0524                                       models.Transform.workload_id.label("transform_workload_id"),
0525                                       models.Transform.status.label("transform_status"),
0526                                       subquery.c.processing_id,
0527                                       subquery.c.workload_id.label("processing_workload_id"),
0528                                       subquery.c.processing_status,
0529                                       subquery.c.processing_created_at,
0530                                       subquery.c.processing_updated_at,
0531                                       subquery.c.processing_finished_at)
0532 
0533             if request_id:
0534                 query = query.filter(models.Request.request_id == request_id)
0535             if workload_id:
0536                 query = query.filter(models.Request.workload_id == workload_id)
0537 
0538             query = query.outerjoin(models.Transform, and_(models.Request.request_id == models.Transform.request_id))
0539             query = query.outerjoin(subquery, and_(subquery.c.transform_id == models.Transform.transform_id))
0540             query = query.order_by(asc(models.Request.request_id))
0541 
0542             tmp = query.all()
0543             rets = []
0544             if tmp:
0545                 for t in tmp:
0546                     # t2 = dict(t)
0547                     t2 = dict(zip(t.keys(), t))
0548 
0549                     if 'request_metadata' in t2 and t2['request_metadata']:
0550                         if 'workflow' in t2['request_metadata']:
0551                             workflow = t2['request_metadata']['workflow']
0552                             workflow_data = None
0553                             if 'processing_metadata' in t2 and t2['processing_metadata'] and 'workflow_data' in t2['processing_metadata']:
0554                                 workflow_data = t2['processing_metadata']['workflow_data']
0555                             if workflow is not None and workflow_data is not None:
0556                                 workflow.metadata = workflow_data
0557                                 t2['request_metadata']['workflow'] = workflow
0558                         if 'build_workflow' in t2['request_metadata']:
0559                             build_workflow = t2['request_metadata']['build_workflow']
0560                             build_workflow_data = None
0561                             if 'processing_metadata' in t2 and t2['processing_metadata'] and 'build_workflow_data' in t2['processing_metadata']:
0562                                 build_workflow_data = t2['processing_metadata']['build_workflow_data']
0563                             if build_workflow is not None and build_workflow_data is not None:
0564                                 build_workflow.metadata = build_workflow_data
0565                                 t2['request_metadata']['build_workflow'] = build_workflow
0566 
0567                     rets.append(t2)
0568             return rets
0569         elif with_detail or with_metadata:
0570             subquery1 = session.query(models.Collection.coll_id, models.Collection.transform_id,
0571                                       models.Collection.scope.label("input_coll_scope"),
0572                                       models.Collection.name.label("input_coll_name"),
0573                                       models.Collection.status.label("input_coll_status"),
0574                                       models.Collection.bytes.label("input_coll_bytes"),
0575                                       models.Collection.total_files.label("input_total_files"),
0576                                       models.Collection.processed_files.label("input_processed_files"),
0577                                       models.Collection.new_files.label("input_new_files"),
0578                                       models.Collection.failed_files.label("input_failed_files"),
0579                                       models.Collection.missing_files.label("input_missing_files"),
0580                                       models.Collection.processing_files.label("input_processing_files")).filter(models.Collection.relation_type == 0)
0581             subquery1 = subquery1.subquery()
0582 
0583             subquery2 = session.query(models.Collection.coll_id, models.Collection.transform_id,
0584                                       models.Collection.scope.label("output_coll_scope"),
0585                                       models.Collection.name.label("output_coll_name"),
0586                                       models.Collection.status.label("output_coll_status"),
0587                                       models.Collection.bytes.label("output_coll_bytes"),
0588                                       models.Collection.total_files.label("output_total_files"),
0589                                       models.Collection.processed_files.label("output_processed_files"),
0590                                       models.Collection.new_files.label("output_new_files"),
0591                                       models.Collection.failed_files.label("output_failed_files"),
0592                                       models.Collection.missing_files.label("output_missing_files"),
0593                                       models.Collection.processing_files.label("output_processing_files")).filter(models.Collection.relation_type == 1)
0594             subquery2 = subquery2.subquery()
0595 
0596             if with_metadata:
0597                 query = session.query(models.Request.request_id,
0598                                       models.Request.scope,
0599                                       models.Request.name,
0600                                       models.Request.requester,
0601                                       models.Request.request_type,
0602                                       models.Request.username,
0603                                       models.Request.userdn,
0604                                       models.Request.transform_tag,
0605                                       models.Request.workload_id,
0606                                       models.Request.priority,
0607                                       models.Request.status,
0608                                       models.Request.substatus,
0609                                       models.Request.locking,
0610                                       models.Request.created_at,
0611                                       models.Request.updated_at,
0612                                       models.Request.next_poll_at,
0613                                       models.Request.accessed_at,
0614                                       models.Request.expired_at,
0615                                       models.Request.errors,
0616                                       models.Request._request_metadata.label('request_metadata'),
0617                                       models.Request._processing_metadata.label('processing_metadata'),
0618                                       models.Transform.transform_id,
0619                                       models.Transform.transform_type,
0620                                       models.Transform.name.label("transform_name"),
0621                                       models.Transform.workload_id.label("transform_workload_id"),
0622                                       models.Transform.status.label("transform_status"),
0623                                       models.Transform.created_at.label("transform_created_at"),
0624                                       models.Transform.updated_at.label("transform_updated_at"),
0625                                       models.Transform.finished_at.label("transform_finished_at"),
0626                                       subquery1.c.input_coll_scope, subquery1.c.input_coll_name,
0627                                       subquery1.c.input_coll_status, subquery1.c.input_coll_bytes,
0628                                       subquery1.c.input_total_files,
0629                                       subquery1.c.input_processed_files,
0630                                       subquery1.c.input_processing_files,
0631                                       subquery1.c.input_new_files,
0632                                       subquery1.c.input_failed_files,
0633                                       subquery1.c.input_missing_files,
0634                                       subquery2.c.output_coll_scope, subquery2.c.output_coll_name,
0635                                       subquery2.c.output_coll_status, subquery2.c.output_coll_bytes,
0636                                       subquery2.c.output_total_files,
0637                                       subquery2.c.output_processed_files,
0638                                       subquery2.c.output_processing_files,
0639                                       subquery2.c.output_new_files,
0640                                       subquery2.c.output_failed_files,
0641                                       subquery2.c.output_missing_files,)
0642             else:
0643                 query = session.query(models.Request.request_id,
0644                                       models.Request.scope,
0645                                       models.Request.name,
0646                                       models.Request.requester,
0647                                       models.Request.request_type,
0648                                       models.Request.username,
0649                                       models.Request.userdn,
0650                                       models.Request.transform_tag,
0651                                       models.Request.workload_id,
0652                                       models.Request.priority,
0653                                       models.Request.status,
0654                                       models.Request.substatus,
0655                                       models.Request.locking,
0656                                       models.Request.created_at,
0657                                       models.Request.updated_at,
0658                                       models.Request.next_poll_at,
0659                                       models.Request.accessed_at,
0660                                       models.Request.expired_at,
0661                                       models.Request.errors,
0662                                       models.Transform.transform_id,
0663                                       models.Transform.transform_type,
0664                                       models.Transform.name.label("transform_name"),
0665                                       models.Transform.workload_id.label("transform_workload_id"),
0666                                       models.Transform.status.label("transform_status"),
0667                                       models.Transform.created_at.label("transform_created_at"),
0668                                       models.Transform.updated_at.label("transform_updated_at"),
0669                                       models.Transform.finished_at.label("transform_finished_at"),
0670                                       subquery1.c.input_coll_scope, subquery1.c.input_coll_name,
0671                                       subquery1.c.input_coll_status, subquery1.c.input_coll_bytes,
0672                                       subquery1.c.input_total_files,
0673                                       subquery1.c.input_processed_files,
0674                                       subquery1.c.input_processing_files,
0675                                       subquery1.c.input_new_files,
0676                                       subquery1.c.input_failed_files,
0677                                       subquery1.c.input_missing_files,
0678                                       subquery2.c.output_coll_scope, subquery2.c.output_coll_name,
0679                                       subquery2.c.output_coll_status, subquery2.c.output_coll_bytes,
0680                                       subquery2.c.output_total_files,
0681                                       subquery2.c.output_processed_files,
0682                                       subquery2.c.output_processing_files,
0683                                       subquery2.c.output_new_files,
0684                                       subquery2.c.output_failed_files,
0685                                       subquery2.c.output_missing_files,)
0686 
0687             if request_id:
0688                 query = query.filter(models.Request.request_id == request_id)
0689             if workload_id:
0690                 query = query.filter(models.Request.workload_id == workload_id)
0691 
0692             query = query.outerjoin(models.Transform, and_(models.Request.request_id == models.Transform.request_id))
0693             query = query.outerjoin(subquery1, and_(subquery1.c.transform_id == models.Transform.transform_id))
0694             query = query.outerjoin(subquery2, and_(subquery2.c.transform_id == models.Transform.transform_id))
0695             query = query.order_by(asc(models.Request.request_id))
0696 
0697             tmp = query.all()
0698             rets = []
0699             if tmp:
0700                 for t in tmp:
0701                     # t2 = dict(t)
0702                     t2 = dict(zip(t.keys(), t))
0703 
0704                     if 'request_metadata' in t2 and t2['request_metadata']:
0705                         if 'workflow' in t2['request_metadata']:
0706                             workflow = t2['request_metadata']['workflow']
0707                             workflow_data = None
0708                             if 'processing_metadata' in t2 and t2['processing_metadata'] and 'workflow_data' in t2['processing_metadata']:
0709                                 workflow_data = t2['processing_metadata']['workflow_data']
0710                             if workflow is not None and workflow_data is not None:
0711                                 workflow.metadata = workflow_data
0712                                 t2['request_metadata']['workflow'] = workflow
0713                         if 'build_workflow' in t2['request_metadata']:
0714                             build_workflow = t2['request_metadata']['build_workflow']
0715                             build_workflow_data = None
0716                             if 'processing_metadata' in t2 and t2['processing_metadata'] and 'build_workflow_data' in t2['processing_metadata']:
0717                                 build_workflow_data = t2['processing_metadata']['build_workflow_data']
0718                             if build_workflow is not None and build_workflow_data is not None:
0719                                 build_workflow.metadata = build_workflow_data
0720                                 t2['request_metadata']['build_workflow'] = build_workflow
0721 
0722                     rets.append(t2)
0723             return rets
0724     except sqlalchemy.orm.exc.NoResultFound as error:
0725         raise exceptions.NoObject('request workload_id: %s cannot be found: %s' % (workload_id, error))
0726 
0727 
0728 @read_session
0729 def get_requests_only(request_id=None, workload_id=None, with_metadata=False, to_json=False, session=None):
0730     """
0731     Get requests or raise a NoObject exception.
0732 
0733     :param request_id: The request id.
0734     :param workload_id: The workload id of the request.
0735     :param with_metadata: To show metadata.
0736     :param to_json: return json format.
0737 
0738     :param session: The database session in use.
0739 
0740     :raises NoObject: If no request is founded.
0741 
0742     :returns: Requests.
0743     """
0744     if with_metadata:
0745         query = select(models.Request)
0746         if request_id:
0747             query = query.where(models.Request.request_id == request_id)
0748         if workload_id:
0749             query = query.where(models.Request.workload_id == workload_id)
0750 
0751         tmp = session.execute(query).fetchall()
0752         rets = []
0753         if tmp:
0754             for t in tmp:
0755                 t_dict = t[0].to_dict()
0756                 rets.append(t_dict)
0757         return rets
0758     else:
0759         exception_columns = ['request_metadata', 'processing_metadata', '_request_metadata', '_processing_metadata']
0760         columns = [column for column in models.Request.__mapper__.columns if column.name not in exception_columns]
0761         column_names = [column.name for column in columns]
0762         query = select(*columns)
0763 
0764         if request_id:
0765             query = query.where(models.Request.request_id == request_id)
0766         if workload_id:
0767             query = query.where(models.Request.workload_id == workload_id)
0768 
0769         tmp = session.execute(query).fetchall()
0770         rets = []
0771         if tmp:
0772             for t in tmp:
0773                 t_dict = dict(zip(column_names, t))
0774                 rets.append(t_dict)
0775     return rets
0776 
0777 
0778 def get_query_collection(request_id=None, workload_id=None):
0779     """
0780     Get input collection query and output collection query.
0781     """
0782     input_query = select(models.Collection.coll_id.label("input_coll_id"),
0783                          models.Collection.transform_id.label("input_coll_transform_id"),
0784                          models.Collection.scope.label("input_coll_scope"),
0785                          models.Collection.name.label("input_coll_name"),
0786                          models.Collection.status.label("input_coll_status"),
0787                          models.Collection.bytes.label("input_coll_bytes"),
0788                          models.Collection.total_files.label("input_total_files"),
0789                          models.Collection.processed_files.label("input_processed_files"),
0790                          models.Collection.new_files.label("input_new_files"),
0791                          models.Collection.preprocessing_files.label("input_preprocessing_files"),
0792                          models.Collection.activated_files.label("input_activated_files"),
0793                          models.Collection.failed_files.label("input_failed_files"),
0794                          models.Collection.missing_files.label("input_missing_files"),
0795                          models.Collection.processing_files.label("input_processing_files"))\
0796         .where(models.Collection.relation_type == 0)
0797 
0798     output_query = select(models.Collection.coll_id.label("output_coll_id"),
0799                           models.Collection.transform_id.label("output_coll_transform_id"),
0800                           models.Collection.scope.label("output_coll_scope"),
0801                           models.Collection.name.label("output_coll_name"),
0802                           models.Collection.status.label("output_coll_status"),
0803                           models.Collection.bytes.label("output_coll_bytes"),
0804                           models.Collection.total_files.label("output_total_files"),
0805                           models.Collection.processed_files.label("output_processed_files"),
0806                           models.Collection.new_files.label("output_new_files"),
0807                           models.Collection.preprocessing_files.label("output_preprocessing_files"),
0808                           models.Collection.activated_files.label("output_activated_files"),
0809                           models.Collection.failed_files.label("output_failed_files"),
0810                           models.Collection.missing_files.label("output_missing_files"),
0811                           models.Collection.processing_files.label("output_processing_files"))\
0812         .where(models.Collection.relation_type == 1)
0813 
0814     if request_id:
0815         input_query = input_query.where(models.Request.request_id == request_id)
0816         output_query = output_query.where(models.Request.request_id == request_id)
0817     if workload_id:
0818         input_query = input_query.where(models.Request.workload_id == workload_id)
0819         output_query = output_query.where(models.Request.workload_id == workload_id)
0820     return input_query, output_query
0821 
0822 
0823 def get_query_transform(request_id=None, workload_id=None):
0824     """
0825     Get transform query.
0826     """
0827     columns = [models.Transform.request_id,
0828                models.Transform.transform_id,
0829                models.Transform.transform_type,
0830                models.Transform.name.label("transform_name"),
0831                models.Transform.workload_id.label("transform_workload_id"),
0832                models.Transform.status.label("transform_status"),
0833                models.Transform.created_at.label("transform_created_at"),
0834                models.Transform.updated_at.label("transform_updated_at"),
0835                models.Transform.finished_at.label("transform_finished_at")]
0836     query = select(*columns)
0837     if request_id:
0838         query = query.where(models.Request.request_id == request_id)
0839     if workload_id:
0840         query = query.where(models.Request.workload_id == workload_id)
0841 
0842     return query
0843 
0844 
0845 def get_query_processing(request_id=None, workload_id=None):
0846     """
0847     Get processing query.
0848     """
0849     query = select(models.Processing.processing_id,
0850                    models.Processing.transform_id.label("processing_transform_id"),
0851                    models.Processing.workload_id.label("processing_workload_id"),
0852                    models.Processing.status.label("processing_status"),
0853                    models.Processing.created_at.label("processing_created_at"),
0854                    models.Processing.updated_at.label("processing_updated_at"),
0855                    models.Processing.finished_at.label("processing_finished_at"))
0856     if request_id:
0857         query = query.where(models.Request.request_id == request_id)
0858     if workload_id:
0859         query = query.where(models.Request.workload_id == workload_id)
0860 
0861     return query
0862 
0863 
0864 def get_request_dict(column_names, column):
0865     """
0866     Parse request column data to dictionary
0867     """
0868     t_dict = dict(zip(column_names, column))
0869 
0870     if 'request_metadata' in t_dict and t_dict['request_metadata']:
0871         if 'workflow' in t_dict['request_metadata']:
0872             workflow = t_dict['request_metadata']['workflow']
0873             workflow_data = None
0874             if 'processing_metadata' in t_dict and t_dict['processing_metadata'] and 'workflow_data' in t_dict['processing_metadata']:
0875                 workflow_data = t_dict['processing_metadata']['workflow_data']
0876             if workflow is not None and workflow_data is not None:
0877                 workflow.metadata = workflow_data
0878                 t_dict['request_metadata']['workflow'] = workflow
0879         if 'build_workflow' in t_dict['request_metadata']:
0880             build_workflow = t_dict['request_metadata']['build_workflow']
0881             build_workflow_data = None
0882             if 'processing_metadata' in t_dict and t_dict['processing_metadata'] and 'build_workflow_data' in t_dict['processing_metadata']:
0883                 build_workflow_data = t_dict['processing_metadata']['build_workflow_data']
0884             if build_workflow is not None and build_workflow_data is not None:
0885                 build_workflow.metadata = build_workflow_data
0886                 t_dict['request_metadata']['build_workflow'] = build_workflow
0887     return t_dict
0888 
0889 
0890 @read_session
0891 def get_requests_with_transform(request_id=None, workload_id=None, with_metadata=False, show_processing=False,
0892                                 show_collection=False, to_json=False, session=None):
0893     """
0894     Get requests or raise a NoObject exception.
0895 
0896     :param request_id: The request id.
0897     :param workload_id: The workload id of the request.
0898     :param with_metadata: To show metadata.
0899     :param to_json: return json format.
0900 
0901     :param session: The database session in use.
0902 
0903     :raises NoObject: If no request is founded.
0904 
0905     :returns: Requests.
0906     """
0907     exception_columns = ['request_metadata', 'processing_metadata', '_request_metadata', '_processing_metadata']
0908     columns = [column for column in models.Request.__mapper__.columns if column.name not in exception_columns]
0909     if with_metadata:
0910         meta_columns = [models.Request._request_metadata.label('request_metadata'),
0911                         models.Request._processing_metadata.label('processing_metadata')]
0912         columns = columns + meta_columns
0913 
0914     transform_query = get_query_transform(request_id=request_id, workload_id=workload_id)
0915     transform_subquery = transform_query.subquery()
0916     transform_subquery_columns = [column for column in transform_subquery.c]
0917     columns = columns + transform_subquery_columns
0918 
0919     if show_processing:
0920         processing_query = get_query_processing(request_id=request_id, workload_id=workload_id)
0921         processing_subquery = processing_query.subquery()
0922         processing_subquery_columns = [column for column in processing_subquery.c]
0923         columns = columns + processing_subquery_columns
0924 
0925     if show_collection:
0926         input_query, output_query = get_query_collection(request_id=request_id, workload_id=workload_id)
0927         input_subquery = input_query.subquery()
0928         output_subquery = output_query.subquery()
0929         input_subquery_columns = [column for column in input_subquery.c]
0930         output_subquery_columns = [column for column in output_subquery.c]
0931         columns = columns + input_subquery_columns + output_subquery_columns
0932 
0933     column_names = [column.name for column in columns]
0934 
0935     query = select(*columns)
0936     if request_id:
0937         query = query.where(models.Request.request_id == request_id)
0938     if workload_id:
0939         query = query.where(models.Request.workload_id == workload_id)
0940 
0941     query = query.outerjoin(transform_subquery, and_(models.Request.request_id == transform_subquery.c.request_id))
0942     if show_processing:
0943         query = query.outerjoin(processing_subquery, and_(processing_subquery.c.processing_transform_id == transform_subquery.c.transform_id))
0944     if show_collection:
0945         query = query.outerjoin(input_subquery, and_(input_subquery.c.input_coll_transform_id == transform_subquery.c.transform_id))
0946         query = query.outerjoin(output_subquery, and_(output_subquery.c.output_coll_transform_id == transform_subquery.c.transform_id))
0947     query = query.order_by(asc(models.Request.request_id))
0948 
0949     tmp = session.execute(query).fetchall()
0950     rets = []
0951     if tmp:
0952         for t in tmp:
0953             t_dict = get_request_dict(column_names, t)
0954             rets.append(t_dict)
0955     return rets
0956 
0957 
0958 @read_session
0959 def get_requests(request_id=None, workload_id=None, with_detail=False, with_metadata=False,
0960                  with_request=False, with_transform=False, with_processing=False, to_json=False, session=None):
0961     """
0962     Get requests or raise a NoObject exception.
0963 
0964     :param request_id: The request id.
0965     :param workload_id: The workload id of the request.
0966     :param with_detail: To show the details with collections information.
0967     :param with_metadata: To show metadata.
0968     :param with_request: Only show requests.
0969     :param with_transform: Show transforms as addition.
0970     :param with_processing: Show transforms and processings as addition.
0971     :param to_json: return json format.
0972 
0973     :param session: The database session in use.
0974 
0975     :raises NoObject: If no request is founded.
0976 
0977     :returns: Requests.
0978     """
0979     try:
0980         show_transform, show_processing, show_collection = False, False, False
0981         if with_transform:
0982             show_transform = True
0983         if with_processing:
0984             show_transform, show_processing = True, True
0985         if with_detail:
0986             show_transform, show_processing, show_collection = True, True, True
0987 
0988         if show_transform:
0989             return get_requests_with_transform(request_id=request_id, workload_id=workload_id,
0990                                                with_metadata=with_metadata, to_json=to_json,
0991                                                show_processing=show_processing,
0992                                                show_collection=show_collection,
0993                                                session=session)
0994         else:
0995             return get_requests_only(request_id=request_id, workload_id=workload_id,
0996                                      with_metadata=with_metadata, to_json=to_json,
0997                                      session=session)
0998     except Exception as error:
0999         raise exceptions.NoObject(f'request(request_id: {request_id}) cannot be found: {error}')
1000 
1001 
1002 @transactional_session
1003 def extend_requests(request_id=None, workload_id=None, lifetime=30, session=None):
1004     """
1005     extend an request's lifetime.
1006 
1007     :param request_id: The id of the request.
1008     :param workload_id: The workload_id of the request.
1009     :param lifetime: The life time as umber of days.
1010     :param session: The database session in use.
1011 
1012     :raises NoObject: If no request is founded.
1013     :raises DatabaseException: If there is a database error.
1014     """
1015     try:
1016         query = session.query(models.Request)
1017         if request_id:
1018             query = query.filter_by(request_id=request_id)
1019         else:
1020             query = query.filter_by(workload_id=workload_id)
1021 
1022         update_items = {'expired_at': datetime.datetime.utcnow() + datetime.timedelta(days=lifetime)}
1023         query.update(update_items, synchronize_session=False)
1024     except sqlalchemy.orm.exc.NoResultFound as error:
1025         raise exceptions.NoObject('Request (workload_id: %s) cannot be found: %s' % (workload_id, error))
1026 
1027 
1028 @transactional_session
1029 def cancel_requests(request_id=None, workload_id=None, session=None):
1030     """
1031     cancel an request.
1032 
1033     :param request_id: The id of the request.
1034     :param workload_id: The workload_id of the request.
1035     :param session: The database session in use.
1036 
1037     :raises NoObject: If no request is founded.
1038     :raises DatabaseException: If there is a database error.
1039     """
1040     try:
1041         query = session.query(models.Request)
1042         if request_id:
1043             query = query.filter_by(request_id=request_id)
1044         else:
1045             query = query.filter_by(workload_id=workload_id)
1046 
1047         update_items = {'status': RequestStatus.Cancel}
1048         query.update(update_items, synchronize_session=False)
1049     except sqlalchemy.orm.exc.NoResultFound as error:
1050         raise exceptions.NoObject('Request %s cannot be found: %s' % (request_id, error))
1051 
1052 
1053 @read_session
1054 def get_requests_by_requester(scope, name, requester, to_json=False, session=None):
1055     """
1056     Get requests.
1057 
1058     :param scope: The scope of the request data.
1059     :param name: The name of the request data.
1060     :param requestr: The requester, such as panda, user and so on.
1061     :param to_json: return json format.
1062 
1063     :raises NoObject: If no request is founded.
1064 
1065     :returns: list of Request.
1066     """
1067 
1068     try:
1069         query = session.query(models.Request)\
1070                        .filter(models.Request.requester == requester)\
1071                        .filter(models.Request.scope == scope)\
1072                        .filter(models.Request.name.like(name.replace('*', '%')))
1073         tmp = query.all()
1074         rets = []
1075         if tmp:
1076             for req in tmp:
1077                 if to_json:
1078                     rets.append(req.to_dict_json())
1079                 else:
1080                     rets.append(req.to_dict())
1081         return rets
1082     except sqlalchemy.orm.exc.NoResultFound as error:
1083         raise exceptions.NoObject('No requests with scope:name(%s:%s) and requester(%s) %s' % (scope, name, requester, error))
1084 
1085 
1086 @transactional_session
1087 def get_requests_by_status_type(status, request_type=None, time_period=None, request_ids=[], locking=False,
1088                                 locking_for_update=False, bulk_size=None, to_json=False, by_substatus=False,
1089                                 min_request_id=None, new_poll=False, update_poll=False, only_return_id=False,
1090                                 not_lock=False, session=None):
1091     """
1092     Get requests.
1093 
1094     :param status: list of status of the request data.
1095     :param request_type: The type of the request data.
1096     :param locking: Wheter to lock requests to avoid others get the same request.
1097     :param bulk_size: Size limitation per retrieve.
1098     :param to_json: return json format.
1099 
1100     :raises NoObject: If no request are founded.
1101 
1102     :returns: list of Request.
1103     """
1104 
1105     try:
1106         if status:
1107             if not isinstance(status, (list, tuple)):
1108                 status = [status]
1109             if len(status) == 1:
1110                 status = [status[0], status[0]]
1111 
1112         if only_return_id:
1113             query = session.query(models.Request.request_id)
1114         else:
1115             query = session.query(models.Request)
1116 
1117         if status:
1118             if by_substatus:
1119                 query = query.filter(models.Request.substatus.in_(status))
1120             else:
1121                 query = query.filter(models.Request.status.in_(status))
1122         if new_poll:
1123             query = query.filter(models.Request.updated_at + models.Request.new_poll_period <= datetime.datetime.utcnow())
1124         if update_poll:
1125             query = query.filter(models.Request.updated_at + models.Request.update_poll_period <= datetime.datetime.utcnow())
1126 
1127         if request_type is not None:
1128             query = query.filter(models.Request.request_type == request_type)
1129         if request_ids:
1130             query = query.filter(models.Request.request_id.in_(request_ids))
1131         else:
1132             if min_request_id is not None:
1133                 query = query.filter(models.Request.request_id >= min_request_id)
1134         if locking:
1135             query = query.filter(models.Request.locking == RequestLocking.Idle)
1136 
1137         if locking_for_update:
1138             query = query.with_for_update(skip_locked=True)
1139         else:
1140             # query = query.order_by(asc(models.Request.updated_at))\
1141             #              .order_by(desc(models.Request.priority))
1142             # query = query.order_by(desc(models.Request.priority))\
1143             #              .order_by(asc(models.Request.updated_at))
1144             query = query.order_by(asc(models.Request.updated_at))
1145 
1146         if bulk_size:
1147             query = query.limit(bulk_size)
1148 
1149         tmp = query.all()
1150         rets = []
1151         if tmp:
1152             for req in tmp:
1153                 if locking:
1154                     req.updated_at = datetime.datetime.utcnow()
1155                     req.locking = RequestLocking.Locking
1156 
1157                     hostname, pid, thread_id, thread_name = get_process_thread_info()
1158                     req.locking_hostname = hostname
1159                     req.locking_pid = pid
1160                     req.locking_thread_id = thread_id
1161                     req.locking_thread_name = thread_name
1162 
1163                 if only_return_id:
1164                     rets.append(req[0])
1165                 else:
1166                     if to_json:
1167                         rets.append(req.to_dict_json())
1168                     else:
1169                         rets.append(req.to_dict())
1170         return rets
1171     except sqlalchemy.orm.exc.NoResultFound as error:
1172         raise exceptions.NoObject('No requests with status: %s, request_type: %s, time_period: %s, locking: %s, %s' % (status, request_type, time_period, locking, error))
1173 
1174 
1175 @transactional_session
1176 def update_request(request_id, parameters, update_request_metadata=False, locking=False, origin_status=None, session=None):
1177     """
1178     update an request.
1179 
1180     :param request_id: the request id.
1181     :param parameters: A dictionary of parameters.
1182     :param session: The database session in use.
1183 
1184     :raises NoObject: If no request is founded.
1185     :raises DatabaseException: If there is a database error.
1186 
1187     """
1188     try:
1189         parameters['updated_at'] = datetime.datetime.utcnow()
1190 
1191         if 'new_poll_period' in parameters and type(parameters['new_poll_period']) not in [datetime.timedelta]:
1192             parameters['new_poll_period'] = datetime.timedelta(seconds=parameters['new_poll_period'])
1193         if 'update_poll_period' in parameters and type(parameters['update_poll_period']) not in [datetime.timedelta]:
1194             parameters['update_poll_period'] = datetime.timedelta(seconds=parameters['update_poll_period'])
1195 
1196         if 'request_metadata' in parameters:
1197             if 'workflow' in parameters['request_metadata']:
1198                 workflow = parameters['request_metadata']['workflow']
1199 
1200                 if workflow is not None:
1201                     if hasattr(workflow, 'refresh_works'):
1202                         workflow.refresh_works()
1203                     if 'processing_metadata' not in parameters or not parameters['processing_metadata']:
1204                         parameters['processing_metadata'] = {}
1205                     parameters['processing_metadata']['workflow_data'] = workflow.metadata
1206             if 'build_workflow' in parameters['request_metadata']:
1207                 build_workflow = parameters['request_metadata']['build_workflow']
1208 
1209                 if build_workflow is not None:
1210                     build_workflow.refresh_works()
1211                     if 'processing_metadata' not in parameters or not parameters['processing_metadata']:
1212                         parameters['processing_metadata'] = {}
1213                     parameters['processing_metadata']['build_workflow_data'] = build_workflow.metadata
1214 
1215         if 'request_metadata' in parameters:
1216             if not update_request_metadata:
1217                 del parameters['request_metadata']
1218             else:
1219                 parameters['_request_metadata'] = parameters['request_metadata']
1220                 del parameters['request_metadata']
1221         if 'processing_metadata' in parameters:
1222             parameters['_processing_metadata'] = parameters['processing_metadata']
1223             del parameters['processing_metadata']
1224 
1225         query = session.query(models.Request).filter_by(request_id=request_id)
1226 
1227         if locking:
1228             query = query.filter(models.Request.locking == RequestLocking.Idle)
1229             query = query.with_for_update(skip_locked=True)
1230             num_rows = query.update(parameters, synchronize_session=False)
1231             return num_rows
1232         else:
1233             build_status = [RequestStatus.Built, RequestStatus.Built]
1234             if origin_status and origin_status not in build_status:
1235                 query_no_built = query.filter(not_(models.Request.status.in_(build_status)))
1236 
1237                 num_rows = query_no_built.update(parameters, synchronize_session=False)
1238                 if num_rows > 0:
1239                     return num_rows
1240                 else:
1241                     if 'status' in parameters:
1242                         parameters['status'] = origin_status
1243                     num_rows = query.update(parameters, synchronize_session=False)
1244                 return num_rows
1245             else:
1246                 num_rows = query.update(parameters, synchronize_session=False)
1247                 return num_rows
1248     except sqlalchemy.orm.exc.NoResultFound as error:
1249         raise exceptions.NoObject('Request %s cannot be found: %s' % (request_id, error))
1250     return 0
1251 
1252 
1253 @transactional_session
1254 def delete_requests(request_id=None, workload_id=None, session=None):
1255     """
1256     delete an request.
1257 
1258     :param request_id: The id of the request.
1259     :param workload_id: The workload_id of the request.
1260     :param session: The database session in use.
1261 
1262     :raises NoObject: If no request is founded.
1263     :raises DatabaseException: If there is a database error.
1264     """
1265     try:
1266         if request_id:
1267             session.query(models.Request).filter_by(request_id=request_id).delete()
1268         else:
1269             session.query(models.Request).filter_by(workload_id=workload_id).delete()
1270     except sqlalchemy.orm.exc.NoResultFound as error:
1271         raise exceptions.NoObject('Request (request_id: %s, workload_id: %s) cannot be found: %s' % (request_id, workload_id, error))
1272 
1273 
1274 @transactional_session
1275 def clean_locking(time_period=3600, min_request_id=None, health_items=[], force=False, hostname=None, pid=None, session=None):
1276     """
1277     Clearn locking which is older than time period.
1278 
1279     :param time_period in seconds
1280     """
1281     health_dict = {}
1282     for item in health_items:
1283         hostname = item['hostname']
1284         pid = item['pid']
1285         thread_id = item['thread_id']
1286         if hostname not in health_dict:
1287             health_dict[hostname] = {}
1288         if pid not in health_dict[hostname]:
1289             health_dict[hostname][pid] = []
1290         if thread_id not in health_dict[hostname][pid]:
1291             health_dict[hostname][pid].append(thread_id)
1292     query = session.query(models.Request.request_id,
1293                           models.Request.locking_hostname,
1294                           models.Request.locking_pid,
1295                           models.Request.locking_thread_id,
1296                           models.Request.locking_thread_name,
1297                           models.Request.updated_at)
1298     query = query.filter(models.Request.locking == RequestLocking.Locking)
1299     if min_request_id:
1300         query = query.filter(models.Request.request_id >= min_request_id)
1301 
1302     lost_request_ids = []
1303     tmp = query.all()
1304     if tmp:
1305         for req in tmp:
1306             req_id, locking_hostname, locking_pid, locking_thread_id, locking_thread_name, updated_at = req
1307             if (
1308                 (locking_hostname not in health_dict or locking_pid not in health_dict[locking_hostname])
1309                 or (force and hostname == locking_hostname and pid == locking_pid)      # noqa W503,
1310                 or (updated_at < datetime.datetime.utcnow() - datetime.timedelta(seconds=time_period))    # noqa W503
1311             ):
1312                 lost_request_ids.append({"request_id": req_id, 'locking': 0})
1313 
1314     # session.bulk_update_mappings(models.Request, lost_request_ids)
1315     safe_bulk_update_mappings(session, models.Request, lost_request_ids)
1316 
1317 
1318 @transactional_session
1319 def clean_next_poll_at(status, session=None):
1320     """
1321     Clearn next_poll_at.
1322 
1323     :param status: status of the request
1324     """
1325     if not isinstance(status, (list, tuple)):
1326         status = [status]
1327     if len(status) == 1:
1328         status = [status[0], status[0]]
1329 
1330     params = {'next_poll_at': datetime.datetime.utcnow()}
1331     session.query(models.Request).filter(models.Request.status.in_(status))\
1332            .update(params, synchronize_session=False)
1333 
1334 
1335 @read_session
1336 def get_last_request_id(status, older_than=None, session=None):
1337     """
1338     Get last request id which is older than a timestamp.
1339 
1340     :param status: status of the request.
1341     :param older_than: days older than current timestamp.
1342 
1343     :returns request_id
1344     """
1345     if not isinstance(status, (list, tuple)):
1346         status = [status]
1347     if len(status) == 1:
1348         status = [status[0], status[0]]
1349 
1350     query = session.query(models.Request.request_id)
1351     if status:
1352         query = query.filter(models.Request.status.in_(status))
1353     query = query.filter(models.Request.updated_at <= datetime.datetime.utcnow() - datetime.timedelta(days=older_than))
1354     query = query.order_by(desc(models.Request.request_id))
1355     ret = query.first()
1356     if ret:
1357         return ret[0]
1358     return ret
1359 
1360 
1361 @read_session
1362 def get_num_active_requests(active_status=None, session=None):
1363     if active_status and not isinstance(active_status, (list, tuple)):
1364         active_status = [active_status]
1365     if active_status and len(active_status) == 1:
1366         active_status = [active_status[0], active_status[0]]
1367 
1368     try:
1369         query = session.query(models.Request.status, models.Request.site, func.count(models.Request.request_id))
1370         if active_status:
1371             query = query.filter(models.Request.status.in_(active_status))
1372         query = query.group_by(models.Request.status, models.Request.site)
1373         tmp = query.all()
1374         return tmp
1375     except Exception as error:
1376         raise error
1377 
1378 
1379 @read_session
1380 def get_active_requests(active_status=None, session=None):
1381     if active_status and not isinstance(active_status, (list, tuple)):
1382         active_status = [active_status]
1383     if active_status and len(active_status) == 1:
1384         active_status = [active_status[0], active_status[0]]
1385 
1386     try:
1387         query = session.query(models.Request.request_id, models.Request.status, models.Request.site)
1388         if active_status:
1389             query = query.filter(models.Request.status.in_(active_status))
1390         tmp = query.all()
1391         return tmp
1392     except Exception as error:
1393         raise error
1394 
1395 
1396 @read_session
1397 def get_min_request_id(difference=1000, session=None):
1398     try:
1399         seq = models.get_request_sequence()
1400         row = session.query(seq.next_value()).one()
1401         if row:
1402             max_request_id = row[0]
1403             return max_request_id - difference
1404         else:
1405             return 0
1406     except Exception:
1407         try:
1408             query = session.query(func.max(models.Request.request_id))
1409             row = query.one()
1410             if row:
1411                 max_request_id = row[0]
1412                 return max_request_id - difference
1413             else:
1414                 return 0
1415         except Exception as error:
1416             raise error