File indexing completed on 2026-04-09 07:58:19
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 operations related to Requests.
0014 """
0015
0016 import copy
0017
0018 from idds.common.constants import (RequestStatus, RequestLocking, WorkStatus,
0019 CollectionType, CollectionStatus, CollectionRelationType,
0020 MessageStatus, MetaStatus, CommandType)
0021 from idds.orm.base.session import read_session, transactional_session
0022 from idds.orm import requests_group as orm_requests_group
0023 from idds.orm import requests as orm_requests
0024 from idds.orm import transforms as orm_transforms
0025 from idds.orm import workprogress as orm_workprogresses
0026 from idds.orm import collections as orm_collections
0027 from idds.orm import conditions as orm_conditions
0028 from idds.orm import messages as orm_messages
0029 from idds.orm import meta as orm_meta
0030 from idds.core import messages as core_messages
0031
0032
0033 def create_request(scope=None, name=None, requester=None, request_type=None,
0034 username=None, userdn=None, transform_tag=None,
0035 status=RequestStatus.New, locking=RequestLocking.Idle, priority=0,
0036 lifetime=None, workload_id=None, request_metadata=None,
0037 new_poll_period=1, update_poll_period=10, site=None,
0038 cloud=None, queue=None, command=CommandType.NoneCommand,
0039 new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0,
0040 campaign=None, campaign_group=None, campaign_tag=None,
0041 max_processing_requests=-1, additional_data_storage=None,
0042 processing_metadata=None):
0043 """
0044 Add a request.
0045
0046 :param scope: The scope of the request data.
0047 :param name: The name of the request data.
0048 :param requestr: The requester, such as panda, user and so on.
0049 :param request_type: The type of the request, such as ESS, DAOD.
0050 :param transform_tag: Transform tag, such as ATLAS AMI tag.
0051 :param status: The request status as integer.
0052 :param locking: The request locking as integer.
0053 :param priority: The priority as integer.
0054 :param lifetime: The life time as umber of days.
0055 :param workload_id: The external workload id.
0056 :param request_metadata: The metadata as json.
0057 :param processing_metadata: The metadata as json.
0058
0059 :returns: request id.
0060 """
0061 if workload_id is None and request_metadata and 'workload_id' in request_metadata:
0062 workload_id = int(request_metadata['workload_id'])
0063
0064
0065 kwargs = {'scope': scope, 'name': name, 'requester': requester, 'request_type': request_type,
0066 'username': username, 'userdn': userdn, 'command': command,
0067 'transform_tag': transform_tag, 'status': status, 'locking': locking,
0068 'priority': priority, 'lifetime': lifetime, 'workload_id': workload_id,
0069 'new_poll_period': new_poll_period, 'update_poll_period': update_poll_period,
0070 'new_retries': new_retries, 'update_retries': update_retries,
0071 'max_new_retries': max_new_retries, 'max_update_retries': max_update_retries,
0072 'site': site, 'campaign': campaign, 'campaign_group': campaign_group,
0073 'cloud': cloud, 'queue': queue, 'max_processing_requests': max_processing_requests,
0074 'campaign_tag': campaign_tag, 'additional_data_storage': additional_data_storage,
0075 'request_metadata': request_metadata, 'processing_metadata': processing_metadata}
0076 return orm_requests.create_request(**kwargs)
0077
0078
0079 @transactional_session
0080 def add_request(scope=None, name=None, requester=None, request_type=None,
0081 username=None, userdn=None, transform_tag=None,
0082 status=RequestStatus.New, locking=RequestLocking.Idle, priority=0,
0083 lifetime=None, workload_id=None, request_metadata=None,
0084 new_poll_period=1, update_poll_period=10, site=None,
0085 cloud=None, queue=None, command=CommandType.NoneCommand,
0086 new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0,
0087 campaign=None, campaign_scope=None, campaign_group=None, campaign_tag=None,
0088 additional_data_storage=None, max_processing_requests=-1,
0089 processing_metadata=None, session=None):
0090 """
0091 Add a request.
0092
0093 :param scope: The scope of the request data.
0094 :param name: The name of the request data.
0095 :param requestr: The requester, such as panda, user and so on.
0096 :param request_type: The type of the request, such as ESS, DAOD.
0097 :param transform_tag: Transform tag, such as ATLAS AMI tag.
0098 :param status: The request status as integer.
0099 :param locking: The request locking as integer.
0100 :param priority: The priority as integer.
0101 :param lifetime: The life time as umber of days.
0102 :param workload_id: The external workload id.
0103 :param request_metadata: The metadata as json.
0104 :param processing_metadata: The metadata as json.
0105
0106 :returns: request id.
0107 """
0108 if workload_id is None and request_metadata and 'workload_id' in request_metadata and request_metadata['workload_id']:
0109 workload_id = int(request_metadata['workload_id'])
0110
0111 if not site:
0112 try:
0113 if request_metadata and 'workflow' in request_metadata and request_metadata['workflow']:
0114 w = request_metadata['workflow']
0115 site = w.get_site()
0116 except Exception:
0117 pass
0118
0119 group_id = None
0120 if campaign and campaign_scope and campaign_group and campaign_tag:
0121 req_groups = orm_requests_group.get_request_groups(campaign=campaign, campaign_scope=campaign_scope,
0122 campaign_group=campaign_group, campaign_tag=campaign_tag,
0123 session=session)
0124 if req_groups:
0125 if len(req_groups) == 1:
0126 group_id = req_groups[0]["group_id"]
0127 else:
0128 group_id = orm_requests_group.add_request_group(campaign=campaign, campaign_scope=campaign_scope,
0129 campaign_group=campaign_group, campaign_tag=campaign_tag,
0130 requester=requester, username=username, userdn=userdn,
0131 lifetime=lifetime, max_new_retries=max_new_retries,
0132 max_update_retries=max_update_retries,
0133 max_processing_requests=max_processing_requests,
0134 session=session)
0135
0136 kwargs = {'scope': scope, 'name': name, 'requester': requester, 'request_type': request_type,
0137 'username': username, 'userdn': userdn, 'site': site,
0138 'cloud': cloud, 'queue': queue, 'command': command,
0139 'transform_tag': transform_tag, 'status': status, 'locking': locking,
0140 'priority': priority, 'lifetime': lifetime, 'workload_id': workload_id,
0141 'new_poll_period': new_poll_period, 'update_poll_period': update_poll_period,
0142 'new_retries': new_retries, 'update_retries': update_retries,
0143 'group_id': group_id, 'campaign': campaign, 'campaign_scope': campaign_scope,
0144 'campaign_group': campaign_group, 'campaign_tag': campaign_tag,
0145 'max_new_retries': max_new_retries, 'max_update_retries': max_update_retries,
0146 'additional_data_storage': additional_data_storage,
0147 'request_metadata': request_metadata, 'processing_metadata': processing_metadata,
0148 'session': session}
0149 return orm_requests.add_request(**kwargs)
0150
0151
0152 @read_session
0153 def get_request(request_id, to_json=False, session=None):
0154 """
0155 Get a request or raise a NoObject exception.
0156
0157 :param request_id: The id of the request.
0158 :param to_json: return json format.
0159
0160 :param session: The database session in use.
0161
0162 :raises NoObject: If no request is founded.
0163
0164 :returns: Request.
0165 """
0166 return orm_requests.get_request(request_id=request_id, to_json=to_json, session=session)
0167
0168
0169 @read_session
0170 def get_request_ids_by_workload_id(workload_id, session=None):
0171 """
0172 Get request id or raise a NoObject exception.
0173
0174 :param workload_id: The workload_id of the request.
0175 :param session: The database session in use.
0176
0177 :raises NoObject: If no request is founded.
0178
0179 :returns: Request ids.
0180 """
0181 return orm_requests.get_request_ids_by_workload_id(workload_id, session=session)
0182
0183
0184 @read_session
0185 def get_request_ids_by_name(name, scope=None, exact_match=False, session=None):
0186 """
0187 Get request ids or raise a NoObject exception.
0188
0189 :param name: name of the request.
0190 :param session: The database session in use.
0191
0192 :raises NoObject: If no request is founded.
0193
0194 :returns: Request {name:id} dict.
0195 """
0196 return orm_requests.get_request_ids_by_name(name, scope=scope, exact_match=exact_match, session=session)
0197
0198
0199 @transactional_session
0200 def get_request_by_id_status(request_id, status=None, locking=False, session=None):
0201 req = orm_requests.get_request_by_id_status(request_id=request_id, status=status, locking=locking, session=session)
0202 return req
0203
0204
0205 @read_session
0206 def get_requests(request_id=None, workload_id=None, with_detail=False,
0207 with_request=False, with_transform=False, with_processing=False,
0208 with_metadata=False, to_json=False, session=None):
0209 """
0210 Get a request or raise a NoObject exception.
0211
0212 :param request_id: The id of the request.
0213 :param workload_id: The workload_id of the request.
0214 :param to_json: return json format.
0215
0216 :raises NoObject: If no request is founded.
0217
0218 :returns: Request.
0219 """
0220 return orm_requests.get_requests(request_id=request_id, workload_id=workload_id,
0221 with_detail=with_detail, with_metadata=with_metadata,
0222 with_request=with_request, with_transform=with_transform,
0223 with_processing=with_processing,
0224 to_json=to_json, session=session)
0225
0226
0227 @transactional_session
0228 def extend_requests(request_id=None, workload_id=None, lifetime=30, session=None):
0229 """
0230 extend an request's lifetime.
0231
0232 :param request_id: The id of the request.
0233 :param workload_id: The workload_id of the request.
0234 :param lifetime: The life time as umber of days.
0235 """
0236 return orm_requests.extend_request(request_id=request_id, workload_id=workload_id, lifetime=lifetime,
0237 session=session)
0238
0239
0240 @transactional_session
0241 def cancel_requests(request_id=None, workload_id=None, session=None):
0242 """
0243 cancel an request.
0244
0245 :param request_id: The id of the request.
0246 :param workload_id: The workload_id of the request.
0247 """
0248 return orm_requests.cancel_request(request_id=request_id, workload_id=workload_id, session=session)
0249
0250
0251 @transactional_session
0252 def update_request(request_id, parameters, update_request_metadata=False, session=None):
0253 """
0254 update an request.
0255
0256 :param request_id: the request id.
0257 :param parameters: A dictionary of parameters.
0258 """
0259 return orm_requests.update_request(request_id, parameters, update_request_metadata=update_request_metadata, session=session)
0260
0261
0262 def generate_collection(transform, collection, relation_type=CollectionRelationType.Input):
0263 coll_metadata = collection.coll_metadata
0264
0265 if 'coll_type' in coll_metadata:
0266 coll_type = coll_metadata['coll_type']
0267 else:
0268 coll_type = CollectionType.Dataset
0269
0270 if collection.status is None:
0271 collection.status = CollectionStatus.Open
0272
0273 coll = {'transform_id': transform['transform_id'],
0274 'request_id': transform['request_id'],
0275 'workload_id': transform['workload_id'],
0276 'coll_type': coll_type,
0277 'scope': collection.scope,
0278 'name': collection.name[:254],
0279 'relation_type': relation_type,
0280 'bytes': coll_metadata['bytes'] if 'bytes' in coll_metadata else 0,
0281 'total_files': coll_metadata['total_files'] if 'total_files' in coll_metadata else 0,
0282 'new_files': coll_metadata['new_files'] if 'new_files' in coll_metadata else 0,
0283 'processed_files': 0,
0284 'processing_files': 0,
0285 'coll_metadata': coll_metadata,
0286 'status': collection.status,
0287 'expired_at': transform['expired_at'],
0288 'collection': collection}
0289 return coll
0290
0291
0292 def generate_collections(transform):
0293 work = transform['transform_metadata']['work']
0294
0295 if not hasattr(work, 'get_input_collections'):
0296 return []
0297
0298 input_collections = work.get_input_collections()
0299 output_collections = work.get_output_collections()
0300 log_collections = work.get_log_collections()
0301
0302 input_colls, output_colls, log_colls = [], [], []
0303 for input_coll in input_collections:
0304 in_coll = generate_collection(transform, input_coll, relation_type=CollectionRelationType.Input)
0305 input_colls.append(in_coll)
0306 for output_coll in output_collections:
0307 out_coll = generate_collection(transform, output_coll, relation_type=CollectionRelationType.Output)
0308 output_colls.append(out_coll)
0309 for log_coll in log_collections:
0310 l_coll = generate_collection(transform, log_coll, relation_type=CollectionRelationType.Log)
0311 log_colls.append(l_coll)
0312 return input_colls + output_colls + log_colls
0313
0314
0315 @transactional_session
0316 def update_request_with_transforms(request_id, parameters,
0317 origin_status=None,
0318 new_transforms=None, update_transforms=None,
0319 new_messages=None, update_messages=None,
0320 new_conditions=None, session=None):
0321 """
0322 update an request.
0323
0324 :param request_id: the request id.
0325 :param parameters: A dictionary of parameters.
0326 :param new_transforms: list of transforms
0327 :param update_transforms: list of transforms
0328 """
0329 new_tf_ids, update_tf_ids = [], []
0330 if new_transforms:
0331 for tf in new_transforms:
0332
0333
0334
0335 workflow = tf['transform_metadata']['workflow']
0336 del tf['transform_metadata']['workflow']
0337
0338 work = tf['transform_metadata']['work']
0339 tf_copy = copy.deepcopy(tf)
0340 ret_tf = orm_transforms.get_transform_by_name(request_id=request_id, name=tf['name'], session=session)
0341 if ret_tf is None:
0342 tf_id = orm_transforms.add_transform(**tf_copy, session=session)
0343 else:
0344 tf_id = ret_tf['transform_id']
0345 tf['transform_id'] = tf_id
0346
0347
0348
0349
0350 if hasattr(work, 'set_work_id'):
0351 work.set_work_id(tf_id, transforming=True)
0352 if hasattr(work, 'set_status'):
0353 work.set_status(WorkStatus.New)
0354 if workflow is not None:
0355 if hasattr(workflow, 'refresh_works'):
0356 workflow.refresh_works()
0357
0358 collections = generate_collections(tf)
0359 for coll in collections:
0360 collection = coll['collection']
0361 del coll['collection']
0362 coll['transform_id'] = tf_id
0363 coll_id = orm_collections.add_collection(**coll, session=session)
0364
0365 collection.coll_id = coll_id
0366
0367
0368 if hasattr(work, 'refresh_works'):
0369 work.refresh_work()
0370 orm_transforms.update_transform(transform_id=tf_id,
0371 parameters={'transform_metadata': tf['transform_metadata']},
0372 session=session)
0373 new_tf_ids.append(tf_id)
0374 if update_transforms:
0375 for tr_id in update_transforms:
0376 orm_transforms.update_transform(transform_id=tr_id, parameters=update_transforms[tr_id], session=session)
0377 update_tf_ids.append(tf_id)
0378
0379 if new_messages:
0380 orm_messages.add_messages(new_messages, session=session)
0381 if update_messages:
0382 orm_messages.update_messages(update_messages, session=session)
0383
0384 if new_conditions:
0385 orm_conditions.add_conditions(new_conditions, session=session)
0386
0387 return orm_requests.update_request(request_id, parameters, origin_status=origin_status, session=session), new_tf_ids, update_tf_ids
0388
0389
0390 @transactional_session
0391 def update_request_with_workprogresses(request_id, parameters, new_workprogresses=None, update_workprogresses=None, session=None):
0392 """
0393 update an request.
0394
0395 :param request_id: the request id.
0396 :param parameters: A dictionary of parameters.
0397 :param new_workprogresses: list of new workprogresses.
0398 """
0399 if new_workprogresses:
0400 orm_workprogresses.add_workprogresses(new_workprogresses, session=session)
0401 if update_workprogresses:
0402 for workprogress_id in update_workprogresses:
0403 orm_workprogresses.update_workprogress(workprogress_id, update_workprogresses[workprogress_id], session=session)
0404 return orm_requests.update_request(request_id, parameters, session=session)
0405
0406
0407 @transactional_session
0408 def get_operation_request_msgs(locking=False, bulk_size=None, session=None):
0409 msgs = core_messages.retrieve_request_messages(request_id=None, bulk_size=bulk_size, session=session)
0410 if msgs:
0411
0412 to_updates = []
0413 for msg in msgs:
0414 to_updates.append({'msg_id': msg['msg_id'],
0415 'status': MessageStatus.Delivered})
0416 core_messages.update_messages(to_updates)
0417 return msgs
0418
0419
0420 @transactional_session
0421 def get_requests_by_status_type(status, request_type=None, time_period=None, locking=False, bulk_size=None, to_json=False,
0422 by_substatus=False, not_lock=False, next_poll_at=None, new_poll=False, update_poll=False,
0423 min_request_id=None, only_return_id=False, session=None):
0424 """
0425 Get requests by status and type
0426
0427 :param status: list of status of the request data.
0428 :param request_type: The type of the request data.
0429 :param time_period: Delay of seconds before last update.
0430 :param locking: Wheter to lock requests to avoid others get the same request.
0431 :param bulk_size: Size limitation per retrieve.
0432 :param to_json: return json format.
0433
0434 :returns: list of Request.
0435 """
0436 if min_request_id is None:
0437 min_request_id = get_min_request_id(session=session)
0438 if not min_request_id:
0439 min_request_id = 0
0440
0441 reqs = orm_requests.get_requests_by_status_type(status, request_type, time_period, locking=locking, locking_for_update=False,
0442 bulk_size=bulk_size, min_request_id=min_request_id, not_lock=not_lock,
0443 new_poll=new_poll, update_poll=update_poll, only_return_id=only_return_id,
0444 to_json=to_json, by_substatus=by_substatus, session=session)
0445
0446 return reqs
0447
0448
0449 @transactional_session
0450 def clean_locking(time_period=3600, min_request_id=None, health_items=[], force=False, hostname=None, pid=None, session=None):
0451 """
0452 Clearn locking which is older than time period.
0453
0454 :param time_period in seconds
0455 """
0456 orm_requests.clean_locking(time_period=time_period, min_request_id=min_request_id, health_items=health_items,
0457 force=force, hostname=hostname, pid=pid, session=session)
0458
0459
0460 @transactional_session
0461 def clean_next_poll_at(status, session=None):
0462 """
0463 Clearn next_poll_at.
0464
0465 :param status: status of the request
0466 """
0467 orm_requests.clean_next_poll_at(status=status, session=session)
0468
0469
0470 @read_session
0471 def get_last_request_id(status, older_than=None, session=None):
0472 """
0473 Get last request id which is older than a timestamp.
0474
0475 :param status: status of the request.
0476 :param older_than: days older than current timestamp.
0477
0478 :returns request_id
0479 """
0480 return orm_requests.get_last_request_id(status=status, older_than=older_than, session=session)
0481
0482
0483 @read_session
0484 def get_num_active_requests(active_status=None, session=None):
0485 return orm_requests.get_num_active_requests(active_status=active_status, session=session)
0486
0487
0488 @read_session
0489 def get_active_requests(active_status=None, session=None):
0490 return orm_requests.get_active_requests(active_status=active_status, session=session)
0491
0492
0493 @transactional_session
0494 def set_min_request_id(min_request_id, session=None):
0495 """
0496 Set min request id
0497
0498 :param min_request_id: Int of min_request_id.
0499 """
0500 orm_meta.add_meta_item(name='min_request_id', status=MetaStatus.Active, description="min request id",
0501 meta_info={"min_request_id": min_request_id}, session=None)
0502
0503
0504 @read_session
0505 def get_min_request_id(session=None):
0506 """
0507 Get min request id
0508
0509 :returns min_request_id: Int of min_request_id.
0510 """
0511 meta = orm_meta.get_meta_item(name='min_request_id', session=session)
0512 if not meta:
0513 return None
0514 else:
0515 return meta['meta_info'].get("min_request_id", None)