Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:18

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2022 - 2025
0010 
0011 import concurrent.futures
0012 import json
0013 import logging
0014 import time
0015 import threading
0016 import traceback
0017 
0018 from idds.common.constants import (CommandType,
0019                                    ProcessingStatus,
0020                                    CollectionStatus,
0021                                    ContentStatus, ContentType,
0022                                    ContentRelationType,
0023                                    WorkStatus,
0024                                    TransformType,
0025                                    TransformType2MessageTypeMap,
0026                                    MessageType, MessageTypeStr,
0027                                    MessageStatus, MessageSource,
0028                                    MessageDestination,
0029                                    get_work_status_from_transform_processing_status)
0030 from idds.common.utils import setup_logging, get_list_chunks
0031 from idds.core import (transforms as core_transforms,
0032                        processings as core_processings,
0033                        catalog as core_catalog)
0034 from idds.agents.common.cache.redis import get_redis_cache
0035 
0036 
0037 setup_logging(__name__)
0038 
0039 
0040 def get_logger(logger=None):
0041     if logger:
0042         return logger
0043     logger = logging.getLogger(__name__)
0044     return logger
0045 
0046 
0047 def get_new_content(request_id, transform_id, workload_id, map_id, input_content, content_relation_type=ContentRelationType.Input,
0048                     content_name_id_map={}, es_name=None, sub_map_id=None, order_id=None):
0049     content = {'transform_id': transform_id,
0050                'coll_id': input_content['coll_id'],
0051                'request_id': request_id,
0052                # 'workload_id': workload_id,
0053                'map_id': map_id,
0054                'scope': input_content['scope'],
0055                'name': input_content['name'],
0056                'min_id': input_content['min_id'] if 'min_id' in input_content else 0,
0057                'max_id': input_content['max_id'] if 'max_id' in input_content else 0,
0058                'status': input_content['status'] if 'status' in input_content and input_content['status'] is not None else ContentStatus.New,
0059                'substatus': input_content['substatus'] if 'substatus' in input_content and input_content['substatus'] is not None else ContentStatus.New,
0060                'path': input_content['path'] if 'path' in input_content else None,
0061                'content_type': input_content['content_type'] if 'content_type' in input_content else ContentType.File,
0062                'content_relation_type': content_relation_type,
0063                'bytes': input_content['bytes'],
0064                'adler32': input_content['adler32'],
0065                'content_metadata': input_content['content_metadata']}
0066     if content['min_id'] is None:
0067         content['min_id'] = 0
0068     if content['max_id'] is None:
0069         content['max_id'] = 0
0070     if 'sub_map_id' in input_content:
0071         content['sub_map_id'] = input_content['sub_map_id']
0072     if 'dep_sub_map_id' in input_content:
0073         content['dep_sub_map_id'] = input_content['dep_sub_map_id']
0074 
0075     if order_id is not None:
0076         content['min_id'] = order_id
0077         content['max_id'] = order_id
0078     if sub_map_id is not None:
0079         content['sub_map_id'] = sub_map_id
0080     if 'sub_map_id' not in content or content['sub_map_id'] is None:
0081         content['sub_map_id'] = 0
0082     if es_name is not None and content_relation_type == ContentRelationType.Output:
0083         content['path'] = es_name
0084 
0085     # for input dependency, convert name to content_id
0086     if content_relation_type == ContentRelationType.InputDependency:
0087         if content_name_id_map is None:
0088             content_name_id_map = {}
0089         if input_content['coll_id'] not in content_name_id_map:
0090             content_name_id_map_new = get_output_content_name_to_content_id_map(request_id=request_id, coll_ids=[input_content['coll_id']])
0091             content_name_id_map.update(content_name_id_map_new)
0092         if content['coll_id'] in content_name_id_map and content['name'] in content_name_id_map[content['coll_id']]:
0093             content_dep_id = content_name_id_map[content['coll_id']][content['name']]
0094             content['content_dep_id'] = content_dep_id
0095             content['name'] = str(content_dep_id)
0096 
0097     return content
0098 
0099 
0100 def is_process_terminated(processing_status):
0101     if processing_status in [ProcessingStatus.Finished, ProcessingStatus.Failed,
0102                              ProcessingStatus.SubFinished, ProcessingStatus.Cancelled,
0103                              ProcessingStatus.Suspended, ProcessingStatus.Expired,
0104                              ProcessingStatus.Broken, ProcessingStatus.FinishedOnStep,
0105                              ProcessingStatus.FinishedOnExec, ProcessingStatus.FinishedTerm]:
0106         return True
0107     return False
0108 
0109 
0110 def is_process_finished(processing_status):
0111     if processing_status in [ProcessingStatus.Finished]:
0112         return True
0113     return False
0114 
0115 
0116 def is_all_contents_available(contents):
0117     for content in contents:
0118         if type(content) is dict:
0119             if content['substatus'] not in [ContentStatus.Available, ContentStatus.FakeAvailable]:
0120                 return False
0121         else:
0122             # list of content_id, status,
0123             if content[1] not in [ContentStatus.Available, ContentStatus.FakeAvailable]:
0124                 return False
0125     return True
0126 
0127 
0128 def is_all_contents_terminated(contents, terminated=False):
0129     terminated_status = [ContentStatus.Available, ContentStatus.FakeAvailable,
0130                          ContentStatus.FinalFailed, ContentStatus.Missing]
0131     if terminated:
0132         terminated_status = [ContentStatus.Available, ContentStatus.FakeAvailable, ContentStatus.Failed,
0133                              ContentStatus.FinalFailed, ContentStatus.Missing]
0134 
0135     for content in contents:
0136         if type(content) is dict:
0137             if content['substatus'] not in terminated_status:
0138                 return False
0139         else:
0140             if content[1] not in terminated_status:
0141                 return False
0142     return True
0143 
0144 
0145 def is_input_dependency_terminated(input_dependency):
0146     if type(input_dependency) is dict:
0147         if input_dependency['substatus'] in [ContentStatus.Available, ContentStatus.FakeAvailable,
0148                                              ContentStatus.FinalFailed, ContentStatus.Missing]:
0149             return True
0150     else:
0151         if input_dependency[1] in [ContentStatus.Available, ContentStatus.FakeAvailable,
0152                                    ContentStatus.FinalFailed, ContentStatus.Missing]:
0153             return True
0154     return False
0155 
0156 
0157 def is_all_contents_terminated_but_not_available(inputs, terminated=False):
0158     terminated_status = [ContentStatus.Available, ContentStatus.FakeAvailable,
0159                          ContentStatus.FinalFailed, ContentStatus.Missing]
0160     if terminated:
0161         terminated_status = [ContentStatus.Available, ContentStatus.FakeAvailable, ContentStatus.Failed,
0162                              ContentStatus.FinalFailed, ContentStatus.Missing]
0163 
0164     all_contents_available = True
0165     for content in inputs:
0166         if type(content) is dict:
0167             if content['substatus'] not in terminated_status:
0168                 return False
0169             if content['substatus'] not in [ContentStatus.Available]:
0170                 all_contents_available = False
0171         else:
0172             if content[1] not in terminated_status:
0173                 return False
0174             if content[1] not in [ContentStatus.Available]:
0175                 all_contents_available = False
0176     if all_contents_available:
0177         return False
0178     return True
0179 
0180 
0181 def is_all_contents_available_with_status_map(inputs_dependency, content_status_map):
0182     for content_id in inputs_dependency:
0183         status = content_status_map[str(content_id)]
0184         if status not in [ContentStatus.Available, ContentStatus.FakeAvailable]:
0185             return False
0186     return True
0187 
0188 
0189 def is_all_contents_terminated_with_status_map(input_dependency, content_status_map):
0190     for content_id in input_dependency:
0191         status = content_status_map[str(content_id)]
0192         if status not in [ContentStatus.Available, ContentStatus.FakeAvailable,
0193                           ContentStatus.FinalFailed, ContentStatus.Missing]:
0194             return False
0195     return True
0196 
0197 
0198 def get_collection_ids(collections):
0199     coll_ids = [coll.coll_id for coll in collections]
0200     return coll_ids
0201 
0202 
0203 def get_input_output_maps(request_id, transform_id, work, with_deps=True, page_num=None, page_size=None, with_panda_id=None, status=None, match_content_ext=False):
0204     """
0205     :param with_panda_id: When True, return only map_ids where at least one output content
0206         has a panda_id in content_metadata. When False, return only map_ids where no output
0207         content has a panda_id. When None (default), return all map_ids.
0208     """
0209     # link collections
0210     input_collections = work.get_input_collections()
0211     output_collections = work.get_output_collections()
0212     log_collections = work.get_log_collections()
0213 
0214     # for coll in input_collections + output_collections + log_collections:
0215     #     coll_model = core_catalog.get_collection(coll_id=coll.coll_id)
0216     #     coll.collection = coll_model
0217 
0218     input_coll_ids = get_collection_ids(input_collections)
0219     output_coll_ids = get_collection_ids(output_collections)
0220     log_coll_ids = get_collection_ids(log_collections)
0221 
0222     mapped_input_output_maps = core_transforms.get_transform_input_output_maps(request_id,
0223                                                                                transform_id,
0224                                                                                input_coll_ids=input_coll_ids,
0225                                                                                output_coll_ids=output_coll_ids,
0226                                                                                log_coll_ids=log_coll_ids,
0227                                                                                with_sub_map_id=work.with_sub_map_id(),
0228                                                                                with_deps=with_deps,
0229                                                                                page_num=page_num,
0230                                                                                page_size=page_size,
0231                                                                                match_content_ext=match_content_ext,
0232                                                                                status=status)
0233 
0234     if with_panda_id is not None:
0235         filtered = {}
0236         for map_id, map_contents in mapped_input_output_maps.items():
0237             outputs = map_contents.get('outputs', [])
0238             has_panda_id = any(
0239                 content.get('content_metadata', {}).get('panda_ids') or  # noqa W504
0240                 content.get('content_metadata', {}).get('panda_id')
0241                 for content in outputs
0242             )
0243             if with_panda_id == has_panda_id:
0244                 filtered[map_id] = map_contents
0245         return filtered
0246 
0247     # work_name_to_coll_map = core_transforms.get_work_name_to_coll_map(request_id=transform['request_id'])
0248     # work.set_work_name_to_coll_map(work_name_to_coll_map)
0249 
0250     # new_input_output_maps = work.get_new_input_output_maps(mapped_input_output_maps)
0251     return mapped_input_output_maps
0252 
0253 
0254 def get_ext_content_ids(request_id, transform_id, work):
0255     contents_ids = core_catalog.get_contents_ext_ids(request_id=request_id, transform_id=transform_id)
0256     return contents_ids
0257 
0258 
0259 def get_output_content_name_to_content_id_map(request_id=None, coll_ids=[]):
0260     contents = core_catalog.get_contents(coll_id=coll_ids, request_id=request_id, content_relation_type=ContentRelationType.Output)
0261     content_name_id_map = {}
0262     for content in contents:
0263         if content['coll_id'] not in content_name_id_map:
0264             content_name_id_map[content['coll_id']] = {}
0265         if content['name'] not in content_name_id_map[content['coll_id']]:
0266             content_name_id_map[content['coll_id']][content['name']] = {}
0267         # if content['map_id'] not in content_name_id_map[content['coll_id']][content['name']]:
0268         #     content_name_id_map[content['coll_id']][content['name']][content['map_id']] = {}
0269         # content_name_id_map[content['coll_id']][content['name']][content['sub_map_id']] = content['content_id']
0270         content_name_id_map[content['coll_id']][content['name']] = content['content_id']
0271     return content_name_id_map
0272 
0273 
0274 def get_new_contents(request_id, transform_id, workload_id, new_input_output_maps, max_updates_per_round=2000,
0275                      input_dependency_coll_ids=[], logger=None, log_prefix=''):
0276     logger = get_logger(logger)
0277 
0278     logger.debug(log_prefix + "get_new_contents")
0279     if input_dependency_coll_ids:
0280         content_name_id_map = get_output_content_name_to_content_id_map(request_id=request_id, coll_ids=input_dependency_coll_ids)
0281     else:
0282         content_name_id_map = {}
0283 
0284     new_input_contents, new_output_contents, new_log_contents = [], [], []
0285     new_input_dependency_contents = []
0286     new_input_dep_coll_ids = []
0287     chunks = []
0288     for map_id in new_input_output_maps:
0289         if "sub_maps" not in new_input_output_maps[map_id] or not new_input_output_maps[map_id]["sub_maps"]:
0290             inputs = new_input_output_maps[map_id]['inputs'] if 'inputs' in new_input_output_maps[map_id] else []
0291             inputs_dependency = new_input_output_maps[map_id]['inputs_dependency'] if 'inputs_dependency' in new_input_output_maps[map_id] else []
0292             outputs = new_input_output_maps[map_id]['outputs'] if 'outputs' in new_input_output_maps[map_id] else []
0293             logs = new_input_output_maps[map_id]['logs'] if 'logs' in new_input_output_maps[map_id] else []
0294 
0295             for input_content in inputs:
0296                 content = get_new_content(request_id, transform_id, workload_id, map_id, input_content, content_relation_type=ContentRelationType.Input)
0297                 new_input_contents.append(content)
0298             for input_content in inputs_dependency:
0299                 content = get_new_content(request_id, transform_id, workload_id, map_id, input_content,
0300                                           content_relation_type=ContentRelationType.InputDependency,
0301                                           content_name_id_map=content_name_id_map)
0302                 new_input_dependency_contents.append(content)
0303                 if content['coll_id'] not in new_input_dep_coll_ids:
0304                     new_input_dep_coll_ids.append(content['coll_id'])
0305             for output_content in outputs:
0306                 content = get_new_content(request_id, transform_id, workload_id, map_id, output_content, content_relation_type=ContentRelationType.Output)
0307                 new_output_contents.append(content)
0308             for log_content in logs:
0309                 content = get_new_content(request_id, transform_id, workload_id, map_id, log_content, content_relation_type=ContentRelationType.Log)
0310                 new_log_contents.append(content)
0311 
0312             total_num_updates = len(new_input_contents) + len(new_output_contents) + len(new_log_contents) + len(new_input_dependency_contents)
0313             if total_num_updates > max_updates_per_round:
0314                 chunk = new_input_contents, new_output_contents, new_log_contents, new_input_dependency_contents
0315                 chunks.append(chunk)
0316 
0317                 new_input_contents, new_output_contents, new_log_contents = [], [], []
0318                 new_input_dependency_contents = []
0319         else:
0320             sub_maps = new_input_output_maps[map_id]["sub_maps"]
0321             for sub_map in sub_maps:
0322                 sub_map_id = sub_map['sub_map_id']
0323                 order_id = sub_map['order_id']
0324                 inputs = sub_map['inputs'] if 'inputs' in sub_map else []
0325                 inputs_dependency = sub_map['inputs_dependency'] if 'inputs_dependency' in sub_map else []
0326                 outputs = sub_map['outputs'] if 'outputs' in sub_map else []
0327                 logs = sub_map['logs'] if 'logs' in sub_map else []
0328 
0329                 for input_content in inputs:
0330                     content = get_new_content(request_id, transform_id, workload_id, map_id, input_content,
0331                                               content_relation_type=ContentRelationType.Input,
0332                                               sub_map_id=sub_map_id, order_id=order_id)
0333                     new_input_contents.append(content)
0334                 for input_content in inputs_dependency:
0335                     content = get_new_content(request_id, transform_id, workload_id, map_id, input_content,
0336                                               content_relation_type=ContentRelationType.InputDependency,
0337                                               sub_map_id=sub_map_id, order_id=order_id,
0338                                               content_name_id_map=content_name_id_map)
0339                     new_input_dependency_contents.append(content)
0340                     if content['coll_id'] not in new_input_dep_coll_ids:
0341                         new_input_dep_coll_ids.append(content['coll_id'])
0342                 for output_content in outputs:
0343                     content = get_new_content(request_id, transform_id, workload_id, map_id, output_content,
0344                                               content_relation_type=ContentRelationType.Output,
0345                                               sub_map_id=sub_map_id, order_id=order_id)
0346                     new_output_contents.append(content)
0347                 for log_content in logs:
0348                     content = get_new_content(request_id, transform_id, workload_id, map_id, log_content,
0349                                               content_relation_type=ContentRelationType.Log,
0350                                               sub_map_id=sub_map_id, order_id=order_id)
0351                     new_log_contents.append(content)
0352 
0353             total_num_updates = len(new_input_contents) + len(new_output_contents) + len(new_log_contents) + len(new_input_dependency_contents)
0354             if total_num_updates > max_updates_per_round:
0355                 chunk = new_input_contents, new_output_contents, new_log_contents, new_input_dependency_contents
0356                 chunks.append(chunk)
0357 
0358                 new_input_contents, new_output_contents, new_log_contents = [], [], []
0359                 new_input_dependency_contents = []
0360 
0361     total_num_updates = len(new_input_contents) + len(new_output_contents) + len(new_log_contents) + len(new_input_dependency_contents)
0362     if total_num_updates > 0:
0363         chunk = new_input_contents, new_output_contents, new_log_contents, new_input_dependency_contents
0364         chunks.append(chunk)
0365 
0366     # return new_input_contents, new_output_contents, new_log_contents, new_input_dependency_contents
0367     return chunks
0368 
0369 
0370 def get_update_content(content):
0371     updated_content = {'content_id': content['content_id'],
0372                        'request_id': content['request_id'],
0373                        # 'substatus': content['substatus'],
0374                        'status': content['substatus']}
0375     content['status'] = content['substatus']
0376     return updated_content, content
0377 
0378 
0379 def get_update_contents(request_id, transform_id, workload_id, input_output_maps):
0380     updated_contents = []
0381     updated_input_contents_full, updated_output_contents_full = [], []
0382 
0383     for map_id in input_output_maps:
0384         inputs = input_output_maps[map_id]['inputs'] if 'inputs' in input_output_maps[map_id] else []
0385         inputs_dependency = input_output_maps[map_id]['inputs_dependency'] if 'inputs_dependency' in input_output_maps[map_id] else []
0386         outputs = input_output_maps[map_id]['outputs'] if 'outputs' in input_output_maps[map_id] else []
0387         # logs = input_output_maps[map_id]['logs'] if 'logs' in input_output_maps[map_id] else []
0388 
0389         input_output_sub_maps = get_input_output_sub_maps(inputs, outputs, inputs_dependency)
0390         for sub_map_id in input_output_sub_maps:
0391             inputs_sub = input_output_sub_maps[sub_map_id]['inputs']
0392             outputs_sub = input_output_sub_maps[sub_map_id]['outputs']
0393             inputs_dependency_sub = input_output_sub_maps[sub_map_id]['inputs_dependency']
0394 
0395             content_update_status = None
0396             if is_all_contents_available(inputs_dependency_sub):
0397                 # logger.debug("all input dependency available: %s, inputs: %s" % (str(inputs_dependency), str(inputs)))
0398                 content_update_status = ContentStatus.Available
0399             elif is_all_contents_terminated(inputs_dependency_sub):
0400                 # logger.debug("all input dependency terminated: %s, inputs: %s, outputs: %s" % (str(inputs_dependency), str(inputs), str(outputs)))
0401                 content_update_status = ContentStatus.Missing
0402 
0403             if content_update_status:
0404                 for content in inputs_sub:
0405                     content['substatus'] = content_update_status
0406                     if content['status'] != content['substatus']:
0407                         updated_content, content = get_update_content(content)
0408                         updated_contents.append(updated_content)
0409                         updated_input_contents_full.append(content)
0410             if content_update_status in [ContentStatus.Missing]:
0411                 for content in outputs_sub:
0412                     content['substatus'] = content_update_status
0413                     if content['status'] != content['substatus']:
0414                         updated_content, content = get_update_content(content)
0415                         updated_contents.append(updated_content)
0416                         updated_output_contents_full.append(content)
0417 
0418             for content in outputs_sub:
0419                 if content['status'] != content['substatus']:
0420                     updated_content, content = get_update_content(content)
0421                     updated_contents.append(updated_content)
0422                     updated_output_contents_full.append(content)
0423         return updated_contents, updated_input_contents_full, updated_output_contents_full
0424 
0425 
0426 def get_message_type(work_type, input_type='file'):
0427     work_type_value = str(work_type.value)
0428     if work_type_value not in TransformType2MessageTypeMap:
0429         return TransformType2MessageTypeMap['0'][input_type]
0430     else:
0431         return TransformType2MessageTypeMap[work_type_value][input_type]
0432 
0433 
0434 def generate_file_messages(request_id, transform_id, workload_id, work, files, relation_type):
0435     if work:
0436         work_type = work.get_work_type()
0437     else:
0438         work_type = TransformType.Processing
0439 
0440     i_msg_type, i_msg_type_str = get_message_type(work_type, input_type='file')
0441     no_dup_files = {}
0442     files_message = []
0443     for file in files:
0444         filename = file['name']
0445         if work and work.es:
0446             filename = file['path']
0447             if filename in no_dup_files:
0448                 continue
0449             else:
0450                 no_dup_files[filename] = None
0451 
0452         file_status = file['substatus'].name
0453         if file['substatus'] == ContentStatus.FakeAvailable:
0454             file_status = ContentStatus.Available.name
0455         file_message = {'scope': file['scope'],
0456                         'name': filename,
0457                         'path': file['path'],
0458                         'map_id': file['map_id'],
0459                         'content_id': file['content_id'] if 'content_id' in file else None,
0460                         'external_coll_id': file['external_coll_id'] if 'external_coll_id' in file else None,
0461                         'external_content_id': file['external_content_id'] if 'external_content_id' in file else None,
0462                         'status': file_status}
0463         files_message.append(file_message)
0464     msg_content = {'msg_type': i_msg_type_str.value,
0465                    'request_id': request_id,
0466                    'transform_id': transform_id,
0467                    'workload_id': workload_id,
0468                    'relation_type': relation_type,
0469                    'files': files_message}
0470     num_msg_content = len(files_message)
0471     return i_msg_type, msg_content, num_msg_content
0472 
0473 
0474 def generate_content_ext_messages(request_id, transform_id, workload_id, work, files, relation_type, input_output_maps):
0475     i_msg_type = MessageType.ContentExt
0476     i_msg_type_str = MessageTypeStr.ContentExt
0477 
0478     output_contents = []
0479     for map_id in input_output_maps:
0480         outputs = input_output_maps[map_id]['outputs'] if 'outputs' in input_output_maps[map_id] else []
0481         # for content in outputs:
0482         #    # content_map[content['content_id']] = content
0483         output_contents += outputs
0484 
0485     files_message = core_catalog.combine_contents_ext(output_contents, files, with_status_name=True)
0486     msg_content = {'msg_type': i_msg_type_str.value,
0487                    'request_id': request_id,
0488                    'workload_id': workload_id,
0489                    'transform_id': transform_id,
0490                    'relation_type': relation_type,
0491                    'files': files_message}
0492     num_msg_content = len(files_message)
0493     return i_msg_type, msg_content, num_msg_content
0494 
0495 
0496 def generate_collection_messages(request_id, transform_id, workload_id, work, collection, relation_type):
0497     coll_name = collection.name
0498     if coll_name.endswith(".idds.stagein"):
0499         coll_name = coll_name.replace(".idds.stagein", "")
0500 
0501     i_msg_type, i_msg_type_str = get_message_type(work.get_work_type(), input_type='collection')
0502     msg_content = {'msg_type': i_msg_type_str.value,
0503                    'request_id': request_id,
0504                    'workload_id': workload_id,
0505                    'transform_id': transform_id,
0506                    'relation_type': relation_type,
0507                    'collections': [{'scope': collection.scope,
0508                                     'name': coll_name,
0509                                     'status': collection.status.name}],
0510                    'output': work.get_output_data(),
0511                    'error': work.get_terminated_msg()}
0512     num_msg_content = 1
0513     return i_msg_type, msg_content, num_msg_content
0514 
0515 
0516 def generate_work_messages(request_id, transform_id, workload_id, work, relation_type):
0517     i_msg_type, i_msg_type_str = get_message_type(work.get_work_type(), input_type='work')
0518     msg_content = {'msg_type': i_msg_type_str.value,
0519                    'request_id': request_id,
0520                    'workload_id': workload_id,
0521                    'transform_id': transform_id,
0522                    'relation_type': relation_type,
0523                    'status': work.get_status().name,
0524                    'output': work.get_output_data(),
0525                    'error': work.get_terminated_msg()}
0526     num_msg_content = 1
0527     return i_msg_type, msg_content, num_msg_content
0528 
0529 
0530 def generate_messages(request_id, transform_id, workload_id, work, msg_type='file', files=[], relation_type='input', input_output_maps=None):
0531     if msg_type == 'file':
0532         i_msg_type, msg_content, num_msg_content = generate_file_messages(request_id, transform_id, workload_id, work, files=files, relation_type=relation_type)
0533         msg = {'msg_type': i_msg_type,
0534                'status': MessageStatus.New,
0535                'source': MessageSource.Carrier,
0536                'destination': MessageDestination.Outside,
0537                'request_id': request_id,
0538                'workload_id': workload_id,
0539                'transform_id': transform_id,
0540                'num_contents': num_msg_content,
0541                'msg_content': msg_content}
0542         return [msg]
0543     elif msg_type == 'content_ext':
0544         i_msg_type, msg_content, num_msg_content = generate_content_ext_messages(request_id, transform_id, workload_id, work, files=files,
0545                                                                                  relation_type=relation_type,
0546                                                                                  input_output_maps=input_output_maps)
0547         msg = {'msg_type': i_msg_type,
0548                'status': MessageStatus.New,
0549                'source': MessageSource.Carrier,
0550                'destination': MessageDestination.ContentExt,
0551                'request_id': request_id,
0552                'workload_id': workload_id,
0553                'transform_id': transform_id,
0554                'num_contents': num_msg_content,
0555                'msg_content': msg_content}
0556         return [msg]
0557     elif msg_type == 'collection':
0558         msg_type_contents = []
0559         for coll in files:
0560             msg_type_content = generate_collection_messages(request_id, transform_id, workload_id, work, coll, relation_type=relation_type)
0561             msg_type_contents.append(msg_type_content)
0562 
0563         msgs = []
0564         for i_msg_type, msg_content, num_msg_content in msg_type_contents:
0565             msg = {'msg_type': i_msg_type,
0566                    'status': MessageStatus.New,
0567                    'source': MessageSource.Carrier,
0568                    'destination': MessageDestination.Outside,
0569                    'request_id': request_id,
0570                    'workload_id': workload_id,
0571                    'transform_id': transform_id,
0572                    'num_contents': num_msg_content,
0573                    'msg_content': msg_content}
0574             msgs.append(msg)
0575         return msgs
0576     elif msg_type == 'work':
0577         # link collections
0578         input_collections = work.get_input_collections()
0579         output_collections = work.get_output_collections()
0580         log_collections = work.get_log_collections()
0581 
0582         msg_type_contents = []
0583         msg_type_content = generate_work_messages(request_id, transform_id, workload_id, work, relation_type='input')
0584         msg_type_contents.append(msg_type_content)
0585         for coll in input_collections:
0586             msg_type_content = generate_collection_messages(request_id, transform_id, workload_id, work, coll, relation_type='input')
0587             msg_type_contents.append(msg_type_content)
0588         for coll in output_collections:
0589             msg_type_content = generate_collection_messages(request_id, transform_id, workload_id, work, coll, relation_type='output')
0590             msg_type_contents.append(msg_type_content)
0591         for coll in log_collections:
0592             msg_type_content = generate_collection_messages(request_id, transform_id, workload_id, work, coll, relation_type='log')
0593             msg_type_contents.append(msg_type_content)
0594 
0595         msgs = []
0596         for i_msg_type, msg_content, num_msg_content in msg_type_contents:
0597             msg = {'msg_type': i_msg_type,
0598                    'status': MessageStatus.New,
0599                    'source': MessageSource.Carrier,
0600                    'destination': MessageDestination.Outside,
0601                    'request_id': request_id,
0602                    'workload_id': workload_id,
0603                    'transform_id': transform_id,
0604                    'num_contents': num_msg_content,
0605                    'msg_content': msg_content}
0606             msgs.append(msg)
0607         return msgs
0608 
0609 
0610 def update_processing_contents_thread(logger, log_prefix, log_msg, kwargs):
0611     try:
0612         logger = get_logger(logger)
0613         logger.debug(log_prefix + log_msg)
0614         core_processings.update_processing_contents(**kwargs)
0615         logger.debug(log_prefix + " end")
0616     except Exception as ex:
0617         logger.error(f"{log_prefix}update_processing_contents_thread: {ex}")
0618         raise Exception(f"update_processing_contents_thread: {ex}")
0619 
0620 
0621 def wait_futures_finish(ret_futures, func_name, logger, log_prefix, timeout=180):
0622     logger = get_logger(logger)
0623     logger.debug(f"{log_prefix}{func_name}: wait_futures_finish")
0624     # Wait for all subprocess to complete
0625     steps = 0
0626     ex = None
0627     while True:
0628         steps += 1
0629         # Wait for all subprocess to complete in 3 minutes
0630         completed, ret_futures = concurrent.futures.wait(ret_futures, timeout=180, return_when=concurrent.futures.ALL_COMPLETED)
0631 
0632         for c in completed:
0633             try:
0634                 _ = c.result()   # This will raise the exception if the thread failed
0635             except Exception as e:
0636                 ex = e
0637                 logger.error(f"{log_prefix} {func_name}: thread failed: {e}")
0638 
0639         if len(ret_futures) > 0:
0640             logger.debug(log_prefix + "%s thread: %s threads has been running for more than %s minutes" % (func_name, len(ret_futures), steps * 3))
0641         else:
0642             break
0643     logger.debug(log_prefix + "%s: wait_futures_finish end" % func_name)
0644     if ex:
0645         raise ex
0646 
0647 
0648 def handle_new_processing(processing, agent_attributes, func_site_to_cloud=None, max_updates_per_round=2000, executors=None, logger=None, log_prefix=''):
0649     logger = get_logger(logger)
0650 
0651     proc = processing['processing_metadata']['processing']
0652     work = proc.work
0653     work.set_agent_attributes(agent_attributes, processing)
0654     request_id = processing['request_id']
0655     transform_id = processing['transform_id']
0656     workload_id = processing['workload_id']
0657 
0658     ret_msgs = []
0659     new_contents = []
0660     new_input_dependency_contents = []
0661     update_collections = []
0662 
0663     input_output_maps = get_input_output_maps(request_id, transform_id, work, with_deps=False)
0664     new_input_output_maps = work.get_new_input_output_maps(input_output_maps)
0665     if hasattr(work, 'input_dependency_coll_ids'):
0666         input_dependency_coll_ids = work.input_dependency_coll_ids
0667     else:
0668         input_dependency_coll_ids = []
0669     ret_new_contents_chunks = get_new_contents(request_id, transform_id, workload_id, new_input_output_maps,
0670                                                max_updates_per_round=max_updates_per_round,
0671                                                input_dependency_coll_ids=input_dependency_coll_ids,
0672                                                logger=logger, log_prefix=log_prefix)
0673     if not ret_new_contents_chunks:
0674         logger.debug(log_prefix + "handle_new_processing: no new contnets")
0675 
0676     if executors is None:
0677         for ret_new_contents in ret_new_contents_chunks:
0678             new_input_contents, new_output_contents, new_log_contents, new_input_dependency_contents = ret_new_contents
0679 
0680             # for new_input_dependency_contents, already converted name to content_dep_id, don't need to separate it
0681             # new_contents = new_input_contents + new_output_contents + new_log_contents
0682             new_contents = new_input_contents + new_output_contents + new_log_contents + new_input_dependency_contents
0683 
0684             # not generate new messages
0685             # if new_input_contents:
0686             #     msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', files=new_input_contents, relation_type='input')
0687             #     ret_msgs = ret_msgs + msgs
0688             # if new_output_contents:
0689             #     msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', files=new_input_contents, relation_type='output')
0690             #     ret_msgs = ret_msgs + msgs
0691             logger.debug(log_prefix + "handle_new_processing: add %s new contents" % (len(new_contents)))
0692             core_processings.update_processing_contents(update_processing=None,
0693                                                         request_id=request_id,
0694                                                         transform_id=transform_id,
0695                                                         new_contents=new_contents,
0696                                                         # new_input_dependency_contents=new_input_dependency_contents,
0697                                                         messages=ret_msgs)
0698     else:
0699         if ret_new_contents_chunks:
0700             ret_futures = set()
0701             for ret_new_contents in ret_new_contents_chunks:
0702                 new_input_contents, new_output_contents, new_log_contents, new_input_dependency_contents = ret_new_contents
0703                 # new_contents = new_input_contents + new_output_contents + new_log_contents
0704                 new_contents = new_input_contents + new_output_contents + new_log_contents + new_input_dependency_contents
0705                 log_msg = "handle_new_processing thread: add %s new contents" % (len(new_contents))
0706                 kwargs = {'update_processing': None,
0707                           'request_id': request_id,
0708                           'transform_id': transform_id,
0709                           'new_contents': new_contents,
0710                           # 'new_input_dependency_contents': new_input_dependency_contents,
0711                           'messages': ret_msgs}
0712                 f = executors.submit(update_processing_contents_thread, logger, log_prefix, log_msg, kwargs)
0713                 ret_futures.add(f)
0714             wait_futures_finish(ret_futures, "handle_new_processing", logger, log_prefix)
0715 
0716     # fix missing content_dep_id for input_dependency contents
0717     ret_fix = core_processings.update_processing_contents(update_processing=None,
0718                                                           request_id=request_id,
0719                                                           transform_id=transform_id,
0720                                                           fix_missing_content_dep_id=True)
0721     num_added_contents, num_updated_contents, num_added_messages, num_updated_messages = ret_fix
0722     logger.debug(log_prefix + "handle_new_processing: fix missing content_dep_id for input_dependency contents, num_fixed: %s" % num_updated_contents)
0723 
0724     # reload updated input_output_maps.
0725     # if all inputs are filled correctly, it will automatically set has_new_input to False, and won't come to this step anymore, which can avoid too many database query in the future.
0726     input_output_maps = get_input_output_maps(request_id, transform_id, work, with_deps=False)
0727     new_input_output_maps = work.get_new_input_output_maps(input_output_maps)
0728     if not work.has_new_inputs:
0729         logger.debug(log_prefix + "handle_new_processing: no new input_output_maps after reload")
0730         processing["num_unmapped"] = 0
0731     else:
0732         num_unmapped = len(new_input_output_maps) if new_input_output_maps else 1
0733         processing["num_unmapped"] = num_unmapped
0734     core_processings.update_processing(processing['processing_id'],
0735                                        parameters={'num_unmapped': processing["num_unmapped"]})
0736     logger.debug(log_prefix + f"handle_new_processing: num_unmapped={processing['num_unmapped']}")
0737 
0738     logger.debug(log_prefix + "handle_new_processing: finish")
0739 
0740     # return True, processing, update_collections, new_contents, new_input_dependency_contents, ret_msgs, errors
0741     return True, processing, update_collections, [], [], ret_msgs, None
0742 
0743 
0744 def handle_prepared_processing(processing, agent_attributes, func_site_to_cloud=None, max_updates_per_round=2000, executors=None, logger=None, log_prefix=''):
0745     logger = get_logger(logger)
0746 
0747     proc = processing['processing_metadata']['processing']
0748     work = proc.work
0749     work.set_agent_attributes(agent_attributes, processing)
0750 
0751     if func_site_to_cloud:
0752         work.set_func_site_to_cloud(func_site_to_cloud)
0753     status, workload_id, errors = work.submit_processing(processing)
0754     logger.info(log_prefix + "submit_processing (status: %s, workload_id: %s, errors: %s)" % (status, workload_id, errors))
0755 
0756     if not status:
0757         logger.error(log_prefix + "Failed to submit processing (status: %s, workload_id: %s, errors: %s)" % (status, workload_id, errors))
0758         return False, processing, [], [], [], [], errors
0759 
0760     ret_msgs = []
0761     update_collections = []
0762     if proc.workload_id:
0763         # processing['workload_id'] = proc.workload_id
0764         input_collections = work.get_input_collections()
0765         output_collections = work.get_output_collections()
0766         log_collections = work.get_log_collections()
0767         for coll in input_collections + output_collections + log_collections:
0768             u_coll = {'coll_id': coll.coll_id, 'workload_id': proc.workload_id}
0769             update_collections.append(u_coll)
0770 
0771     # return True, processing, update_collections, new_contents, new_input_dependency_contents, ret_msgs, errors
0772     return True, processing, update_collections, [], [], ret_msgs, errors
0773 
0774 
0775 def get_updated_contents_by_request(request_id, transform_id, workload_id, work, terminated=False, input_output_maps=None,
0776                                     logger=None, log_prefix=''):
0777     logger = get_logger(logger)
0778 
0779     status_to_check = [ContentStatus.Available, ContentStatus.FakeAvailable, ContentStatus.FinalFailed,
0780                        ContentStatus.Missing, ContentStatus.Failed, ContentStatus.Lost,
0781                        ContentStatus.Deleted]
0782     contents = core_catalog.get_contents_by_request_transform(request_id=request_id, transform_id=transform_id,
0783                                                               status=status_to_check, status_updated=True)
0784     updated_contents, updated_contents_full_input, updated_contents_full_output = [], [], []
0785     updated_contents_full_input_deps = []
0786     for content in contents:
0787         if (content['status'] != content['substatus']) and content['substatus'] in status_to_check:
0788             u_content = {'content_id': content['content_id'],
0789                          'request_id': content['request_id'],
0790                          'status': content['substatus']}
0791             updated_contents.append(u_content)
0792             if content['content_relation_type'] == ContentRelationType.Output:
0793                 updated_contents_full_output.append(content)
0794             elif content['content_relation_type'] == ContentRelationType.Input:
0795                 updated_contents_full_input.append(content)
0796             elif content['content_relation_type'] == ContentRelationType.InputDependency:
0797                 updated_contents_full_input_deps.append(content)
0798     # logger.debug(log_prefix + "get_updated_contents_by_request: updated_contents[:3]: %s" % str(updated_contents[:3]))
0799     return updated_contents, updated_contents_full_input, updated_contents_full_output, updated_contents_full_input_deps
0800 
0801 
0802 def get_input_output_sub_maps(inputs, outputs, inputs_dependency, logs=[]):
0803     input_output_sub_maps = {}
0804     for content in inputs:
0805         sub_map_id = content['sub_map_id']
0806         if sub_map_id not in input_output_sub_maps:
0807             input_output_sub_maps[sub_map_id] = {'inputs': [], 'outputs': [], 'logs': [], 'inputs_dependency': []}
0808             input_output_sub_maps[sub_map_id]['inputs'].append(content)
0809     for content in inputs_dependency:
0810         sub_map_id = content['sub_map_id']
0811         if sub_map_id not in input_output_sub_maps:
0812             input_output_sub_maps[sub_map_id] = {'inputs': [], 'outputs': [], 'logs': [], 'inputs_dependency': []}
0813         input_output_sub_maps[sub_map_id]['inputs_dependency'].append(content)
0814     for content in outputs:
0815         sub_map_id = content['sub_map_id']
0816         if sub_map_id not in input_output_sub_maps:
0817             input_output_sub_maps[sub_map_id] = {'inputs': [], 'outputs': [], 'logs': [], 'inputs_dependency': []}
0818         input_output_sub_maps[sub_map_id]['outputs'].append(content)
0819     for content in logs:
0820         sub_map_id = content['sub_map_id']
0821         if sub_map_id not in input_output_sub_maps:
0822             input_output_sub_maps[sub_map_id] = {'inputs': [], 'outputs': [], 'logs': [], 'inputs_dependency': []}
0823         input_output_sub_maps[sub_map_id]['logs'].append(content)
0824     return input_output_sub_maps
0825 
0826 
0827 def get_updated_contents_by_input_output_maps(input_output_maps=None, terminated=False, max_updates_per_round=2000, with_deps=False, es=False, logger=None, log_prefix=''):
0828     updated_contents, updated_contents_full_input, updated_contents_full_output = [], [], []
0829     updated_contents_full_input_deps = []
0830     new_update_contents = []
0831 
0832     chunks = []
0833     status_to_check = [ContentStatus.Available, ContentStatus.FakeAvailable, ContentStatus.FinalFailed,
0834                        ContentStatus.Missing, ContentStatus.Failed, ContentStatus.Lost,
0835                        ContentStatus.Deleted]
0836     available_status = [ContentStatus.Available, ContentStatus.FakeAvailable]
0837     if terminated:
0838         terminated_status = [ContentStatus.Available, ContentStatus.FakeAvailable,
0839                              ContentStatus.FinalFailed, ContentStatus.Missing,
0840                              ContentStatus.Lost, ContentStatus.Deleted,
0841                              ContentStatus.Failed]
0842     else:
0843         terminated_status = [ContentStatus.Available, ContentStatus.FakeAvailable,
0844                              ContentStatus.FinalFailed, ContentStatus.Missing,
0845                              ContentStatus.Lost, ContentStatus.Deleted]
0846 
0847     for map_id in input_output_maps:
0848         inputs = input_output_maps[map_id]['inputs'] if 'inputs' in input_output_maps[map_id] else []
0849         inputs_dependency = input_output_maps[map_id]['inputs_dependency'] if 'inputs_dependency' in input_output_maps[map_id] else []
0850         outputs = input_output_maps[map_id]['outputs'] if 'outputs' in input_output_maps[map_id] else []
0851         # logs = input_output_maps[map_id]['logs'] if 'logs' in input_output_maps[map_id] else []
0852 
0853         # input_output_sub_maps = get_input_output_sub_maps(inputs, outputs, inputs_dependency)
0854 
0855         has_updated_inputs, all_inputs_available, all_inputs_terminated = False, True, True
0856         for content in inputs:
0857             if not content['substatus'] in available_status:
0858                 all_inputs_available = False
0859             if not content['substatus'] in terminated_status:
0860                 all_inputs_terminated = False
0861             if (content['status'] != content['substatus']) and content['substatus'] in status_to_check:
0862                 has_updated_inputs = True
0863                 u_content = {'content_id': content['content_id'],
0864                              'request_id': content['request_id'],
0865                              'status': content['substatus']}
0866                 updated_contents.append(u_content)
0867                 u_content_substatus = {'content_id': content['content_id'],
0868                                        'substatus': content['substatus'],
0869                                        'request_id': content['request_id'],
0870                                        'transform_id': content['transform_id'],
0871                                        'workload_id': content['workload_id'],
0872                                        'coll_id': content['coll_id']}
0873                 new_update_contents.append(u_content_substatus)
0874                 if not es:
0875                     updated_contents_full_input.append(content)
0876         if es and has_updated_inputs:
0877             # for es, multiple contents map to one ES job
0878             # The 'path' is the ES job name
0879             if all_inputs_available:
0880                 content_copy = content.copy()
0881                 content_copy['name'] = content_copy['path']
0882                 content_copy['status'] = ContentStatus.Available
0883                 content_copy['substatus'] = ContentStatus.Available
0884                 updated_contents_full_input.append(content_copy)
0885             elif all_inputs_terminated:
0886                 content_copy = content.copy()
0887                 content_copy['name'] = content_copy['path']
0888                 content_copy['status'] = ContentStatus.Missing
0889                 content_copy['substatus'] = ContentStatus.Missing
0890                 updated_contents_full_input.append(content_copy)
0891 
0892         for content in outputs:
0893             if (content['status'] != content['substatus']) and content['substatus'] in status_to_check:
0894                 u_content = {'content_id': content['content_id'],
0895                              'request_id': content['request_id'],
0896                              'status': content['substatus']}
0897                 updated_contents.append(u_content)
0898                 u_content_substatus = {'content_id': content['content_id'],
0899                                        'substatus': content['substatus'],
0900                                        'request_id': content['request_id'],
0901                                        'transform_id': content['transform_id'],
0902                                        'workload_id': content['workload_id'],
0903                                        'coll_id': content['coll_id']}
0904                 new_update_contents.append(u_content_substatus)
0905                 updated_contents_full_output.append(content)
0906 
0907         for content in inputs_dependency:
0908             if (content['status'] != content['substatus']) and content['substatus'] in status_to_check:
0909                 u_content = {'content_id': content['content_id'],
0910                              'request_id': content['request_id'],
0911                              'status': content['substatus']}
0912                 updated_contents.append(u_content)
0913                 updated_contents_full_input_deps.append(content)
0914 
0915         if len(updated_contents) > max_updates_per_round:
0916             chunk = updated_contents, updated_contents_full_input, updated_contents_full_output, updated_contents_full_input_deps, new_update_contents
0917             chunks.append(chunk)
0918 
0919             updated_contents, updated_contents_full_input, updated_contents_full_output = [], [], []
0920             updated_contents_full_input_deps = []
0921             new_update_contents = []
0922 
0923     if len(updated_contents) > 0:
0924         chunk = updated_contents, updated_contents_full_input, updated_contents_full_output, updated_contents_full_input_deps, new_update_contents
0925         chunks.append(chunk)
0926 
0927     # return updated_contents, updated_contents_full_input, updated_contents_full_output, updated_contents_full_input_deps, new_update_contents
0928     return chunks
0929 
0930 
0931 def get_transform_dependency_map(transform_id, logger=None, log_prefix=''):
0932     cache = get_redis_cache()
0933     transform_dependcy_map_key = "transform_dependcy_map_%s" % transform_id
0934     transform_dependcy_map = cache.get(transform_dependcy_map_key, default={})
0935     return transform_dependcy_map
0936 
0937 
0938 def set_transform_dependency_map(transform_id, transform_dependcy_map, logger=None, log_prefix=''):
0939     cache = get_redis_cache()
0940     transform_dependcy_map_key = "transform_dependcy_map_%s" % transform_id
0941     cache.set(transform_dependcy_map_key, transform_dependcy_map)
0942 
0943 
0944 def get_content_dependcy_map(request_id, logger=None, log_prefix=''):
0945     cache = get_redis_cache()
0946     content_dependcy_map_key = "request_content_dependcy_map_%s" % request_id
0947     content_dependcy_map = cache.get(content_dependcy_map_key, default={})
0948 
0949     request_dependcy_map_key = "request_dependcy_map_%s" % request_id
0950     request_dependcy_map = cache.get(request_dependcy_map_key, default=[])
0951 
0952     collection_dependcy_map_key = "request_collections_dependcy_map_%s" % request_id
0953     collection_dependcy_map = cache.get(collection_dependcy_map_key, default=[])
0954 
0955     return content_dependcy_map, request_dependcy_map, collection_dependcy_map
0956 
0957 
0958 def set_content_dependcy_map(request_id, content_dependcy_map, request_dependcy_map,
0959                              collection_dependcy_map, logger=None, log_prefix=''):
0960     cache = get_redis_cache()
0961     content_dependcy_map_key = "request_content_dependcy_map_%s" % request_id
0962     cache.set(content_dependcy_map_key, content_dependcy_map)
0963 
0964     request_dependcy_map_key = "request_dependcy_map_%s" % request_id
0965     cache.set(request_dependcy_map_key, request_dependcy_map)
0966 
0967     collection_dependcy_map_key = "request_collections_dependcy_map_%s" % request_id
0968     cache.set(collection_dependcy_map_key, collection_dependcy_map)
0969 
0970 
0971 def get_content_status_map(request_id, logger=None, log_prefix=''):
0972     cache = get_redis_cache()
0973     content_status_map_key = "request_content_status_map_%s" % request_id
0974     content_status_map = cache.get(content_status_map_key, default={})
0975     return content_status_map
0976 
0977 
0978 def set_content_status_map(request_id, content_status_map, logger=None, log_prefix=''):
0979     cache = get_redis_cache()
0980     content_status_map_key = "request_content_status_map_%s" % request_id
0981     cache.set(content_status_map_key, content_status_map)
0982 
0983 
0984 def get_input_dependency_map_by_request(request_id, transform_id, workload_id, work, logger=None, log_prefix=''):
0985     logger = get_logger(logger)
0986 
0987     content_dependcy_map, request_dependcy_map, collection_dependcy_map = get_content_dependcy_map(request_id, logger=logger, log_prefix=log_prefix)
0988     content_status_map = get_content_status_map(request_id, logger=logger, log_prefix=log_prefix)
0989 
0990     transform_dependcy_maps = {}
0991 
0992     refresh = False
0993     if not content_dependcy_map or not content_status_map:
0994         refresh = True
0995     elif transform_id and transform_id not in request_dependcy_map:
0996         refresh = True
0997     elif work:
0998         output_collections = work.get_output_collections()
0999         for coll in output_collections:
1000             if coll.coll_id not in collection_dependcy_map:
1001                 refresh = True
1002 
1003     for tf_id in request_dependcy_map:
1004         transform_dependcy_maps[str(tf_id)] = get_transform_dependency_map(tf_id, logger=logger, log_prefix=log_prefix)
1005         if not transform_dependcy_maps[str(tf_id)]:
1006             refresh = True
1007 
1008     if refresh:
1009         logger.debug(log_prefix + "refresh content_dependcy_map")
1010         content_dependcy_map = {}
1011         request_dependcy_map = []
1012         collection_dependcy_map = []
1013         content_status_map = {}
1014         transform_dependcy_maps = {}
1015 
1016         content_output_name2id = {}
1017         content_input_deps = []
1018 
1019         contents = core_catalog.get_contents_by_request_transform(request_id=request_id)
1020         # logger.debug("contents: ", contents)
1021         for content in contents:
1022             if content['transform_id'] not in request_dependcy_map:
1023                 request_dependcy_map.append(content['transform_id'])
1024             if content['coll_id'] not in collection_dependcy_map:
1025                 collection_dependcy_map.append(content['coll_id'])
1026 
1027             content_status_map[str(content['content_id'])] = content['substatus'].value
1028 
1029             str_tf_id = str(content['transform_id'])
1030             str_map_id = str(content['map_id'])
1031             if str_tf_id not in transform_dependcy_maps:
1032                 transform_dependcy_maps[str_tf_id] = get_transform_dependency_map(str_tf_id, logger=logger, log_prefix=log_prefix)
1033             if str_map_id not in transform_dependcy_maps[str_tf_id]:
1034                 transform_dependcy_maps[str_tf_id][str_map_id] = {'inputs': [], 'outputs': [], 'input_deps': []}
1035 
1036             if content['content_relation_type'] == ContentRelationType.Output:
1037                 if content['coll_id'] not in content_output_name2id:
1038                     content_output_name2id[content['coll_id']] = {}
1039                     collection_dependcy_map.append(content['coll_id'])
1040                 content_output_name2id[content['coll_id']][content['name']] = content
1041                 # content_id, status
1042                 transform_dependcy_maps[str_tf_id][str_map_id]['outputs'].append(content['content_id'])
1043             elif content['content_relation_type'] == ContentRelationType.InputDependency:
1044                 content_input_deps.append(content)
1045                 # content_id, status
1046                 transform_dependcy_maps[str_tf_id][str_map_id]['input_deps'].append(content['content_id'])
1047             elif content['content_relation_type'] == ContentRelationType.Input:
1048                 # content_id, status
1049                 transform_dependcy_maps[str_tf_id][str_map_id]['inputs'].append(content['content_id'])
1050         # logger.debug("content_output_name2id: ", content_output_name2id)
1051 
1052         for content in content_input_deps:
1053             dep_coll_id = content['coll_id']
1054             if dep_coll_id not in content_output_name2id:
1055                 logger.warn(log_prefix + "dep_coll_id: %s contents are not added yet" % dep_coll_id)
1056             else:
1057                 dep_content = content_output_name2id[dep_coll_id].get(content['name'], None)
1058                 if dep_content:
1059                     dep_content_id = str(dep_content['content_id'])
1060                     if dep_content_id not in content_dependcy_map:
1061                         content_dependcy_map[dep_content_id] = []
1062                     content_dependcy_map[dep_content_id].append((content['content_id'], content['transform_id'], content['map_id']))
1063                 else:
1064                     logger.error(log_prefix + "Failed to find input dependcy for content_id: %s" % content['content_id'])
1065 
1066         set_content_dependcy_map(request_id, content_dependcy_map, request_dependcy_map,
1067                                  collection_dependcy_map, logger=logger, log_prefix=log_prefix)
1068         for str_tf_id in transform_dependcy_maps:
1069             set_transform_dependency_map(str_tf_id, transform_dependcy_maps[str_tf_id], logger=logger, log_prefix=log_prefix)
1070         set_content_status_map(request_id, content_status_map, logger=logger, log_prefix=log_prefix)
1071 
1072     return content_dependcy_map, transform_dependcy_maps, content_status_map
1073 
1074 
1075 def get_content_status_with_status_map(content_ids, content_status_map):
1076     content_id_status = []
1077     for content_id in content_ids:
1078         status_value = content_status_map[str(content_id)]
1079         status = ContentStatus(status_value)
1080         content_id_status.append((content_id, status))
1081     return content_id_status
1082 
1083 
1084 def trigger_release_inputs_no_deps(request_id, transform_id, workload_id, work, input_output_maps, logger=None, log_prefix=''):
1085     logger = get_logger(logger)
1086 
1087     update_contents = []
1088     update_input_contents_full = {}
1089     update_input_contents_full[transform_id] = []
1090 
1091     for map_id in input_output_maps:
1092         inputs = input_output_maps[map_id]['inputs'] if 'inputs' in input_output_maps[map_id] else []
1093         inputs_dependency = input_output_maps[map_id]['inputs_dependency'] if 'inputs_dependency' in input_output_maps[map_id] else []
1094         outputs = input_output_maps[map_id]['outputs'] if 'outputs' in input_output_maps[map_id] else []
1095         # logs = input_output_maps[map_id]['logs'] if 'logs' in input_output_maps[map_id] else []
1096 
1097         input_output_sub_maps = get_input_output_sub_maps(inputs, outputs, inputs_dependency)
1098         for sub_map_id in input_output_sub_maps:
1099             inputs_sub = input_output_sub_maps[sub_map_id]['inputs']
1100             # outputs_sub = input_output_sub_maps[sub_map_id]['outputs']
1101             inputs_dependency_sub = input_output_sub_maps[sub_map_id]['inputs_dependency']
1102 
1103             if not inputs_dependency_sub:
1104                 for content in inputs_sub:
1105                     if content['substatus'] != ContentStatus.Available:
1106                         u_content = {'content_id': content['content_id'],
1107                                      'request_id': content['request_id'],
1108                                      # 'status': ContentStatus.Available,
1109                                      'substatus': ContentStatus.Available}
1110                         update_contents.append(u_content)
1111                         content['status'] = ContentStatus.Available
1112                         content['substatus'] = ContentStatus.Available
1113                         update_input_contents_full[transform_id].append(content)
1114     return update_contents, update_input_contents_full
1115 
1116 
1117 def trigger_release_inputs(request_id, transform_id, workload_id, work, updated_contents_full_output, updated_contents_full_input,
1118                            updated_contents_full_input_deps, input_output_maps, logger=None, log_prefix=''):
1119     logger = get_logger(logger)
1120 
1121     status_to_check = [ContentStatus.Available, ContentStatus.FakeAvailable, ContentStatus.FinalFailed, ContentStatus.Missing]
1122     # status_to_check_fake = [ContentStatus.FakeAvailable, ContentStatus.Missing]
1123 
1124     update_contents = []
1125     update_contents_status = {}
1126     update_contents_status_name = {}
1127     update_input_contents_full = {}
1128     update_input_contents_full[transform_id] = []
1129 
1130     for status in status_to_check:
1131         update_contents_status[status.name] = []
1132         update_contents_status_name[status.name] = status
1133 
1134     for content in updated_contents_full_output:
1135         # update the status
1136         # u_content = {'content_id': content['content_id'], 'status': content['substatus']}
1137         # update_contents.append(u_content)
1138 
1139         if content['substatus'] in status_to_check:
1140             update_contents_status[content['substatus'].name].append(content['content_id'])
1141 
1142     for map_id in input_output_maps:
1143         inputs = input_output_maps[map_id]['inputs'] if 'inputs' in input_output_maps[map_id] else []
1144         inputs_dependency = input_output_maps[map_id]['inputs_dependency'] if 'inputs_dependency' in input_output_maps[map_id] else []
1145         outputs = input_output_maps[map_id]['outputs'] if 'outputs' in input_output_maps[map_id] else []
1146         # logs = input_output_maps[map_id]['logs'] if 'logs' in input_output_maps[map_id] else []
1147 
1148         input_output_sub_maps = get_input_output_sub_maps(inputs, outputs, inputs_dependency)
1149         for sub_map_id in input_output_sub_maps:
1150             inputs_sub = input_output_sub_maps[sub_map_id]['inputs']
1151             outputs_sub = input_output_sub_maps[sub_map_id]['outputs']
1152             inputs_dependency_sub = input_output_sub_maps[sub_map_id]['inputs_dependency']
1153 
1154             input_content_update_status = None
1155             if is_all_contents_available(inputs_dependency_sub):
1156                 input_content_update_status = ContentStatus.Available
1157             elif is_all_contents_terminated(inputs_dependency_sub):
1158                 input_content_update_status = ContentStatus.Missing
1159             if input_content_update_status:
1160                 for content in inputs_dependency_sub:
1161                     # u_content = {'content_id': content['content_id'], 'status': content['substatus'])
1162                     # update_contents.append(u_content)
1163                     pass
1164                 for content in inputs_sub:
1165                     u_content = {'content_id': content['content_id'],
1166                                  'request_id': content['request_id'],
1167                                  'substatus': input_content_update_status}
1168                     update_contents.append(u_content)
1169                     content['status'] = input_content_update_status
1170                     content['substatus'] = input_content_update_status
1171                     update_input_contents_full[transform_id].append(content)
1172 
1173             output_content_update_status = None
1174             if is_all_contents_available(inputs_sub):
1175                 # wait for the job to finish
1176                 # for content in inputs:
1177                 #     u_content = {'content_id': content['content_id'], 'status': content['substatus'])
1178                 #     update_contents.append(u_content)
1179                 pass
1180             elif is_all_contents_terminated_but_not_available(inputs_sub):
1181                 # for content in inputs:
1182                 #     u_content = {'content_id': content['content_id'], 'status': content['substatus'])
1183                 #     update_contents.append(u_content)
1184                 pass
1185                 output_content_update_status = ContentStatus.Missing
1186             if output_content_update_status:
1187                 for content in outputs_sub:
1188                     u_content = {'content_id': content['content_id'],
1189                                  'request_id': content['request_id'],
1190                                  'substatus': output_content_update_status}
1191                     update_contents.append(u_content)
1192 
1193     return update_contents, update_input_contents_full, update_contents_status_name, update_contents_status
1194 
1195 
1196 def poll_missing_outputs(input_output_maps, contents_ext=[], max_updates_per_round=2000, process_status=None):
1197     content_updates_missing, updated_contents_full_missing = [], []
1198 
1199     chunks = []
1200     for map_id in input_output_maps:
1201         inputs = input_output_maps[map_id]['inputs'] if 'inputs' in input_output_maps[map_id] else []
1202         inputs_dependency = input_output_maps[map_id]['inputs_dependency'] if 'inputs_dependency' in input_output_maps[map_id] else []
1203         outputs = input_output_maps[map_id]['outputs'] if 'outputs' in input_output_maps[map_id] else []
1204         # logs = input_output_maps[map_id]['logs'] if 'logs' in input_output_maps[map_id] else []
1205 
1206         input_output_sub_maps = get_input_output_sub_maps(inputs, outputs, inputs_dependency)
1207         for sub_map_id in input_output_sub_maps:
1208             inputs_sub = input_output_sub_maps[sub_map_id]['inputs']
1209             outputs_sub = input_output_sub_maps[sub_map_id]['outputs']
1210             # inputs_dependency_sub = input_output_sub_maps[sub_map_id]['inputs_dependency']
1211 
1212             content_update_status = None
1213             if is_all_contents_terminated_but_not_available(inputs_sub) or process_status in [ProcessingStatus.Cancelled]:
1214                 content_update_status = ContentStatus.Missing
1215 
1216                 for content in outputs_sub:
1217                     content['substatus'] = content_update_status
1218                     if content['status'] != content['substatus']:
1219                         u_content = {'content_id': content['content_id'],
1220                                      'request_id': content['request_id'],
1221                                      'substatus': content['substatus']}
1222 
1223                         content_updates_missing.append(u_content)
1224                         updated_contents_full_missing.append(content)
1225 
1226         if len(content_updates_missing) > max_updates_per_round:
1227             chunk = content_updates_missing, updated_contents_full_missing
1228             chunks.append(chunk)
1229             content_updates_missing, updated_contents_full_missing = [], []
1230     if len(content_updates_missing) > 0:
1231         chunk = content_updates_missing, updated_contents_full_missing
1232         chunks.append(chunk)
1233     # return content_updates_missing, updated_contents_full_missing
1234     return chunks
1235 
1236 
1237 def has_external_content_id(request_id, transform_id):
1238     """
1239     Return True if all Input contents for the transform have external_content_id set.
1240     Queries the database directly with a COUNT to avoid loading all map data.
1241     request_id is included because the database uses it for virtual table partitioning.
1242     """
1243     return not core_catalog.has_input_contents_without_external_id(request_id, transform_id)
1244 
1245 
1246 def get_update_external_content_ids(input_output_maps, external_content_ids, es=False):
1247     name_to_id_map = {}
1248     update_contents = []
1249     if not es:
1250         for map_id in input_output_maps:
1251             inputs = input_output_maps[map_id]['inputs'] if 'inputs' in input_output_maps[map_id] else []
1252             outputs = input_output_maps[map_id]['outputs'] if 'outputs' in input_output_maps[map_id] else []
1253             for content in inputs + outputs:
1254                 if content['name'] not in name_to_id_map:
1255                     name_to_id_map[content['name']] = []
1256                 name_to_id_map[content['name']].append(content['content_id'])
1257     else:
1258         # for es jobs, 'path' saves the es file name
1259         for map_id in input_output_maps:
1260             inputs = input_output_maps[map_id]['inputs'] if 'inputs' in input_output_maps[map_id] else []
1261             outputs = input_output_maps[map_id]['outputs'] if 'outputs' in input_output_maps[map_id] else []
1262             for content in inputs + outputs:
1263                 if content['path'] not in name_to_id_map:
1264                     name_to_id_map[content['path']] = []
1265                 name_to_id_map[content['path']].append(content['content_id'])
1266     for dataset in external_content_ids:
1267         dataset_id = dataset['dataset']['id']
1268         files = dataset['files']
1269         for file_item in files:
1270             lfn = file_item['lfn']
1271             # remove scope '00000:'
1272             pos = lfn.find(":")
1273             if pos >= 0:
1274                 lfn = lfn[pos + 1:]
1275             file_id = file_item['id']
1276             content_ids = name_to_id_map.get(lfn, [])
1277             for content_id in content_ids:
1278                 update_content = {'content_id': content_id,
1279                                   'request_id': content['request_id'],
1280                                   'external_coll_id': dataset_id,
1281                                   'external_content_id': file_id}
1282                 update_contents.append(update_content)
1283     return update_contents
1284 
1285 
1286 def get_update_external_content_ids_from_name_map(name_to_id_map, external_content_ids, request_id):
1287     """
1288     Build the list of external content ID updates from a pre-built name_to_id_map.
1289     Used by handle_update_processing_new to avoid loading the full input_output_maps.
1290     """
1291     update_contents = []
1292     for dataset in external_content_ids:
1293         dataset_id = dataset['dataset']['id']
1294         files = dataset['files']
1295         for file_item in files:
1296             lfn = file_item['lfn']
1297             # remove scope '00000:'
1298             pos = lfn.find(":")
1299             if pos >= 0:
1300                 lfn = lfn[pos + 1:]
1301             file_id = file_item['id']
1302             content_ids = name_to_id_map.get(lfn, [])
1303             for content_id in content_ids:
1304                 update_content = {'content_id': content_id,
1305                                   'request_id': request_id,
1306                                   'external_coll_id': dataset_id,
1307                                   'external_content_id': file_id}
1308                 update_contents.append(update_content)
1309     return update_contents
1310 
1311 
1312 def handle_update_processing(processing, agent_attributes, max_updates_per_round=2000, use_bulk_update_mappings=True, executors=None, logger=None, log_prefix=''):
1313     logger = get_logger(logger)
1314 
1315     ret_msgs = []
1316     new_contents = []
1317     new_input_output_maps = {}
1318 
1319     request_id = processing['request_id']
1320     transform_id = processing['transform_id']
1321     workload_id = processing['workload_id']
1322 
1323     proc = processing['processing_metadata']['processing']
1324     work = proc.work
1325     work.set_agent_attributes(agent_attributes, processing)
1326 
1327     if processing['command'] in [CommandType.AbortProcessing]:
1328         handle_abort_processing(processing, agent_attributes=agent_attributes, sync=False, logger=logger, log_prefix=log_prefix)
1329     if processing['command'] in [CommandType.ResumeProcessing]:
1330         handle_resume_processing(processing, agent_attributes=agent_attributes, logger=logger, log_prefix=log_prefix)
1331 
1332     input_output_maps = get_input_output_maps(request_id, transform_id, work, with_deps=False)
1333     logger.debug(log_prefix + "get_input_output_maps: len: %s" % len(input_output_maps))
1334     logger.debug(log_prefix + "get_input_output_maps.keys[:3]: %s" % str(list(input_output_maps.keys())[:3]))
1335 
1336     if work.has_external_content_id() and not has_external_content_id(request_id, transform_id):
1337         external_content_ids = work.get_external_content_ids(processing, log_prefix=log_prefix)
1338         update_external_content_ids = get_update_external_content_ids(input_output_maps, external_content_ids, es=work.es)
1339         core_catalog.update_contents(update_external_content_ids)
1340 
1341     num_inputs = None
1342     if hasattr(work, "num_inputs"):
1343         num_inputs = work.num_inputs
1344     num_input_output_maps = len(input_output_maps)
1345     if processing["num_unmapped"] > 0 or work.has_new_inputs or (num_inputs is not None and num_inputs > num_input_output_maps):
1346         new_input_output_maps = work.get_new_input_output_maps(input_output_maps)
1347         logger.debug(log_prefix + "get_new_input_output_maps: len: %s" % len(new_input_output_maps))
1348         logger.debug(log_prefix + "get_new_input_output_maps.keys[:3]: %s" % str(list(new_input_output_maps.keys())[:3]))
1349     if num_inputs:
1350         processing["num_unmapped"] = num_inputs - num_input_output_maps
1351 
1352     contents_ext = []
1353     if work.require_ext_contents():
1354         contents_ext = get_ext_content_ids(request_id, transform_id, work)
1355         job_info_maps = core_catalog.get_contents_ext_maps()
1356         ret_poll_processing = work.poll_processing_updates(processing, input_output_maps, contents_ext=contents_ext,
1357                                                            job_info_maps=job_info_maps, executors=executors, log_prefix=log_prefix)
1358         process_status, content_updates, new_input_output_maps1, updated_contents_full, parameters, new_contents_ext, update_contents_ext = ret_poll_processing
1359     else:
1360         ret_poll_processing = work.poll_processing_updates(processing, input_output_maps, log_prefix=log_prefix)
1361         new_contents_ext, update_contents_ext = [], []
1362         process_status, content_updates, new_input_output_maps1, updated_contents_full, parameters = ret_poll_processing
1363 
1364     new_input_output_maps.update(new_input_output_maps1)
1365     logger.debug(log_prefix + "poll_processing_updates process_status: %s" % process_status)
1366     logger.debug(log_prefix + "poll_processing_updates content_updates[:3]: %s" % content_updates[:3])
1367     logger.debug(log_prefix + "poll_processing_updates new_input_output_maps1.keys[:3]: %s" % (list(new_input_output_maps1.keys())[:3]))
1368     logger.debug(log_prefix + "poll_processing_updates updated_contents_full[:3]: %s" % (updated_contents_full[:3]))
1369     logger.debug(log_prefix + f"poll_processing_updates parameters: {parameters}")
1370 
1371     ret_futures = set()
1372 
1373     if hasattr(work, 'input_dependency_coll_ids'):
1374         input_dependency_coll_ids = work.input_dependency_coll_ids
1375     else:
1376         input_dependency_coll_ids = []
1377 
1378     ret_new_contents_chunks = get_new_contents(request_id, transform_id, workload_id, new_input_output_maps,
1379                                                input_dependency_coll_ids=input_dependency_coll_ids,
1380                                                max_updates_per_round=max_updates_per_round)
1381     for ret_new_contents in ret_new_contents_chunks:
1382         new_input_contents, new_output_contents, new_log_contents, new_input_dependency_contents = ret_new_contents
1383 
1384         ret_msgs = []
1385         # not generate messages for new contents
1386         # if new_input_contents:
1387         #     msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file',
1388         #                              files=new_input_contents, relation_type='input')
1389         #     ret_msgs = ret_msgs + msgs
1390         # if new_output_contents:
1391         #     msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file',
1392         #                              files=new_output_contents, relation_type='output')
1393         #     ret_msgs = ret_msgs + msgs
1394 
1395         # for new_input_dependency_contents, already converted name to content_dep_id, don't need to separate it
1396         # new_contents = new_input_contents + new_output_contents + new_log_contents
1397         new_contents = new_input_contents + new_output_contents + new_log_contents + new_input_dependency_contents
1398 
1399         if executors is None:
1400             logger.debug(log_prefix + "handle_update_processing: add %s new contents" % (len(new_contents)))
1401             core_processings.update_processing_contents(update_processing=None,
1402                                                         new_contents=new_contents,
1403                                                         # new_input_dependency_contents=new_input_dependency_contents,
1404                                                         request_id=request_id,
1405                                                         # transform_id=transform_id,
1406                                                         use_bulk_update_mappings=use_bulk_update_mappings,
1407                                                         messages=ret_msgs)
1408         else:
1409             log_msg = "handle_update_processing thread: add %s new contents" % (len(new_contents))
1410             kwargs = {'update_processing': None,
1411                       'request_id': request_id,
1412                       'new_contents': new_contents,
1413                       # 'new_input_dependency_contents': new_input_dependency_contents,
1414                       'use_bulk_update_mappings': use_bulk_update_mappings,
1415                       'messages': ret_msgs}
1416             f = executors.submit(update_processing_contents_thread, logger, log_prefix, log_msg, kwargs)
1417             ret_futures.add(f)
1418 
1419     ret_msgs = []
1420     content_updates_missing_chunks = poll_missing_outputs(input_output_maps, contents_ext=contents_ext,
1421                                                           max_updates_per_round=max_updates_per_round,
1422                                                           process_status=process_status)
1423     for content_updates_missing_chunk in content_updates_missing_chunks:
1424         content_updates_missing, updated_contents_full_missing = content_updates_missing_chunk
1425         msgs = []
1426         if updated_contents_full_missing:
1427             msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file',
1428                                      files=updated_contents_full, relation_type='output')
1429         if executors is None:
1430             logger.debug(log_prefix + "handle_update_processing: update %s missing contents" % (len(content_updates_missing)))
1431             core_processings.update_processing_contents(update_processing=None,
1432                                                         update_contents=content_updates_missing,
1433                                                         request_id=request_id,
1434                                                         # transform_id=transform_id,
1435                                                         # use_bulk_update_mappings=use_bulk_update_mappings,
1436                                                         use_bulk_update_mappings=False,
1437                                                         messages=msgs)
1438         else:
1439             log_msg = "handle_update_processing thread: update %s missing contents" % (len(content_updates_missing))
1440             kwargs = {'update_processing': None,
1441                       'request_id': request_id,
1442                       'update_contents': content_updates_missing,
1443                       'use_bulk_update_mappings': False,
1444                       'messages': msgs}
1445             f = executors.submit(update_processing_contents_thread, logger, log_prefix, log_msg, kwargs)
1446             ret_futures.add(f)
1447 
1448     if updated_contents_full:
1449         updated_contents_full_chunks = get_list_chunks(updated_contents_full, bulk_size=max_updates_per_round)
1450         for updated_contents_full_chunk in updated_contents_full_chunks:
1451             msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file',
1452                                      files=updated_contents_full_chunk, relation_type='output')
1453             if executors is None:
1454                 log_msg = "handle_update_processing: update %s messages" % (len(msgs))
1455                 logger.debug(log_prefix + log_msg)
1456                 core_processings.update_processing_contents(update_processing=None,
1457                                                             request_id=request_id,
1458                                                             messages=msgs)
1459             else:
1460                 log_msg = "handle_update_processing thread: update %s messages" % (len(msgs))
1461                 kwargs = {'update_processing': None,
1462                           'request_id': request_id,
1463                           'messages': msgs}
1464                 f = executors.submit(update_processing_contents_thread, logger, log_prefix, log_msg, kwargs)
1465                 ret_futures.add(f)
1466 
1467     if new_contents_ext:
1468         new_contents_ext_chunks = get_list_chunks(new_contents_ext, bulk_size=max_updates_per_round)
1469         for new_contents_ext_chunk in new_contents_ext_chunks:
1470             if executors is None:
1471                 log_msg = "handle_update_processing: add %s ext contents" % (len(new_contents_ext_chunk))
1472                 logger.debug(log_prefix + log_msg)
1473                 core_processings.update_processing_contents(update_processing=None,
1474                                                             request_id=request_id,
1475                                                             new_contents_ext=new_contents_ext_chunk)
1476             else:
1477                 log_msg = "handle_update_processing thread: add %s ext contents" % (len(new_contents_ext_chunk))
1478                 kwargs = {'update_processing': None,
1479                           'request_id': request_id,
1480                           'new_contents_ext': new_contents_ext_chunk}
1481                 f = executors.submit(update_processing_contents_thread, logger, log_prefix, log_msg, kwargs)
1482                 ret_futures.add(f)
1483 
1484     if update_contents_ext:
1485         update_contents_ext_chunks = get_list_chunks(update_contents_ext, bulk_size=max_updates_per_round)
1486         for update_contents_ext_chunk in update_contents_ext_chunks:
1487             if executors is None:
1488                 log_msg = "handle_update_processing: update %s ext contents" % (len(update_contents_ext_chunk))
1489                 logger.debug(log_prefix + log_msg)
1490                 core_processings.update_processing_contents(update_processing=None,
1491                                                             request_id=request_id,
1492                                                             update_contents_ext=update_contents_ext_chunk)
1493             else:
1494                 log_msg = "handle_update_processing thread: update %s ext contents" % (len(update_contents_ext_chunk))
1495                 kwargs = {'update_processing': None,
1496                           'request_id': request_id,
1497                           'update_contents_ext': update_contents_ext_chunk}
1498                 f = executors.submit(update_processing_contents_thread, logger, log_prefix, log_msg, kwargs)
1499                 ret_futures.add(f)
1500 
1501     if content_updates:
1502         content_updates_chunks = get_list_chunks(content_updates, bulk_size=max_updates_per_round)
1503         for content_updates_chunk in content_updates_chunks:
1504             if executors is None:
1505                 log_msg = "handle_update_processing: update %s contents" % (len(content_updates_chunk))
1506                 logger.debug(log_prefix + log_msg)
1507                 core_processings.update_processing_contents(update_processing=None,
1508                                                             request_id=request_id,
1509                                                             # transform_id=transform_id,
1510                                                             use_bulk_update_mappings=use_bulk_update_mappings,
1511                                                             update_contents=content_updates_chunk)
1512             else:
1513                 log_msg = "handle_update_processing thread: update %s contents" % (len(content_updates_chunk))
1514                 kwargs = {'update_processing': None,
1515                           'request_id': request_id,
1516                           'use_bulk_update_mappings': use_bulk_update_mappings,
1517                           'update_contents': content_updates_chunk}
1518                 f = executors.submit(update_processing_contents_thread, logger, log_prefix, log_msg, kwargs)
1519                 ret_futures.add(f)
1520 
1521     if len(ret_futures) > 0:
1522         wait_futures_finish(ret_futures, "handle_update_processing", logger, log_prefix)
1523 
1524     if not parameters:
1525         parameters = {}
1526     parameters["num_unmapped"] = processing["num_unmapped"]
1527 
1528     # return process_status, new_contents, new_input_dependency_contents, ret_msgs, content_updates + content_updates_missing, parameters, new_contents_ext, update_contents_ext
1529     return process_status, [], [], ret_msgs, [], parameters, [], []
1530 
1531 
1532 def get_unmatched_panda_id_updates(processing, request_id, transform_id, work, input_output_maps, logger=None, log_prefix=''):
1533     """
1534     Find panda job IDs that are not yet recorded in any output content's content_metadata,
1535     resolve them to input file names via work.get_processing_job_name_to_ids, and return
1536     content updates that set panda_ids on the matching output contents.
1537 
1538     :returns: list of content update dicts {content_id, request_id, content_metadata}
1539     """
1540     logger = get_logger(logger)
1541 
1542     def _to_id_list(value):
1543         """Normalise a panda_id/panda_ids value to a flat list of IDs."""
1544         if value is None:
1545             return []
1546         if isinstance(value, str):
1547             return [int(v.strip()) for v in value.split(',') if v.strip()]
1548         if isinstance(value, list):
1549             return value
1550         return [value]
1551 
1552     # Step 1: collect known panda_ids and build input_name -> output_contents map
1553     known_panda_ids = set()
1554     name_to_outputs = {}
1555     for map_id in input_output_maps:
1556         outputs = input_output_maps[map_id].get('outputs', [])
1557         inputs = input_output_maps[map_id].get('inputs', [])
1558         for content in outputs:
1559             meta = content.get('content_metadata') or {}
1560             existing = meta.get('panda_ids', []) or meta.get('panda_id', [])
1561             known_panda_ids.update(_to_id_list(existing))
1562         for content in inputs:
1563             name = content.get('name')
1564             if name:
1565                 if name not in name_to_outputs:
1566                     name_to_outputs[name] = []
1567                 name_to_outputs[name].extend(outputs)
1568 
1569     # Step 2: get all panda job ids for this processing from PanDA
1570     all_job_ids = work.get_processing_job_ids(processing, log_prefix=log_prefix)
1571     all_job_ids = _to_id_list(all_job_ids)
1572     logger.debug(log_prefix + "get_unmatched_panda_id_updates: all_job_ids: %s, known_panda_ids: %s"
1573                  % (len(all_job_ids), len(known_panda_ids)))
1574 
1575     # Step 3: find unmatched job ids
1576     unmatched_job_ids = [jid for jid in all_job_ids if jid not in known_panda_ids]
1577     logger.debug(log_prefix + "get_unmatched_panda_id_updates: unmatched_job_ids: %s" % len(unmatched_job_ids))
1578     if not unmatched_job_ids:
1579         return []
1580 
1581     # Step 4: resolve unmatched job ids to input file names
1582     job_name_to_ids = work.get_processing_job_name_to_ids(processing, unmatched_job_ids, log_prefix=log_prefix)
1583     logger.debug(log_prefix + "get_unmatched_panda_id_updates: job_name_to_ids: %s" % len(job_name_to_ids))
1584 
1585     # Step 5: match job names to input content names and build content_metadata updates
1586     update_contents = []
1587     for job_name, panda_ids in job_name_to_ids.items():
1588         if job_name not in name_to_outputs or not panda_ids:
1589             continue
1590         for output_content in name_to_outputs[job_name]:
1591             content_metadata = dict(output_content.get('content_metadata') or {})
1592             registered_panda_ids = _to_id_list(content_metadata.get('panda_ids') or content_metadata.get('panda_id'))
1593             merged = sorted(set(registered_panda_ids) | set(panda_ids))
1594             content_metadata['panda_ids'] = merged
1595             update_contents.append({'content_id': output_content['content_id'],
1596                                     'request_id': request_id,
1597                                     'content_metadata': content_metadata})
1598     return update_contents
1599 
1600 
1601 def handle_update_processing_new(processing, agent_attributes, max_updates_per_round=2000, max_jobs_per_round=None, use_bulk_update_mappings=True, executors=None, logger=None, log_prefix=''):
1602     """
1603     Handle update processing with optional paginated job polling to reduce peak memory usage.
1604 
1605     :param max_jobs_per_round: maximum number of map_ids (jobs) to poll per round.
1606         When set, input_output_maps are loaded and polled in pages of this size,
1607         flushing results to DB after each page rather than accumulating everything
1608         in memory at once. When None (default), all maps are loaded and polled at
1609         once (same behaviour as handle_update_processing).
1610     """
1611     logger = get_logger(logger)
1612 
1613     ret_msgs = []
1614     new_contents = []
1615     new_input_output_maps = {}
1616 
1617     request_id = processing['request_id']
1618     transform_id = processing['transform_id']
1619     workload_id = processing['workload_id']
1620 
1621     proc = processing['processing_metadata']['processing']
1622     work = proc.work
1623     work.set_agent_attributes(agent_attributes, processing)
1624 
1625     if processing['command'] in [CommandType.AbortProcessing]:
1626         handle_abort_processing(processing, agent_attributes=agent_attributes, sync=False, logger=logger, log_prefix=log_prefix)
1627     if processing['command'] in [CommandType.ResumeProcessing]:
1628         handle_resume_processing(processing, agent_attributes=agent_attributes, logger=logger, log_prefix=log_prefix)
1629 
1630     num_inputs = None
1631     if hasattr(work, "num_inputs"):
1632         num_inputs = work.num_inputs
1633 
1634     if hasattr(work, 'es') and work.es:
1635         max_jobs_per_round = None   # disable paging for ES jobs
1636         logger.debug(log_prefix + "handle_update_processing_new: ES job detected, paging disabled")
1637 
1638     input_output_maps = None
1639 
1640     num_input_output_maps = core_catalog.get_input_output_map_count(request_id, transform_id)
1641     logger.debug(log_prefix + "get_input_output_map_count: %s" % num_input_output_maps)
1642     if work.has_external_content_id() and not has_external_content_id(request_id, transform_id):
1643         external_content_ids = work.get_external_content_ids(processing, log_prefix=log_prefix)
1644         name_to_id_map = core_catalog.get_content_name_to_id_map(request_id, transform_id, es=work.es)
1645         update_external_content_ids = get_update_external_content_ids_from_name_map(
1646             name_to_id_map, external_content_ids, request_id)
1647         core_catalog.update_contents(update_external_content_ids)
1648 
1649     has_new_inputs = work.has_new_inputs if (processing["num_unmapped"] is None) else False
1650     if processing["num_unmapped"] > 0 or has_new_inputs or (num_inputs is not None and num_inputs > num_input_output_maps):
1651         logger.debug(log_prefix + f"handle_update_processing_new: checking for new input_output_maps with num_inputs: {num_inputs}, num_input_output_maps: {num_input_output_maps},"
1652                      f"processing['num_unmapped']: {processing['num_unmapped']}, work.has_new_inputs: {work.has_new_inputs}")
1653         input_output_maps = get_input_output_maps(request_id, transform_id, work, with_deps=False)
1654         logger.debug(log_prefix + "get_input_output_maps: len: %s" % len(input_output_maps))
1655         new_input_output_maps = work.get_new_input_output_maps(input_output_maps)
1656         logger.debug(log_prefix + "get_new_input_output_maps: len: %s" % len(new_input_output_maps))
1657         logger.debug(log_prefix + "get_new_input_output_maps.keys[:3]: %s" % str(list(new_input_output_maps.keys())[:3]))
1658 
1659     if num_inputs:
1660         processing["num_unmapped"] = num_inputs - num_input_output_maps
1661 
1662     # For full-map mode: ensure input_output_maps is loaded for the polling loop.
1663     if not max_jobs_per_round and input_output_maps is None:
1664         input_output_maps = get_input_output_maps(request_id, transform_id, work, with_deps=False)
1665         logger.debug(log_prefix + "get_input_output_maps: len: %s" % len(input_output_maps))
1666 
1667     # Map any newly submitted panda jobs not yet recorded in content_metadata.
1668     maps_for_unmatch = input_output_maps
1669     if maps_for_unmatch is None:
1670         maps_for_unmatch = get_input_output_maps(request_id, transform_id, work, with_deps=False)
1671     unmatched_updates = get_unmatched_panda_id_updates(processing, request_id, transform_id, work,
1672                                                        maps_for_unmatch, logger=logger, log_prefix=log_prefix)
1673     if unmatched_updates:
1674         logger.debug(log_prefix + "get_unmatched_panda_id_updates: updating %s contents" % len(unmatched_updates))
1675         core_catalog.update_contents(unmatched_updates)
1676 
1677     final_terminated_status = [ContentStatus.Available, ContentStatus.FakeAvailable,
1678                                ContentStatus.FinalFailed, ContentStatus.Missing,
1679                                ContentStatus.FinalSubAvailable]
1680     status_filter = [s for s in ContentStatus if s not in final_terminated_status]
1681 
1682     if max_jobs_per_round:
1683         maps_for_unmatch = None  # free if we loaded it above
1684         input_output_maps = None  # free full map if we loaded it above, to save memory for polling loop
1685     else:
1686         # reload ones filtered with panda_id to avoid keeping two full maps in memory at once
1687         # ES jobs track panda IDs in contents_ext, not content_metadata, so use with_panda_id=False
1688         panda_id_filter = False if (hasattr(work, 'es') and work.es) else True
1689         logger.debug(log_prefix + "Reloading input_output_maps with panda_id filter for polling loop")
1690         input_output_maps = get_input_output_maps(request_id, transform_id, work, with_deps=False, with_panda_id=panda_id_filter, status=status_filter, match_content_ext=True)
1691 
1692     if hasattr(work, 'input_dependency_coll_ids'):
1693         input_dependency_coll_ids = work.input_dependency_coll_ids
1694     else:
1695         input_dependency_coll_ids = []
1696 
1697     ret_futures = set()
1698 
1699     # Load ext content IDs once (lightweight) for reuse across all pages.
1700     contents_ext = []
1701     if work.require_ext_contents():
1702         contents_ext = get_ext_content_ids(request_id, transform_id, work)
1703 
1704     # --- Polling phase ---
1705     # When max_jobs_per_round is set, iterate through pages of maps, flushing
1706     # updates to DB after each page to keep peak memory bounded.
1707     # When max_jobs_per_round is None, run a single iteration using the full map.
1708     process_status = None
1709     parameters = {}
1710     new_input_output_maps_from_poll = {}
1711     page_num = 0
1712 
1713     logger.debug(log_prefix + f"Starting polling loop with max_jobs_per_round={max_jobs_per_round}")
1714 
1715     while True:
1716         if max_jobs_per_round:
1717             maps_page = get_input_output_maps(request_id, transform_id, work, with_deps=False,
1718                                               page_num=page_num, page_size=max_jobs_per_round,
1719                                               with_panda_id=True, status=status_filter, match_content_ext=True)
1720             logger.debug(log_prefix + "handle_update_processing_new: polling page %d with %d maps"
1721                          % (page_num, len(maps_page)))
1722         else:
1723             maps_page = input_output_maps  # full map, single iteration
1724 
1725         # Poll job status for this page/batch
1726         if work.require_ext_contents():
1727             job_info_maps = core_catalog.get_contents_ext_maps()
1728             ret_poll = work.poll_processing_updates(processing, maps_page, contents_ext=contents_ext,
1729                                                     job_info_maps=job_info_maps, executors=executors, log_prefix=log_prefix)
1730             process_status, content_updates, new_input_output_maps1, updated_contents_full, parameters, new_contents_ext, update_contents_ext = ret_poll
1731         else:
1732             ret_poll = work.poll_processing_updates(processing, maps_page, log_prefix=log_prefix)
1733             new_contents_ext, update_contents_ext = [], []
1734             process_status, content_updates, new_input_output_maps1, updated_contents_full, parameters = ret_poll
1735 
1736         new_input_output_maps_from_poll.update(new_input_output_maps1)
1737 
1738         logger.debug(log_prefix + "poll_processing_updates process_status: %s" % process_status)
1739         logger.debug(log_prefix + "poll_processing_updates content_updates[:3]: %s" % content_updates[:3])
1740         logger.debug(log_prefix + "poll_processing_updates new_input_output_maps1.keys[:3]: %s" % (list(new_input_output_maps1.keys())[:3]))
1741         logger.debug(log_prefix + "poll_processing_updates updated_contents_full[:3]: %s" % (updated_contents_full[:3]))
1742         logger.debug(log_prefix + f"poll_processing_updates parameters: {parameters}")
1743 
1744         # Poll missing outputs for this page and flush to DB immediately
1745         # (messages are flushed immediately; ret_msgs at return is always [])
1746         content_updates_missing_chunks = poll_missing_outputs(maps_page, contents_ext=contents_ext,
1747                                                               max_updates_per_round=max_updates_per_round,
1748                                                               process_status=process_status)
1749         for content_updates_missing_chunk in content_updates_missing_chunks:
1750             content_updates_missing, updated_contents_full_missing = content_updates_missing_chunk
1751             msgs = []
1752             if updated_contents_full_missing:
1753                 msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file',
1754                                          files=updated_contents_full_missing, relation_type='output')
1755             if executors is None:
1756                 logger.debug(log_prefix + "handle_update_processing_new: update %s missing contents" % (len(content_updates_missing)))
1757                 core_processings.update_processing_contents(update_processing=None,
1758                                                             update_contents=content_updates_missing,
1759                                                             request_id=request_id,
1760                                                             use_bulk_update_mappings=False,
1761                                                             messages=msgs)
1762             else:
1763                 log_msg = "handle_update_processing_new thread: update %s missing contents" % (len(content_updates_missing))
1764                 kwargs = {'update_processing': None,
1765                           'request_id': request_id,
1766                           'update_contents': content_updates_missing,
1767                           'use_bulk_update_mappings': False,
1768                           'messages': msgs}
1769                 f = executors.submit(update_processing_contents_thread, logger, log_prefix, log_msg, kwargs)
1770                 ret_futures.add(f)
1771 
1772         # Save messages for fully-updated contents
1773         if updated_contents_full:
1774             updated_contents_full_chunks = get_list_chunks(updated_contents_full, bulk_size=max_updates_per_round)
1775             for updated_contents_full_chunk in updated_contents_full_chunks:
1776                 msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file',
1777                                          files=updated_contents_full_chunk, relation_type='output')
1778                 if executors is None:
1779                     log_msg = "handle_update_processing_new: update %s messages" % (len(msgs))
1780                     logger.debug(log_prefix + log_msg)
1781                     core_processings.update_processing_contents(update_processing=None,
1782                                                                 request_id=request_id,
1783                                                                 messages=msgs)
1784                 else:
1785                     log_msg = "handle_update_processing_new thread: update %s messages" % (len(msgs))
1786                     kwargs = {'update_processing': None,
1787                               'request_id': request_id,
1788                               'messages': msgs}
1789                     f = executors.submit(update_processing_contents_thread, logger, log_prefix, log_msg, kwargs)
1790                     ret_futures.add(f)
1791 
1792         if new_contents_ext:
1793             new_contents_ext_chunks = get_list_chunks(new_contents_ext, bulk_size=max_updates_per_round)
1794             for new_contents_ext_chunk in new_contents_ext_chunks:
1795                 if executors is None:
1796                     log_msg = "handle_update_processing_new: add %s ext contents" % (len(new_contents_ext_chunk))
1797                     logger.debug(log_prefix + log_msg)
1798                     core_processings.update_processing_contents(update_processing=None,
1799                                                                 request_id=request_id,
1800                                                                 new_contents_ext=new_contents_ext_chunk)
1801                 else:
1802                     log_msg = "handle_update_processing_new thread: add %s ext contents" % (len(new_contents_ext_chunk))
1803                     kwargs = {'update_processing': None,
1804                               'request_id': request_id,
1805                               'new_contents_ext': new_contents_ext_chunk}
1806                     f = executors.submit(update_processing_contents_thread, logger, log_prefix, log_msg, kwargs)
1807                     ret_futures.add(f)
1808 
1809         if update_contents_ext:
1810             update_contents_ext_chunks = get_list_chunks(update_contents_ext, bulk_size=max_updates_per_round)
1811             for update_contents_ext_chunk in update_contents_ext_chunks:
1812                 if executors is None:
1813                     log_msg = "handle_update_processing_new: update %s ext contents" % (len(update_contents_ext_chunk))
1814                     logger.debug(log_prefix + log_msg)
1815                     core_processings.update_processing_contents(update_processing=None,
1816                                                                 request_id=request_id,
1817                                                                 update_contents_ext=update_contents_ext_chunk)
1818                 else:
1819                     log_msg = "handle_update_processing_new thread: update %s ext contents" % (len(update_contents_ext_chunk))
1820                     kwargs = {'update_processing': None,
1821                               'request_id': request_id,
1822                               'update_contents_ext': update_contents_ext_chunk}
1823                     f = executors.submit(update_processing_contents_thread, logger, log_prefix, log_msg, kwargs)
1824                     ret_futures.add(f)
1825 
1826         if content_updates:
1827             content_updates_chunks = get_list_chunks(content_updates, bulk_size=max_updates_per_round)
1828             for content_updates_chunk in content_updates_chunks:
1829                 if executors is None:
1830                     log_msg = "handle_update_processing_new: update %s contents" % (len(content_updates_chunk))
1831                     logger.debug(log_prefix + log_msg)
1832                     core_processings.update_processing_contents(update_processing=None,
1833                                                                 request_id=request_id,
1834                                                                 use_bulk_update_mappings=use_bulk_update_mappings,
1835                                                                 update_contents=content_updates_chunk)
1836                 else:
1837                     log_msg = "handle_update_processing_new thread: update %s contents" % (len(content_updates_chunk))
1838                     kwargs = {'update_processing': None,
1839                               'request_id': request_id,
1840                               'use_bulk_update_mappings': use_bulk_update_mappings,
1841                               'update_contents': content_updates_chunk}
1842                     f = executors.submit(update_processing_contents_thread, logger, log_prefix, log_msg, kwargs)
1843                     ret_futures.add(f)
1844 
1845         # Stop after first iteration for full-map mode, or when last page is reached
1846         if not max_jobs_per_round or len(maps_page) < max_jobs_per_round:
1847             break
1848         page_num += 1
1849 
1850     # Merge new maps discovered during polling and save new content records
1851     new_input_output_maps.update(new_input_output_maps_from_poll)
1852 
1853     ret_new_contents_chunks = get_new_contents(request_id, transform_id, workload_id, new_input_output_maps,
1854                                                input_dependency_coll_ids=input_dependency_coll_ids,
1855                                                max_updates_per_round=max_updates_per_round)
1856     for ret_new_contents in ret_new_contents_chunks:
1857         new_input_contents, new_output_contents, new_log_contents, new_input_dependency_contents = ret_new_contents
1858 
1859         ret_msgs = []
1860         # for new_input_dependency_contents, already converted name to content_dep_id, don't need to separate it
1861         new_contents = new_input_contents + new_output_contents + new_log_contents + new_input_dependency_contents
1862 
1863         if executors is None:
1864             logger.debug(log_prefix + "handle_update_processing_new: add %s new contents" % (len(new_contents)))
1865             core_processings.update_processing_contents(update_processing=None,
1866                                                         new_contents=new_contents,
1867                                                         request_id=request_id,
1868                                                         use_bulk_update_mappings=use_bulk_update_mappings,
1869                                                         messages=ret_msgs)
1870         else:
1871             log_msg = "handle_update_processing_new thread: add %s new contents" % (len(new_contents))
1872             kwargs = {'update_processing': None,
1873                       'request_id': request_id,
1874                       'new_contents': new_contents,
1875                       'use_bulk_update_mappings': use_bulk_update_mappings,
1876                       'messages': ret_msgs}
1877             f = executors.submit(update_processing_contents_thread, logger, log_prefix, log_msg, kwargs)
1878             ret_futures.add(f)
1879 
1880     if len(ret_futures) > 0:
1881         wait_futures_finish(ret_futures, "handle_update_processing_new", logger, log_prefix)
1882 
1883     if not parameters:
1884         parameters = {}
1885     parameters["num_unmapped"] = processing["num_unmapped"]
1886 
1887     # return process_status, new_contents, new_input_dependency_contents, ret_msgs, content_updates + content_updates_missing, parameters, new_contents_ext, update_contents_ext
1888     return process_status, [], [], ret_msgs, [], parameters, [], []
1889 
1890 
1891 def get_transform_id_dependency_map(transform_id, logger=None, log_prefix=''):
1892     cache = get_redis_cache()
1893     transform_id_dependcy_map_key = "transform_id_dependcy_map_%s" % transform_id
1894     transform_id_dependcy_map = cache.get(transform_id_dependcy_map_key, default=[])
1895     return transform_id_dependcy_map
1896 
1897 
1898 def set_transform_id_dependency_map(transform_id, transform_id_dependcy_map, logger=None, log_prefix=''):
1899     cache = get_redis_cache()
1900     transform_id_dependcy_map_key = "transform_id_dependcy_map_%s" % transform_id
1901     cache.set(transform_id_dependcy_map_key, transform_id_dependcy_map)
1902 
1903 
1904 def get_updated_transforms_by_content_status(request_id=None, transform_id=None, logger=None, log_prefix=''):
1905     logger = get_logger(logger)
1906     logger.debug("get_updated_transforms_by_content_status starts")
1907 
1908     update_transforms = get_transform_id_dependency_map(transform_id=transform_id, logger=logger, log_prefix=log_prefix)
1909     if not update_transforms:
1910         update_transforms = core_catalog.get_updated_transforms_by_content_status(request_id=request_id,
1911                                                                                   transform_id=transform_id)
1912         set_transform_id_dependency_map(transform_id, update_transforms, logger=logger, log_prefix=log_prefix)
1913     logger.debug("get_updated_transforms_by_content_status ends")
1914     return update_transforms
1915 
1916 
1917 def update_contents_thread(logger, log_prefix, log_msg, kwargs):
1918     try:
1919         logger = get_logger(logger)
1920         logger.debug(log_prefix + log_msg)
1921         core_catalog.update_contents(**kwargs)
1922         logger.debug(log_prefix + " end")
1923     except Exception as ex:
1924         logger.error(log_prefix + "update_contents_thread: %s" % str(ex))
1925         raise ex
1926     except:
1927         logger.error(traceback.format_exc())
1928         raise Exception("update_contents_thread error")
1929 
1930 
1931 def handle_trigger_processing(processing, agent_attributes, trigger_new_updates=False, max_updates_per_round=2000, executors=None, logger=None, log_prefix=''):
1932     logger = get_logger(logger)
1933 
1934     has_updates = False
1935     ret_msgs = []
1936     content_updates = []
1937     ret_update_transforms = []
1938     new_update_contents = []
1939 
1940     request_id = processing['request_id']
1941     transform_id = processing['transform_id']
1942     workload_id = processing['workload_id']
1943     processing_id = processing['processing_id']
1944 
1945     proc = processing['processing_metadata']['processing']
1946     work = proc.work
1947     work.set_agent_attributes(agent_attributes, processing)
1948 
1949     num_dependencies = None
1950     num_inputs = None
1951     default_input_dep_page_size = 500
1952     min_input_dep_page_size = 100
1953     max_dependencies = 5000
1954     try:
1955         num_inputs = work.num_inputs
1956         num_dependencies = work.num_dependencies
1957         if num_inputs is not None and num_dependencies is not None and num_dependencies > 0:
1958             input_dep_page_size = int(max_dependencies * num_inputs / num_dependencies)
1959             if input_dep_page_size < default_input_dep_page_size:
1960                 default_input_dep_page_size = input_dep_page_size
1961                 log_info = f"input_dep_page_size ({input_dep_page_size}) is smaller than default_input_dep_page_size ({default_input_dep_page_size}),"
1962                 log_info = "update default_input_dep_page_size from input_dep_page_size"
1963                 logger.info(log_info)
1964             if default_input_dep_page_size < min_input_dep_page_size:
1965                 log_info = f"default_input_dep_page_size ({default_input_dep_page_size}) is smaller than min_input_dep_page_size ({min_input_dep_page_size}),"
1966                 log_info = "update default_input_dep_page_size from min_input_dep_page_size"
1967                 logger.info(log_info)
1968                 default_input_dep_page_size = min_input_dep_page_size
1969     except Exception as ex:
1970         logger.warn(f"request_id ({request_id}) transform_id ({transform_id}) processing_id ({processing_id}) fails to get num_dependencies: {ex}")
1971 
1972     if (not work.use_dependency_to_release_jobs()) or workload_id is None:
1973         return processing['substatus'], [], [], {}, {}, {}, [], [], has_updates
1974     else:
1975         if trigger_new_updates:
1976             # delete information in the contents_update table, to invoke the trigger.
1977             # ret_update_transforms = core_catalog.delete_contents_update(request_id=request_id, transform_id=transform_id)
1978             # logger.debug(log_prefix + "delete_contents_update: %s" % str(ret_update_transforms))
1979             pass
1980 
1981         logger.debug(log_prefix + "sync contents_update to contents")
1982         core_catalog.set_fetching_contents_update(request_id=request_id, transform_id=transform_id, fetch=True)
1983         contents_update_list = core_catalog.get_contents_update(request_id=request_id, transform_id=transform_id, fetch=True)
1984         new_contents_update_list = []
1985         # contents_id_list = []
1986         for con in contents_update_list:
1987             has_updates = True
1988             if not work.es or con['substatus'] in [ContentStatus.Available]:
1989                 con_dict = {'content_id': con['content_id'],
1990                             'request_id': con['request_id'],
1991                             'substatus': con['substatus'],
1992                             'status': con['substatus']}
1993                 if 'content_metadata' in con and con['content_metadata']:
1994                     con_dict['content_metadata'] = con['content_metadata']
1995                 new_contents_update_list.append(con_dict)
1996                 # contents_id_list.append(con['content_id'])
1997         new_contents_update_list_chunks = [new_contents_update_list[i:i + max_updates_per_round] for i in range(0, len(new_contents_update_list), max_updates_per_round)]
1998         ret_futures = set()
1999         for chunk in new_contents_update_list_chunks:
2000             has_updates = True
2001             if executors is None:
2002                 logger.debug(log_prefix + "new_contents_update chunk[:3](total: %s): %s" % (len(chunk), str(chunk[:3])))
2003                 # core_catalog.update_contents(chunk, request_id=request_id, transform_id=transform_id, use_bulk_update_mappings=False)
2004                 core_catalog.update_contents(chunk, request_id=request_id, transform_id=transform_id, use_bulk_update_mappings=True)
2005             else:
2006                 log_msg = "new_contents_update thread chunk[:3](total: %s): %s" % (len(chunk), str(chunk[:3]))
2007                 kwargs = {'parameters': chunk,
2008                           'request_id': request_id,
2009                           'transform_id': transform_id,
2010                           'use_bulk_update_mappings': True}
2011                 f = executors.submit(update_contents_thread, logger, log_prefix, log_msg, kwargs)
2012                 ret_futures.add(f)
2013         if len(ret_futures) > 0:
2014             wait_futures_finish(ret_futures, "new_contents_update", logger, log_prefix)
2015 
2016         # core_catalog.delete_contents_update(contents=contents_id_list)
2017         core_catalog.delete_contents_update(request_id=request_id, transform_id=transform_id, fetch=True)
2018         logger.debug(log_prefix + "sync contents_update to contents done")
2019 
2020         """
2021         logger.debug(log_prefix + "update_contents_from_others_by_dep_id")
2022         # core_catalog.update_contents_from_others_by_dep_id(request_id=request_id, transform_id=transform_id)
2023         to_triggered_contents = core_catalog.get_update_contents_from_others_by_dep_id(request_id=request_id, transform_id=transform_id)
2024         to_triggered_contents_chunks = [to_triggered_contents[i:i + max_updates_per_round] for i in range(0, len(to_triggered_contents), max_updates_per_round)]
2025 
2026         ret_futures = set()
2027         for chunk in to_triggered_contents_chunks:
2028             has_updates = True
2029             if executors is None:
2030                 logger.debug(log_prefix + "update_contents_from_others_by_dep_id chunk[:3](total: %s): %s" % (len(chunk), str(chunk[:3])))
2031                 core_catalog.update_contents(chunk, request_id=request_id, transform_id=transform_id, use_bulk_update_mappings=False)
2032             else:
2033                 log_msg = "update_contents_from_others_by_dep_id thread chunk[:3](total: %s): %s" % (len(chunk), str(chunk[:3]))
2034                 kwargs = {'parameters': chunk,
2035                           'request_id': request_id,
2036                           'transform_id': transform_id,
2037                           'use_bulk_update_mappings': False}
2038                 f = executors.submit(update_contents_thread, logger, log_prefix, log_msg, kwargs)
2039                 ret_futures.add(f)
2040         if len(ret_futures) > 0:
2041             wait_futures_finish(ret_futures, "update_contents_from_others_by_dep_id", logger, log_prefix)
2042 
2043         logger.debug(log_prefix + "update_contents_from_others_by_dep_id done")
2044         """
2045 
2046         logger.debug(log_prefix + "update_contents_from_others_by_dep_id_pages")
2047         status_not_to_check = [ContentStatus.Available, ContentStatus.FakeAvailable,
2048                                ContentStatus.FinalFailed, ContentStatus.Missing]
2049         core_catalog.update_contents_from_others_by_dep_id_pages(request_id=request_id, transform_id=transform_id,
2050                                                                  page_size=2000, status_not_to_check=status_not_to_check,
2051                                                                  logger=logger, log_prefix=log_prefix)
2052         logger.debug(log_prefix + "update_contents_from_others_by_dep_id_pages done")
2053 
2054         terminated_processing = False
2055         terminated_status = [ProcessingStatus.Finished, ProcessingStatus.Failed, ProcessingStatus.SubFinished,
2056                              ProcessingStatus.Terminating, ProcessingStatus.Cancelled]
2057         if processing['status'] in terminated_status or processing['substatus'] in terminated_status:
2058             terminated_processing = True
2059 
2060         logger.debug(log_prefix + "update_input_contents_by_dependency_pages")
2061         status_not_to_check = [ContentStatus.Available, ContentStatus.FakeAvailable,
2062                                ContentStatus.FinalFailed, ContentStatus.Missing]
2063         core_catalog.update_input_contents_by_dependency_pages(request_id=request_id, transform_id=transform_id,
2064                                                                page_size=default_input_dep_page_size,
2065                                                                terminated=terminated_processing,
2066                                                                batch_size=2000, status_not_to_check=status_not_to_check,
2067                                                                logger=logger, log_prefix=log_prefix)
2068         logger.debug(log_prefix + "update_input_contents_by_dependency_pages done")
2069 
2070         with_deps = False
2071         input_output_maps = get_input_output_maps(request_id, transform_id, work, with_deps=with_deps)
2072         logger.debug(log_prefix + "input_output_maps.keys[:2]: %s" % str(list(input_output_maps.keys())[:2]))
2073 
2074         updated_contents_ret_chunks = get_updated_contents_by_input_output_maps(input_output_maps=input_output_maps,
2075                                                                                 terminated=terminated_processing,
2076                                                                                 max_updates_per_round=max_updates_per_round,
2077                                                                                 es=work.es,
2078                                                                                 with_deps=with_deps,
2079                                                                                 logger=logger,
2080                                                                                 log_prefix=log_prefix)
2081 
2082         ret_futures = set()
2083         for updated_contents_ret in updated_contents_ret_chunks:
2084             updated_contents, updated_contents_full_input, updated_contents_full_output, updated_contents_full_input_deps, new_update_contents = updated_contents_ret
2085 
2086             if updated_contents_full_input:
2087                 # if the content is updated by receiver, here is the place to broadcast the messages
2088                 msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file',
2089                                          files=updated_contents_full_input, relation_type='input')
2090                 ret_msgs = ret_msgs + msgs
2091             if updated_contents_full_output:
2092                 # if the content is updated by receiver, here is the place to broadcast the messages
2093                 msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file',
2094                                          files=updated_contents_full_output, relation_type='output')
2095                 ret_msgs = ret_msgs + msgs
2096 
2097             # content_updates = content_updates + updated_contents
2098 
2099             if updated_contents or new_update_contents:
2100                 has_updates = True
2101 
2102             if executors is None:
2103                 logger.debug(log_prefix + "handle_trigger_processing: updated_contents[:3] (total: %s): %s" % (len(updated_contents), updated_contents[:3]))
2104                 core_processings.update_processing_contents(update_processing=None,
2105                                                             update_contents=updated_contents,
2106                                                             # new_update_contents=new_update_contents,
2107                                                             messages=ret_msgs,
2108                                                             request_id=request_id,
2109                                                             # transform_id=transform_id,
2110                                                             use_bulk_update_mappings=False)
2111             else:
2112                 log_msg = "handle_trigger_processing thread: updated_contents[:3] (total: %s): %s" % (len(updated_contents), updated_contents[:3])
2113                 kwargs = {'update_processing': None,
2114                           'request_id': request_id,
2115                           'update_contents': updated_contents,
2116                           'messages': ret_msgs,
2117                           'use_bulk_update_mappings': False}
2118                 f = executors.submit(update_processing_contents_thread, logger, log_prefix, log_msg, kwargs)
2119                 ret_futures.add(f)
2120 
2121             updated_contents = []
2122             new_update_contents = []
2123             ret_msgs = []
2124         if len(ret_futures) > 0:
2125             wait_futures_finish(ret_futures, "handle_trigger_processing", logger, log_prefix)
2126 
2127         if has_updates:
2128             ret_update_transforms = get_updated_transforms_by_content_status(request_id=request_id,
2129                                                                              transform_id=transform_id,
2130                                                                              logger=logger,
2131                                                                              log_prefix=log_prefix)
2132     # update_dep_contents_status_name = {}
2133     # update_dep_contents_status = {}
2134     # for content in new_update_contents:
2135     #     if content['substatus'] not in update_dep_contents_status_name:
2136     #         update_dep_contents_status_name[content['substatus'].name] = content['substatus']
2137     #         update_dep_contents_status[content['substatus'].name] = []
2138     #     update_dep_contents_status[content['substatus'].name].append(content['content_id'])
2139 
2140     # return processing['substatus'], content_updates, ret_msgs, {}, {}, {}, new_update_contents, ret_update_transforms
2141     # return processing['substatus'], content_updates, ret_msgs, {}, update_dep_contents_status_name, update_dep_contents_status, [], ret_update_transforms
2142     # return processing['substatus'], content_updates, ret_msgs, {}, {}, {}, [], ret_update_transforms
2143     return processing['substatus'], content_updates, ret_msgs, {}, {}, {}, new_update_contents, ret_update_transforms, has_updates
2144 
2145 
2146 def get_content_status_from_panda_msg_status(status):
2147     status_map = {'starting': ContentStatus.New,
2148                   'activated': ContentStatus.Activated,
2149                   'running': ContentStatus.Processing,
2150                   'finished': ContentStatus.Available,
2151                   'failed': ContentStatus.Failed}
2152     if status in status_map:
2153         return status_map[status]
2154     return ContentStatus.New
2155 
2156 
2157 def get_collection_id_transform_id_map(coll_id, request_id, request_ids=[]):
2158     cache = get_redis_cache()
2159     coll_tf_id_map_key = "collection_id_transform_id_map"
2160     coll_tf_id_map = cache.get(coll_tf_id_map_key, default={})
2161 
2162     if coll_id is None or coll_id not in coll_tf_id_map:
2163         if not request_ids:
2164             request_ids = []
2165         if request_id not in request_ids:
2166             request_ids.append(request_id)
2167         colls = core_catalog.get_collections_by_request_ids(request_ids)
2168         for coll in colls:
2169             coll_tf_id_map[coll['coll_id']] = (coll['request_id'], coll['transform_id'], coll['workload_id'])
2170 
2171         cache.set(coll_tf_id_map_key, coll_tf_id_map)
2172 
2173     if coll_id is None or coll_id not in coll_tf_id_map:
2174         return None, None, None
2175     return coll_tf_id_map[coll_id]
2176 
2177 
2178 workload_id_lock = threading.Lock()
2179 
2180 
2181 def get_workload_id_transform_id_map(workload_id, logger=None, log_prefix=''):
2182     cache = get_redis_cache()
2183     workload_id_transform_id_map_key = "all_worloadid2transformid_map"
2184     workload_id_transform_id_map = cache.get(workload_id_transform_id_map_key, default={})
2185 
2186     workload_id_transform_id_map_notexist_key = "all_worloadid2transformid_map_notexist"
2187     workload_id_transform_id_map_notexist = cache.get(workload_id_transform_id_map_notexist_key, default={})
2188 
2189     if type(workload_id_transform_id_map_notexist) in (list, tuple):
2190         workload_id_transform_id_map_notexist = {}
2191 
2192     workload_id_str = str(workload_id)
2193     if workload_id_str in workload_id_transform_id_map:
2194         return workload_id_transform_id_map[workload_id_str]
2195 
2196     if workload_id_str in workload_id_transform_id_map_notexist and workload_id_transform_id_map_notexist[workload_id_str] + 600 < time.time():
2197         return None
2198 
2199     # lock area
2200     workload_id_lock.acquire()
2201 
2202     workload_id_transform_id_map = cache.get(workload_id_transform_id_map_key, default={})
2203     workload_id_transform_id_map_notexist = cache.get(workload_id_transform_id_map_notexist_key, default={})
2204 
2205     if type(workload_id_transform_id_map_notexist) in (list, tuple):
2206         workload_id_transform_id_map_notexist = {}
2207 
2208     request_ids = []
2209     if not workload_id_transform_id_map or workload_id_str not in workload_id_transform_id_map or len(workload_id_transform_id_map[workload_id_str]) < 5:
2210         processing_status = [ProcessingStatus.New,
2211                              ProcessingStatus.Submitting, ProcessingStatus.Submitted,
2212                              ProcessingStatus.Running, ProcessingStatus.FinishedOnExec,
2213                              ProcessingStatus.Cancel, ProcessingStatus.FinishedOnStep,
2214                              ProcessingStatus.ToCancel, ProcessingStatus.Cancelling,
2215                              ProcessingStatus.ToSuspend, ProcessingStatus.Suspending,
2216                              ProcessingStatus.ToResume, ProcessingStatus.Resuming,
2217                              ProcessingStatus.ToExpire, ProcessingStatus.Expiring,
2218                              ProcessingStatus.ToFinish, ProcessingStatus.ToForceFinish]
2219 
2220         procs = core_processings.get_processings_by_status(status=processing_status)
2221         for proc in procs:
2222             processing = proc['processing_metadata']['processing']
2223             work = processing.work
2224             if work.use_dependency_to_release_jobs():
2225                 workload_id_transform_id_map[str(proc['workload_id'])] = (proc['request_id'],
2226                                                                           proc['transform_id'],
2227                                                                           proc['processing_id'],
2228                                                                           proc['status'].value,
2229                                                                           proc['substatus'].value)
2230                 if proc['request_id'] not in request_ids:
2231                     request_ids.append(proc['request_id'])
2232 
2233         cache.set(workload_id_transform_id_map_key, workload_id_transform_id_map)
2234 
2235         for key in workload_id_transform_id_map:
2236             if key in workload_id_transform_id_map_notexist:
2237                 del workload_id_transform_id_map_notexist[key]
2238 
2239     # renew the collection to transform map
2240     if request_ids:
2241         get_collection_id_transform_id_map(coll_id=None, request_id=request_ids[0], request_ids=request_ids)
2242 
2243     keys = list(workload_id_transform_id_map_notexist.keys())
2244     for key in keys:
2245         if workload_id_transform_id_map_notexist[key] + 7200 < time.time():
2246             del workload_id_transform_id_map_notexist[key]
2247 
2248     cache.set(workload_id_transform_id_map_notexist_key, workload_id_transform_id_map_notexist)
2249     # for tasks running in some other instances
2250     if workload_id_str not in workload_id_transform_id_map:
2251         if workload_id_str not in workload_id_transform_id_map_notexist:
2252             workload_id_transform_id_map_notexist[workload_id_str] = time.time()
2253             cache.set(workload_id_transform_id_map_notexist_key, workload_id_transform_id_map_notexist)
2254 
2255         workload_id_lock.release()
2256         return None
2257     else:
2258         workload_id_lock.release()
2259 
2260     return workload_id_transform_id_map[workload_id_str]
2261 
2262 
2263 content_id_lock = threading.Lock()
2264 
2265 
2266 def get_input_name_content_id_map(request_id, workload_id, transform_id):
2267     cache = get_redis_cache()
2268     input_name_content_id_map_key = "transform_input_contentid_map_%s" % transform_id
2269     input_name_content_id_map = cache.get(input_name_content_id_map_key, default={})
2270 
2271     if not input_name_content_id_map:
2272         content_id_lock.acquire()
2273 
2274         contents = core_catalog.get_contents_by_request_transform(request_id=request_id, transform_id=transform_id)
2275         input_name_content_id_map = {}
2276         for content in contents:
2277             if content['content_relation_type'] == ContentRelationType.Output:
2278                 if content['name'] not in input_name_content_id_map:
2279                     input_name_content_id_map[content['name']] = []
2280                 input_name_content_id_map[content['name']].append(content['content_id'])
2281                 if content['path']:
2282                     if content['path'] not in input_name_content_id_map:
2283                         input_name_content_id_map[content['path']] = []
2284                     input_name_content_id_map[content['path']].append(content['content_id'])
2285 
2286         cache.set(input_name_content_id_map_key, input_name_content_id_map)
2287 
2288         content_id_lock.release()
2289     return input_name_content_id_map
2290 
2291 
2292 def get_jobid_content_id_map(request_id, workload_id, transform_id, job_id, inputs):
2293     cache = get_redis_cache()
2294     jobid_content_id_map_key = "transform_jobid_contentid_map_%s" % transform_id
2295     jobid_content_id_map = cache.get(jobid_content_id_map_key, default={})
2296 
2297     to_update_jobid = False
2298     job_id = str(job_id)
2299     if not jobid_content_id_map or job_id not in jobid_content_id_map:
2300         to_update_jobid = True
2301         input_name_content_id_map = get_input_name_content_id_map(request_id, workload_id, transform_id)
2302         for ip in inputs:
2303             if ':' in ip:
2304                 pos = ip.find(":")
2305                 ip = ip[pos + 1:]
2306             if ip in input_name_content_id_map:
2307                 content_ids = input_name_content_id_map[ip]
2308                 jobid_content_id_map[job_id] = content_ids
2309                 break
2310 
2311         cache.set(jobid_content_id_map_key, jobid_content_id_map)
2312     return jobid_content_id_map, to_update_jobid
2313 
2314 
2315 def get_content_id_from_job_id(request_id, workload_id, transform_id, job_id, inputs):
2316     jobid_content_id_map, to_update_jobid = get_jobid_content_id_map(request_id, workload_id, transform_id, job_id, inputs)
2317 
2318     if str(job_id) in jobid_content_id_map:
2319         content_ids = jobid_content_id_map[str(job_id)]
2320     else:
2321         content_ids = None
2322     return content_ids, to_update_jobid
2323 
2324 
2325 pending_lock = threading.Lock()
2326 
2327 
2328 def whether_to_process_pending_workload_id(workload_id, logger=None, log_prefix=''):
2329     cache = get_redis_cache()
2330     processed_pending_workload_id_map_key = "processed_pending_workload_id_map"
2331     processed_pending_workload_id_map = cache.get(processed_pending_workload_id_map_key, default={})
2332     processed_pending_workload_id_map_time_key = "processed_pending_workload_id_map_time"
2333     processed_pending_workload_id_map_time = cache.get(processed_pending_workload_id_map_time_key, default=None)
2334 
2335     workload_id = str(workload_id)
2336     if workload_id in processed_pending_workload_id_map:
2337         return False
2338 
2339     # lock area
2340     pending_lock.acquire()
2341     processed_pending_workload_id_map = cache.get(processed_pending_workload_id_map_key, default={})
2342 
2343     processed_pending_workload_id_map[workload_id] = time.time()
2344     if processed_pending_workload_id_map_time is None or processed_pending_workload_id_map_time + 86400 < time.time():
2345         cache.set(processed_pending_workload_id_map_time_key, int(time.time()), expire_seconds=86400)
2346 
2347         keys = list(processed_pending_workload_id_map.keys())
2348         for workload_id in keys:
2349             if processed_pending_workload_id_map[workload_id] + 86400 < time.time():
2350                 del processed_pending_workload_id_map[workload_id]
2351 
2352     cache.set(processed_pending_workload_id_map_key, processed_pending_workload_id_map, expire_seconds=86400)
2353     pending_lock.release()
2354     return True
2355 
2356 
2357 update_processing_lock = threading.Lock()
2358 
2359 
2360 def whether_to_update_processing(processing_id, interval=300):
2361     cache = get_redis_cache()
2362     ret = False
2363 
2364     update_processing_lock.acquire()
2365     update_processing_map_key = "update_processing_map"
2366     update_processing_map = cache.get(update_processing_map_key, default={})
2367 
2368     processing_id_str = str(processing_id)
2369     if processing_id_str not in update_processing_map or update_processing_map[processing_id_str] + interval < time.time():
2370         update_processing_map[processing_id_str] = time.time()
2371         ret = True
2372 
2373     keys = list(update_processing_map.keys())
2374     for key in keys:
2375         if update_processing_map[key] + 86400 < time.time():
2376             del update_processing_map[key]
2377 
2378     cache.set(update_processing_map_key, update_processing_map, expire_seconds=86400)
2379     update_processing_lock.release()
2380     return ret
2381 
2382 
2383 def handle_messages_processing(messages, logger=None, log_prefix='', update_processing_interval=300):
2384     logger = get_logger(logger)
2385     if not log_prefix:
2386         log_prefix = "<Message>"
2387 
2388     update_processings = []
2389     update_processings_by_job = []
2390     terminated_processings = []
2391     update_contents = []
2392 
2393     for ori_msg in messages:
2394         if type(ori_msg) in [dict]:
2395             msg = ori_msg
2396         else:
2397             msg = json.loads(ori_msg)
2398         if 'taskid' not in msg or not msg['taskid']:
2399             continue
2400 
2401         if msg['msg_type'] in ['task_status']:
2402             workload_id = msg['taskid']
2403             status = msg['status']
2404             if status in ['pending1']:   # 'prepared'
2405                 logger.debug(log_prefix + "Received message: %s" % str(ori_msg))
2406 
2407                 ret_req_tf_pr_id = get_workload_id_transform_id_map(workload_id, logger=logger, log_prefix=log_prefix)
2408                 if not ret_req_tf_pr_id:
2409                     # request is submitted by some other instances
2410                     logger.debug(log_prefix + "No matched workload_id, discard message: %s" % str(ori_msg))
2411                     continue
2412 
2413                 logger.debug(log_prefix + "(request_id, transform_id, processing_id, status, substatus): %s" % str(ret_req_tf_pr_id))
2414                 req_id, tf_id, processing_id, r_status, r_substatus = ret_req_tf_pr_id
2415                 if whether_to_process_pending_workload_id(workload_id, logger=logger, log_prefix=log_prefix):
2416                     # new_processings.append((req_id, tf_id, processing_id, workload_id, status))
2417                     if processing_id not in update_processings:
2418                         update_processings.append(processing_id)
2419                         logger.debug(log_prefix + "Add to update processing: %s" % str(processing_id))
2420                 else:
2421                     logger.debug(log_prefix + "Processing %s is already processed, not add it to update processing" % (str(processing_id)))
2422             elif status in ['finished', 'done']:
2423                 logger.debug(log_prefix + "Received message: %s" % str(ori_msg))
2424 
2425                 ret_req_tf_pr_id = get_workload_id_transform_id_map(workload_id, logger=logger, log_prefix=log_prefix)
2426                 if not ret_req_tf_pr_id:
2427                     # request is submitted by some other instances
2428                     logger.debug(log_prefix + "No matched workload_id, discard message: %s" % str(ori_msg))
2429                     continue
2430 
2431                 logger.debug(log_prefix + "(request_id, transform_id, processing_id, status, substatus): %s" % str(ret_req_tf_pr_id))
2432                 req_id, tf_id, processing_id, r_status, r_substatus = ret_req_tf_pr_id
2433                 # update_processings.append((processing_id, status))
2434                 if processing_id not in update_processings:
2435                     terminated_processings.append(processing_id)
2436                     logger.debug(log_prefix + "Add to terminated processing: %s" % str(processing_id))
2437 
2438         if msg['msg_type'] in ['job_status']:
2439             workload_id = msg['taskid']
2440             job_id = msg['jobid']
2441             status = msg['status']
2442             inputs = msg['inputs']
2443             # if inputs and status in ['finished']:
2444             # add activated
2445             if inputs and status in ['finished', 'activated']:
2446                 logger.debug(log_prefix + "Received message: %s" % str(ori_msg))
2447 
2448                 ret_req_tf_pr_id = get_workload_id_transform_id_map(workload_id, logger=logger, log_prefix=log_prefix)
2449                 if not ret_req_tf_pr_id:
2450                     # request is submitted by some other instances
2451                     logger.debug(log_prefix + "No matched workload_id, discard message: %s" % str(ori_msg))
2452                     continue
2453 
2454                 logger.debug(log_prefix + "(request_id, transform_id, processing_id, status, substatus): %s" % str(ret_req_tf_pr_id))
2455 
2456                 req_id, tf_id, processing_id, r_status, r_substatus = ret_req_tf_pr_id
2457                 content_ids, to_update_jobid = get_content_id_from_job_id(req_id, workload_id, tf_id, job_id, inputs)
2458                 if content_ids:
2459                     for content_id in content_ids:
2460                         if to_update_jobid:
2461                             u_content = {'content_id': content_id,
2462                                          'request_id': req_id,
2463                                          'transform_id': tf_id,
2464                                          'workload_id': workload_id,
2465                                          # 'status': get_content_status_from_panda_msg_status(status),
2466                                          'substatus': get_content_status_from_panda_msg_status(status),
2467                                          'content_metadata': {'panda_id': job_id}}
2468                         else:
2469                             u_content = {'content_id': content_id,
2470                                          'request_id': req_id,
2471                                          'transform_id': tf_id,
2472                                          'workload_id': workload_id,
2473                                          'substatus': get_content_status_from_panda_msg_status(status)}
2474                             #             # 'status': get_content_status_from_panda_msg_status(status)}
2475 
2476                         update_contents.append(u_content)
2477                         # if processing_id not in update_processings:
2478                         # if processing_id not in update_processings and whether_to_update_processing(processing_id, update_processing_interval):
2479                         if processing_id not in update_processings_by_job:
2480                             update_processings_by_job.append(processing_id)
2481                             logger.debug(log_prefix + "Add to update processing by job: %s" % str(processing_id))
2482 
2483     return update_processings, update_processings_by_job, terminated_processings, update_contents, []
2484 
2485 
2486 def sync_collection_status(request_id, transform_id, workload_id, work, input_output_maps=None, log_prefix='',
2487                            close_collection=False, force_close_collection=False, abort=False, terminate=False):
2488     logger = get_logger()
2489 
2490     logger.info(log_prefix + "sync_collection_status")
2491 
2492     if input_output_maps is None:
2493         input_output_maps = get_input_output_maps(request_id, transform_id, work, with_deps=False)
2494 
2495     all_updates_flushed = True
2496     coll_status = {}
2497     messages = []
2498     for map_id in input_output_maps:
2499         inputs = input_output_maps[map_id]['inputs'] if 'inputs' in input_output_maps[map_id] else []
2500         # inputs_dependency = input_output_maps[map_id]['inputs_dependency'] if 'inputs_dependency' in input_output_maps[map_id] else []
2501         outputs = input_output_maps[map_id]['outputs'] if 'outputs' in input_output_maps[map_id] else []
2502         logs = input_output_maps[map_id]['logs'] if 'logs' in input_output_maps[map_id] else []
2503 
2504         for content in inputs + outputs + logs:
2505             if content['coll_id'] not in coll_status:
2506                 coll_status[content['coll_id']] = {'total_files': 0, 'processed_files': 0, 'processing_files': 0, 'bytes': 0,
2507                                                    'new_files': 0, 'activated_files': 0, 'failed_files': 0, 'missing_files': 0,
2508                                                    'ext_files': 0, 'processed_ext_files': 0, 'failed_ext_files': 0,
2509                                                    'preprocessing_files': 0, 'missing_ext_files': 0}
2510             coll_status[content['coll_id']]['total_files'] += 1
2511 
2512             if content['status'] in [ContentStatus.Available, ContentStatus.Mapped,
2513                                      ContentStatus.Available.value, ContentStatus.Mapped.value,
2514                                      ContentStatus.FakeAvailable, ContentStatus.FakeAvailable.value]:
2515                 coll_status[content['coll_id']]['processed_files'] += 1
2516                 coll_status[content['coll_id']]['bytes'] += content['bytes']
2517             elif content['status'] in [ContentStatus.New]:
2518                 coll_status[content['coll_id']]['new_files'] += 1
2519             elif content['status'] in [ContentStatus.Failed, ContentStatus.FinalFailed,
2520                                        ContentStatus.SubAvailable, ContentStatus.FinalSubAvailable]:
2521                 coll_status[content['coll_id']]['failed_files'] += 1
2522             elif content['status'] in [ContentStatus.Lost, ContentStatus.Deleted, ContentStatus.Missing]:
2523                 coll_status[content['coll_id']]['missing_files'] += 1
2524             elif content['status'] in [ContentStatus.Processing]:
2525                 coll_status[content['coll_id']]['processing_files'] += 1
2526             elif content['status'] in [ContentStatus.Activated]:
2527                 coll_status[content['coll_id']]['activated_files'] += 1
2528             else:
2529                 coll_status[content['coll_id']]['preprocessing_files'] += 1
2530 
2531             if content['status'] != content['substatus']:
2532                 all_updates_flushed = False
2533 
2534     all_ext_updated = True
2535     if work.require_ext_contents():
2536         all_ext_updated = False
2537         contents_ext = core_catalog.get_contents_ext(request_id=request_id, transform_id=transform_id)
2538         for content in contents_ext:
2539             coll_status[content['coll_id']]['ext_files'] += 1
2540 
2541             if content['status'] in [ContentStatus.Available, ContentStatus.Mapped,
2542                                      ContentStatus.Available.value, ContentStatus.Mapped.value,
2543                                      ContentStatus.FakeAvailable, ContentStatus.FakeAvailable.value]:
2544                 coll_status[content['coll_id']]['processed_ext_files'] += 1
2545             # elif content['status'] in [ContentStatus.Failed, ContentStatus.FinalFailed]:
2546             elif content['status'] in [ContentStatus.Failed, ContentStatus.FinalFailed,
2547                                        ContentStatus.SubAvailable, ContentStatus.FinalSubAvailable]:
2548                 coll_status[content['coll_id']]['failed_ext_files'] += 1
2549             elif content['status'] in [ContentStatus.Lost, ContentStatus.Deleted, ContentStatus.Missing]:
2550                 coll_status[content['coll_id']]['missing_ext_files'] += 1
2551 
2552     logger.info(log_prefix + f"sync_collection_status, coll_status: {coll_status}")
2553 
2554     input_collections = work.get_input_collections(poll_externel=True)
2555     output_collections = work.get_output_collections()
2556     log_collections = work.get_log_collections()
2557 
2558     update_collections = []
2559     for coll in input_collections + output_collections + log_collections:
2560         if coll.coll_id in coll_status:
2561             if 'total_files' in coll.coll_metadata and coll.coll_metadata['total_files']:
2562                 coll.total_files = coll.coll_metadata['total_files']
2563             else:
2564                 coll.total_files = coll_status[coll.coll_id]['total_files']
2565             coll.processed_files = coll_status[coll.coll_id]['processed_files']
2566             coll.processing_files = coll_status[coll.coll_id]['processing_files']
2567             coll.preprocessing_files = coll_status[coll.coll_id]['preprocessing_files']
2568             coll.activated_files = coll_status[coll.coll_id]['activated_files']
2569             coll.bytes = coll_status[coll.coll_id]['bytes']
2570             coll.new_files = coll_status[coll.coll_id]['new_files']
2571             coll.failed_files = coll_status[coll.coll_id]['failed_files']
2572             coll.missing_files = coll_status[coll.coll_id]['missing_files']
2573             coll.ext_files = coll_status[coll.coll_id]['ext_files']
2574             coll.processed_ext_files = coll_status[coll.coll_id]['processed_ext_files']
2575             coll.failed_ext_files = coll_status[coll.coll_id]['failed_ext_files']
2576             coll.missing_ext_files = coll_status[coll.coll_id]['missing_ext_files']
2577         else:
2578             if 'total_files' in coll.coll_metadata and coll.coll_metadata['total_files']:
2579                 coll.total_files = coll.coll_metadata['total_files']
2580             else:
2581                 coll.total_files = 0
2582             if 'availability' in coll.coll_metadata and coll.coll_metadata['availability']:
2583                 coll.processed_files = coll.coll_metadata['availability']
2584             else:
2585                 coll.processed_files = 0
2586             if 'stuck' in coll.coll_metadata and coll.coll_metadata['stuck']:
2587                 coll.failed_files = coll.coll_metadata['stuck']
2588             else:
2589                 coll.failed_files = 0
2590             if 'processing' in coll.coll_metadata and coll.coll_metadata['processing']:
2591                 coll.processing_files = coll.coll_metadata['processing']
2592             else:
2593                 coll.processing_files = coll.total_files - coll.processed_files - coll.failed_files
2594             coll.new_files = 0
2595             coll.preprocessing_files = 0
2596             coll.activated_files = 0
2597             coll.missing_files = 0
2598             coll.ext_files = 0
2599             coll.processed_ext_files = 0
2600             coll.failed_ext_files = 0
2601             coll.missing_ext_files = 0
2602 
2603         u_coll = {'coll_id': coll.coll_id,
2604                   'total_files': coll.total_files,
2605                   'processed_files': coll.processed_files,
2606                   'processing_files': coll.processing_files,
2607                   'activated_files': coll.activated_files,
2608                   'preprocessing_files': coll.preprocessing_files,
2609                   'new_files': coll.new_files,
2610                   'failed_files': coll.failed_files,
2611                   'missing_files': coll.missing_files,
2612                   'bytes': coll.bytes,
2613                   'ext_files': coll.ext_files,
2614                   'processed_ext_files': coll.processed_ext_files,
2615                   'failed_ext_files': coll.failed_ext_files,
2616                   'missing_ext_files': coll.missing_ext_files}
2617 
2618         if (not work.generating_new_inputs()) and (coll in input_collections and (workload_id is not None)):
2619             if coll.total_files == coll.processed_files + coll.failed_files + coll.missing_files:
2620                 coll_db = core_catalog.get_collection(coll_id=coll.coll_id)
2621                 coll.status = coll_db['status']
2622                 if coll.status is not None and coll.status != CollectionStatus.Closed:
2623                     u_coll['status'] = CollectionStatus.Closed
2624                     u_coll['substatus'] = CollectionStatus.Closed
2625                     coll.status = CollectionStatus.Closed
2626                     coll.substatus = CollectionStatus.Closed
2627 
2628                     msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='collection', files=[coll], relation_type='input')
2629                     messages += msgs
2630 
2631         if terminate:
2632             all_files_monitored = False
2633             if coll.total_files == coll.processed_files + coll.failed_files + coll.missing_files:
2634                 all_files_monitored = True
2635 
2636             if abort:
2637                 u_coll['status'] = CollectionStatus.Closed
2638                 u_coll['substatus'] = CollectionStatus.Closed
2639                 coll.status = CollectionStatus.Closed
2640                 coll.substatus = CollectionStatus.Closed
2641             elif coll in output_collections:
2642                 if (not work.require_ext_contents() or (work.require_ext_contents()
2643                     and coll.processed_files <= coll.processed_ext_files and coll.failed_files <= coll.failed_ext_files)):     # noqa E129, W503
2644                     all_ext_updated = True
2645                 if (force_close_collection or (close_collection and all_updates_flushed and all_ext_updated and all_files_monitored)
2646                    or coll.status == CollectionStatus.Closed):        # noqa W503
2647                     u_coll['status'] = CollectionStatus.Closed
2648                     u_coll['substatus'] = CollectionStatus.Closed
2649                     coll.status = CollectionStatus.Closed
2650                     coll.substatus = CollectionStatus.Closed
2651             elif force_close_collection or (close_collection and all_updates_flushed and all_files_monitored) or coll.status == CollectionStatus.Closed:
2652                 u_coll['status'] = CollectionStatus.Closed
2653                 u_coll['substatus'] = CollectionStatus.Closed
2654                 coll.status = CollectionStatus.Closed
2655                 coll.substatus = CollectionStatus.Closed
2656 
2657         update_collections.append(u_coll)
2658 
2659     logger.info(log_prefix + f"sync_collection_status, update_collections: {update_collections}")
2660 
2661     return update_collections, all_updates_flushed, messages
2662 
2663 
2664 def sync_collection_status_new(request_id, transform_id, workload_id, work, log_prefix='',
2665                                close_collection=False, force_close_collection=False, abort=False, terminate=False):
2666     """
2667     Synchronise collection file-count statistics using pure SQL aggregate queries,
2668     avoiding loading all input_output_maps rows into Python memory.
2669 
2670     The function issues two GROUP-BY queries to the database:
2671       1. ``contents`` table  → count + bytes per (coll_id, status), plus a flag
2672          indicating whether any row still has status != substatus.
2673       2. ``contents_ext`` table (only when the work requires ext contents) →
2674          count per (coll_id, status).
2675 
2676     The results are mapped to the same ``update_collections`` / ``all_updates_flushed``
2677     / ``messages`` return shape that ``sync_collection_status`` produces, so the two
2678     functions are interchangeable at call sites.
2679     """
2680     logger = get_logger()
2681     logger.info(log_prefix + "sync_collection_status_new")
2682 
2683     # ------------------------------------------------------------------
2684     # 1.  Aggregate contents by (coll_id, status) via SQL
2685     # ------------------------------------------------------------------
2686     coll_stats_raw = core_catalog.get_content_status_statistics_by_coll(
2687         request_id=request_id, transform_id=transform_id, with_deps=False
2688     )
2689     # coll_stats_raw: {coll_id: {status_enum: {'count': N, 'bytes': B}, 'has_unsynced': bool}}
2690 
2691     # Status sets used for classification (mirror sync_collection_status logic)
2692     _processed_statuses = {
2693         ContentStatus.Available, ContentStatus.Mapped,
2694         ContentStatus.FakeAvailable,
2695         ContentStatus.Available.value, ContentStatus.Mapped.value,
2696         ContentStatus.FakeAvailable.value,
2697     }
2698     _failed_statuses = {
2699         ContentStatus.Failed, ContentStatus.FinalFailed,
2700         ContentStatus.SubAvailable, ContentStatus.FinalSubAvailable,
2701     }
2702     _missing_statuses = {
2703         ContentStatus.Lost, ContentStatus.Deleted, ContentStatus.Missing,
2704     }
2705 
2706     all_updates_flushed = True
2707     coll_status = {}
2708 
2709     for coll_id, by_status in coll_stats_raw.items():
2710         entry = {
2711             'total_files': 0,
2712             'processed_files': 0,
2713             'processing_files': 0,
2714             'bytes': 0,
2715             'new_files': 0,
2716             'activated_files': 0,
2717             'failed_files': 0,
2718             'missing_files': 0,
2719             'preprocessing_files': 0,
2720             'ext_files': 0,
2721             'processed_ext_files': 0,
2722             'failed_ext_files': 0,
2723             'missing_ext_files': 0,
2724         }
2725 
2726         if by_status.get('has_unsynced', False):
2727             all_updates_flushed = False
2728 
2729         for status, stat in by_status.items():
2730             if status == 'has_unsynced':
2731                 continue
2732             cnt = stat['count']
2733             byt = stat['bytes']
2734             entry['total_files'] += cnt
2735 
2736             if status in _processed_statuses:
2737                 entry['processed_files'] += cnt
2738                 entry['bytes'] += byt
2739             elif status == ContentStatus.New or status == ContentStatus.New.value:
2740                 entry['new_files'] += cnt
2741             elif status in _failed_statuses or status in {s.value for s in _failed_statuses}:
2742                 entry['failed_files'] += cnt
2743             elif status in _missing_statuses or status in {s.value for s in _missing_statuses}:
2744                 entry['missing_files'] += cnt
2745             elif status == ContentStatus.Processing or status == ContentStatus.Processing.value:
2746                 entry['processing_files'] += cnt
2747             elif status == ContentStatus.Activated or status == ContentStatus.Activated.value:
2748                 entry['activated_files'] += cnt
2749             else:
2750                 entry['preprocessing_files'] += cnt
2751 
2752         coll_status[coll_id] = entry
2753 
2754     # ------------------------------------------------------------------
2755     # 2.  Aggregate contents_ext by (coll_id, status) via SQL (optional)
2756     # ------------------------------------------------------------------
2757     all_ext_updated = True
2758     if work.require_ext_contents():
2759         all_ext_updated = False
2760         ext_stats_raw = core_catalog.get_content_ext_status_statistics_by_coll(
2761             request_id=request_id, transform_id=transform_id
2762         )
2763         # ext_stats_raw: {coll_id: {status_enum: count}}
2764         for coll_id, by_status in ext_stats_raw.items():
2765             if coll_id not in coll_status:
2766                 coll_status[coll_id] = {
2767                     'total_files': 0, 'processed_files': 0, 'processing_files': 0,
2768                     'bytes': 0, 'new_files': 0, 'activated_files': 0,
2769                     'failed_files': 0, 'missing_files': 0, 'preprocessing_files': 0,
2770                     'ext_files': 0, 'processed_ext_files': 0, 'failed_ext_files': 0,
2771                     'missing_ext_files': 0,
2772                 }
2773             for status, cnt in by_status.items():
2774                 coll_status[coll_id]['ext_files'] += cnt
2775                 if status in _processed_statuses:
2776                     coll_status[coll_id]['processed_ext_files'] += cnt
2777                 elif status in _failed_statuses or status in {s.value for s in _failed_statuses}:
2778                     coll_status[coll_id]['failed_ext_files'] += cnt
2779                 elif status in _missing_statuses or status in {s.value for s in _missing_statuses}:
2780                     coll_status[coll_id]['missing_ext_files'] += cnt
2781 
2782     logger.info(log_prefix + f"sync_collection_status_new, coll_status: {coll_status}")
2783 
2784     # ------------------------------------------------------------------
2785     # 3.  Build update_collections – identical logic to sync_collection_status
2786     # ------------------------------------------------------------------
2787     input_collections = work.get_input_collections(poll_externel=True)
2788     output_collections = work.get_output_collections()
2789     log_collections = work.get_log_collections()
2790 
2791     messages = []
2792     update_collections = []
2793 
2794     for coll in input_collections + output_collections + log_collections:
2795         if coll.coll_id in coll_status:
2796             st = coll_status[coll.coll_id]
2797             if 'total_files' in coll.coll_metadata and coll.coll_metadata['total_files']:
2798                 coll.total_files = coll.coll_metadata['total_files']
2799             else:
2800                 coll.total_files = st['total_files']
2801             coll.processed_files = st['processed_files']
2802             coll.processing_files = st['processing_files']
2803             coll.preprocessing_files = st['preprocessing_files']
2804             coll.activated_files = st['activated_files']
2805             coll.bytes = st['bytes']
2806             coll.new_files = st['new_files']
2807             coll.failed_files = st['failed_files']
2808             coll.missing_files = st['missing_files']
2809             coll.ext_files = st['ext_files']
2810             coll.processed_ext_files = st['processed_ext_files']
2811             coll.failed_ext_files = st['failed_ext_files']
2812             coll.missing_ext_files = st['missing_ext_files']
2813         else:
2814             if 'total_files' in coll.coll_metadata and coll.coll_metadata['total_files']:
2815                 coll.total_files = coll.coll_metadata['total_files']
2816             else:
2817                 coll.total_files = 0
2818             if 'availability' in coll.coll_metadata and coll.coll_metadata['availability']:
2819                 coll.processed_files = coll.coll_metadata['availability']
2820             else:
2821                 coll.processed_files = 0
2822             if 'stuck' in coll.coll_metadata and coll.coll_metadata['stuck']:
2823                 coll.failed_files = coll.coll_metadata['stuck']
2824             else:
2825                 coll.failed_files = 0
2826             if 'processing' in coll.coll_metadata and coll.coll_metadata['processing']:
2827                 coll.processing_files = coll.coll_metadata['processing']
2828             else:
2829                 coll.processing_files = coll.total_files - coll.processed_files - coll.failed_files
2830             coll.new_files = 0
2831             coll.preprocessing_files = 0
2832             coll.activated_files = 0
2833             coll.missing_files = 0
2834             coll.ext_files = 0
2835             coll.processed_ext_files = 0
2836             coll.failed_ext_files = 0
2837             coll.missing_ext_files = 0
2838 
2839         u_coll = {
2840             'coll_id': coll.coll_id,
2841             'total_files': coll.total_files,
2842             'processed_files': coll.processed_files,
2843             'processing_files': coll.processing_files,
2844             'activated_files': coll.activated_files,
2845             'preprocessing_files': coll.preprocessing_files,
2846             'new_files': coll.new_files,
2847             'failed_files': coll.failed_files,
2848             'missing_files': coll.missing_files,
2849             'bytes': coll.bytes,
2850             'ext_files': coll.ext_files,
2851             'processed_ext_files': coll.processed_ext_files,
2852             'failed_ext_files': coll.failed_ext_files,
2853             'missing_ext_files': coll.missing_ext_files,
2854         }
2855 
2856         if (not work.generating_new_inputs()) and (coll in input_collections and (workload_id is not None)):
2857             if coll.total_files == coll.processed_files + coll.failed_files + coll.missing_files:
2858                 coll_db = core_catalog.get_collection(coll_id=coll.coll_id)
2859                 coll.status = coll_db['status']
2860                 if coll.status is not None and coll.status != CollectionStatus.Closed:
2861                     u_coll['status'] = CollectionStatus.Closed
2862                     u_coll['substatus'] = CollectionStatus.Closed
2863                     coll.status = CollectionStatus.Closed
2864                     coll.substatus = CollectionStatus.Closed
2865 
2866                     msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='collection', files=[coll], relation_type='input')
2867                     messages += msgs
2868 
2869         if terminate:
2870             all_files_monitored = False
2871             if coll.total_files == coll.processed_files + coll.failed_files + coll.missing_files:
2872                 all_files_monitored = True
2873 
2874             if abort:
2875                 u_coll['status'] = CollectionStatus.Closed
2876                 u_coll['substatus'] = CollectionStatus.Closed
2877                 coll.status = CollectionStatus.Closed
2878                 coll.substatus = CollectionStatus.Closed
2879             elif coll in output_collections:
2880                 if (not work.require_ext_contents() or (work.require_ext_contents()
2881                     and coll.processed_files <= coll.processed_ext_files and coll.failed_files <= coll.failed_ext_files)):     # noqa E129, W503
2882                     all_ext_updated = True
2883                 if (force_close_collection or (close_collection and all_updates_flushed and all_ext_updated and all_files_monitored)
2884                    or coll.status == CollectionStatus.Closed):        # noqa W503
2885                     u_coll['status'] = CollectionStatus.Closed
2886                     u_coll['substatus'] = CollectionStatus.Closed
2887                     coll.status = CollectionStatus.Closed
2888                     coll.substatus = CollectionStatus.Closed
2889             elif force_close_collection or (close_collection and all_updates_flushed and all_files_monitored) or coll.status == CollectionStatus.Closed:
2890                 u_coll['status'] = CollectionStatus.Closed
2891                 u_coll['substatus'] = CollectionStatus.Closed
2892                 coll.status = CollectionStatus.Closed
2893                 coll.substatus = CollectionStatus.Closed
2894 
2895         update_collections.append(u_coll)
2896 
2897     logger.info(log_prefix + f"sync_collection_status_new, update_collections: {update_collections}")
2898 
2899     return update_collections, all_updates_flushed, messages
2900 
2901 
2902 def sync_work_status(request_id, transform_id, workload_id, work, substatus=None, log_prefix=""):
2903     logger = get_logger()
2904 
2905     input_collections = work.get_input_collections()
2906     output_collections = work.get_output_collections()
2907     log_collections = work.get_log_collections()
2908 
2909     is_all_collections_closed = True
2910     is_all_files_processed = True
2911     is_all_files_failed = True
2912     has_files = False
2913     for coll in input_collections + output_collections + log_collections:
2914         if coll.status != CollectionStatus.Closed:
2915             is_all_collections_closed = False
2916     for coll in output_collections:
2917         if coll.total_files > 0:
2918             has_files = True
2919         if coll.total_files != coll.processed_files:
2920             is_all_files_processed = False
2921         if coll.processed_files > 0 or coll.total_files == coll.processed_files:
2922             is_all_files_failed = False
2923 
2924     if is_all_collections_closed:
2925         logger.debug(log_prefix + "has_files: %s, is_all_files_processed: %s, is_all_files_failed: %s, substatus: %s" % (has_files,
2926                                                                                                                          is_all_files_processed,
2927                                                                                                                          is_all_files_failed,
2928                                                                                                                          substatus))
2929         if has_files:
2930             if is_all_files_processed:
2931                 work.status = WorkStatus.Finished
2932             elif is_all_files_failed:
2933                 work.status = WorkStatus.Failed
2934             else:
2935                 work.status = WorkStatus.SubFinished
2936         else:
2937             if substatus:
2938                 work.status = get_work_status_from_transform_processing_status(substatus)
2939             else:
2940                 work.status = WorkStatus.Failed
2941     elif substatus and substatus in [ProcessingStatus.Broken]:
2942         work.status = get_work_status_from_transform_processing_status(substatus)
2943     logger.debug(log_prefix + "work status: %s, substatus: %s" % (str(work.status), substatus))
2944 
2945 
2946 def sync_processing(processing, agent_attributes, terminate=False, abort=False, logger=None, log_prefix=""):
2947     logger = get_logger()
2948 
2949     terminated_status = [ProcessingStatus.Finished, ProcessingStatus.Failed, ProcessingStatus.SubFinished,
2950                          ProcessingStatus.Terminating, ProcessingStatus.Cancelled]
2951 
2952     request_id = processing['request_id']
2953     transform_id = processing['transform_id']
2954     workload_id = processing['workload_id']
2955 
2956     proc = processing['processing_metadata']['processing']
2957     work = proc.work
2958     work.set_agent_attributes(agent_attributes, processing)
2959 
2960     messages = []
2961     # input_output_maps = get_input_output_maps(request_id, transform_id, work, with_deps=False)
2962     if processing['substatus'] in terminated_status or processing['substatus'] in terminated_status:
2963         terminate = True
2964     update_collections, all_updates_flushed, msgs = sync_collection_status_new(request_id, transform_id, workload_id, work,
2965                                                                                log_prefix=log_prefix,
2966                                                                                close_collection=True, abort=abort, terminate=terminate)
2967 
2968     messages += msgs
2969 
2970     sync_work_status(request_id, transform_id, workload_id, work, processing['substatus'], log_prefix)
2971     logger.info(log_prefix + "sync_processing: work status: %s" % work.get_status())
2972     if terminate and work.is_terminated() and all_updates_flushed:
2973         msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='work')
2974         messages += msgs
2975         if work.is_finished():
2976             processing['status'] = ProcessingStatus.Finished
2977             # processing['status'] = processing['substatus']
2978         elif work.is_subfinished():
2979             processing['status'] = ProcessingStatus.SubFinished
2980         elif work.is_failed():
2981             processing['status'] = ProcessingStatus.Failed
2982         else:
2983             processing['status'] = ProcessingStatus.SubFinished
2984 
2985         # if work.require_ext_contents():
2986         if work.dispatch_ext_content:
2987             input_output_maps = get_input_output_maps(request_id, transform_id, work, with_deps=False)
2988             logger.info(f"{log_prefix} generating messages for ext contents")
2989             contents_ext = core_catalog.get_contents_ext(request_id=request_id, transform_id=transform_id)
2990             msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='content_ext', files=contents_ext,
2991                                      relation_type='output', input_output_maps=input_output_maps)
2992             messages += msgs
2993 
2994         if processing['status'] == ProcessingStatus.Terminating and is_process_terminated(processing['substatus']):
2995             processing['status'] = processing['substatus']
2996 
2997     return processing, update_collections, messages
2998 
2999 
3000 def handle_abort_processing(processing, agent_attributes, logger=None, sync=True, log_prefix=''):
3001     logger = get_logger(logger)
3002 
3003     # request_id = processing['request_id']
3004     # transform_id = processing['transform_id']
3005     # workload_id = processing['workload_id']
3006 
3007     proc = processing['processing_metadata']['processing']
3008     work = proc.work
3009     work.set_agent_attributes(agent_attributes, processing)
3010 
3011     work.abort_processing(processing, log_prefix=log_prefix)
3012 
3013     # input_collections = work.get_input_collections()
3014     # output_collections = work.get_output_collections()
3015     # log_collections = work.get_log_collections()
3016 
3017     # input_output_maps = get_input_output_maps(transform_id, work)
3018     # update_collections, all_updates_flushed = sync_collection_status(request_id, transform_id, workload_id, work,
3019     #                                                                  input_output_maps=None, close_collection=True,
3020     #                                                                  force_close_collection=True)
3021 
3022     # for coll in input_collections + output_collections + log_collections:
3023     #     coll.status = CollectionStatus.Closed
3024     #     coll.substatus = CollectionStatus.Closed
3025     update_contents = []
3026     if sync:
3027         processing, update_collections, messages = sync_processing(processing, agent_attributes, terminate=True, abort=True, logger=logger, log_prefix=log_prefix)
3028     else:
3029         update_collections = []
3030         messages = []
3031 
3032     # processing['status'] = ProcessingStatus.Cancelled
3033     return processing, update_collections, update_contents, messages
3034 
3035 
3036 def reactive_contents(request_id, transform_id, workload_id, work, input_output_maps):
3037     updated_contents = []
3038     contents = core_catalog.get_contents_by_request_transform(request_id=request_id, transform_id=transform_id)
3039     for content in contents:
3040         if content['status'] not in [ContentStatus.Available, ContentStatus.Mapped,
3041                                      ContentStatus.Available.value, ContentStatus.Mapped.value,
3042                                      ContentStatus.FakeAvailable, ContentStatus.FakeAvailable.value]:
3043             u_content = {'content_id': content['content_id'],
3044                          'request_id': content['request_id'],
3045                          'substatus': ContentStatus.New,
3046                          'status': ContentStatus.New}
3047             updated_contents.append(u_content)
3048     return updated_contents
3049 
3050 
3051 def handle_resume_processing(processing, agent_attributes, logger=None, log_prefix=''):
3052     logger = get_logger(logger)
3053 
3054     request_id = processing['request_id']
3055     transform_id = processing['transform_id']
3056     workload_id = processing['workload_id']
3057 
3058     proc = processing['processing_metadata']['processing']
3059     work = proc.work
3060     work.set_agent_attributes(agent_attributes, processing)
3061 
3062     work.resume_processing(processing, log_prefix=log_prefix)
3063 
3064     input_collections = work.get_input_collections()
3065     output_collections = work.get_output_collections()
3066     log_collections = work.get_log_collections()
3067 
3068     update_collections = []
3069     for coll in input_collections + output_collections + log_collections:
3070         coll.status = CollectionStatus.Open
3071         coll.substatus = CollectionStatus.Open
3072         u_collection = {'coll_id': coll.coll_id,
3073                         'status': CollectionStatus.Open,
3074                         'substatus': CollectionStatus.Open}
3075         update_collections.append(u_collection)
3076 
3077     input_output_maps = get_input_output_maps(request_id, transform_id, work, with_deps=False)
3078     update_contents = reactive_contents(request_id, transform_id, workload_id, work, input_output_maps)
3079 
3080     processing['status'] = ProcessingStatus.Running
3081     return processing, update_collections, update_contents