File indexing completed on 2026-04-09 07:58:17
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 import datetime
0013 import logging
0014
0015 from idds.common.constants import (ProcessingStatus, CollectionStatus, ContentStatus,
0016 MessageType, MessageStatus, MessageSource, MessageDestination,
0017 ContentType, ContentRelationType)
0018 from idds.common.utils import setup_logging
0019 from idds.core import (catalog as core_catalog, messages as core_messages)
0020 from idds.agents.common.cache.redis import get_redis_cache
0021
0022 setup_logging(__name__)
0023
0024
0025 def get_logger(logger=None):
0026 if logger:
0027 return logger
0028 logger = logging.getLogger(__name__)
0029 return logger
0030
0031
0032 def is_process_terminated(processing_status):
0033 if processing_status in [ProcessingStatus.Finished, ProcessingStatus.Failed,
0034 ProcessingStatus.SubFinished, ProcessingStatus.Cancelled,
0035 ProcessingStatus.Suspended, ProcessingStatus.Expired,
0036 ProcessingStatus.Broken, ProcessingStatus.FinishedOnStep,
0037 ProcessingStatus.FinishedOnExec, ProcessingStatus.FinishedTerm]:
0038 return True
0039 return False
0040
0041
0042 def handle_new_iprocessing(processing, agent_attributes, plugin=None, func_site_to_cloud=None, max_updates_per_round=2000, executors=None, logger=None, log_prefix=''):
0043 logger = get_logger(logger)
0044
0045 work = processing['processing_metadata']['work']
0046
0047
0048 try:
0049 workload_id, errors = plugin.submit(work, logger=logger, log_prefix=log_prefix)
0050 logger.info(log_prefix + "submit work (workload_id: %s, errors: %s)" % (workload_id, errors))
0051 except Exception as ex:
0052 err_msg = "submit work failed with exception: %s" % (ex)
0053 logger.error(log_prefix + err_msg)
0054 raise Exception(err_msg)
0055
0056 processing['workload_id'] = workload_id
0057 processing['submitted_at'] = datetime.datetime.utcnow()
0058
0059
0060 return True, processing, [], [], [], [], errors
0061
0062
0063 def handle_update_iprocessing(processing, agent_attributes, plugin=None, max_updates_per_round=2000, use_bulk_update_mappings=True, executors=None, logger=None, log_prefix=''):
0064 logger = get_logger(logger)
0065
0066
0067
0068
0069
0070 workload_id = processing['workload_id']
0071
0072 try:
0073 if workload_id is None:
0074 logger.info(log_prefix + f"poll work (workload_id: {workload_id}) workload_id is None, fail the task")
0075 status = ProcessingStatus.Failed
0076 else:
0077 status = plugin.poll(workload_id, logger=logger, log_prefix=log_prefix)
0078 logger.info(log_prefix + "poll work (status: %s, workload_id: %s)" % (status, workload_id))
0079 except Exception as ex:
0080 err_msg = "poll work failed with exception: %s" % (ex)
0081 logger.error(log_prefix + err_msg)
0082 raise Exception(err_msg)
0083
0084 return status, [], [], [], [], [], [], []
0085
0086
0087 def handle_abort_iprocessing(processing, agent_attributes, plugin=None, logger=None, log_prefix=''):
0088 logger = get_logger(logger)
0089
0090 workload_id = processing['workload_id']
0091
0092 try:
0093 status = plugin.abort(workload_id, logger=logger, log_prefix=log_prefix)
0094 logger.info(log_prefix + "abort work (status: %s, workload_id: %s)" % (status, workload_id))
0095 except Exception as ex:
0096 err_msg = "abort work failed with exception: %s" % (ex)
0097 logger.error(log_prefix + err_msg)
0098 raise Exception(err_msg)
0099 return status, [], [], []
0100
0101
0102 def handle_resume_iprocessing(processing, agent_attributes, plugin=None, logger=None, log_prefix=''):
0103 logger = get_logger(logger)
0104
0105 workload_id = processing['workload_id']
0106
0107 try:
0108 status = plugin.resume(workload_id, logger=logger, log_prefix=log_prefix)
0109 logger.info(log_prefix + "resume work (status: %s, workload_id: %s)" % (status, workload_id))
0110 except Exception as ex:
0111 err_msg = "resume work failed with exception: %s" % (ex)
0112 logger.error(log_prefix + err_msg)
0113 raise Exception(err_msg)
0114 return status, [], []
0115
0116
0117 def sync_iprocessing(processing, agent_attributes, terminate=False, abort=False, logger=None, log_prefix=""):
0118
0119
0120
0121
0122
0123
0124
0125
0126 u_colls = []
0127 if processing['substatus'] in [ProcessingStatus.Finished, ProcessingStatus.Failed, ProcessingStatus.SubFinished, ProcessingStatus.Broken]:
0128 collections = core_catalog.get_collections(transform_id=processing['transform_id'])
0129 if collections:
0130 for coll in collections:
0131 u_coll = {'coll_id': coll['coll_id'],
0132 'status': CollectionStatus.Closed}
0133 u_colls.append(u_coll)
0134
0135 processing['status'] = processing['substatus']
0136
0137 return processing, u_colls, None
0138
0139
0140 def get_request_id_transform_id_collection_id_map(request_id, transform_id):
0141 cache = get_redis_cache()
0142 coll_tf_id_map_key = "req_id_trf_id_coll_id_map"
0143 coll_tf_id_map = cache.get(coll_tf_id_map_key, default={})
0144
0145 if request_id is not None and transform_id is not None:
0146 if request_id not in coll_tf_id_map or transform_id not in coll_tf_id_map[request_id]:
0147 colls = core_catalog.get_collections_by_request_ids([request_id])
0148 for coll in colls:
0149 if coll['request_id'] not in coll_tf_id_map:
0150 coll_tf_id_map[coll['request_id']] = {}
0151 if coll['transform_id'] not in coll_tf_id_map[coll['request_id']]:
0152 coll_tf_id_map[coll['request_id']][coll['transform_id']] = {}
0153 if coll['relation_type'].value not in coll_tf_id_map[coll['request_id']][coll['transform_id']]:
0154 coll_tf_id_map[coll['request_id']][coll['transform_id']][coll['relation_type'].value] = []
0155 if coll['coll_id'] not in coll_tf_id_map[coll['request_id']][coll['transform_id']][coll['relation_type'].value]:
0156 coll_tf_id_map[coll['request_id']][coll['transform_id']][coll['relation_type'].value].append[coll['coll_id']]
0157
0158 cache.set(coll_tf_id_map_key, coll_tf_id_map)
0159
0160 return coll_tf_id_map[request_id][transform_id]
0161 return None
0162
0163
0164 def get_new_asyncresult_content(request_id, transform_id, name, path, workload_id=0, coll_id=0, map_id=0, scope='asyncresult',
0165 status=ContentStatus.Available, content_relation_type=ContentRelationType.Output):
0166 content = {'transform_id': transform_id,
0167 'coll_id': coll_id,
0168 'request_id': request_id,
0169 'workload_id': workload_id,
0170 'map_id': map_id,
0171 'scope': scope,
0172 'name': name,
0173 'min_id': 0,
0174 'max_id': 0,
0175 'status': status,
0176 'substatus': status,
0177 'path': path,
0178 'content_type': ContentType.PseudoContent,
0179 'content_relation_type': content_relation_type,
0180 'bytes': 0}
0181 return content
0182
0183
0184 def handle_messages_asyncresult(messages, logger=None, log_prefix='', update_processing_interval=300):
0185 logger = get_logger(logger)
0186 if not log_prefix:
0187 log_prefix = "<Message_AsyncResult>"
0188
0189 req_msgs = {}
0190
0191 for item in messages:
0192 if 'from_idds' in item:
0193 if type(item['from_idds']) in [bool] and item['from_idds'] or type(item['from_idds']) in [str] and item['from_idds'].lower() == 'true':
0194 continue
0195
0196 msg = item['msg']
0197
0198
0199
0200
0201
0202 request_id = msg['body']['request_id']
0203 transform_id = msg['body'].get('transform_id', 0)
0204 internal_id = msg['body'].get('internal_id', None)
0205
0206
0207 if request_id not in req_msgs:
0208 req_msgs[request_id] = {}
0209 if transform_id not in req_msgs[request_id]:
0210 req_msgs[request_id][transform_id] = {}
0211 if internal_id not in req_msgs[request_id][transform_id]:
0212 req_msgs[request_id][transform_id][internal_id] = []
0213
0214 msgs = [msg]
0215 core_messages.add_message(msg_type=MessageType.AsyncResult,
0216 status=MessageStatus.NoNeedDelivery,
0217 destination=MessageDestination.AsyncResult,
0218 source=MessageSource.OutSide,
0219 request_id=request_id,
0220 workload_id=None,
0221 transform_id=transform_id,
0222 internal_id=internal_id,
0223 num_contents=len(msgs),
0224 msg_content=msgs)
0225
0226 logger.debug(f"{log_prefix} handle_messages_asyncresult, add {len(msgs)} for request_id {request_id} transform_id {transform_id} internal_id {internal_id}")