File indexing completed on 2026-04-09 07:58:20
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 operations related to Requests.
0014 """
0015
0016 import datetime
0017 import random
0018
0019 import sqlalchemy
0020 from sqlalchemy import and_, func, select, not_
0021 from sqlalchemy.exc import DatabaseError, IntegrityError
0022 from sqlalchemy.sql.expression import asc, desc
0023
0024 from idds.common import exceptions
0025 from idds.common.constants import RequestType, RequestStatus, RequestLocking, CommandType
0026 from idds.common.utils import get_process_thread_info
0027 from idds.orm.base.session import read_session, transactional_session, safe_bulk_update_mappings
0028 from idds.orm.base import models
0029
0030
0031 def create_request(scope=None, name=None, requester=None, request_type=None,
0032 username=None, userdn=None, transform_tag=None,
0033 status=RequestStatus.New, locking=RequestLocking.Idle, priority=0,
0034 lifetime=None, workload_id=None, request_metadata=None,
0035 new_poll_period=1, update_poll_period=10, site=None,
0036 cloud=None, queue=None, command=CommandType.NoneCommand,
0037 new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0,
0038 group_id=None, campaign=None, campaign_scope=None, campaign_group=None,
0039 campaign_tag=None, additional_data_storage=None,
0040 processing_metadata=None):
0041 """
0042 Create a request.
0043
0044 :param scope: The scope of the request data.
0045 :param name: The name of the request data.
0046 :param requestr: The requester, such as panda, user and so on.
0047 :param request_type: The type of the request, such as ESS, DAOD.
0048 :param transform_tag: Transform tag, such as ATLAS AMI tag.
0049 :param status: The request status as integer.
0050 :param locking: The request locking as integer.
0051 :param priority: The priority as integer.
0052 :param lifetime: The life time as umber of days.
0053 :param workload_id: The external workload id.
0054 :param request_metadata: The metadata as json.
0055 :param processing_metadata: The metadata as json.
0056
0057 :returns: request.
0058 """
0059
0060 is_pseudo_input = None
0061 if request_type in [RequestType.HyperParameterOpt, RequestType.HyperParameterOpt.value]:
0062 if not scope:
0063 scope = 'hpo'
0064 is_pseudo_input = True
0065 if not name:
0066 if workload_id is not None:
0067 name = 'hpo.%s.' % workload_id
0068 else:
0069 name = 'hpo.'
0070 name = name + datetime.datetime.utcnow().strftime("%Y_%m_%d_%H_%M_%S_%f") + str(random.randint(1, 1000))
0071 is_pseudo_input = True
0072
0073 if is_pseudo_input:
0074 if not request_metadata:
0075 request_metadata = {}
0076 request_metadata['is_pseudo_input'] = True
0077
0078 if lifetime:
0079 expired_at = datetime.datetime.utcnow() + datetime.timedelta(days=lifetime)
0080 else:
0081 expired_at = None
0082
0083 new_request = models.Request(scope=scope, name=name, requester=requester, request_type=request_type,
0084 username=username, userdn=userdn,
0085 transform_tag=transform_tag, status=status, locking=locking,
0086 priority=priority, workload_id=workload_id,
0087 expired_at=expired_at, site=site, additional_data_storage=additional_data_storage,
0088 cloud=cloud, queue=queue, command=command,
0089 new_retries=new_retries, update_retries=update_retries,
0090 max_new_retries=max_new_retries, max_update_retries=max_update_retries,
0091 group_id=group_id, campaign=campaign, campaign_scope=campaign_scope,
0092 campaign_group=campaign_group, campaign_tag=campaign_tag,
0093 request_metadata=request_metadata, processing_metadata=processing_metadata)
0094 if new_poll_period:
0095 new_poll_period = datetime.timedelta(seconds=new_poll_period)
0096 new_request.new_poll_period = new_poll_period
0097 if update_poll_period:
0098 update_poll_period = datetime.timedelta(seconds=update_poll_period)
0099 new_request.update_poll_period = update_poll_period
0100 return new_request
0101
0102
0103 @transactional_session
0104 def add_request(scope=None, name=None, requester=None, request_type=None,
0105 username=None, userdn=None, transform_tag=None,
0106 status=RequestStatus.New, locking=RequestLocking.Idle, priority=0,
0107 lifetime=None, workload_id=None, request_metadata=None,
0108 new_poll_period=1, update_poll_period=10, site=None,
0109 cloud=None, queue=None, command=CommandType.NoneCommand,
0110 new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0,
0111 group_id=None, campaign=None, campaign_scope=None, campaign_group=None,
0112 campaign_tag=None, additional_data_storage=None,
0113 processing_metadata=None, session=None):
0114 """
0115 Add a request.
0116
0117 :param scope: The scope of the request data.
0118 :param name: The name of the request data.
0119 :param requestr: The requester, such as panda, user and so on.
0120 :param request_type: The type of the request, such as ESS, DAOD.
0121 :param transform_tag: Transform tag, such as ATLAS AMI tag.
0122 :param status: The request status as integer.
0123 :param locking: The request locking as integer.
0124 :param priority: The priority as integer.
0125 :param lifetime: The life time as umber of days.
0126 :param workload_id: The external workload id.
0127 :param request_metadata: The metadata as json.
0128 :param processing_metadata: The metadata as json.
0129
0130 :raises DuplicatedObject: If an request with the same name exists.
0131 :raises DatabaseException: If there is a database error.
0132
0133 :returns: request id.
0134 """
0135
0136 try:
0137 new_request = create_request(scope=scope, name=name, requester=requester, request_type=request_type,
0138 username=username, userdn=userdn,
0139 transform_tag=transform_tag, status=status, locking=locking,
0140 priority=priority, workload_id=workload_id, lifetime=lifetime,
0141 new_poll_period=new_poll_period, site=site,
0142 cloud=cloud, queue=queue,
0143 command=command,
0144 update_poll_period=update_poll_period,
0145 additional_data_storage=additional_data_storage,
0146 new_retries=new_retries, update_retries=update_retries,
0147 max_new_retries=max_new_retries, max_update_retries=max_update_retries,
0148 group_id=group_id, campaign=campaign, campaign_scope=campaign_scope,
0149 campaign_group=campaign_group, campaign_tag=campaign_tag,
0150 request_metadata=request_metadata, processing_metadata=processing_metadata)
0151 new_request.save(session=session)
0152 request_id = new_request.request_id
0153 return request_id
0154 except IntegrityError as error:
0155 raise exceptions.DuplicatedObject('Request %s:%s already exists!: %s' % (scope, name, error))
0156 except DatabaseError as error:
0157 raise exceptions.DatabaseException(error)
0158
0159
0160 @read_session
0161 def get_request_ids_by_workload_id(workload_id, session=None):
0162 """
0163 Get request id or raise a NoObject exception.
0164
0165 :param workload_id: The workload_id of the request.
0166 :param session: The database session in use.
0167
0168 :raises NoObject: If no request is founded.
0169
0170 :returns: Request id.
0171 """
0172
0173 if workload_id is None:
0174 return exceptions.WrongParameterException("workload_id should not be None")
0175
0176 try:
0177 query = session.query(models.Request.request_id)\
0178 .filter(models.Request.workload_id == workload_id)
0179 tmp = query.all()
0180 ret_ids = []
0181 if tmp:
0182 for req in tmp:
0183 ret_ids.append(req[0])
0184 return ret_ids
0185 except sqlalchemy.orm.exc.NoResultFound as error:
0186 raise exceptions.NoObject('request with workload_id:%s cannot be found: %s' % (workload_id, error))
0187
0188
0189 @read_session
0190 def get_request_ids_by_name(name, scope=None, exact_match=False, session=None):
0191 """
0192 Get request ids or raise a NoObject exception.
0193
0194 :param name: name of the request.
0195 :param session: The database session in use.
0196
0197 :raises NoObject: If no request is founded.
0198
0199 :returns: Request {name:id} dict.
0200 """
0201 try:
0202 if not exact_match:
0203 query = session.query(models.Request.request_id, models.Request.name)\
0204 .filter(models.Request.name.like(name.replace('*', '%')))
0205 else:
0206 query = session.query(models.Request.request_id, models.Request.name)\
0207 .filter(models.Request.name == name)
0208 if scope:
0209 query = query.filter(models.Request.scope == scope)
0210
0211 tmp = query.all()
0212 ret_ids = {}
0213 if tmp:
0214 for req in tmp:
0215 ret_ids[req[1]] = req[0]
0216 return ret_ids
0217 except sqlalchemy.orm.exc.NoResultFound as error:
0218 raise exceptions.NoObject('request with name:%s cannot be found: %s' % (name, error))
0219
0220
0221 @read_session
0222 def get_request_ids(request_id=None, workload_id=None, session=None):
0223 """
0224 Get request id or raise a NoObject exception.
0225
0226 :param request_id: the request id.
0227 :param workload_id: The workload_id of the request.
0228 :param session: The database session in use.
0229
0230 :raises NoObject: If no request is founded.
0231
0232 :returns: Request id.
0233 """
0234 if request_id:
0235 return [request_id]
0236 return get_request_ids_by_workload_id(workload_id)
0237
0238
0239 @read_session
0240 def get_request(request_id, to_json=False, session=None):
0241 """
0242 Get a request or raise a NoObject exception.
0243
0244 :param request_id: The id of the request.
0245 :param to_json: return json format.
0246
0247 :param session: The database session in use.
0248
0249 :raises NoObject: If no request is founded.
0250
0251 :returns: Request.
0252 """
0253
0254 try:
0255 query = select(models.Request).where(models.Request.request_id == request_id)
0256
0257 ret = session.execute(query).fetchone()
0258 if not ret:
0259 return None
0260 else:
0261 if to_json:
0262 return ret[0].to_dict_json()
0263 else:
0264 return ret[0].to_dict()
0265 except sqlalchemy.orm.exc.NoResultFound as error:
0266 raise exceptions.NoObject('request request_id: %s cannot be found: %s' % (request_id, error))
0267
0268
0269 @read_session
0270 def get_request_by_id_status(request_id, status=None, locking=False, session=None):
0271 """
0272 Get a request or raise a NoObject exception.
0273
0274 :param request_id: The id of the request.
0275 :param status: request status.
0276 :param locking: the locking status.
0277
0278 :param session: The database session in use.
0279
0280 :raises NoObject: If no request is founded.
0281
0282 :returns: Request.
0283 """
0284
0285 try:
0286 query = select(models.Request).filter(models.Request.request_id == request_id)
0287
0288 if status:
0289 if not isinstance(status, (list, tuple)):
0290 status = [status]
0291 if len(status) == 1:
0292 status = [status[0], status[0]]
0293 query = query.where(models.Request.status.in_(status))
0294
0295 if locking:
0296 query = query.where(models.Request.locking == RequestLocking.Idle)
0297 query = query.with_for_update(skip_locked=True)
0298
0299 ret = session.execute(query).fetchone()
0300 if not ret:
0301 return None
0302 else:
0303 if locking:
0304 ret[0].updated_at = datetime.datetime.utcnow()
0305 ret[0].locking = RequestLocking.Locking
0306 hostname, pid, thread_id, thread_name = get_process_thread_info()
0307 ret[0].locking_hostname = hostname
0308 ret[0].locking_pid = pid
0309 ret[0].locking_thread_id = thread_id
0310 ret[0].locking_thread_name = thread_name
0311
0312 return ret[0].to_dict()
0313 except sqlalchemy.orm.exc.NoResultFound as error:
0314 raise exceptions.NoObject('request request_id: %s cannot be found: %s' % (request_id, error))
0315
0316
0317 @read_session
0318 def get_requests_old(request_id=None, workload_id=None, with_detail=False, with_metadata=False,
0319 with_request=False, with_transform=False, with_processing=False, to_json=False,
0320 session=None):
0321 """
0322 Get a request or raise a NoObject exception.
0323
0324 :param workload_id: The workload id of the request.
0325 :param to_json: return json format.
0326
0327 :param session: The database session in use.
0328
0329 :raises NoObject: If no request is founded.
0330
0331 :returns: Request.
0332 """
0333 try:
0334 if with_request or not (with_transform or with_processing or with_detail or with_metadata):
0335 if with_metadata:
0336 query = session.query(models.Request)
0337
0338 if request_id:
0339 query = query.filter(models.Request.request_id == request_id)
0340 if workload_id:
0341 query = query.filter(models.Request.workload_id == workload_id)
0342
0343 tmp = query.all()
0344 rets = []
0345 if tmp:
0346 for t in tmp:
0347 if to_json:
0348 t_dict = t.to_dict_json()
0349 else:
0350 t_dict = t.to_dict()
0351 rets.append(t_dict)
0352 return rets
0353 else:
0354 query = session.query(models.Request.request_id,
0355 models.Request.scope,
0356 models.Request.name,
0357 models.Request.requester,
0358 models.Request.request_type,
0359 models.Request.username,
0360 models.Request.userdn,
0361 models.Request.transform_tag,
0362 models.Request.workload_id,
0363 models.Request.priority,
0364 models.Request.status,
0365 models.Request.substatus,
0366 models.Request.locking,
0367 models.Request.created_at,
0368 models.Request.updated_at,
0369 models.Request.next_poll_at,
0370 models.Request.accessed_at,
0371 models.Request.expired_at,
0372 models.Request.errors)
0373
0374 if request_id:
0375 query = query.filter(models.Request.request_id == request_id)
0376 if workload_id:
0377 query = query.filter(models.Request.workload_id == workload_id)
0378
0379 tmp = query.all()
0380 rets = []
0381 if tmp:
0382 for t in tmp:
0383 t2 = dict(zip(t.keys(), t))
0384 rets.append(t2)
0385 return rets
0386 elif with_transform:
0387 subquery1 = session.query(models.Collection.coll_id, models.Collection.transform_id,
0388 models.Collection.scope.label("input_coll_scope"),
0389 models.Collection.name.label("input_coll_name"),
0390 models.Collection.status.label("input_coll_status"),
0391 models.Collection.bytes.label("input_coll_bytes"),
0392 models.Collection.total_files.label("input_total_files"),
0393 models.Collection.processed_files.label("input_processed_files"),
0394 models.Collection.new_files.label("input_new_files"),
0395 models.Collection.failed_files.label("input_failed_files"),
0396 models.Collection.missing_files.label("input_missing_files"),
0397 models.Collection.processing_files.label("input_processing_files")).filter(models.Collection.relation_type == 0)
0398 subquery1 = subquery1.subquery()
0399
0400 subquery2 = session.query(models.Collection.coll_id, models.Collection.transform_id,
0401 models.Collection.scope.label("output_coll_scope"),
0402 models.Collection.name.label("output_coll_name"),
0403 models.Collection.status.label("output_coll_status"),
0404 models.Collection.bytes.label("output_coll_bytes"),
0405 models.Collection.total_files.label("output_total_files"),
0406 models.Collection.processed_files.label("output_processed_files"),
0407 models.Collection.new_files.label("output_new_files"),
0408 models.Collection.failed_files.label("output_failed_files"),
0409 models.Collection.missing_files.label("output_missing_files"),
0410 models.Collection.processing_files.label("output_processing_files")).filter(models.Collection.relation_type == 1)
0411 subquery2 = subquery2.subquery()
0412 if True:
0413 query = session.query(models.Request.request_id,
0414 models.Request.scope,
0415 models.Request.name,
0416 models.Request.requester,
0417 models.Request.request_type,
0418 models.Request.username,
0419 models.Request.userdn,
0420 models.Request.transform_tag,
0421 models.Request.workload_id,
0422 models.Request.priority,
0423 models.Request.status,
0424 models.Request.substatus,
0425 models.Request.locking,
0426 models.Request.created_at,
0427 models.Request.updated_at,
0428 models.Request.next_poll_at,
0429 models.Request.accessed_at,
0430 models.Request.expired_at,
0431 models.Request.errors,
0432 models.Transform.transform_id,
0433 models.Transform.transform_type,
0434 models.Transform.workload_id.label("transform_workload_id"),
0435 models.Transform.status.label("transform_status"),
0436 models.Transform.created_at.label("transform_created_at"),
0437 models.Transform.updated_at.label("transform_updated_at"),
0438 models.Transform.finished_at.label("transform_finished_at"),
0439 subquery1.c.input_coll_scope, subquery1.c.input_coll_name,
0440 subquery1.c.input_coll_status, subquery1.c.input_coll_bytes,
0441 subquery1.c.input_total_files,
0442 subquery1.c.input_processed_files,
0443 subquery1.c.input_processing_files,
0444 subquery1.c.input_new_files,
0445 subquery1.c.input_failed_files,
0446 subquery1.c.input_missing_files,
0447 subquery2.c.output_coll_scope, subquery2.c.output_coll_name,
0448 subquery2.c.output_coll_status, subquery2.c.output_coll_bytes,
0449 subquery2.c.output_total_files,
0450 subquery2.c.output_processed_files,
0451 subquery2.c.output_processing_files,
0452 subquery2.c.output_new_files,
0453 subquery2.c.output_failed_files,
0454 subquery2.c.output_missing_files)
0455
0456 if request_id:
0457 query = query.filter(models.Request.request_id == request_id)
0458 if workload_id:
0459 query = query.filter(models.Request.workload_id == workload_id)
0460
0461 query = query.outerjoin(models.Transform, and_(models.Request.request_id == models.Transform.request_id))
0462 query = query.outerjoin(subquery1, and_(subquery1.c.transform_id == models.Transform.transform_id))
0463 query = query.outerjoin(subquery2, and_(subquery2.c.transform_id == models.Transform.transform_id))
0464 query = query.order_by(asc(models.Request.request_id))
0465
0466 tmp = query.all()
0467 rets = []
0468 if tmp:
0469 for t in tmp:
0470
0471 t2 = dict(zip(t.keys(), t))
0472
0473 if 'request_metadata' in t2 and t2['request_metadata']:
0474 if 'workflow' in t2['request_metadata']:
0475 workflow = t2['request_metadata']['workflow']
0476 workflow_data = None
0477 if 'processing_metadata' in t2 and t2['processing_metadata'] and 'workflow_data' in t2['processing_metadata']:
0478 workflow_data = t2['processing_metadata']['workflow_data']
0479 if workflow is not None and workflow_data is not None:
0480 workflow.metadata = workflow_data
0481 t2['request_metadata']['workflow'] = workflow
0482 if 'build_workflow' in t2['request_metadata']:
0483 build_workflow = t2['request_metadata']['build_workflow']
0484 build_workflow_data = None
0485 if 'processing_metadata' in t2 and t2['processing_metadata'] and 'build_workflow_data' in t2['processing_metadata']:
0486 build_workflow_data = t2['processing_metadata']['build_workflow_data']
0487 if build_workflow is not None and build_workflow_data is not None:
0488 build_workflow.metadata = build_workflow_data
0489 t2['request_metadata']['build_workflow'] = build_workflow
0490
0491 rets.append(t2)
0492 return rets
0493 elif with_processing:
0494 subquery = session.query(models.Processing.processing_id,
0495 models.Processing.transform_id,
0496 models.Processing.workload_id,
0497 models.Processing.status.label("processing_status"),
0498 models.Processing.created_at.label("processing_created_at"),
0499 models.Processing.updated_at.label("processing_updated_at"),
0500 models.Processing.finished_at.label("processing_finished_at"))
0501 subquery = subquery.subquery()
0502
0503 if True:
0504 query = session.query(models.Request.request_id,
0505 models.Request.scope,
0506 models.Request.name,
0507 models.Request.requester,
0508 models.Request.request_type,
0509 models.Request.username,
0510 models.Request.userdn,
0511 models.Request.transform_tag,
0512 models.Request.workload_id,
0513 models.Request.priority,
0514 models.Request.status,
0515 models.Request.substatus,
0516 models.Request.locking,
0517 models.Request.created_at,
0518 models.Request.updated_at,
0519 models.Request.next_poll_at,
0520 models.Request.accessed_at,
0521 models.Request.expired_at,
0522 models.Request.errors,
0523 models.Transform.transform_id,
0524 models.Transform.workload_id.label("transform_workload_id"),
0525 models.Transform.status.label("transform_status"),
0526 subquery.c.processing_id,
0527 subquery.c.workload_id.label("processing_workload_id"),
0528 subquery.c.processing_status,
0529 subquery.c.processing_created_at,
0530 subquery.c.processing_updated_at,
0531 subquery.c.processing_finished_at)
0532
0533 if request_id:
0534 query = query.filter(models.Request.request_id == request_id)
0535 if workload_id:
0536 query = query.filter(models.Request.workload_id == workload_id)
0537
0538 query = query.outerjoin(models.Transform, and_(models.Request.request_id == models.Transform.request_id))
0539 query = query.outerjoin(subquery, and_(subquery.c.transform_id == models.Transform.transform_id))
0540 query = query.order_by(asc(models.Request.request_id))
0541
0542 tmp = query.all()
0543 rets = []
0544 if tmp:
0545 for t in tmp:
0546
0547 t2 = dict(zip(t.keys(), t))
0548
0549 if 'request_metadata' in t2 and t2['request_metadata']:
0550 if 'workflow' in t2['request_metadata']:
0551 workflow = t2['request_metadata']['workflow']
0552 workflow_data = None
0553 if 'processing_metadata' in t2 and t2['processing_metadata'] and 'workflow_data' in t2['processing_metadata']:
0554 workflow_data = t2['processing_metadata']['workflow_data']
0555 if workflow is not None and workflow_data is not None:
0556 workflow.metadata = workflow_data
0557 t2['request_metadata']['workflow'] = workflow
0558 if 'build_workflow' in t2['request_metadata']:
0559 build_workflow = t2['request_metadata']['build_workflow']
0560 build_workflow_data = None
0561 if 'processing_metadata' in t2 and t2['processing_metadata'] and 'build_workflow_data' in t2['processing_metadata']:
0562 build_workflow_data = t2['processing_metadata']['build_workflow_data']
0563 if build_workflow is not None and build_workflow_data is not None:
0564 build_workflow.metadata = build_workflow_data
0565 t2['request_metadata']['build_workflow'] = build_workflow
0566
0567 rets.append(t2)
0568 return rets
0569 elif with_detail or with_metadata:
0570 subquery1 = session.query(models.Collection.coll_id, models.Collection.transform_id,
0571 models.Collection.scope.label("input_coll_scope"),
0572 models.Collection.name.label("input_coll_name"),
0573 models.Collection.status.label("input_coll_status"),
0574 models.Collection.bytes.label("input_coll_bytes"),
0575 models.Collection.total_files.label("input_total_files"),
0576 models.Collection.processed_files.label("input_processed_files"),
0577 models.Collection.new_files.label("input_new_files"),
0578 models.Collection.failed_files.label("input_failed_files"),
0579 models.Collection.missing_files.label("input_missing_files"),
0580 models.Collection.processing_files.label("input_processing_files")).filter(models.Collection.relation_type == 0)
0581 subquery1 = subquery1.subquery()
0582
0583 subquery2 = session.query(models.Collection.coll_id, models.Collection.transform_id,
0584 models.Collection.scope.label("output_coll_scope"),
0585 models.Collection.name.label("output_coll_name"),
0586 models.Collection.status.label("output_coll_status"),
0587 models.Collection.bytes.label("output_coll_bytes"),
0588 models.Collection.total_files.label("output_total_files"),
0589 models.Collection.processed_files.label("output_processed_files"),
0590 models.Collection.new_files.label("output_new_files"),
0591 models.Collection.failed_files.label("output_failed_files"),
0592 models.Collection.missing_files.label("output_missing_files"),
0593 models.Collection.processing_files.label("output_processing_files")).filter(models.Collection.relation_type == 1)
0594 subquery2 = subquery2.subquery()
0595
0596 if with_metadata:
0597 query = session.query(models.Request.request_id,
0598 models.Request.scope,
0599 models.Request.name,
0600 models.Request.requester,
0601 models.Request.request_type,
0602 models.Request.username,
0603 models.Request.userdn,
0604 models.Request.transform_tag,
0605 models.Request.workload_id,
0606 models.Request.priority,
0607 models.Request.status,
0608 models.Request.substatus,
0609 models.Request.locking,
0610 models.Request.created_at,
0611 models.Request.updated_at,
0612 models.Request.next_poll_at,
0613 models.Request.accessed_at,
0614 models.Request.expired_at,
0615 models.Request.errors,
0616 models.Request._request_metadata.label('request_metadata'),
0617 models.Request._processing_metadata.label('processing_metadata'),
0618 models.Transform.transform_id,
0619 models.Transform.transform_type,
0620 models.Transform.name.label("transform_name"),
0621 models.Transform.workload_id.label("transform_workload_id"),
0622 models.Transform.status.label("transform_status"),
0623 models.Transform.created_at.label("transform_created_at"),
0624 models.Transform.updated_at.label("transform_updated_at"),
0625 models.Transform.finished_at.label("transform_finished_at"),
0626 subquery1.c.input_coll_scope, subquery1.c.input_coll_name,
0627 subquery1.c.input_coll_status, subquery1.c.input_coll_bytes,
0628 subquery1.c.input_total_files,
0629 subquery1.c.input_processed_files,
0630 subquery1.c.input_processing_files,
0631 subquery1.c.input_new_files,
0632 subquery1.c.input_failed_files,
0633 subquery1.c.input_missing_files,
0634 subquery2.c.output_coll_scope, subquery2.c.output_coll_name,
0635 subquery2.c.output_coll_status, subquery2.c.output_coll_bytes,
0636 subquery2.c.output_total_files,
0637 subquery2.c.output_processed_files,
0638 subquery2.c.output_processing_files,
0639 subquery2.c.output_new_files,
0640 subquery2.c.output_failed_files,
0641 subquery2.c.output_missing_files,)
0642 else:
0643 query = session.query(models.Request.request_id,
0644 models.Request.scope,
0645 models.Request.name,
0646 models.Request.requester,
0647 models.Request.request_type,
0648 models.Request.username,
0649 models.Request.userdn,
0650 models.Request.transform_tag,
0651 models.Request.workload_id,
0652 models.Request.priority,
0653 models.Request.status,
0654 models.Request.substatus,
0655 models.Request.locking,
0656 models.Request.created_at,
0657 models.Request.updated_at,
0658 models.Request.next_poll_at,
0659 models.Request.accessed_at,
0660 models.Request.expired_at,
0661 models.Request.errors,
0662 models.Transform.transform_id,
0663 models.Transform.transform_type,
0664 models.Transform.name.label("transform_name"),
0665 models.Transform.workload_id.label("transform_workload_id"),
0666 models.Transform.status.label("transform_status"),
0667 models.Transform.created_at.label("transform_created_at"),
0668 models.Transform.updated_at.label("transform_updated_at"),
0669 models.Transform.finished_at.label("transform_finished_at"),
0670 subquery1.c.input_coll_scope, subquery1.c.input_coll_name,
0671 subquery1.c.input_coll_status, subquery1.c.input_coll_bytes,
0672 subquery1.c.input_total_files,
0673 subquery1.c.input_processed_files,
0674 subquery1.c.input_processing_files,
0675 subquery1.c.input_new_files,
0676 subquery1.c.input_failed_files,
0677 subquery1.c.input_missing_files,
0678 subquery2.c.output_coll_scope, subquery2.c.output_coll_name,
0679 subquery2.c.output_coll_status, subquery2.c.output_coll_bytes,
0680 subquery2.c.output_total_files,
0681 subquery2.c.output_processed_files,
0682 subquery2.c.output_processing_files,
0683 subquery2.c.output_new_files,
0684 subquery2.c.output_failed_files,
0685 subquery2.c.output_missing_files,)
0686
0687 if request_id:
0688 query = query.filter(models.Request.request_id == request_id)
0689 if workload_id:
0690 query = query.filter(models.Request.workload_id == workload_id)
0691
0692 query = query.outerjoin(models.Transform, and_(models.Request.request_id == models.Transform.request_id))
0693 query = query.outerjoin(subquery1, and_(subquery1.c.transform_id == models.Transform.transform_id))
0694 query = query.outerjoin(subquery2, and_(subquery2.c.transform_id == models.Transform.transform_id))
0695 query = query.order_by(asc(models.Request.request_id))
0696
0697 tmp = query.all()
0698 rets = []
0699 if tmp:
0700 for t in tmp:
0701
0702 t2 = dict(zip(t.keys(), t))
0703
0704 if 'request_metadata' in t2 and t2['request_metadata']:
0705 if 'workflow' in t2['request_metadata']:
0706 workflow = t2['request_metadata']['workflow']
0707 workflow_data = None
0708 if 'processing_metadata' in t2 and t2['processing_metadata'] and 'workflow_data' in t2['processing_metadata']:
0709 workflow_data = t2['processing_metadata']['workflow_data']
0710 if workflow is not None and workflow_data is not None:
0711 workflow.metadata = workflow_data
0712 t2['request_metadata']['workflow'] = workflow
0713 if 'build_workflow' in t2['request_metadata']:
0714 build_workflow = t2['request_metadata']['build_workflow']
0715 build_workflow_data = None
0716 if 'processing_metadata' in t2 and t2['processing_metadata'] and 'build_workflow_data' in t2['processing_metadata']:
0717 build_workflow_data = t2['processing_metadata']['build_workflow_data']
0718 if build_workflow is not None and build_workflow_data is not None:
0719 build_workflow.metadata = build_workflow_data
0720 t2['request_metadata']['build_workflow'] = build_workflow
0721
0722 rets.append(t2)
0723 return rets
0724 except sqlalchemy.orm.exc.NoResultFound as error:
0725 raise exceptions.NoObject('request workload_id: %s cannot be found: %s' % (workload_id, error))
0726
0727
0728 @read_session
0729 def get_requests_only(request_id=None, workload_id=None, with_metadata=False, to_json=False, session=None):
0730 """
0731 Get requests or raise a NoObject exception.
0732
0733 :param request_id: The request id.
0734 :param workload_id: The workload id of the request.
0735 :param with_metadata: To show metadata.
0736 :param to_json: return json format.
0737
0738 :param session: The database session in use.
0739
0740 :raises NoObject: If no request is founded.
0741
0742 :returns: Requests.
0743 """
0744 if with_metadata:
0745 query = select(models.Request)
0746 if request_id:
0747 query = query.where(models.Request.request_id == request_id)
0748 if workload_id:
0749 query = query.where(models.Request.workload_id == workload_id)
0750
0751 tmp = session.execute(query).fetchall()
0752 rets = []
0753 if tmp:
0754 for t in tmp:
0755 t_dict = t[0].to_dict()
0756 rets.append(t_dict)
0757 return rets
0758 else:
0759 exception_columns = ['request_metadata', 'processing_metadata', '_request_metadata', '_processing_metadata']
0760 columns = [column for column in models.Request.__mapper__.columns if column.name not in exception_columns]
0761 column_names = [column.name for column in columns]
0762 query = select(*columns)
0763
0764 if request_id:
0765 query = query.where(models.Request.request_id == request_id)
0766 if workload_id:
0767 query = query.where(models.Request.workload_id == workload_id)
0768
0769 tmp = session.execute(query).fetchall()
0770 rets = []
0771 if tmp:
0772 for t in tmp:
0773 t_dict = dict(zip(column_names, t))
0774 rets.append(t_dict)
0775 return rets
0776
0777
0778 def get_query_collection(request_id=None, workload_id=None):
0779 """
0780 Get input collection query and output collection query.
0781 """
0782 input_query = select(models.Collection.coll_id.label("input_coll_id"),
0783 models.Collection.transform_id.label("input_coll_transform_id"),
0784 models.Collection.scope.label("input_coll_scope"),
0785 models.Collection.name.label("input_coll_name"),
0786 models.Collection.status.label("input_coll_status"),
0787 models.Collection.bytes.label("input_coll_bytes"),
0788 models.Collection.total_files.label("input_total_files"),
0789 models.Collection.processed_files.label("input_processed_files"),
0790 models.Collection.new_files.label("input_new_files"),
0791 models.Collection.preprocessing_files.label("input_preprocessing_files"),
0792 models.Collection.activated_files.label("input_activated_files"),
0793 models.Collection.failed_files.label("input_failed_files"),
0794 models.Collection.missing_files.label("input_missing_files"),
0795 models.Collection.processing_files.label("input_processing_files"))\
0796 .where(models.Collection.relation_type == 0)
0797
0798 output_query = select(models.Collection.coll_id.label("output_coll_id"),
0799 models.Collection.transform_id.label("output_coll_transform_id"),
0800 models.Collection.scope.label("output_coll_scope"),
0801 models.Collection.name.label("output_coll_name"),
0802 models.Collection.status.label("output_coll_status"),
0803 models.Collection.bytes.label("output_coll_bytes"),
0804 models.Collection.total_files.label("output_total_files"),
0805 models.Collection.processed_files.label("output_processed_files"),
0806 models.Collection.new_files.label("output_new_files"),
0807 models.Collection.preprocessing_files.label("output_preprocessing_files"),
0808 models.Collection.activated_files.label("output_activated_files"),
0809 models.Collection.failed_files.label("output_failed_files"),
0810 models.Collection.missing_files.label("output_missing_files"),
0811 models.Collection.processing_files.label("output_processing_files"))\
0812 .where(models.Collection.relation_type == 1)
0813
0814 if request_id:
0815 input_query = input_query.where(models.Request.request_id == request_id)
0816 output_query = output_query.where(models.Request.request_id == request_id)
0817 if workload_id:
0818 input_query = input_query.where(models.Request.workload_id == workload_id)
0819 output_query = output_query.where(models.Request.workload_id == workload_id)
0820 return input_query, output_query
0821
0822
0823 def get_query_transform(request_id=None, workload_id=None):
0824 """
0825 Get transform query.
0826 """
0827 columns = [models.Transform.request_id,
0828 models.Transform.transform_id,
0829 models.Transform.transform_type,
0830 models.Transform.name.label("transform_name"),
0831 models.Transform.workload_id.label("transform_workload_id"),
0832 models.Transform.status.label("transform_status"),
0833 models.Transform.created_at.label("transform_created_at"),
0834 models.Transform.updated_at.label("transform_updated_at"),
0835 models.Transform.finished_at.label("transform_finished_at")]
0836 query = select(*columns)
0837 if request_id:
0838 query = query.where(models.Request.request_id == request_id)
0839 if workload_id:
0840 query = query.where(models.Request.workload_id == workload_id)
0841
0842 return query
0843
0844
0845 def get_query_processing(request_id=None, workload_id=None):
0846 """
0847 Get processing query.
0848 """
0849 query = select(models.Processing.processing_id,
0850 models.Processing.transform_id.label("processing_transform_id"),
0851 models.Processing.workload_id.label("processing_workload_id"),
0852 models.Processing.status.label("processing_status"),
0853 models.Processing.created_at.label("processing_created_at"),
0854 models.Processing.updated_at.label("processing_updated_at"),
0855 models.Processing.finished_at.label("processing_finished_at"))
0856 if request_id:
0857 query = query.where(models.Request.request_id == request_id)
0858 if workload_id:
0859 query = query.where(models.Request.workload_id == workload_id)
0860
0861 return query
0862
0863
0864 def get_request_dict(column_names, column):
0865 """
0866 Parse request column data to dictionary
0867 """
0868 t_dict = dict(zip(column_names, column))
0869
0870 if 'request_metadata' in t_dict and t_dict['request_metadata']:
0871 if 'workflow' in t_dict['request_metadata']:
0872 workflow = t_dict['request_metadata']['workflow']
0873 workflow_data = None
0874 if 'processing_metadata' in t_dict and t_dict['processing_metadata'] and 'workflow_data' in t_dict['processing_metadata']:
0875 workflow_data = t_dict['processing_metadata']['workflow_data']
0876 if workflow is not None and workflow_data is not None:
0877 workflow.metadata = workflow_data
0878 t_dict['request_metadata']['workflow'] = workflow
0879 if 'build_workflow' in t_dict['request_metadata']:
0880 build_workflow = t_dict['request_metadata']['build_workflow']
0881 build_workflow_data = None
0882 if 'processing_metadata' in t_dict and t_dict['processing_metadata'] and 'build_workflow_data' in t_dict['processing_metadata']:
0883 build_workflow_data = t_dict['processing_metadata']['build_workflow_data']
0884 if build_workflow is not None and build_workflow_data is not None:
0885 build_workflow.metadata = build_workflow_data
0886 t_dict['request_metadata']['build_workflow'] = build_workflow
0887 return t_dict
0888
0889
0890 @read_session
0891 def get_requests_with_transform(request_id=None, workload_id=None, with_metadata=False, show_processing=False,
0892 show_collection=False, to_json=False, session=None):
0893 """
0894 Get requests or raise a NoObject exception.
0895
0896 :param request_id: The request id.
0897 :param workload_id: The workload id of the request.
0898 :param with_metadata: To show metadata.
0899 :param to_json: return json format.
0900
0901 :param session: The database session in use.
0902
0903 :raises NoObject: If no request is founded.
0904
0905 :returns: Requests.
0906 """
0907 exception_columns = ['request_metadata', 'processing_metadata', '_request_metadata', '_processing_metadata']
0908 columns = [column for column in models.Request.__mapper__.columns if column.name not in exception_columns]
0909 if with_metadata:
0910 meta_columns = [models.Request._request_metadata.label('request_metadata'),
0911 models.Request._processing_metadata.label('processing_metadata')]
0912 columns = columns + meta_columns
0913
0914 transform_query = get_query_transform(request_id=request_id, workload_id=workload_id)
0915 transform_subquery = transform_query.subquery()
0916 transform_subquery_columns = [column for column in transform_subquery.c]
0917 columns = columns + transform_subquery_columns
0918
0919 if show_processing:
0920 processing_query = get_query_processing(request_id=request_id, workload_id=workload_id)
0921 processing_subquery = processing_query.subquery()
0922 processing_subquery_columns = [column for column in processing_subquery.c]
0923 columns = columns + processing_subquery_columns
0924
0925 if show_collection:
0926 input_query, output_query = get_query_collection(request_id=request_id, workload_id=workload_id)
0927 input_subquery = input_query.subquery()
0928 output_subquery = output_query.subquery()
0929 input_subquery_columns = [column for column in input_subquery.c]
0930 output_subquery_columns = [column for column in output_subquery.c]
0931 columns = columns + input_subquery_columns + output_subquery_columns
0932
0933 column_names = [column.name for column in columns]
0934
0935 query = select(*columns)
0936 if request_id:
0937 query = query.where(models.Request.request_id == request_id)
0938 if workload_id:
0939 query = query.where(models.Request.workload_id == workload_id)
0940
0941 query = query.outerjoin(transform_subquery, and_(models.Request.request_id == transform_subquery.c.request_id))
0942 if show_processing:
0943 query = query.outerjoin(processing_subquery, and_(processing_subquery.c.processing_transform_id == transform_subquery.c.transform_id))
0944 if show_collection:
0945 query = query.outerjoin(input_subquery, and_(input_subquery.c.input_coll_transform_id == transform_subquery.c.transform_id))
0946 query = query.outerjoin(output_subquery, and_(output_subquery.c.output_coll_transform_id == transform_subquery.c.transform_id))
0947 query = query.order_by(asc(models.Request.request_id))
0948
0949 tmp = session.execute(query).fetchall()
0950 rets = []
0951 if tmp:
0952 for t in tmp:
0953 t_dict = get_request_dict(column_names, t)
0954 rets.append(t_dict)
0955 return rets
0956
0957
0958 @read_session
0959 def get_requests(request_id=None, workload_id=None, with_detail=False, with_metadata=False,
0960 with_request=False, with_transform=False, with_processing=False, to_json=False, session=None):
0961 """
0962 Get requests or raise a NoObject exception.
0963
0964 :param request_id: The request id.
0965 :param workload_id: The workload id of the request.
0966 :param with_detail: To show the details with collections information.
0967 :param with_metadata: To show metadata.
0968 :param with_request: Only show requests.
0969 :param with_transform: Show transforms as addition.
0970 :param with_processing: Show transforms and processings as addition.
0971 :param to_json: return json format.
0972
0973 :param session: The database session in use.
0974
0975 :raises NoObject: If no request is founded.
0976
0977 :returns: Requests.
0978 """
0979 try:
0980 show_transform, show_processing, show_collection = False, False, False
0981 if with_transform:
0982 show_transform = True
0983 if with_processing:
0984 show_transform, show_processing = True, True
0985 if with_detail:
0986 show_transform, show_processing, show_collection = True, True, True
0987
0988 if show_transform:
0989 return get_requests_with_transform(request_id=request_id, workload_id=workload_id,
0990 with_metadata=with_metadata, to_json=to_json,
0991 show_processing=show_processing,
0992 show_collection=show_collection,
0993 session=session)
0994 else:
0995 return get_requests_only(request_id=request_id, workload_id=workload_id,
0996 with_metadata=with_metadata, to_json=to_json,
0997 session=session)
0998 except Exception as error:
0999 raise exceptions.NoObject(f'request(request_id: {request_id}) cannot be found: {error}')
1000
1001
1002 @transactional_session
1003 def extend_requests(request_id=None, workload_id=None, lifetime=30, session=None):
1004 """
1005 extend an request's lifetime.
1006
1007 :param request_id: The id of the request.
1008 :param workload_id: The workload_id of the request.
1009 :param lifetime: The life time as umber of days.
1010 :param session: The database session in use.
1011
1012 :raises NoObject: If no request is founded.
1013 :raises DatabaseException: If there is a database error.
1014 """
1015 try:
1016 query = session.query(models.Request)
1017 if request_id:
1018 query = query.filter_by(request_id=request_id)
1019 else:
1020 query = query.filter_by(workload_id=workload_id)
1021
1022 update_items = {'expired_at': datetime.datetime.utcnow() + datetime.timedelta(days=lifetime)}
1023 query.update(update_items, synchronize_session=False)
1024 except sqlalchemy.orm.exc.NoResultFound as error:
1025 raise exceptions.NoObject('Request (workload_id: %s) cannot be found: %s' % (workload_id, error))
1026
1027
1028 @transactional_session
1029 def cancel_requests(request_id=None, workload_id=None, session=None):
1030 """
1031 cancel an request.
1032
1033 :param request_id: The id of the request.
1034 :param workload_id: The workload_id of the request.
1035 :param session: The database session in use.
1036
1037 :raises NoObject: If no request is founded.
1038 :raises DatabaseException: If there is a database error.
1039 """
1040 try:
1041 query = session.query(models.Request)
1042 if request_id:
1043 query = query.filter_by(request_id=request_id)
1044 else:
1045 query = query.filter_by(workload_id=workload_id)
1046
1047 update_items = {'status': RequestStatus.Cancel}
1048 query.update(update_items, synchronize_session=False)
1049 except sqlalchemy.orm.exc.NoResultFound as error:
1050 raise exceptions.NoObject('Request %s cannot be found: %s' % (request_id, error))
1051
1052
1053 @read_session
1054 def get_requests_by_requester(scope, name, requester, to_json=False, session=None):
1055 """
1056 Get requests.
1057
1058 :param scope: The scope of the request data.
1059 :param name: The name of the request data.
1060 :param requestr: The requester, such as panda, user and so on.
1061 :param to_json: return json format.
1062
1063 :raises NoObject: If no request is founded.
1064
1065 :returns: list of Request.
1066 """
1067
1068 try:
1069 query = session.query(models.Request)\
1070 .filter(models.Request.requester == requester)\
1071 .filter(models.Request.scope == scope)\
1072 .filter(models.Request.name.like(name.replace('*', '%')))
1073 tmp = query.all()
1074 rets = []
1075 if tmp:
1076 for req in tmp:
1077 if to_json:
1078 rets.append(req.to_dict_json())
1079 else:
1080 rets.append(req.to_dict())
1081 return rets
1082 except sqlalchemy.orm.exc.NoResultFound as error:
1083 raise exceptions.NoObject('No requests with scope:name(%s:%s) and requester(%s) %s' % (scope, name, requester, error))
1084
1085
1086 @transactional_session
1087 def get_requests_by_status_type(status, request_type=None, time_period=None, request_ids=[], locking=False,
1088 locking_for_update=False, bulk_size=None, to_json=False, by_substatus=False,
1089 min_request_id=None, new_poll=False, update_poll=False, only_return_id=False,
1090 not_lock=False, session=None):
1091 """
1092 Get requests.
1093
1094 :param status: list of status of the request data.
1095 :param request_type: The type of the request data.
1096 :param locking: Wheter to lock requests to avoid others get the same request.
1097 :param bulk_size: Size limitation per retrieve.
1098 :param to_json: return json format.
1099
1100 :raises NoObject: If no request are founded.
1101
1102 :returns: list of Request.
1103 """
1104
1105 try:
1106 if status:
1107 if not isinstance(status, (list, tuple)):
1108 status = [status]
1109 if len(status) == 1:
1110 status = [status[0], status[0]]
1111
1112 if only_return_id:
1113 query = session.query(models.Request.request_id)
1114 else:
1115 query = session.query(models.Request)
1116
1117 if status:
1118 if by_substatus:
1119 query = query.filter(models.Request.substatus.in_(status))
1120 else:
1121 query = query.filter(models.Request.status.in_(status))
1122 if new_poll:
1123 query = query.filter(models.Request.updated_at + models.Request.new_poll_period <= datetime.datetime.utcnow())
1124 if update_poll:
1125 query = query.filter(models.Request.updated_at + models.Request.update_poll_period <= datetime.datetime.utcnow())
1126
1127 if request_type is not None:
1128 query = query.filter(models.Request.request_type == request_type)
1129 if request_ids:
1130 query = query.filter(models.Request.request_id.in_(request_ids))
1131 else:
1132 if min_request_id is not None:
1133 query = query.filter(models.Request.request_id >= min_request_id)
1134 if locking:
1135 query = query.filter(models.Request.locking == RequestLocking.Idle)
1136
1137 if locking_for_update:
1138 query = query.with_for_update(skip_locked=True)
1139 else:
1140
1141
1142
1143
1144 query = query.order_by(asc(models.Request.updated_at))
1145
1146 if bulk_size:
1147 query = query.limit(bulk_size)
1148
1149 tmp = query.all()
1150 rets = []
1151 if tmp:
1152 for req in tmp:
1153 if locking:
1154 req.updated_at = datetime.datetime.utcnow()
1155 req.locking = RequestLocking.Locking
1156
1157 hostname, pid, thread_id, thread_name = get_process_thread_info()
1158 req.locking_hostname = hostname
1159 req.locking_pid = pid
1160 req.locking_thread_id = thread_id
1161 req.locking_thread_name = thread_name
1162
1163 if only_return_id:
1164 rets.append(req[0])
1165 else:
1166 if to_json:
1167 rets.append(req.to_dict_json())
1168 else:
1169 rets.append(req.to_dict())
1170 return rets
1171 except sqlalchemy.orm.exc.NoResultFound as error:
1172 raise exceptions.NoObject('No requests with status: %s, request_type: %s, time_period: %s, locking: %s, %s' % (status, request_type, time_period, locking, error))
1173
1174
1175 @transactional_session
1176 def update_request(request_id, parameters, update_request_metadata=False, locking=False, origin_status=None, session=None):
1177 """
1178 update an request.
1179
1180 :param request_id: the request id.
1181 :param parameters: A dictionary of parameters.
1182 :param session: The database session in use.
1183
1184 :raises NoObject: If no request is founded.
1185 :raises DatabaseException: If there is a database error.
1186
1187 """
1188 try:
1189 parameters['updated_at'] = datetime.datetime.utcnow()
1190
1191 if 'new_poll_period' in parameters and type(parameters['new_poll_period']) not in [datetime.timedelta]:
1192 parameters['new_poll_period'] = datetime.timedelta(seconds=parameters['new_poll_period'])
1193 if 'update_poll_period' in parameters and type(parameters['update_poll_period']) not in [datetime.timedelta]:
1194 parameters['update_poll_period'] = datetime.timedelta(seconds=parameters['update_poll_period'])
1195
1196 if 'request_metadata' in parameters:
1197 if 'workflow' in parameters['request_metadata']:
1198 workflow = parameters['request_metadata']['workflow']
1199
1200 if workflow is not None:
1201 if hasattr(workflow, 'refresh_works'):
1202 workflow.refresh_works()
1203 if 'processing_metadata' not in parameters or not parameters['processing_metadata']:
1204 parameters['processing_metadata'] = {}
1205 parameters['processing_metadata']['workflow_data'] = workflow.metadata
1206 if 'build_workflow' in parameters['request_metadata']:
1207 build_workflow = parameters['request_metadata']['build_workflow']
1208
1209 if build_workflow is not None:
1210 build_workflow.refresh_works()
1211 if 'processing_metadata' not in parameters or not parameters['processing_metadata']:
1212 parameters['processing_metadata'] = {}
1213 parameters['processing_metadata']['build_workflow_data'] = build_workflow.metadata
1214
1215 if 'request_metadata' in parameters:
1216 if not update_request_metadata:
1217 del parameters['request_metadata']
1218 else:
1219 parameters['_request_metadata'] = parameters['request_metadata']
1220 del parameters['request_metadata']
1221 if 'processing_metadata' in parameters:
1222 parameters['_processing_metadata'] = parameters['processing_metadata']
1223 del parameters['processing_metadata']
1224
1225 query = session.query(models.Request).filter_by(request_id=request_id)
1226
1227 if locking:
1228 query = query.filter(models.Request.locking == RequestLocking.Idle)
1229 query = query.with_for_update(skip_locked=True)
1230 num_rows = query.update(parameters, synchronize_session=False)
1231 return num_rows
1232 else:
1233 build_status = [RequestStatus.Built, RequestStatus.Built]
1234 if origin_status and origin_status not in build_status:
1235 query_no_built = query.filter(not_(models.Request.status.in_(build_status)))
1236
1237 num_rows = query_no_built.update(parameters, synchronize_session=False)
1238 if num_rows > 0:
1239 return num_rows
1240 else:
1241 if 'status' in parameters:
1242 parameters['status'] = origin_status
1243 num_rows = query.update(parameters, synchronize_session=False)
1244 return num_rows
1245 else:
1246 num_rows = query.update(parameters, synchronize_session=False)
1247 return num_rows
1248 except sqlalchemy.orm.exc.NoResultFound as error:
1249 raise exceptions.NoObject('Request %s cannot be found: %s' % (request_id, error))
1250 return 0
1251
1252
1253 @transactional_session
1254 def delete_requests(request_id=None, workload_id=None, session=None):
1255 """
1256 delete an request.
1257
1258 :param request_id: The id of the request.
1259 :param workload_id: The workload_id of the request.
1260 :param session: The database session in use.
1261
1262 :raises NoObject: If no request is founded.
1263 :raises DatabaseException: If there is a database error.
1264 """
1265 try:
1266 if request_id:
1267 session.query(models.Request).filter_by(request_id=request_id).delete()
1268 else:
1269 session.query(models.Request).filter_by(workload_id=workload_id).delete()
1270 except sqlalchemy.orm.exc.NoResultFound as error:
1271 raise exceptions.NoObject('Request (request_id: %s, workload_id: %s) cannot be found: %s' % (request_id, workload_id, error))
1272
1273
1274 @transactional_session
1275 def clean_locking(time_period=3600, min_request_id=None, health_items=[], force=False, hostname=None, pid=None, session=None):
1276 """
1277 Clearn locking which is older than time period.
1278
1279 :param time_period in seconds
1280 """
1281 health_dict = {}
1282 for item in health_items:
1283 hostname = item['hostname']
1284 pid = item['pid']
1285 thread_id = item['thread_id']
1286 if hostname not in health_dict:
1287 health_dict[hostname] = {}
1288 if pid not in health_dict[hostname]:
1289 health_dict[hostname][pid] = []
1290 if thread_id not in health_dict[hostname][pid]:
1291 health_dict[hostname][pid].append(thread_id)
1292 query = session.query(models.Request.request_id,
1293 models.Request.locking_hostname,
1294 models.Request.locking_pid,
1295 models.Request.locking_thread_id,
1296 models.Request.locking_thread_name,
1297 models.Request.updated_at)
1298 query = query.filter(models.Request.locking == RequestLocking.Locking)
1299 if min_request_id:
1300 query = query.filter(models.Request.request_id >= min_request_id)
1301
1302 lost_request_ids = []
1303 tmp = query.all()
1304 if tmp:
1305 for req in tmp:
1306 req_id, locking_hostname, locking_pid, locking_thread_id, locking_thread_name, updated_at = req
1307 if (
1308 (locking_hostname not in health_dict or locking_pid not in health_dict[locking_hostname])
1309 or (force and hostname == locking_hostname and pid == locking_pid)
1310 or (updated_at < datetime.datetime.utcnow() - datetime.timedelta(seconds=time_period))
1311 ):
1312 lost_request_ids.append({"request_id": req_id, 'locking': 0})
1313
1314
1315 safe_bulk_update_mappings(session, models.Request, lost_request_ids)
1316
1317
1318 @transactional_session
1319 def clean_next_poll_at(status, session=None):
1320 """
1321 Clearn next_poll_at.
1322
1323 :param status: status of the request
1324 """
1325 if not isinstance(status, (list, tuple)):
1326 status = [status]
1327 if len(status) == 1:
1328 status = [status[0], status[0]]
1329
1330 params = {'next_poll_at': datetime.datetime.utcnow()}
1331 session.query(models.Request).filter(models.Request.status.in_(status))\
1332 .update(params, synchronize_session=False)
1333
1334
1335 @read_session
1336 def get_last_request_id(status, older_than=None, session=None):
1337 """
1338 Get last request id which is older than a timestamp.
1339
1340 :param status: status of the request.
1341 :param older_than: days older than current timestamp.
1342
1343 :returns request_id
1344 """
1345 if not isinstance(status, (list, tuple)):
1346 status = [status]
1347 if len(status) == 1:
1348 status = [status[0], status[0]]
1349
1350 query = session.query(models.Request.request_id)
1351 if status:
1352 query = query.filter(models.Request.status.in_(status))
1353 query = query.filter(models.Request.updated_at <= datetime.datetime.utcnow() - datetime.timedelta(days=older_than))
1354 query = query.order_by(desc(models.Request.request_id))
1355 ret = query.first()
1356 if ret:
1357 return ret[0]
1358 return ret
1359
1360
1361 @read_session
1362 def get_num_active_requests(active_status=None, session=None):
1363 if active_status and not isinstance(active_status, (list, tuple)):
1364 active_status = [active_status]
1365 if active_status and len(active_status) == 1:
1366 active_status = [active_status[0], active_status[0]]
1367
1368 try:
1369 query = session.query(models.Request.status, models.Request.site, func.count(models.Request.request_id))
1370 if active_status:
1371 query = query.filter(models.Request.status.in_(active_status))
1372 query = query.group_by(models.Request.status, models.Request.site)
1373 tmp = query.all()
1374 return tmp
1375 except Exception as error:
1376 raise error
1377
1378
1379 @read_session
1380 def get_active_requests(active_status=None, session=None):
1381 if active_status and not isinstance(active_status, (list, tuple)):
1382 active_status = [active_status]
1383 if active_status and len(active_status) == 1:
1384 active_status = [active_status[0], active_status[0]]
1385
1386 try:
1387 query = session.query(models.Request.request_id, models.Request.status, models.Request.site)
1388 if active_status:
1389 query = query.filter(models.Request.status.in_(active_status))
1390 tmp = query.all()
1391 return tmp
1392 except Exception as error:
1393 raise error
1394
1395
1396 @read_session
1397 def get_min_request_id(difference=1000, session=None):
1398 try:
1399 seq = models.get_request_sequence()
1400 row = session.query(seq.next_value()).one()
1401 if row:
1402 max_request_id = row[0]
1403 return max_request_id - difference
1404 else:
1405 return 0
1406 except Exception:
1407 try:
1408 query = session.query(func.max(models.Request.request_id))
1409 row = query.one()
1410 if row:
1411 max_request_id = row[0]
1412 return max_request_id - difference
1413 else:
1414 return 0
1415 except Exception as error:
1416 raise error