Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:19

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2019 - 2024
0010 
0011 
0012 """
0013 operations related to Requests.
0014 """
0015 
0016 import copy
0017 
0018 from idds.common.constants import (RequestStatus, RequestLocking, WorkStatus,
0019                                    CollectionType, CollectionStatus, CollectionRelationType,
0020                                    MessageStatus, MetaStatus, CommandType)
0021 from idds.orm.base.session import read_session, transactional_session
0022 from idds.orm import requests_group as orm_requests_group
0023 from idds.orm import requests as orm_requests
0024 from idds.orm import transforms as orm_transforms
0025 from idds.orm import workprogress as orm_workprogresses
0026 from idds.orm import collections as orm_collections
0027 from idds.orm import conditions as orm_conditions
0028 from idds.orm import messages as orm_messages
0029 from idds.orm import meta as orm_meta
0030 from idds.core import messages as core_messages
0031 
0032 
0033 def create_request(scope=None, name=None, requester=None, request_type=None,
0034                    username=None, userdn=None, transform_tag=None,
0035                    status=RequestStatus.New, locking=RequestLocking.Idle, priority=0,
0036                    lifetime=None, workload_id=None, request_metadata=None,
0037                    new_poll_period=1, update_poll_period=10, site=None,
0038                    cloud=None, queue=None, command=CommandType.NoneCommand,
0039                    new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0,
0040                    campaign=None, campaign_group=None, campaign_tag=None,
0041                    max_processing_requests=-1, additional_data_storage=None,
0042                    processing_metadata=None):
0043     """
0044     Add a request.
0045 
0046     :param scope: The scope of the request data.
0047     :param name: The name of the request data.
0048     :param requestr: The requester, such as panda, user and so on.
0049     :param request_type: The type of the request, such as ESS, DAOD.
0050     :param transform_tag: Transform tag, such as ATLAS AMI tag.
0051     :param status: The request status as integer.
0052     :param locking: The request locking as integer.
0053     :param priority: The priority as integer.
0054     :param lifetime: The life time as umber of days.
0055     :param workload_id: The external workload id.
0056     :param request_metadata: The metadata as json.
0057     :param processing_metadata: The metadata as json.
0058 
0059     :returns: request id.
0060     """
0061     if workload_id is None and request_metadata and 'workload_id' in request_metadata:
0062         workload_id = int(request_metadata['workload_id'])
0063 
0064     # request_metadata = convert_request_metadata_to_workflow(scope, name, workload_id, request_type, request_metadata)
0065     kwargs = {'scope': scope, 'name': name, 'requester': requester, 'request_type': request_type,
0066               'username': username, 'userdn': userdn, 'command': command,
0067               'transform_tag': transform_tag, 'status': status, 'locking': locking,
0068               'priority': priority, 'lifetime': lifetime, 'workload_id': workload_id,
0069               'new_poll_period': new_poll_period, 'update_poll_period': update_poll_period,
0070               'new_retries': new_retries, 'update_retries': update_retries,
0071               'max_new_retries': max_new_retries, 'max_update_retries': max_update_retries,
0072               'site': site, 'campaign': campaign, 'campaign_group': campaign_group,
0073               'cloud': cloud, 'queue': queue, 'max_processing_requests': max_processing_requests,
0074               'campaign_tag': campaign_tag, 'additional_data_storage': additional_data_storage,
0075               'request_metadata': request_metadata, 'processing_metadata': processing_metadata}
0076     return orm_requests.create_request(**kwargs)
0077 
0078 
0079 @transactional_session
0080 def add_request(scope=None, name=None, requester=None, request_type=None,
0081                 username=None, userdn=None, transform_tag=None,
0082                 status=RequestStatus.New, locking=RequestLocking.Idle, priority=0,
0083                 lifetime=None, workload_id=None, request_metadata=None,
0084                 new_poll_period=1, update_poll_period=10, site=None,
0085                 cloud=None, queue=None, command=CommandType.NoneCommand,
0086                 new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0,
0087                 campaign=None, campaign_scope=None, campaign_group=None, campaign_tag=None,
0088                 additional_data_storage=None, max_processing_requests=-1,
0089                 processing_metadata=None, session=None):
0090     """
0091     Add a request.
0092 
0093     :param scope: The scope of the request data.
0094     :param name: The name of the request data.
0095     :param requestr: The requester, such as panda, user and so on.
0096     :param request_type: The type of the request, such as ESS, DAOD.
0097     :param transform_tag: Transform tag, such as ATLAS AMI tag.
0098     :param status: The request status as integer.
0099     :param locking: The request locking as integer.
0100     :param priority: The priority as integer.
0101     :param lifetime: The life time as umber of days.
0102     :param workload_id: The external workload id.
0103     :param request_metadata: The metadata as json.
0104     :param processing_metadata: The metadata as json.
0105 
0106     :returns: request id.
0107     """
0108     if workload_id is None and request_metadata and 'workload_id' in request_metadata and request_metadata['workload_id']:
0109         workload_id = int(request_metadata['workload_id'])
0110     # request_metadata = convert_request_metadata_to_workflow(scope, name, workload_id, request_type, request_metadata)
0111     if not site:
0112         try:
0113             if request_metadata and 'workflow' in request_metadata and request_metadata['workflow']:
0114                 w = request_metadata['workflow']
0115                 site = w.get_site()
0116         except Exception:
0117             pass
0118 
0119     group_id = None
0120     if campaign and campaign_scope and campaign_group and campaign_tag:
0121         req_groups = orm_requests_group.get_request_groups(campaign=campaign, campaign_scope=campaign_scope,
0122                                                            campaign_group=campaign_group, campaign_tag=campaign_tag,
0123                                                            session=session)
0124         if req_groups:
0125             if len(req_groups) == 1:
0126                 group_id = req_groups[0]["group_id"]
0127         else:
0128             group_id = orm_requests_group.add_request_group(campaign=campaign, campaign_scope=campaign_scope,
0129                                                             campaign_group=campaign_group, campaign_tag=campaign_tag,
0130                                                             requester=requester, username=username, userdn=userdn,
0131                                                             lifetime=lifetime, max_new_retries=max_new_retries,
0132                                                             max_update_retries=max_update_retries,
0133                                                             max_processing_requests=max_processing_requests,
0134                                                             session=session)
0135 
0136     kwargs = {'scope': scope, 'name': name, 'requester': requester, 'request_type': request_type,
0137               'username': username, 'userdn': userdn, 'site': site,
0138               'cloud': cloud, 'queue': queue, 'command': command,
0139               'transform_tag': transform_tag, 'status': status, 'locking': locking,
0140               'priority': priority, 'lifetime': lifetime, 'workload_id': workload_id,
0141               'new_poll_period': new_poll_period, 'update_poll_period': update_poll_period,
0142               'new_retries': new_retries, 'update_retries': update_retries,
0143               'group_id': group_id, 'campaign': campaign, 'campaign_scope': campaign_scope,
0144               'campaign_group': campaign_group, 'campaign_tag': campaign_tag,
0145               'max_new_retries': max_new_retries, 'max_update_retries': max_update_retries,
0146               'additional_data_storage': additional_data_storage,
0147               'request_metadata': request_metadata, 'processing_metadata': processing_metadata,
0148               'session': session}
0149     return orm_requests.add_request(**kwargs)
0150 
0151 
0152 @read_session
0153 def get_request(request_id, to_json=False, session=None):
0154     """
0155     Get a request or raise a NoObject exception.
0156 
0157     :param request_id: The id of the request.
0158     :param to_json: return json format.
0159 
0160     :param session: The database session in use.
0161 
0162     :raises NoObject: If no request is founded.
0163 
0164     :returns: Request.
0165     """
0166     return orm_requests.get_request(request_id=request_id, to_json=to_json, session=session)
0167 
0168 
0169 @read_session
0170 def get_request_ids_by_workload_id(workload_id, session=None):
0171     """
0172     Get request id or raise a NoObject exception.
0173 
0174     :param workload_id: The workload_id of the request.
0175     :param session: The database session in use.
0176 
0177     :raises NoObject: If no request is founded.
0178 
0179     :returns: Request ids.
0180     """
0181     return orm_requests.get_request_ids_by_workload_id(workload_id, session=session)
0182 
0183 
0184 @read_session
0185 def get_request_ids_by_name(name, scope=None, exact_match=False, session=None):
0186     """
0187     Get request ids or raise a NoObject exception.
0188 
0189     :param name: name of the request.
0190     :param session: The database session in use.
0191 
0192     :raises NoObject: If no request is founded.
0193 
0194     :returns: Request {name:id} dict.
0195     """
0196     return orm_requests.get_request_ids_by_name(name, scope=scope, exact_match=exact_match, session=session)
0197 
0198 
0199 @transactional_session
0200 def get_request_by_id_status(request_id, status=None, locking=False, session=None):
0201     req = orm_requests.get_request_by_id_status(request_id=request_id, status=status, locking=locking, session=session)
0202     return req
0203 
0204 
0205 @read_session
0206 def get_requests(request_id=None, workload_id=None, with_detail=False,
0207                  with_request=False, with_transform=False, with_processing=False,
0208                  with_metadata=False, to_json=False, session=None):
0209     """
0210     Get a request or raise a NoObject exception.
0211 
0212     :param request_id: The id of the request.
0213     :param workload_id: The workload_id of the request.
0214     :param to_json: return json format.
0215 
0216     :raises NoObject: If no request is founded.
0217 
0218     :returns: Request.
0219     """
0220     return orm_requests.get_requests(request_id=request_id, workload_id=workload_id,
0221                                      with_detail=with_detail, with_metadata=with_metadata,
0222                                      with_request=with_request, with_transform=with_transform,
0223                                      with_processing=with_processing,
0224                                      to_json=to_json, session=session)
0225 
0226 
0227 @transactional_session
0228 def extend_requests(request_id=None, workload_id=None, lifetime=30, session=None):
0229     """
0230     extend an request's lifetime.
0231 
0232     :param request_id: The id of the request.
0233     :param workload_id: The workload_id of the request.
0234     :param lifetime: The life time as umber of days.
0235     """
0236     return orm_requests.extend_request(request_id=request_id, workload_id=workload_id, lifetime=lifetime,
0237                                        session=session)
0238 
0239 
0240 @transactional_session
0241 def cancel_requests(request_id=None, workload_id=None, session=None):
0242     """
0243     cancel an request.
0244 
0245     :param request_id: The id of the request.
0246     :param workload_id: The workload_id of the request.
0247     """
0248     return orm_requests.cancel_request(request_id=request_id, workload_id=workload_id, session=session)
0249 
0250 
0251 @transactional_session
0252 def update_request(request_id, parameters, update_request_metadata=False, session=None):
0253     """
0254     update an request.
0255 
0256     :param request_id: the request id.
0257     :param parameters: A dictionary of parameters.
0258     """
0259     return orm_requests.update_request(request_id, parameters, update_request_metadata=update_request_metadata, session=session)
0260 
0261 
0262 def generate_collection(transform, collection, relation_type=CollectionRelationType.Input):
0263     coll_metadata = collection.coll_metadata
0264 
0265     if 'coll_type' in coll_metadata:
0266         coll_type = coll_metadata['coll_type']
0267     else:
0268         coll_type = CollectionType.Dataset
0269 
0270     if collection.status is None:
0271         collection.status = CollectionStatus.Open
0272 
0273     coll = {'transform_id': transform['transform_id'],
0274             'request_id': transform['request_id'],
0275             'workload_id': transform['workload_id'],
0276             'coll_type': coll_type,
0277             'scope': collection.scope,
0278             'name': collection.name[:254],
0279             'relation_type': relation_type,
0280             'bytes': coll_metadata['bytes'] if 'bytes' in coll_metadata else 0,
0281             'total_files': coll_metadata['total_files'] if 'total_files' in coll_metadata else 0,
0282             'new_files': coll_metadata['new_files'] if 'new_files' in coll_metadata else 0,
0283             'processed_files': 0,
0284             'processing_files': 0,
0285             'coll_metadata': coll_metadata,
0286             'status': collection.status,
0287             'expired_at': transform['expired_at'],
0288             'collection': collection}
0289     return coll
0290 
0291 
0292 def generate_collections(transform):
0293     work = transform['transform_metadata']['work']
0294 
0295     if not hasattr(work, 'get_input_collections'):
0296         return []
0297 
0298     input_collections = work.get_input_collections()
0299     output_collections = work.get_output_collections()
0300     log_collections = work.get_log_collections()
0301 
0302     input_colls, output_colls, log_colls = [], [], []
0303     for input_coll in input_collections:
0304         in_coll = generate_collection(transform, input_coll, relation_type=CollectionRelationType.Input)
0305         input_colls.append(in_coll)
0306     for output_coll in output_collections:
0307         out_coll = generate_collection(transform, output_coll, relation_type=CollectionRelationType.Output)
0308         output_colls.append(out_coll)
0309     for log_coll in log_collections:
0310         l_coll = generate_collection(transform, log_coll, relation_type=CollectionRelationType.Log)
0311         log_colls.append(l_coll)
0312     return input_colls + output_colls + log_colls
0313 
0314 
0315 @transactional_session
0316 def update_request_with_transforms(request_id, parameters,
0317                                    origin_status=None,
0318                                    new_transforms=None, update_transforms=None,
0319                                    new_messages=None, update_messages=None,
0320                                    new_conditions=None, session=None):
0321     """
0322     update an request.
0323 
0324     :param request_id: the request id.
0325     :param parameters: A dictionary of parameters.
0326     :param new_transforms: list of transforms
0327     :param update_transforms: list of transforms
0328     """
0329     new_tf_ids, update_tf_ids = [], []
0330     if new_transforms:
0331         for tf in new_transforms:
0332             # tf_id = orm_transforms.add_transform(**tf, session=session)
0333             # original_work = tf['transform_metadata']['original_work']
0334             # del tf['transform_metadata']['original_work']
0335             workflow = tf['transform_metadata']['workflow']
0336             del tf['transform_metadata']['workflow']
0337 
0338             work = tf['transform_metadata']['work']
0339             tf_copy = copy.deepcopy(tf)
0340             ret_tf = orm_transforms.get_transform_by_name(request_id=request_id, name=tf['name'], session=session)
0341             if ret_tf is None:
0342                 tf_id = orm_transforms.add_transform(**tf_copy, session=session)
0343             else:
0344                 tf_id = ret_tf['transform_id']
0345             tf['transform_id'] = tf_id
0346 
0347             # work = tf['transform_metadata']['work']
0348             # original_work.set_work_id(tf_id, transforming=True)
0349             # original_work.set_status(WorkStatus.New)
0350             if hasattr(work, 'set_work_id'):
0351                 work.set_work_id(tf_id, transforming=True)
0352             if hasattr(work, 'set_status'):
0353                 work.set_status(WorkStatus.New)
0354             if workflow is not None:
0355                 if hasattr(workflow, 'refresh_works'):
0356                     workflow.refresh_works()
0357 
0358             collections = generate_collections(tf)
0359             for coll in collections:
0360                 collection = coll['collection']
0361                 del coll['collection']
0362                 coll['transform_id'] = tf_id
0363                 coll_id = orm_collections.add_collection(**coll, session=session)
0364                 # work.set_collection_id(coll, coll_id)
0365                 collection.coll_id = coll_id
0366 
0367             # update transform to record the coll_id
0368             if hasattr(work, 'refresh_works'):
0369                 work.refresh_work()
0370             orm_transforms.update_transform(transform_id=tf_id,
0371                                             parameters={'transform_metadata': tf['transform_metadata']},
0372                                             session=session)
0373             new_tf_ids.append(tf_id)
0374     if update_transforms:
0375         for tr_id in update_transforms:
0376             orm_transforms.update_transform(transform_id=tr_id, parameters=update_transforms[tr_id], session=session)
0377             update_tf_ids.append(tf_id)
0378 
0379     if new_messages:
0380         orm_messages.add_messages(new_messages, session=session)
0381     if update_messages:
0382         orm_messages.update_messages(update_messages, session=session)
0383 
0384     if new_conditions:
0385         orm_conditions.add_conditions(new_conditions, session=session)
0386 
0387     return orm_requests.update_request(request_id, parameters, origin_status=origin_status, session=session), new_tf_ids, update_tf_ids
0388 
0389 
0390 @transactional_session
0391 def update_request_with_workprogresses(request_id, parameters, new_workprogresses=None, update_workprogresses=None, session=None):
0392     """
0393     update an request.
0394 
0395     :param request_id: the request id.
0396     :param parameters: A dictionary of parameters.
0397     :param new_workprogresses: list of new workprogresses.
0398     """
0399     if new_workprogresses:
0400         orm_workprogresses.add_workprogresses(new_workprogresses, session=session)
0401     if update_workprogresses:
0402         for workprogress_id in update_workprogresses:
0403             orm_workprogresses.update_workprogress(workprogress_id, update_workprogresses[workprogress_id], session=session)
0404     return orm_requests.update_request(request_id, parameters, session=session)
0405 
0406 
0407 @transactional_session
0408 def get_operation_request_msgs(locking=False, bulk_size=None, session=None):
0409     msgs = core_messages.retrieve_request_messages(request_id=None, bulk_size=bulk_size, session=session)
0410     if msgs:
0411         # req_ids = [msg['request_id'] for msg in msgs]
0412         to_updates = []
0413         for msg in msgs:
0414             to_updates.append({'msg_id': msg['msg_id'],
0415                                'status': MessageStatus.Delivered})
0416         core_messages.update_messages(to_updates)
0417     return msgs
0418 
0419 
0420 @transactional_session
0421 def get_requests_by_status_type(status, request_type=None, time_period=None, locking=False, bulk_size=None, to_json=False,
0422                                 by_substatus=False, not_lock=False, next_poll_at=None, new_poll=False, update_poll=False,
0423                                 min_request_id=None, only_return_id=False, session=None):
0424     """
0425     Get requests by status and type
0426 
0427     :param status: list of status of the request data.
0428     :param request_type: The type of the request data.
0429     :param time_period: Delay of seconds before last update.
0430     :param locking: Wheter to lock requests to avoid others get the same request.
0431     :param bulk_size: Size limitation per retrieve.
0432     :param to_json: return json format.
0433 
0434     :returns: list of Request.
0435     """
0436     if min_request_id is None:
0437         min_request_id = get_min_request_id(session=session)
0438         if not min_request_id:
0439             min_request_id = 0
0440 
0441     reqs = orm_requests.get_requests_by_status_type(status, request_type, time_period, locking=locking, locking_for_update=False,
0442                                                     bulk_size=bulk_size, min_request_id=min_request_id, not_lock=not_lock,
0443                                                     new_poll=new_poll, update_poll=update_poll, only_return_id=only_return_id,
0444                                                     to_json=to_json, by_substatus=by_substatus, session=session)
0445 
0446     return reqs
0447 
0448 
0449 @transactional_session
0450 def clean_locking(time_period=3600, min_request_id=None, health_items=[], force=False, hostname=None, pid=None, session=None):
0451     """
0452     Clearn locking which is older than time period.
0453 
0454     :param time_period in seconds
0455     """
0456     orm_requests.clean_locking(time_period=time_period, min_request_id=min_request_id, health_items=health_items,
0457                                force=force, hostname=hostname, pid=pid, session=session)
0458 
0459 
0460 @transactional_session
0461 def clean_next_poll_at(status, session=None):
0462     """
0463     Clearn next_poll_at.
0464 
0465     :param status: status of the request
0466     """
0467     orm_requests.clean_next_poll_at(status=status, session=session)
0468 
0469 
0470 @read_session
0471 def get_last_request_id(status, older_than=None, session=None):
0472     """
0473     Get last request id which is older than a timestamp.
0474 
0475     :param status: status of the request.
0476     :param older_than: days older than current timestamp.
0477 
0478     :returns request_id
0479     """
0480     return orm_requests.get_last_request_id(status=status, older_than=older_than, session=session)
0481 
0482 
0483 @read_session
0484 def get_num_active_requests(active_status=None, session=None):
0485     return orm_requests.get_num_active_requests(active_status=active_status, session=session)
0486 
0487 
0488 @read_session
0489 def get_active_requests(active_status=None, session=None):
0490     return orm_requests.get_active_requests(active_status=active_status, session=session)
0491 
0492 
0493 @transactional_session
0494 def set_min_request_id(min_request_id, session=None):
0495     """
0496     Set min request id
0497 
0498     :param min_request_id: Int of min_request_id.
0499     """
0500     orm_meta.add_meta_item(name='min_request_id', status=MetaStatus.Active, description="min request id",
0501                            meta_info={"min_request_id": min_request_id}, session=None)
0502 
0503 
0504 @read_session
0505 def get_min_request_id(session=None):
0506     """
0507     Get min request id
0508 
0509     :returns min_request_id: Int of min_request_id.
0510     """
0511     meta = orm_meta.get_meta_item(name='min_request_id', session=session)
0512     if not meta:
0513         return None
0514     else:
0515         return meta['meta_info'].get("min_request_id", None)