Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:17

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2024
0010 
0011 
0012 import datetime
0013 import logging
0014 
0015 from idds.common.constants import (ProcessingStatus, CollectionStatus, ContentStatus,
0016                                    MessageType, MessageStatus, MessageSource, MessageDestination,
0017                                    ContentType, ContentRelationType)
0018 from idds.common.utils import setup_logging
0019 from idds.core import (catalog as core_catalog, messages as core_messages)
0020 from idds.agents.common.cache.redis import get_redis_cache
0021 
0022 setup_logging(__name__)
0023 
0024 
0025 def get_logger(logger=None):
0026     if logger:
0027         return logger
0028     logger = logging.getLogger(__name__)
0029     return logger
0030 
0031 
0032 def is_process_terminated(processing_status):
0033     if processing_status in [ProcessingStatus.Finished, ProcessingStatus.Failed,
0034                              ProcessingStatus.SubFinished, ProcessingStatus.Cancelled,
0035                              ProcessingStatus.Suspended, ProcessingStatus.Expired,
0036                              ProcessingStatus.Broken, ProcessingStatus.FinishedOnStep,
0037                              ProcessingStatus.FinishedOnExec, ProcessingStatus.FinishedTerm]:
0038         return True
0039     return False
0040 
0041 
0042 def handle_new_iprocessing(processing, agent_attributes, plugin=None, func_site_to_cloud=None, max_updates_per_round=2000, executors=None, logger=None, log_prefix=''):
0043     logger = get_logger(logger)
0044 
0045     work = processing['processing_metadata']['work']
0046     # transform_id = processing['transform_id']
0047 
0048     try:
0049         workload_id, errors = plugin.submit(work, logger=logger, log_prefix=log_prefix)
0050         logger.info(log_prefix + "submit work (workload_id: %s, errors: %s)" % (workload_id, errors))
0051     except Exception as ex:
0052         err_msg = "submit work failed with exception: %s" % (ex)
0053         logger.error(log_prefix + err_msg)
0054         raise Exception(err_msg)
0055 
0056     processing['workload_id'] = workload_id
0057     processing['submitted_at'] = datetime.datetime.utcnow()
0058 
0059     # return True, processing, update_collections, new_contents, new_input_dependency_contents, ret_msgs, errors
0060     return True, processing, [], [], [], [], errors
0061 
0062 
0063 def handle_update_iprocessing(processing, agent_attributes, plugin=None, max_updates_per_round=2000, use_bulk_update_mappings=True, executors=None, logger=None, log_prefix=''):
0064     logger = get_logger(logger)
0065 
0066     # work = processing['processing_metadata']['work']
0067 
0068     # request_id = processing['request_id']
0069     # transform_id = processing['transform_id']
0070     workload_id = processing['workload_id']
0071 
0072     try:
0073         if workload_id is None:
0074             logger.info(log_prefix + f"poll work (workload_id: {workload_id}) workload_id is None, fail the task")
0075             status = ProcessingStatus.Failed
0076         else:
0077             status = plugin.poll(workload_id, logger=logger, log_prefix=log_prefix)
0078             logger.info(log_prefix + "poll work (status: %s, workload_id: %s)" % (status, workload_id))
0079     except Exception as ex:
0080         err_msg = "poll work failed with exception: %s" % (ex)
0081         logger.error(log_prefix + err_msg)
0082         raise Exception(err_msg)
0083 
0084     return status, [], [], [], [], [], [], []
0085 
0086 
0087 def handle_abort_iprocessing(processing, agent_attributes, plugin=None, logger=None, log_prefix=''):
0088     logger = get_logger(logger)
0089 
0090     workload_id = processing['workload_id']
0091 
0092     try:
0093         status = plugin.abort(workload_id, logger=logger, log_prefix=log_prefix)
0094         logger.info(log_prefix + "abort work (status: %s, workload_id: %s)" % (status, workload_id))
0095     except Exception as ex:
0096         err_msg = "abort work failed with exception: %s" % (ex)
0097         logger.error(log_prefix + err_msg)
0098         raise Exception(err_msg)
0099     return status, [], [], []
0100 
0101 
0102 def handle_resume_iprocessing(processing, agent_attributes, plugin=None, logger=None, log_prefix=''):
0103     logger = get_logger(logger)
0104 
0105     workload_id = processing['workload_id']
0106 
0107     try:
0108         status = plugin.resume(workload_id, logger=logger, log_prefix=log_prefix)
0109         logger.info(log_prefix + "resume work (status: %s, workload_id: %s)" % (status, workload_id))
0110     except Exception as ex:
0111         err_msg = "resume work failed with exception: %s" % (ex)
0112         logger.error(log_prefix + err_msg)
0113         raise Exception(err_msg)
0114     return status, [], []
0115 
0116 
0117 def sync_iprocessing(processing, agent_attributes, terminate=False, abort=False, logger=None, log_prefix=""):
0118     # logger = get_logger()
0119 
0120     # request_id = processing['request_id']
0121     # transform_id = processing['transform_id']
0122     # workload_id = processing['workload_id']
0123 
0124     # work = processing['processing_metadata']['work']
0125 
0126     u_colls = []
0127     if processing['substatus'] in [ProcessingStatus.Finished, ProcessingStatus.Failed, ProcessingStatus.SubFinished, ProcessingStatus.Broken]:
0128         collections = core_catalog.get_collections(transform_id=processing['transform_id'])
0129         if collections:
0130             for coll in collections:
0131                 u_coll = {'coll_id': coll['coll_id'],
0132                           'status': CollectionStatus.Closed}
0133                 u_colls.append(u_coll)
0134 
0135     processing['status'] = processing['substatus']
0136 
0137     return processing, u_colls, None
0138 
0139 
0140 def get_request_id_transform_id_collection_id_map(request_id, transform_id):
0141     cache = get_redis_cache()
0142     coll_tf_id_map_key = "req_id_trf_id_coll_id_map"
0143     coll_tf_id_map = cache.get(coll_tf_id_map_key, default={})
0144 
0145     if request_id is not None and transform_id is not None:
0146         if request_id not in coll_tf_id_map or transform_id not in coll_tf_id_map[request_id]:
0147             colls = core_catalog.get_collections_by_request_ids([request_id])
0148             for coll in colls:
0149                 if coll['request_id'] not in coll_tf_id_map:
0150                     coll_tf_id_map[coll['request_id']] = {}
0151                 if coll['transform_id'] not in coll_tf_id_map[coll['request_id']]:
0152                     coll_tf_id_map[coll['request_id']][coll['transform_id']] = {}
0153                 if coll['relation_type'].value not in coll_tf_id_map[coll['request_id']][coll['transform_id']]:
0154                     coll_tf_id_map[coll['request_id']][coll['transform_id']][coll['relation_type'].value] = []
0155                 if coll['coll_id'] not in coll_tf_id_map[coll['request_id']][coll['transform_id']][coll['relation_type'].value]:
0156                     coll_tf_id_map[coll['request_id']][coll['transform_id']][coll['relation_type'].value].append[coll['coll_id']]
0157 
0158             cache.set(coll_tf_id_map_key, coll_tf_id_map)
0159 
0160         return coll_tf_id_map[request_id][transform_id]
0161     return None
0162 
0163 
0164 def get_new_asyncresult_content(request_id, transform_id, name, path, workload_id=0, coll_id=0, map_id=0, scope='asyncresult',
0165                                 status=ContentStatus.Available, content_relation_type=ContentRelationType.Output):
0166     content = {'transform_id': transform_id,
0167                'coll_id': coll_id,
0168                'request_id': request_id,
0169                'workload_id': workload_id,
0170                'map_id': map_id,
0171                'scope': scope,
0172                'name': name,
0173                'min_id': 0,
0174                'max_id': 0,
0175                'status': status,
0176                'substatus': status,
0177                'path': path,
0178                'content_type': ContentType.PseudoContent,
0179                'content_relation_type': content_relation_type,
0180                'bytes': 0}
0181     return content
0182 
0183 
0184 def handle_messages_asyncresult(messages, logger=None, log_prefix='', update_processing_interval=300):
0185     logger = get_logger(logger)
0186     if not log_prefix:
0187         log_prefix = "<Message_AsyncResult>"
0188 
0189     req_msgs = {}
0190 
0191     for item in messages:
0192         if 'from_idds' in item:
0193             if type(item['from_idds']) in [bool] and item['from_idds'] or type(item['from_idds']) in [str] and item['from_idds'].lower() == 'true':
0194                 continue
0195 
0196         msg = item['msg']
0197 
0198         # ret = msg['ret']
0199         # key = msg['key']
0200         # internal_id = msg['internal_id']
0201         # msg_type = msg['type']
0202         request_id = msg['body']['request_id']
0203         transform_id = msg['body'].get('transform_id', 0)
0204         internal_id = msg['body'].get('internal_id', None)
0205         # if msg_type in ['iworkflow']:
0206 
0207         if request_id not in req_msgs:
0208             req_msgs[request_id] = {}
0209         if transform_id not in req_msgs[request_id]:
0210             req_msgs[request_id][transform_id] = {}
0211         if internal_id not in req_msgs[request_id][transform_id]:
0212             req_msgs[request_id][transform_id][internal_id] = []
0213 
0214         msgs = [msg]
0215         core_messages.add_message(msg_type=MessageType.AsyncResult,
0216                                   status=MessageStatus.NoNeedDelivery,
0217                                   destination=MessageDestination.AsyncResult,
0218                                   source=MessageSource.OutSide,
0219                                   request_id=request_id,
0220                                   workload_id=None,
0221                                   transform_id=transform_id,
0222                                   internal_id=internal_id,
0223                                   num_contents=len(msgs),
0224                                   msg_content=msgs)
0225 
0226         logger.debug(f"{log_prefix} handle_messages_asyncresult, add {len(msgs)} for request_id {request_id} transform_id {transform_id} internal_id {internal_id}")