File indexing completed on 2026-04-09 07:58:18
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 import concurrent.futures
0012 import json
0013 import logging
0014 import time
0015 import threading
0016 import traceback
0017
0018 from idds.common.constants import (CommandType,
0019 ProcessingStatus,
0020 CollectionStatus,
0021 ContentStatus, ContentType,
0022 ContentRelationType,
0023 WorkStatus,
0024 TransformType,
0025 TransformType2MessageTypeMap,
0026 MessageType, MessageTypeStr,
0027 MessageStatus, MessageSource,
0028 MessageDestination,
0029 get_work_status_from_transform_processing_status)
0030 from idds.common.utils import setup_logging, get_list_chunks
0031 from idds.core import (transforms as core_transforms,
0032 processings as core_processings,
0033 catalog as core_catalog)
0034 from idds.agents.common.cache.redis import get_redis_cache
0035
0036
0037 setup_logging(__name__)
0038
0039
0040 def get_logger(logger=None):
0041 if logger:
0042 return logger
0043 logger = logging.getLogger(__name__)
0044 return logger
0045
0046
0047 def get_new_content(request_id, transform_id, workload_id, map_id, input_content, content_relation_type=ContentRelationType.Input,
0048 content_name_id_map={}, es_name=None, sub_map_id=None, order_id=None):
0049 content = {'transform_id': transform_id,
0050 'coll_id': input_content['coll_id'],
0051 'request_id': request_id,
0052
0053 'map_id': map_id,
0054 'scope': input_content['scope'],
0055 'name': input_content['name'],
0056 'min_id': input_content['min_id'] if 'min_id' in input_content else 0,
0057 'max_id': input_content['max_id'] if 'max_id' in input_content else 0,
0058 'status': input_content['status'] if 'status' in input_content and input_content['status'] is not None else ContentStatus.New,
0059 'substatus': input_content['substatus'] if 'substatus' in input_content and input_content['substatus'] is not None else ContentStatus.New,
0060 'path': input_content['path'] if 'path' in input_content else None,
0061 'content_type': input_content['content_type'] if 'content_type' in input_content else ContentType.File,
0062 'content_relation_type': content_relation_type,
0063 'bytes': input_content['bytes'],
0064 'adler32': input_content['adler32'],
0065 'content_metadata': input_content['content_metadata']}
0066 if content['min_id'] is None:
0067 content['min_id'] = 0
0068 if content['max_id'] is None:
0069 content['max_id'] = 0
0070 if 'sub_map_id' in input_content:
0071 content['sub_map_id'] = input_content['sub_map_id']
0072 if 'dep_sub_map_id' in input_content:
0073 content['dep_sub_map_id'] = input_content['dep_sub_map_id']
0074
0075 if order_id is not None:
0076 content['min_id'] = order_id
0077 content['max_id'] = order_id
0078 if sub_map_id is not None:
0079 content['sub_map_id'] = sub_map_id
0080 if 'sub_map_id' not in content or content['sub_map_id'] is None:
0081 content['sub_map_id'] = 0
0082 if es_name is not None and content_relation_type == ContentRelationType.Output:
0083 content['path'] = es_name
0084
0085
0086 if content_relation_type == ContentRelationType.InputDependency:
0087 if content_name_id_map is None:
0088 content_name_id_map = {}
0089 if input_content['coll_id'] not in content_name_id_map:
0090 content_name_id_map_new = get_output_content_name_to_content_id_map(request_id=request_id, coll_ids=[input_content['coll_id']])
0091 content_name_id_map.update(content_name_id_map_new)
0092 if content['coll_id'] in content_name_id_map and content['name'] in content_name_id_map[content['coll_id']]:
0093 content_dep_id = content_name_id_map[content['coll_id']][content['name']]
0094 content['content_dep_id'] = content_dep_id
0095 content['name'] = str(content_dep_id)
0096
0097 return content
0098
0099
0100 def is_process_terminated(processing_status):
0101 if processing_status in [ProcessingStatus.Finished, ProcessingStatus.Failed,
0102 ProcessingStatus.SubFinished, ProcessingStatus.Cancelled,
0103 ProcessingStatus.Suspended, ProcessingStatus.Expired,
0104 ProcessingStatus.Broken, ProcessingStatus.FinishedOnStep,
0105 ProcessingStatus.FinishedOnExec, ProcessingStatus.FinishedTerm]:
0106 return True
0107 return False
0108
0109
0110 def is_process_finished(processing_status):
0111 if processing_status in [ProcessingStatus.Finished]:
0112 return True
0113 return False
0114
0115
0116 def is_all_contents_available(contents):
0117 for content in contents:
0118 if type(content) is dict:
0119 if content['substatus'] not in [ContentStatus.Available, ContentStatus.FakeAvailable]:
0120 return False
0121 else:
0122
0123 if content[1] not in [ContentStatus.Available, ContentStatus.FakeAvailable]:
0124 return False
0125 return True
0126
0127
0128 def is_all_contents_terminated(contents, terminated=False):
0129 terminated_status = [ContentStatus.Available, ContentStatus.FakeAvailable,
0130 ContentStatus.FinalFailed, ContentStatus.Missing]
0131 if terminated:
0132 terminated_status = [ContentStatus.Available, ContentStatus.FakeAvailable, ContentStatus.Failed,
0133 ContentStatus.FinalFailed, ContentStatus.Missing]
0134
0135 for content in contents:
0136 if type(content) is dict:
0137 if content['substatus'] not in terminated_status:
0138 return False
0139 else:
0140 if content[1] not in terminated_status:
0141 return False
0142 return True
0143
0144
0145 def is_input_dependency_terminated(input_dependency):
0146 if type(input_dependency) is dict:
0147 if input_dependency['substatus'] in [ContentStatus.Available, ContentStatus.FakeAvailable,
0148 ContentStatus.FinalFailed, ContentStatus.Missing]:
0149 return True
0150 else:
0151 if input_dependency[1] in [ContentStatus.Available, ContentStatus.FakeAvailable,
0152 ContentStatus.FinalFailed, ContentStatus.Missing]:
0153 return True
0154 return False
0155
0156
0157 def is_all_contents_terminated_but_not_available(inputs, terminated=False):
0158 terminated_status = [ContentStatus.Available, ContentStatus.FakeAvailable,
0159 ContentStatus.FinalFailed, ContentStatus.Missing]
0160 if terminated:
0161 terminated_status = [ContentStatus.Available, ContentStatus.FakeAvailable, ContentStatus.Failed,
0162 ContentStatus.FinalFailed, ContentStatus.Missing]
0163
0164 all_contents_available = True
0165 for content in inputs:
0166 if type(content) is dict:
0167 if content['substatus'] not in terminated_status:
0168 return False
0169 if content['substatus'] not in [ContentStatus.Available]:
0170 all_contents_available = False
0171 else:
0172 if content[1] not in terminated_status:
0173 return False
0174 if content[1] not in [ContentStatus.Available]:
0175 all_contents_available = False
0176 if all_contents_available:
0177 return False
0178 return True
0179
0180
0181 def is_all_contents_available_with_status_map(inputs_dependency, content_status_map):
0182 for content_id in inputs_dependency:
0183 status = content_status_map[str(content_id)]
0184 if status not in [ContentStatus.Available, ContentStatus.FakeAvailable]:
0185 return False
0186 return True
0187
0188
0189 def is_all_contents_terminated_with_status_map(input_dependency, content_status_map):
0190 for content_id in input_dependency:
0191 status = content_status_map[str(content_id)]
0192 if status not in [ContentStatus.Available, ContentStatus.FakeAvailable,
0193 ContentStatus.FinalFailed, ContentStatus.Missing]:
0194 return False
0195 return True
0196
0197
0198 def get_collection_ids(collections):
0199 coll_ids = [coll.coll_id for coll in collections]
0200 return coll_ids
0201
0202
0203 def get_input_output_maps(request_id, transform_id, work, with_deps=True, page_num=None, page_size=None, with_panda_id=None, status=None, match_content_ext=False):
0204 """
0205 :param with_panda_id: When True, return only map_ids where at least one output content
0206 has a panda_id in content_metadata. When False, return only map_ids where no output
0207 content has a panda_id. When None (default), return all map_ids.
0208 """
0209
0210 input_collections = work.get_input_collections()
0211 output_collections = work.get_output_collections()
0212 log_collections = work.get_log_collections()
0213
0214
0215
0216
0217
0218 input_coll_ids = get_collection_ids(input_collections)
0219 output_coll_ids = get_collection_ids(output_collections)
0220 log_coll_ids = get_collection_ids(log_collections)
0221
0222 mapped_input_output_maps = core_transforms.get_transform_input_output_maps(request_id,
0223 transform_id,
0224 input_coll_ids=input_coll_ids,
0225 output_coll_ids=output_coll_ids,
0226 log_coll_ids=log_coll_ids,
0227 with_sub_map_id=work.with_sub_map_id(),
0228 with_deps=with_deps,
0229 page_num=page_num,
0230 page_size=page_size,
0231 match_content_ext=match_content_ext,
0232 status=status)
0233
0234 if with_panda_id is not None:
0235 filtered = {}
0236 for map_id, map_contents in mapped_input_output_maps.items():
0237 outputs = map_contents.get('outputs', [])
0238 has_panda_id = any(
0239 content.get('content_metadata', {}).get('panda_ids') or
0240 content.get('content_metadata', {}).get('panda_id')
0241 for content in outputs
0242 )
0243 if with_panda_id == has_panda_id:
0244 filtered[map_id] = map_contents
0245 return filtered
0246
0247
0248
0249
0250
0251 return mapped_input_output_maps
0252
0253
0254 def get_ext_content_ids(request_id, transform_id, work):
0255 contents_ids = core_catalog.get_contents_ext_ids(request_id=request_id, transform_id=transform_id)
0256 return contents_ids
0257
0258
0259 def get_output_content_name_to_content_id_map(request_id=None, coll_ids=[]):
0260 contents = core_catalog.get_contents(coll_id=coll_ids, request_id=request_id, content_relation_type=ContentRelationType.Output)
0261 content_name_id_map = {}
0262 for content in contents:
0263 if content['coll_id'] not in content_name_id_map:
0264 content_name_id_map[content['coll_id']] = {}
0265 if content['name'] not in content_name_id_map[content['coll_id']]:
0266 content_name_id_map[content['coll_id']][content['name']] = {}
0267
0268
0269
0270 content_name_id_map[content['coll_id']][content['name']] = content['content_id']
0271 return content_name_id_map
0272
0273
0274 def get_new_contents(request_id, transform_id, workload_id, new_input_output_maps, max_updates_per_round=2000,
0275 input_dependency_coll_ids=[], logger=None, log_prefix=''):
0276 logger = get_logger(logger)
0277
0278 logger.debug(log_prefix + "get_new_contents")
0279 if input_dependency_coll_ids:
0280 content_name_id_map = get_output_content_name_to_content_id_map(request_id=request_id, coll_ids=input_dependency_coll_ids)
0281 else:
0282 content_name_id_map = {}
0283
0284 new_input_contents, new_output_contents, new_log_contents = [], [], []
0285 new_input_dependency_contents = []
0286 new_input_dep_coll_ids = []
0287 chunks = []
0288 for map_id in new_input_output_maps:
0289 if "sub_maps" not in new_input_output_maps[map_id] or not new_input_output_maps[map_id]["sub_maps"]:
0290 inputs = new_input_output_maps[map_id]['inputs'] if 'inputs' in new_input_output_maps[map_id] else []
0291 inputs_dependency = new_input_output_maps[map_id]['inputs_dependency'] if 'inputs_dependency' in new_input_output_maps[map_id] else []
0292 outputs = new_input_output_maps[map_id]['outputs'] if 'outputs' in new_input_output_maps[map_id] else []
0293 logs = new_input_output_maps[map_id]['logs'] if 'logs' in new_input_output_maps[map_id] else []
0294
0295 for input_content in inputs:
0296 content = get_new_content(request_id, transform_id, workload_id, map_id, input_content, content_relation_type=ContentRelationType.Input)
0297 new_input_contents.append(content)
0298 for input_content in inputs_dependency:
0299 content = get_new_content(request_id, transform_id, workload_id, map_id, input_content,
0300 content_relation_type=ContentRelationType.InputDependency,
0301 content_name_id_map=content_name_id_map)
0302 new_input_dependency_contents.append(content)
0303 if content['coll_id'] not in new_input_dep_coll_ids:
0304 new_input_dep_coll_ids.append(content['coll_id'])
0305 for output_content in outputs:
0306 content = get_new_content(request_id, transform_id, workload_id, map_id, output_content, content_relation_type=ContentRelationType.Output)
0307 new_output_contents.append(content)
0308 for log_content in logs:
0309 content = get_new_content(request_id, transform_id, workload_id, map_id, log_content, content_relation_type=ContentRelationType.Log)
0310 new_log_contents.append(content)
0311
0312 total_num_updates = len(new_input_contents) + len(new_output_contents) + len(new_log_contents) + len(new_input_dependency_contents)
0313 if total_num_updates > max_updates_per_round:
0314 chunk = new_input_contents, new_output_contents, new_log_contents, new_input_dependency_contents
0315 chunks.append(chunk)
0316
0317 new_input_contents, new_output_contents, new_log_contents = [], [], []
0318 new_input_dependency_contents = []
0319 else:
0320 sub_maps = new_input_output_maps[map_id]["sub_maps"]
0321 for sub_map in sub_maps:
0322 sub_map_id = sub_map['sub_map_id']
0323 order_id = sub_map['order_id']
0324 inputs = sub_map['inputs'] if 'inputs' in sub_map else []
0325 inputs_dependency = sub_map['inputs_dependency'] if 'inputs_dependency' in sub_map else []
0326 outputs = sub_map['outputs'] if 'outputs' in sub_map else []
0327 logs = sub_map['logs'] if 'logs' in sub_map else []
0328
0329 for input_content in inputs:
0330 content = get_new_content(request_id, transform_id, workload_id, map_id, input_content,
0331 content_relation_type=ContentRelationType.Input,
0332 sub_map_id=sub_map_id, order_id=order_id)
0333 new_input_contents.append(content)
0334 for input_content in inputs_dependency:
0335 content = get_new_content(request_id, transform_id, workload_id, map_id, input_content,
0336 content_relation_type=ContentRelationType.InputDependency,
0337 sub_map_id=sub_map_id, order_id=order_id,
0338 content_name_id_map=content_name_id_map)
0339 new_input_dependency_contents.append(content)
0340 if content['coll_id'] not in new_input_dep_coll_ids:
0341 new_input_dep_coll_ids.append(content['coll_id'])
0342 for output_content in outputs:
0343 content = get_new_content(request_id, transform_id, workload_id, map_id, output_content,
0344 content_relation_type=ContentRelationType.Output,
0345 sub_map_id=sub_map_id, order_id=order_id)
0346 new_output_contents.append(content)
0347 for log_content in logs:
0348 content = get_new_content(request_id, transform_id, workload_id, map_id, log_content,
0349 content_relation_type=ContentRelationType.Log,
0350 sub_map_id=sub_map_id, order_id=order_id)
0351 new_log_contents.append(content)
0352
0353 total_num_updates = len(new_input_contents) + len(new_output_contents) + len(new_log_contents) + len(new_input_dependency_contents)
0354 if total_num_updates > max_updates_per_round:
0355 chunk = new_input_contents, new_output_contents, new_log_contents, new_input_dependency_contents
0356 chunks.append(chunk)
0357
0358 new_input_contents, new_output_contents, new_log_contents = [], [], []
0359 new_input_dependency_contents = []
0360
0361 total_num_updates = len(new_input_contents) + len(new_output_contents) + len(new_log_contents) + len(new_input_dependency_contents)
0362 if total_num_updates > 0:
0363 chunk = new_input_contents, new_output_contents, new_log_contents, new_input_dependency_contents
0364 chunks.append(chunk)
0365
0366
0367 return chunks
0368
0369
0370 def get_update_content(content):
0371 updated_content = {'content_id': content['content_id'],
0372 'request_id': content['request_id'],
0373
0374 'status': content['substatus']}
0375 content['status'] = content['substatus']
0376 return updated_content, content
0377
0378
0379 def get_update_contents(request_id, transform_id, workload_id, input_output_maps):
0380 updated_contents = []
0381 updated_input_contents_full, updated_output_contents_full = [], []
0382
0383 for map_id in input_output_maps:
0384 inputs = input_output_maps[map_id]['inputs'] if 'inputs' in input_output_maps[map_id] else []
0385 inputs_dependency = input_output_maps[map_id]['inputs_dependency'] if 'inputs_dependency' in input_output_maps[map_id] else []
0386 outputs = input_output_maps[map_id]['outputs'] if 'outputs' in input_output_maps[map_id] else []
0387
0388
0389 input_output_sub_maps = get_input_output_sub_maps(inputs, outputs, inputs_dependency)
0390 for sub_map_id in input_output_sub_maps:
0391 inputs_sub = input_output_sub_maps[sub_map_id]['inputs']
0392 outputs_sub = input_output_sub_maps[sub_map_id]['outputs']
0393 inputs_dependency_sub = input_output_sub_maps[sub_map_id]['inputs_dependency']
0394
0395 content_update_status = None
0396 if is_all_contents_available(inputs_dependency_sub):
0397
0398 content_update_status = ContentStatus.Available
0399 elif is_all_contents_terminated(inputs_dependency_sub):
0400
0401 content_update_status = ContentStatus.Missing
0402
0403 if content_update_status:
0404 for content in inputs_sub:
0405 content['substatus'] = content_update_status
0406 if content['status'] != content['substatus']:
0407 updated_content, content = get_update_content(content)
0408 updated_contents.append(updated_content)
0409 updated_input_contents_full.append(content)
0410 if content_update_status in [ContentStatus.Missing]:
0411 for content in outputs_sub:
0412 content['substatus'] = content_update_status
0413 if content['status'] != content['substatus']:
0414 updated_content, content = get_update_content(content)
0415 updated_contents.append(updated_content)
0416 updated_output_contents_full.append(content)
0417
0418 for content in outputs_sub:
0419 if content['status'] != content['substatus']:
0420 updated_content, content = get_update_content(content)
0421 updated_contents.append(updated_content)
0422 updated_output_contents_full.append(content)
0423 return updated_contents, updated_input_contents_full, updated_output_contents_full
0424
0425
0426 def get_message_type(work_type, input_type='file'):
0427 work_type_value = str(work_type.value)
0428 if work_type_value not in TransformType2MessageTypeMap:
0429 return TransformType2MessageTypeMap['0'][input_type]
0430 else:
0431 return TransformType2MessageTypeMap[work_type_value][input_type]
0432
0433
0434 def generate_file_messages(request_id, transform_id, workload_id, work, files, relation_type):
0435 if work:
0436 work_type = work.get_work_type()
0437 else:
0438 work_type = TransformType.Processing
0439
0440 i_msg_type, i_msg_type_str = get_message_type(work_type, input_type='file')
0441 no_dup_files = {}
0442 files_message = []
0443 for file in files:
0444 filename = file['name']
0445 if work and work.es:
0446 filename = file['path']
0447 if filename in no_dup_files:
0448 continue
0449 else:
0450 no_dup_files[filename] = None
0451
0452 file_status = file['substatus'].name
0453 if file['substatus'] == ContentStatus.FakeAvailable:
0454 file_status = ContentStatus.Available.name
0455 file_message = {'scope': file['scope'],
0456 'name': filename,
0457 'path': file['path'],
0458 'map_id': file['map_id'],
0459 'content_id': file['content_id'] if 'content_id' in file else None,
0460 'external_coll_id': file['external_coll_id'] if 'external_coll_id' in file else None,
0461 'external_content_id': file['external_content_id'] if 'external_content_id' in file else None,
0462 'status': file_status}
0463 files_message.append(file_message)
0464 msg_content = {'msg_type': i_msg_type_str.value,
0465 'request_id': request_id,
0466 'transform_id': transform_id,
0467 'workload_id': workload_id,
0468 'relation_type': relation_type,
0469 'files': files_message}
0470 num_msg_content = len(files_message)
0471 return i_msg_type, msg_content, num_msg_content
0472
0473
0474 def generate_content_ext_messages(request_id, transform_id, workload_id, work, files, relation_type, input_output_maps):
0475 i_msg_type = MessageType.ContentExt
0476 i_msg_type_str = MessageTypeStr.ContentExt
0477
0478 output_contents = []
0479 for map_id in input_output_maps:
0480 outputs = input_output_maps[map_id]['outputs'] if 'outputs' in input_output_maps[map_id] else []
0481
0482
0483 output_contents += outputs
0484
0485 files_message = core_catalog.combine_contents_ext(output_contents, files, with_status_name=True)
0486 msg_content = {'msg_type': i_msg_type_str.value,
0487 'request_id': request_id,
0488 'workload_id': workload_id,
0489 'transform_id': transform_id,
0490 'relation_type': relation_type,
0491 'files': files_message}
0492 num_msg_content = len(files_message)
0493 return i_msg_type, msg_content, num_msg_content
0494
0495
0496 def generate_collection_messages(request_id, transform_id, workload_id, work, collection, relation_type):
0497 coll_name = collection.name
0498 if coll_name.endswith(".idds.stagein"):
0499 coll_name = coll_name.replace(".idds.stagein", "")
0500
0501 i_msg_type, i_msg_type_str = get_message_type(work.get_work_type(), input_type='collection')
0502 msg_content = {'msg_type': i_msg_type_str.value,
0503 'request_id': request_id,
0504 'workload_id': workload_id,
0505 'transform_id': transform_id,
0506 'relation_type': relation_type,
0507 'collections': [{'scope': collection.scope,
0508 'name': coll_name,
0509 'status': collection.status.name}],
0510 'output': work.get_output_data(),
0511 'error': work.get_terminated_msg()}
0512 num_msg_content = 1
0513 return i_msg_type, msg_content, num_msg_content
0514
0515
0516 def generate_work_messages(request_id, transform_id, workload_id, work, relation_type):
0517 i_msg_type, i_msg_type_str = get_message_type(work.get_work_type(), input_type='work')
0518 msg_content = {'msg_type': i_msg_type_str.value,
0519 'request_id': request_id,
0520 'workload_id': workload_id,
0521 'transform_id': transform_id,
0522 'relation_type': relation_type,
0523 'status': work.get_status().name,
0524 'output': work.get_output_data(),
0525 'error': work.get_terminated_msg()}
0526 num_msg_content = 1
0527 return i_msg_type, msg_content, num_msg_content
0528
0529
0530 def generate_messages(request_id, transform_id, workload_id, work, msg_type='file', files=[], relation_type='input', input_output_maps=None):
0531 if msg_type == 'file':
0532 i_msg_type, msg_content, num_msg_content = generate_file_messages(request_id, transform_id, workload_id, work, files=files, relation_type=relation_type)
0533 msg = {'msg_type': i_msg_type,
0534 'status': MessageStatus.New,
0535 'source': MessageSource.Carrier,
0536 'destination': MessageDestination.Outside,
0537 'request_id': request_id,
0538 'workload_id': workload_id,
0539 'transform_id': transform_id,
0540 'num_contents': num_msg_content,
0541 'msg_content': msg_content}
0542 return [msg]
0543 elif msg_type == 'content_ext':
0544 i_msg_type, msg_content, num_msg_content = generate_content_ext_messages(request_id, transform_id, workload_id, work, files=files,
0545 relation_type=relation_type,
0546 input_output_maps=input_output_maps)
0547 msg = {'msg_type': i_msg_type,
0548 'status': MessageStatus.New,
0549 'source': MessageSource.Carrier,
0550 'destination': MessageDestination.ContentExt,
0551 'request_id': request_id,
0552 'workload_id': workload_id,
0553 'transform_id': transform_id,
0554 'num_contents': num_msg_content,
0555 'msg_content': msg_content}
0556 return [msg]
0557 elif msg_type == 'collection':
0558 msg_type_contents = []
0559 for coll in files:
0560 msg_type_content = generate_collection_messages(request_id, transform_id, workload_id, work, coll, relation_type=relation_type)
0561 msg_type_contents.append(msg_type_content)
0562
0563 msgs = []
0564 for i_msg_type, msg_content, num_msg_content in msg_type_contents:
0565 msg = {'msg_type': i_msg_type,
0566 'status': MessageStatus.New,
0567 'source': MessageSource.Carrier,
0568 'destination': MessageDestination.Outside,
0569 'request_id': request_id,
0570 'workload_id': workload_id,
0571 'transform_id': transform_id,
0572 'num_contents': num_msg_content,
0573 'msg_content': msg_content}
0574 msgs.append(msg)
0575 return msgs
0576 elif msg_type == 'work':
0577
0578 input_collections = work.get_input_collections()
0579 output_collections = work.get_output_collections()
0580 log_collections = work.get_log_collections()
0581
0582 msg_type_contents = []
0583 msg_type_content = generate_work_messages(request_id, transform_id, workload_id, work, relation_type='input')
0584 msg_type_contents.append(msg_type_content)
0585 for coll in input_collections:
0586 msg_type_content = generate_collection_messages(request_id, transform_id, workload_id, work, coll, relation_type='input')
0587 msg_type_contents.append(msg_type_content)
0588 for coll in output_collections:
0589 msg_type_content = generate_collection_messages(request_id, transform_id, workload_id, work, coll, relation_type='output')
0590 msg_type_contents.append(msg_type_content)
0591 for coll in log_collections:
0592 msg_type_content = generate_collection_messages(request_id, transform_id, workload_id, work, coll, relation_type='log')
0593 msg_type_contents.append(msg_type_content)
0594
0595 msgs = []
0596 for i_msg_type, msg_content, num_msg_content in msg_type_contents:
0597 msg = {'msg_type': i_msg_type,
0598 'status': MessageStatus.New,
0599 'source': MessageSource.Carrier,
0600 'destination': MessageDestination.Outside,
0601 'request_id': request_id,
0602 'workload_id': workload_id,
0603 'transform_id': transform_id,
0604 'num_contents': num_msg_content,
0605 'msg_content': msg_content}
0606 msgs.append(msg)
0607 return msgs
0608
0609
0610 def update_processing_contents_thread(logger, log_prefix, log_msg, kwargs):
0611 try:
0612 logger = get_logger(logger)
0613 logger.debug(log_prefix + log_msg)
0614 core_processings.update_processing_contents(**kwargs)
0615 logger.debug(log_prefix + " end")
0616 except Exception as ex:
0617 logger.error(f"{log_prefix}update_processing_contents_thread: {ex}")
0618 raise Exception(f"update_processing_contents_thread: {ex}")
0619
0620
0621 def wait_futures_finish(ret_futures, func_name, logger, log_prefix, timeout=180):
0622 logger = get_logger(logger)
0623 logger.debug(f"{log_prefix}{func_name}: wait_futures_finish")
0624
0625 steps = 0
0626 ex = None
0627 while True:
0628 steps += 1
0629
0630 completed, ret_futures = concurrent.futures.wait(ret_futures, timeout=180, return_when=concurrent.futures.ALL_COMPLETED)
0631
0632 for c in completed:
0633 try:
0634 _ = c.result()
0635 except Exception as e:
0636 ex = e
0637 logger.error(f"{log_prefix} {func_name}: thread failed: {e}")
0638
0639 if len(ret_futures) > 0:
0640 logger.debug(log_prefix + "%s thread: %s threads has been running for more than %s minutes" % (func_name, len(ret_futures), steps * 3))
0641 else:
0642 break
0643 logger.debug(log_prefix + "%s: wait_futures_finish end" % func_name)
0644 if ex:
0645 raise ex
0646
0647
0648 def handle_new_processing(processing, agent_attributes, func_site_to_cloud=None, max_updates_per_round=2000, executors=None, logger=None, log_prefix=''):
0649 logger = get_logger(logger)
0650
0651 proc = processing['processing_metadata']['processing']
0652 work = proc.work
0653 work.set_agent_attributes(agent_attributes, processing)
0654 request_id = processing['request_id']
0655 transform_id = processing['transform_id']
0656 workload_id = processing['workload_id']
0657
0658 ret_msgs = []
0659 new_contents = []
0660 new_input_dependency_contents = []
0661 update_collections = []
0662
0663 input_output_maps = get_input_output_maps(request_id, transform_id, work, with_deps=False)
0664 new_input_output_maps = work.get_new_input_output_maps(input_output_maps)
0665 if hasattr(work, 'input_dependency_coll_ids'):
0666 input_dependency_coll_ids = work.input_dependency_coll_ids
0667 else:
0668 input_dependency_coll_ids = []
0669 ret_new_contents_chunks = get_new_contents(request_id, transform_id, workload_id, new_input_output_maps,
0670 max_updates_per_round=max_updates_per_round,
0671 input_dependency_coll_ids=input_dependency_coll_ids,
0672 logger=logger, log_prefix=log_prefix)
0673 if not ret_new_contents_chunks:
0674 logger.debug(log_prefix + "handle_new_processing: no new contnets")
0675
0676 if executors is None:
0677 for ret_new_contents in ret_new_contents_chunks:
0678 new_input_contents, new_output_contents, new_log_contents, new_input_dependency_contents = ret_new_contents
0679
0680
0681
0682 new_contents = new_input_contents + new_output_contents + new_log_contents + new_input_dependency_contents
0683
0684
0685
0686
0687
0688
0689
0690
0691 logger.debug(log_prefix + "handle_new_processing: add %s new contents" % (len(new_contents)))
0692 core_processings.update_processing_contents(update_processing=None,
0693 request_id=request_id,
0694 transform_id=transform_id,
0695 new_contents=new_contents,
0696
0697 messages=ret_msgs)
0698 else:
0699 if ret_new_contents_chunks:
0700 ret_futures = set()
0701 for ret_new_contents in ret_new_contents_chunks:
0702 new_input_contents, new_output_contents, new_log_contents, new_input_dependency_contents = ret_new_contents
0703
0704 new_contents = new_input_contents + new_output_contents + new_log_contents + new_input_dependency_contents
0705 log_msg = "handle_new_processing thread: add %s new contents" % (len(new_contents))
0706 kwargs = {'update_processing': None,
0707 'request_id': request_id,
0708 'transform_id': transform_id,
0709 'new_contents': new_contents,
0710
0711 'messages': ret_msgs}
0712 f = executors.submit(update_processing_contents_thread, logger, log_prefix, log_msg, kwargs)
0713 ret_futures.add(f)
0714 wait_futures_finish(ret_futures, "handle_new_processing", logger, log_prefix)
0715
0716
0717 ret_fix = core_processings.update_processing_contents(update_processing=None,
0718 request_id=request_id,
0719 transform_id=transform_id,
0720 fix_missing_content_dep_id=True)
0721 num_added_contents, num_updated_contents, num_added_messages, num_updated_messages = ret_fix
0722 logger.debug(log_prefix + "handle_new_processing: fix missing content_dep_id for input_dependency contents, num_fixed: %s" % num_updated_contents)
0723
0724
0725
0726 input_output_maps = get_input_output_maps(request_id, transform_id, work, with_deps=False)
0727 new_input_output_maps = work.get_new_input_output_maps(input_output_maps)
0728 if not work.has_new_inputs:
0729 logger.debug(log_prefix + "handle_new_processing: no new input_output_maps after reload")
0730 processing["num_unmapped"] = 0
0731 else:
0732 num_unmapped = len(new_input_output_maps) if new_input_output_maps else 1
0733 processing["num_unmapped"] = num_unmapped
0734 core_processings.update_processing(processing['processing_id'],
0735 parameters={'num_unmapped': processing["num_unmapped"]})
0736 logger.debug(log_prefix + f"handle_new_processing: num_unmapped={processing['num_unmapped']}")
0737
0738 logger.debug(log_prefix + "handle_new_processing: finish")
0739
0740
0741 return True, processing, update_collections, [], [], ret_msgs, None
0742
0743
0744 def handle_prepared_processing(processing, agent_attributes, func_site_to_cloud=None, max_updates_per_round=2000, executors=None, logger=None, log_prefix=''):
0745 logger = get_logger(logger)
0746
0747 proc = processing['processing_metadata']['processing']
0748 work = proc.work
0749 work.set_agent_attributes(agent_attributes, processing)
0750
0751 if func_site_to_cloud:
0752 work.set_func_site_to_cloud(func_site_to_cloud)
0753 status, workload_id, errors = work.submit_processing(processing)
0754 logger.info(log_prefix + "submit_processing (status: %s, workload_id: %s, errors: %s)" % (status, workload_id, errors))
0755
0756 if not status:
0757 logger.error(log_prefix + "Failed to submit processing (status: %s, workload_id: %s, errors: %s)" % (status, workload_id, errors))
0758 return False, processing, [], [], [], [], errors
0759
0760 ret_msgs = []
0761 update_collections = []
0762 if proc.workload_id:
0763
0764 input_collections = work.get_input_collections()
0765 output_collections = work.get_output_collections()
0766 log_collections = work.get_log_collections()
0767 for coll in input_collections + output_collections + log_collections:
0768 u_coll = {'coll_id': coll.coll_id, 'workload_id': proc.workload_id}
0769 update_collections.append(u_coll)
0770
0771
0772 return True, processing, update_collections, [], [], ret_msgs, errors
0773
0774
0775 def get_updated_contents_by_request(request_id, transform_id, workload_id, work, terminated=False, input_output_maps=None,
0776 logger=None, log_prefix=''):
0777 logger = get_logger(logger)
0778
0779 status_to_check = [ContentStatus.Available, ContentStatus.FakeAvailable, ContentStatus.FinalFailed,
0780 ContentStatus.Missing, ContentStatus.Failed, ContentStatus.Lost,
0781 ContentStatus.Deleted]
0782 contents = core_catalog.get_contents_by_request_transform(request_id=request_id, transform_id=transform_id,
0783 status=status_to_check, status_updated=True)
0784 updated_contents, updated_contents_full_input, updated_contents_full_output = [], [], []
0785 updated_contents_full_input_deps = []
0786 for content in contents:
0787 if (content['status'] != content['substatus']) and content['substatus'] in status_to_check:
0788 u_content = {'content_id': content['content_id'],
0789 'request_id': content['request_id'],
0790 'status': content['substatus']}
0791 updated_contents.append(u_content)
0792 if content['content_relation_type'] == ContentRelationType.Output:
0793 updated_contents_full_output.append(content)
0794 elif content['content_relation_type'] == ContentRelationType.Input:
0795 updated_contents_full_input.append(content)
0796 elif content['content_relation_type'] == ContentRelationType.InputDependency:
0797 updated_contents_full_input_deps.append(content)
0798
0799 return updated_contents, updated_contents_full_input, updated_contents_full_output, updated_contents_full_input_deps
0800
0801
0802 def get_input_output_sub_maps(inputs, outputs, inputs_dependency, logs=[]):
0803 input_output_sub_maps = {}
0804 for content in inputs:
0805 sub_map_id = content['sub_map_id']
0806 if sub_map_id not in input_output_sub_maps:
0807 input_output_sub_maps[sub_map_id] = {'inputs': [], 'outputs': [], 'logs': [], 'inputs_dependency': []}
0808 input_output_sub_maps[sub_map_id]['inputs'].append(content)
0809 for content in inputs_dependency:
0810 sub_map_id = content['sub_map_id']
0811 if sub_map_id not in input_output_sub_maps:
0812 input_output_sub_maps[sub_map_id] = {'inputs': [], 'outputs': [], 'logs': [], 'inputs_dependency': []}
0813 input_output_sub_maps[sub_map_id]['inputs_dependency'].append(content)
0814 for content in outputs:
0815 sub_map_id = content['sub_map_id']
0816 if sub_map_id not in input_output_sub_maps:
0817 input_output_sub_maps[sub_map_id] = {'inputs': [], 'outputs': [], 'logs': [], 'inputs_dependency': []}
0818 input_output_sub_maps[sub_map_id]['outputs'].append(content)
0819 for content in logs:
0820 sub_map_id = content['sub_map_id']
0821 if sub_map_id not in input_output_sub_maps:
0822 input_output_sub_maps[sub_map_id] = {'inputs': [], 'outputs': [], 'logs': [], 'inputs_dependency': []}
0823 input_output_sub_maps[sub_map_id]['logs'].append(content)
0824 return input_output_sub_maps
0825
0826
0827 def get_updated_contents_by_input_output_maps(input_output_maps=None, terminated=False, max_updates_per_round=2000, with_deps=False, es=False, logger=None, log_prefix=''):
0828 updated_contents, updated_contents_full_input, updated_contents_full_output = [], [], []
0829 updated_contents_full_input_deps = []
0830 new_update_contents = []
0831
0832 chunks = []
0833 status_to_check = [ContentStatus.Available, ContentStatus.FakeAvailable, ContentStatus.FinalFailed,
0834 ContentStatus.Missing, ContentStatus.Failed, ContentStatus.Lost,
0835 ContentStatus.Deleted]
0836 available_status = [ContentStatus.Available, ContentStatus.FakeAvailable]
0837 if terminated:
0838 terminated_status = [ContentStatus.Available, ContentStatus.FakeAvailable,
0839 ContentStatus.FinalFailed, ContentStatus.Missing,
0840 ContentStatus.Lost, ContentStatus.Deleted,
0841 ContentStatus.Failed]
0842 else:
0843 terminated_status = [ContentStatus.Available, ContentStatus.FakeAvailable,
0844 ContentStatus.FinalFailed, ContentStatus.Missing,
0845 ContentStatus.Lost, ContentStatus.Deleted]
0846
0847 for map_id in input_output_maps:
0848 inputs = input_output_maps[map_id]['inputs'] if 'inputs' in input_output_maps[map_id] else []
0849 inputs_dependency = input_output_maps[map_id]['inputs_dependency'] if 'inputs_dependency' in input_output_maps[map_id] else []
0850 outputs = input_output_maps[map_id]['outputs'] if 'outputs' in input_output_maps[map_id] else []
0851
0852
0853
0854
0855 has_updated_inputs, all_inputs_available, all_inputs_terminated = False, True, True
0856 for content in inputs:
0857 if not content['substatus'] in available_status:
0858 all_inputs_available = False
0859 if not content['substatus'] in terminated_status:
0860 all_inputs_terminated = False
0861 if (content['status'] != content['substatus']) and content['substatus'] in status_to_check:
0862 has_updated_inputs = True
0863 u_content = {'content_id': content['content_id'],
0864 'request_id': content['request_id'],
0865 'status': content['substatus']}
0866 updated_contents.append(u_content)
0867 u_content_substatus = {'content_id': content['content_id'],
0868 'substatus': content['substatus'],
0869 'request_id': content['request_id'],
0870 'transform_id': content['transform_id'],
0871 'workload_id': content['workload_id'],
0872 'coll_id': content['coll_id']}
0873 new_update_contents.append(u_content_substatus)
0874 if not es:
0875 updated_contents_full_input.append(content)
0876 if es and has_updated_inputs:
0877
0878
0879 if all_inputs_available:
0880 content_copy = content.copy()
0881 content_copy['name'] = content_copy['path']
0882 content_copy['status'] = ContentStatus.Available
0883 content_copy['substatus'] = ContentStatus.Available
0884 updated_contents_full_input.append(content_copy)
0885 elif all_inputs_terminated:
0886 content_copy = content.copy()
0887 content_copy['name'] = content_copy['path']
0888 content_copy['status'] = ContentStatus.Missing
0889 content_copy['substatus'] = ContentStatus.Missing
0890 updated_contents_full_input.append(content_copy)
0891
0892 for content in outputs:
0893 if (content['status'] != content['substatus']) and content['substatus'] in status_to_check:
0894 u_content = {'content_id': content['content_id'],
0895 'request_id': content['request_id'],
0896 'status': content['substatus']}
0897 updated_contents.append(u_content)
0898 u_content_substatus = {'content_id': content['content_id'],
0899 'substatus': content['substatus'],
0900 'request_id': content['request_id'],
0901 'transform_id': content['transform_id'],
0902 'workload_id': content['workload_id'],
0903 'coll_id': content['coll_id']}
0904 new_update_contents.append(u_content_substatus)
0905 updated_contents_full_output.append(content)
0906
0907 for content in inputs_dependency:
0908 if (content['status'] != content['substatus']) and content['substatus'] in status_to_check:
0909 u_content = {'content_id': content['content_id'],
0910 'request_id': content['request_id'],
0911 'status': content['substatus']}
0912 updated_contents.append(u_content)
0913 updated_contents_full_input_deps.append(content)
0914
0915 if len(updated_contents) > max_updates_per_round:
0916 chunk = updated_contents, updated_contents_full_input, updated_contents_full_output, updated_contents_full_input_deps, new_update_contents
0917 chunks.append(chunk)
0918
0919 updated_contents, updated_contents_full_input, updated_contents_full_output = [], [], []
0920 updated_contents_full_input_deps = []
0921 new_update_contents = []
0922
0923 if len(updated_contents) > 0:
0924 chunk = updated_contents, updated_contents_full_input, updated_contents_full_output, updated_contents_full_input_deps, new_update_contents
0925 chunks.append(chunk)
0926
0927
0928 return chunks
0929
0930
0931 def get_transform_dependency_map(transform_id, logger=None, log_prefix=''):
0932 cache = get_redis_cache()
0933 transform_dependcy_map_key = "transform_dependcy_map_%s" % transform_id
0934 transform_dependcy_map = cache.get(transform_dependcy_map_key, default={})
0935 return transform_dependcy_map
0936
0937
0938 def set_transform_dependency_map(transform_id, transform_dependcy_map, logger=None, log_prefix=''):
0939 cache = get_redis_cache()
0940 transform_dependcy_map_key = "transform_dependcy_map_%s" % transform_id
0941 cache.set(transform_dependcy_map_key, transform_dependcy_map)
0942
0943
0944 def get_content_dependcy_map(request_id, logger=None, log_prefix=''):
0945 cache = get_redis_cache()
0946 content_dependcy_map_key = "request_content_dependcy_map_%s" % request_id
0947 content_dependcy_map = cache.get(content_dependcy_map_key, default={})
0948
0949 request_dependcy_map_key = "request_dependcy_map_%s" % request_id
0950 request_dependcy_map = cache.get(request_dependcy_map_key, default=[])
0951
0952 collection_dependcy_map_key = "request_collections_dependcy_map_%s" % request_id
0953 collection_dependcy_map = cache.get(collection_dependcy_map_key, default=[])
0954
0955 return content_dependcy_map, request_dependcy_map, collection_dependcy_map
0956
0957
0958 def set_content_dependcy_map(request_id, content_dependcy_map, request_dependcy_map,
0959 collection_dependcy_map, logger=None, log_prefix=''):
0960 cache = get_redis_cache()
0961 content_dependcy_map_key = "request_content_dependcy_map_%s" % request_id
0962 cache.set(content_dependcy_map_key, content_dependcy_map)
0963
0964 request_dependcy_map_key = "request_dependcy_map_%s" % request_id
0965 cache.set(request_dependcy_map_key, request_dependcy_map)
0966
0967 collection_dependcy_map_key = "request_collections_dependcy_map_%s" % request_id
0968 cache.set(collection_dependcy_map_key, collection_dependcy_map)
0969
0970
0971 def get_content_status_map(request_id, logger=None, log_prefix=''):
0972 cache = get_redis_cache()
0973 content_status_map_key = "request_content_status_map_%s" % request_id
0974 content_status_map = cache.get(content_status_map_key, default={})
0975 return content_status_map
0976
0977
0978 def set_content_status_map(request_id, content_status_map, logger=None, log_prefix=''):
0979 cache = get_redis_cache()
0980 content_status_map_key = "request_content_status_map_%s" % request_id
0981 cache.set(content_status_map_key, content_status_map)
0982
0983
0984 def get_input_dependency_map_by_request(request_id, transform_id, workload_id, work, logger=None, log_prefix=''):
0985 logger = get_logger(logger)
0986
0987 content_dependcy_map, request_dependcy_map, collection_dependcy_map = get_content_dependcy_map(request_id, logger=logger, log_prefix=log_prefix)
0988 content_status_map = get_content_status_map(request_id, logger=logger, log_prefix=log_prefix)
0989
0990 transform_dependcy_maps = {}
0991
0992 refresh = False
0993 if not content_dependcy_map or not content_status_map:
0994 refresh = True
0995 elif transform_id and transform_id not in request_dependcy_map:
0996 refresh = True
0997 elif work:
0998 output_collections = work.get_output_collections()
0999 for coll in output_collections:
1000 if coll.coll_id not in collection_dependcy_map:
1001 refresh = True
1002
1003 for tf_id in request_dependcy_map:
1004 transform_dependcy_maps[str(tf_id)] = get_transform_dependency_map(tf_id, logger=logger, log_prefix=log_prefix)
1005 if not transform_dependcy_maps[str(tf_id)]:
1006 refresh = True
1007
1008 if refresh:
1009 logger.debug(log_prefix + "refresh content_dependcy_map")
1010 content_dependcy_map = {}
1011 request_dependcy_map = []
1012 collection_dependcy_map = []
1013 content_status_map = {}
1014 transform_dependcy_maps = {}
1015
1016 content_output_name2id = {}
1017 content_input_deps = []
1018
1019 contents = core_catalog.get_contents_by_request_transform(request_id=request_id)
1020
1021 for content in contents:
1022 if content['transform_id'] not in request_dependcy_map:
1023 request_dependcy_map.append(content['transform_id'])
1024 if content['coll_id'] not in collection_dependcy_map:
1025 collection_dependcy_map.append(content['coll_id'])
1026
1027 content_status_map[str(content['content_id'])] = content['substatus'].value
1028
1029 str_tf_id = str(content['transform_id'])
1030 str_map_id = str(content['map_id'])
1031 if str_tf_id not in transform_dependcy_maps:
1032 transform_dependcy_maps[str_tf_id] = get_transform_dependency_map(str_tf_id, logger=logger, log_prefix=log_prefix)
1033 if str_map_id not in transform_dependcy_maps[str_tf_id]:
1034 transform_dependcy_maps[str_tf_id][str_map_id] = {'inputs': [], 'outputs': [], 'input_deps': []}
1035
1036 if content['content_relation_type'] == ContentRelationType.Output:
1037 if content['coll_id'] not in content_output_name2id:
1038 content_output_name2id[content['coll_id']] = {}
1039 collection_dependcy_map.append(content['coll_id'])
1040 content_output_name2id[content['coll_id']][content['name']] = content
1041
1042 transform_dependcy_maps[str_tf_id][str_map_id]['outputs'].append(content['content_id'])
1043 elif content['content_relation_type'] == ContentRelationType.InputDependency:
1044 content_input_deps.append(content)
1045
1046 transform_dependcy_maps[str_tf_id][str_map_id]['input_deps'].append(content['content_id'])
1047 elif content['content_relation_type'] == ContentRelationType.Input:
1048
1049 transform_dependcy_maps[str_tf_id][str_map_id]['inputs'].append(content['content_id'])
1050
1051
1052 for content in content_input_deps:
1053 dep_coll_id = content['coll_id']
1054 if dep_coll_id not in content_output_name2id:
1055 logger.warn(log_prefix + "dep_coll_id: %s contents are not added yet" % dep_coll_id)
1056 else:
1057 dep_content = content_output_name2id[dep_coll_id].get(content['name'], None)
1058 if dep_content:
1059 dep_content_id = str(dep_content['content_id'])
1060 if dep_content_id not in content_dependcy_map:
1061 content_dependcy_map[dep_content_id] = []
1062 content_dependcy_map[dep_content_id].append((content['content_id'], content['transform_id'], content['map_id']))
1063 else:
1064 logger.error(log_prefix + "Failed to find input dependcy for content_id: %s" % content['content_id'])
1065
1066 set_content_dependcy_map(request_id, content_dependcy_map, request_dependcy_map,
1067 collection_dependcy_map, logger=logger, log_prefix=log_prefix)
1068 for str_tf_id in transform_dependcy_maps:
1069 set_transform_dependency_map(str_tf_id, transform_dependcy_maps[str_tf_id], logger=logger, log_prefix=log_prefix)
1070 set_content_status_map(request_id, content_status_map, logger=logger, log_prefix=log_prefix)
1071
1072 return content_dependcy_map, transform_dependcy_maps, content_status_map
1073
1074
1075 def get_content_status_with_status_map(content_ids, content_status_map):
1076 content_id_status = []
1077 for content_id in content_ids:
1078 status_value = content_status_map[str(content_id)]
1079 status = ContentStatus(status_value)
1080 content_id_status.append((content_id, status))
1081 return content_id_status
1082
1083
1084 def trigger_release_inputs_no_deps(request_id, transform_id, workload_id, work, input_output_maps, logger=None, log_prefix=''):
1085 logger = get_logger(logger)
1086
1087 update_contents = []
1088 update_input_contents_full = {}
1089 update_input_contents_full[transform_id] = []
1090
1091 for map_id in input_output_maps:
1092 inputs = input_output_maps[map_id]['inputs'] if 'inputs' in input_output_maps[map_id] else []
1093 inputs_dependency = input_output_maps[map_id]['inputs_dependency'] if 'inputs_dependency' in input_output_maps[map_id] else []
1094 outputs = input_output_maps[map_id]['outputs'] if 'outputs' in input_output_maps[map_id] else []
1095
1096
1097 input_output_sub_maps = get_input_output_sub_maps(inputs, outputs, inputs_dependency)
1098 for sub_map_id in input_output_sub_maps:
1099 inputs_sub = input_output_sub_maps[sub_map_id]['inputs']
1100
1101 inputs_dependency_sub = input_output_sub_maps[sub_map_id]['inputs_dependency']
1102
1103 if not inputs_dependency_sub:
1104 for content in inputs_sub:
1105 if content['substatus'] != ContentStatus.Available:
1106 u_content = {'content_id': content['content_id'],
1107 'request_id': content['request_id'],
1108
1109 'substatus': ContentStatus.Available}
1110 update_contents.append(u_content)
1111 content['status'] = ContentStatus.Available
1112 content['substatus'] = ContentStatus.Available
1113 update_input_contents_full[transform_id].append(content)
1114 return update_contents, update_input_contents_full
1115
1116
1117 def trigger_release_inputs(request_id, transform_id, workload_id, work, updated_contents_full_output, updated_contents_full_input,
1118 updated_contents_full_input_deps, input_output_maps, logger=None, log_prefix=''):
1119 logger = get_logger(logger)
1120
1121 status_to_check = [ContentStatus.Available, ContentStatus.FakeAvailable, ContentStatus.FinalFailed, ContentStatus.Missing]
1122
1123
1124 update_contents = []
1125 update_contents_status = {}
1126 update_contents_status_name = {}
1127 update_input_contents_full = {}
1128 update_input_contents_full[transform_id] = []
1129
1130 for status in status_to_check:
1131 update_contents_status[status.name] = []
1132 update_contents_status_name[status.name] = status
1133
1134 for content in updated_contents_full_output:
1135
1136
1137
1138
1139 if content['substatus'] in status_to_check:
1140 update_contents_status[content['substatus'].name].append(content['content_id'])
1141
1142 for map_id in input_output_maps:
1143 inputs = input_output_maps[map_id]['inputs'] if 'inputs' in input_output_maps[map_id] else []
1144 inputs_dependency = input_output_maps[map_id]['inputs_dependency'] if 'inputs_dependency' in input_output_maps[map_id] else []
1145 outputs = input_output_maps[map_id]['outputs'] if 'outputs' in input_output_maps[map_id] else []
1146
1147
1148 input_output_sub_maps = get_input_output_sub_maps(inputs, outputs, inputs_dependency)
1149 for sub_map_id in input_output_sub_maps:
1150 inputs_sub = input_output_sub_maps[sub_map_id]['inputs']
1151 outputs_sub = input_output_sub_maps[sub_map_id]['outputs']
1152 inputs_dependency_sub = input_output_sub_maps[sub_map_id]['inputs_dependency']
1153
1154 input_content_update_status = None
1155 if is_all_contents_available(inputs_dependency_sub):
1156 input_content_update_status = ContentStatus.Available
1157 elif is_all_contents_terminated(inputs_dependency_sub):
1158 input_content_update_status = ContentStatus.Missing
1159 if input_content_update_status:
1160 for content in inputs_dependency_sub:
1161
1162
1163 pass
1164 for content in inputs_sub:
1165 u_content = {'content_id': content['content_id'],
1166 'request_id': content['request_id'],
1167 'substatus': input_content_update_status}
1168 update_contents.append(u_content)
1169 content['status'] = input_content_update_status
1170 content['substatus'] = input_content_update_status
1171 update_input_contents_full[transform_id].append(content)
1172
1173 output_content_update_status = None
1174 if is_all_contents_available(inputs_sub):
1175
1176
1177
1178
1179 pass
1180 elif is_all_contents_terminated_but_not_available(inputs_sub):
1181
1182
1183
1184 pass
1185 output_content_update_status = ContentStatus.Missing
1186 if output_content_update_status:
1187 for content in outputs_sub:
1188 u_content = {'content_id': content['content_id'],
1189 'request_id': content['request_id'],
1190 'substatus': output_content_update_status}
1191 update_contents.append(u_content)
1192
1193 return update_contents, update_input_contents_full, update_contents_status_name, update_contents_status
1194
1195
1196 def poll_missing_outputs(input_output_maps, contents_ext=[], max_updates_per_round=2000, process_status=None):
1197 content_updates_missing, updated_contents_full_missing = [], []
1198
1199 chunks = []
1200 for map_id in input_output_maps:
1201 inputs = input_output_maps[map_id]['inputs'] if 'inputs' in input_output_maps[map_id] else []
1202 inputs_dependency = input_output_maps[map_id]['inputs_dependency'] if 'inputs_dependency' in input_output_maps[map_id] else []
1203 outputs = input_output_maps[map_id]['outputs'] if 'outputs' in input_output_maps[map_id] else []
1204
1205
1206 input_output_sub_maps = get_input_output_sub_maps(inputs, outputs, inputs_dependency)
1207 for sub_map_id in input_output_sub_maps:
1208 inputs_sub = input_output_sub_maps[sub_map_id]['inputs']
1209 outputs_sub = input_output_sub_maps[sub_map_id]['outputs']
1210
1211
1212 content_update_status = None
1213 if is_all_contents_terminated_but_not_available(inputs_sub) or process_status in [ProcessingStatus.Cancelled]:
1214 content_update_status = ContentStatus.Missing
1215
1216 for content in outputs_sub:
1217 content['substatus'] = content_update_status
1218 if content['status'] != content['substatus']:
1219 u_content = {'content_id': content['content_id'],
1220 'request_id': content['request_id'],
1221 'substatus': content['substatus']}
1222
1223 content_updates_missing.append(u_content)
1224 updated_contents_full_missing.append(content)
1225
1226 if len(content_updates_missing) > max_updates_per_round:
1227 chunk = content_updates_missing, updated_contents_full_missing
1228 chunks.append(chunk)
1229 content_updates_missing, updated_contents_full_missing = [], []
1230 if len(content_updates_missing) > 0:
1231 chunk = content_updates_missing, updated_contents_full_missing
1232 chunks.append(chunk)
1233
1234 return chunks
1235
1236
1237 def has_external_content_id(request_id, transform_id):
1238 """
1239 Return True if all Input contents for the transform have external_content_id set.
1240 Queries the database directly with a COUNT to avoid loading all map data.
1241 request_id is included because the database uses it for virtual table partitioning.
1242 """
1243 return not core_catalog.has_input_contents_without_external_id(request_id, transform_id)
1244
1245
1246 def get_update_external_content_ids(input_output_maps, external_content_ids, es=False):
1247 name_to_id_map = {}
1248 update_contents = []
1249 if not es:
1250 for map_id in input_output_maps:
1251 inputs = input_output_maps[map_id]['inputs'] if 'inputs' in input_output_maps[map_id] else []
1252 outputs = input_output_maps[map_id]['outputs'] if 'outputs' in input_output_maps[map_id] else []
1253 for content in inputs + outputs:
1254 if content['name'] not in name_to_id_map:
1255 name_to_id_map[content['name']] = []
1256 name_to_id_map[content['name']].append(content['content_id'])
1257 else:
1258
1259 for map_id in input_output_maps:
1260 inputs = input_output_maps[map_id]['inputs'] if 'inputs' in input_output_maps[map_id] else []
1261 outputs = input_output_maps[map_id]['outputs'] if 'outputs' in input_output_maps[map_id] else []
1262 for content in inputs + outputs:
1263 if content['path'] not in name_to_id_map:
1264 name_to_id_map[content['path']] = []
1265 name_to_id_map[content['path']].append(content['content_id'])
1266 for dataset in external_content_ids:
1267 dataset_id = dataset['dataset']['id']
1268 files = dataset['files']
1269 for file_item in files:
1270 lfn = file_item['lfn']
1271
1272 pos = lfn.find(":")
1273 if pos >= 0:
1274 lfn = lfn[pos + 1:]
1275 file_id = file_item['id']
1276 content_ids = name_to_id_map.get(lfn, [])
1277 for content_id in content_ids:
1278 update_content = {'content_id': content_id,
1279 'request_id': content['request_id'],
1280 'external_coll_id': dataset_id,
1281 'external_content_id': file_id}
1282 update_contents.append(update_content)
1283 return update_contents
1284
1285
1286 def get_update_external_content_ids_from_name_map(name_to_id_map, external_content_ids, request_id):
1287 """
1288 Build the list of external content ID updates from a pre-built name_to_id_map.
1289 Used by handle_update_processing_new to avoid loading the full input_output_maps.
1290 """
1291 update_contents = []
1292 for dataset in external_content_ids:
1293 dataset_id = dataset['dataset']['id']
1294 files = dataset['files']
1295 for file_item in files:
1296 lfn = file_item['lfn']
1297
1298 pos = lfn.find(":")
1299 if pos >= 0:
1300 lfn = lfn[pos + 1:]
1301 file_id = file_item['id']
1302 content_ids = name_to_id_map.get(lfn, [])
1303 for content_id in content_ids:
1304 update_content = {'content_id': content_id,
1305 'request_id': request_id,
1306 'external_coll_id': dataset_id,
1307 'external_content_id': file_id}
1308 update_contents.append(update_content)
1309 return update_contents
1310
1311
1312 def handle_update_processing(processing, agent_attributes, max_updates_per_round=2000, use_bulk_update_mappings=True, executors=None, logger=None, log_prefix=''):
1313 logger = get_logger(logger)
1314
1315 ret_msgs = []
1316 new_contents = []
1317 new_input_output_maps = {}
1318
1319 request_id = processing['request_id']
1320 transform_id = processing['transform_id']
1321 workload_id = processing['workload_id']
1322
1323 proc = processing['processing_metadata']['processing']
1324 work = proc.work
1325 work.set_agent_attributes(agent_attributes, processing)
1326
1327 if processing['command'] in [CommandType.AbortProcessing]:
1328 handle_abort_processing(processing, agent_attributes=agent_attributes, sync=False, logger=logger, log_prefix=log_prefix)
1329 if processing['command'] in [CommandType.ResumeProcessing]:
1330 handle_resume_processing(processing, agent_attributes=agent_attributes, logger=logger, log_prefix=log_prefix)
1331
1332 input_output_maps = get_input_output_maps(request_id, transform_id, work, with_deps=False)
1333 logger.debug(log_prefix + "get_input_output_maps: len: %s" % len(input_output_maps))
1334 logger.debug(log_prefix + "get_input_output_maps.keys[:3]: %s" % str(list(input_output_maps.keys())[:3]))
1335
1336 if work.has_external_content_id() and not has_external_content_id(request_id, transform_id):
1337 external_content_ids = work.get_external_content_ids(processing, log_prefix=log_prefix)
1338 update_external_content_ids = get_update_external_content_ids(input_output_maps, external_content_ids, es=work.es)
1339 core_catalog.update_contents(update_external_content_ids)
1340
1341 num_inputs = None
1342 if hasattr(work, "num_inputs"):
1343 num_inputs = work.num_inputs
1344 num_input_output_maps = len(input_output_maps)
1345 if processing["num_unmapped"] > 0 or work.has_new_inputs or (num_inputs is not None and num_inputs > num_input_output_maps):
1346 new_input_output_maps = work.get_new_input_output_maps(input_output_maps)
1347 logger.debug(log_prefix + "get_new_input_output_maps: len: %s" % len(new_input_output_maps))
1348 logger.debug(log_prefix + "get_new_input_output_maps.keys[:3]: %s" % str(list(new_input_output_maps.keys())[:3]))
1349 if num_inputs:
1350 processing["num_unmapped"] = num_inputs - num_input_output_maps
1351
1352 contents_ext = []
1353 if work.require_ext_contents():
1354 contents_ext = get_ext_content_ids(request_id, transform_id, work)
1355 job_info_maps = core_catalog.get_contents_ext_maps()
1356 ret_poll_processing = work.poll_processing_updates(processing, input_output_maps, contents_ext=contents_ext,
1357 job_info_maps=job_info_maps, executors=executors, log_prefix=log_prefix)
1358 process_status, content_updates, new_input_output_maps1, updated_contents_full, parameters, new_contents_ext, update_contents_ext = ret_poll_processing
1359 else:
1360 ret_poll_processing = work.poll_processing_updates(processing, input_output_maps, log_prefix=log_prefix)
1361 new_contents_ext, update_contents_ext = [], []
1362 process_status, content_updates, new_input_output_maps1, updated_contents_full, parameters = ret_poll_processing
1363
1364 new_input_output_maps.update(new_input_output_maps1)
1365 logger.debug(log_prefix + "poll_processing_updates process_status: %s" % process_status)
1366 logger.debug(log_prefix + "poll_processing_updates content_updates[:3]: %s" % content_updates[:3])
1367 logger.debug(log_prefix + "poll_processing_updates new_input_output_maps1.keys[:3]: %s" % (list(new_input_output_maps1.keys())[:3]))
1368 logger.debug(log_prefix + "poll_processing_updates updated_contents_full[:3]: %s" % (updated_contents_full[:3]))
1369 logger.debug(log_prefix + f"poll_processing_updates parameters: {parameters}")
1370
1371 ret_futures = set()
1372
1373 if hasattr(work, 'input_dependency_coll_ids'):
1374 input_dependency_coll_ids = work.input_dependency_coll_ids
1375 else:
1376 input_dependency_coll_ids = []
1377
1378 ret_new_contents_chunks = get_new_contents(request_id, transform_id, workload_id, new_input_output_maps,
1379 input_dependency_coll_ids=input_dependency_coll_ids,
1380 max_updates_per_round=max_updates_per_round)
1381 for ret_new_contents in ret_new_contents_chunks:
1382 new_input_contents, new_output_contents, new_log_contents, new_input_dependency_contents = ret_new_contents
1383
1384 ret_msgs = []
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397 new_contents = new_input_contents + new_output_contents + new_log_contents + new_input_dependency_contents
1398
1399 if executors is None:
1400 logger.debug(log_prefix + "handle_update_processing: add %s new contents" % (len(new_contents)))
1401 core_processings.update_processing_contents(update_processing=None,
1402 new_contents=new_contents,
1403
1404 request_id=request_id,
1405
1406 use_bulk_update_mappings=use_bulk_update_mappings,
1407 messages=ret_msgs)
1408 else:
1409 log_msg = "handle_update_processing thread: add %s new contents" % (len(new_contents))
1410 kwargs = {'update_processing': None,
1411 'request_id': request_id,
1412 'new_contents': new_contents,
1413
1414 'use_bulk_update_mappings': use_bulk_update_mappings,
1415 'messages': ret_msgs}
1416 f = executors.submit(update_processing_contents_thread, logger, log_prefix, log_msg, kwargs)
1417 ret_futures.add(f)
1418
1419 ret_msgs = []
1420 content_updates_missing_chunks = poll_missing_outputs(input_output_maps, contents_ext=contents_ext,
1421 max_updates_per_round=max_updates_per_round,
1422 process_status=process_status)
1423 for content_updates_missing_chunk in content_updates_missing_chunks:
1424 content_updates_missing, updated_contents_full_missing = content_updates_missing_chunk
1425 msgs = []
1426 if updated_contents_full_missing:
1427 msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file',
1428 files=updated_contents_full, relation_type='output')
1429 if executors is None:
1430 logger.debug(log_prefix + "handle_update_processing: update %s missing contents" % (len(content_updates_missing)))
1431 core_processings.update_processing_contents(update_processing=None,
1432 update_contents=content_updates_missing,
1433 request_id=request_id,
1434
1435
1436 use_bulk_update_mappings=False,
1437 messages=msgs)
1438 else:
1439 log_msg = "handle_update_processing thread: update %s missing contents" % (len(content_updates_missing))
1440 kwargs = {'update_processing': None,
1441 'request_id': request_id,
1442 'update_contents': content_updates_missing,
1443 'use_bulk_update_mappings': False,
1444 'messages': msgs}
1445 f = executors.submit(update_processing_contents_thread, logger, log_prefix, log_msg, kwargs)
1446 ret_futures.add(f)
1447
1448 if updated_contents_full:
1449 updated_contents_full_chunks = get_list_chunks(updated_contents_full, bulk_size=max_updates_per_round)
1450 for updated_contents_full_chunk in updated_contents_full_chunks:
1451 msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file',
1452 files=updated_contents_full_chunk, relation_type='output')
1453 if executors is None:
1454 log_msg = "handle_update_processing: update %s messages" % (len(msgs))
1455 logger.debug(log_prefix + log_msg)
1456 core_processings.update_processing_contents(update_processing=None,
1457 request_id=request_id,
1458 messages=msgs)
1459 else:
1460 log_msg = "handle_update_processing thread: update %s messages" % (len(msgs))
1461 kwargs = {'update_processing': None,
1462 'request_id': request_id,
1463 'messages': msgs}
1464 f = executors.submit(update_processing_contents_thread, logger, log_prefix, log_msg, kwargs)
1465 ret_futures.add(f)
1466
1467 if new_contents_ext:
1468 new_contents_ext_chunks = get_list_chunks(new_contents_ext, bulk_size=max_updates_per_round)
1469 for new_contents_ext_chunk in new_contents_ext_chunks:
1470 if executors is None:
1471 log_msg = "handle_update_processing: add %s ext contents" % (len(new_contents_ext_chunk))
1472 logger.debug(log_prefix + log_msg)
1473 core_processings.update_processing_contents(update_processing=None,
1474 request_id=request_id,
1475 new_contents_ext=new_contents_ext_chunk)
1476 else:
1477 log_msg = "handle_update_processing thread: add %s ext contents" % (len(new_contents_ext_chunk))
1478 kwargs = {'update_processing': None,
1479 'request_id': request_id,
1480 'new_contents_ext': new_contents_ext_chunk}
1481 f = executors.submit(update_processing_contents_thread, logger, log_prefix, log_msg, kwargs)
1482 ret_futures.add(f)
1483
1484 if update_contents_ext:
1485 update_contents_ext_chunks = get_list_chunks(update_contents_ext, bulk_size=max_updates_per_round)
1486 for update_contents_ext_chunk in update_contents_ext_chunks:
1487 if executors is None:
1488 log_msg = "handle_update_processing: update %s ext contents" % (len(update_contents_ext_chunk))
1489 logger.debug(log_prefix + log_msg)
1490 core_processings.update_processing_contents(update_processing=None,
1491 request_id=request_id,
1492 update_contents_ext=update_contents_ext_chunk)
1493 else:
1494 log_msg = "handle_update_processing thread: update %s ext contents" % (len(update_contents_ext_chunk))
1495 kwargs = {'update_processing': None,
1496 'request_id': request_id,
1497 'update_contents_ext': update_contents_ext_chunk}
1498 f = executors.submit(update_processing_contents_thread, logger, log_prefix, log_msg, kwargs)
1499 ret_futures.add(f)
1500
1501 if content_updates:
1502 content_updates_chunks = get_list_chunks(content_updates, bulk_size=max_updates_per_round)
1503 for content_updates_chunk in content_updates_chunks:
1504 if executors is None:
1505 log_msg = "handle_update_processing: update %s contents" % (len(content_updates_chunk))
1506 logger.debug(log_prefix + log_msg)
1507 core_processings.update_processing_contents(update_processing=None,
1508 request_id=request_id,
1509
1510 use_bulk_update_mappings=use_bulk_update_mappings,
1511 update_contents=content_updates_chunk)
1512 else:
1513 log_msg = "handle_update_processing thread: update %s contents" % (len(content_updates_chunk))
1514 kwargs = {'update_processing': None,
1515 'request_id': request_id,
1516 'use_bulk_update_mappings': use_bulk_update_mappings,
1517 'update_contents': content_updates_chunk}
1518 f = executors.submit(update_processing_contents_thread, logger, log_prefix, log_msg, kwargs)
1519 ret_futures.add(f)
1520
1521 if len(ret_futures) > 0:
1522 wait_futures_finish(ret_futures, "handle_update_processing", logger, log_prefix)
1523
1524 if not parameters:
1525 parameters = {}
1526 parameters["num_unmapped"] = processing["num_unmapped"]
1527
1528
1529 return process_status, [], [], ret_msgs, [], parameters, [], []
1530
1531
1532 def get_unmatched_panda_id_updates(processing, request_id, transform_id, work, input_output_maps, logger=None, log_prefix=''):
1533 """
1534 Find panda job IDs that are not yet recorded in any output content's content_metadata,
1535 resolve them to input file names via work.get_processing_job_name_to_ids, and return
1536 content updates that set panda_ids on the matching output contents.
1537
1538 :returns: list of content update dicts {content_id, request_id, content_metadata}
1539 """
1540 logger = get_logger(logger)
1541
1542 def _to_id_list(value):
1543 """Normalise a panda_id/panda_ids value to a flat list of IDs."""
1544 if value is None:
1545 return []
1546 if isinstance(value, str):
1547 return [int(v.strip()) for v in value.split(',') if v.strip()]
1548 if isinstance(value, list):
1549 return value
1550 return [value]
1551
1552
1553 known_panda_ids = set()
1554 name_to_outputs = {}
1555 for map_id in input_output_maps:
1556 outputs = input_output_maps[map_id].get('outputs', [])
1557 inputs = input_output_maps[map_id].get('inputs', [])
1558 for content in outputs:
1559 meta = content.get('content_metadata') or {}
1560 existing = meta.get('panda_ids', []) or meta.get('panda_id', [])
1561 known_panda_ids.update(_to_id_list(existing))
1562 for content in inputs:
1563 name = content.get('name')
1564 if name:
1565 if name not in name_to_outputs:
1566 name_to_outputs[name] = []
1567 name_to_outputs[name].extend(outputs)
1568
1569
1570 all_job_ids = work.get_processing_job_ids(processing, log_prefix=log_prefix)
1571 all_job_ids = _to_id_list(all_job_ids)
1572 logger.debug(log_prefix + "get_unmatched_panda_id_updates: all_job_ids: %s, known_panda_ids: %s"
1573 % (len(all_job_ids), len(known_panda_ids)))
1574
1575
1576 unmatched_job_ids = [jid for jid in all_job_ids if jid not in known_panda_ids]
1577 logger.debug(log_prefix + "get_unmatched_panda_id_updates: unmatched_job_ids: %s" % len(unmatched_job_ids))
1578 if not unmatched_job_ids:
1579 return []
1580
1581
1582 job_name_to_ids = work.get_processing_job_name_to_ids(processing, unmatched_job_ids, log_prefix=log_prefix)
1583 logger.debug(log_prefix + "get_unmatched_panda_id_updates: job_name_to_ids: %s" % len(job_name_to_ids))
1584
1585
1586 update_contents = []
1587 for job_name, panda_ids in job_name_to_ids.items():
1588 if job_name not in name_to_outputs or not panda_ids:
1589 continue
1590 for output_content in name_to_outputs[job_name]:
1591 content_metadata = dict(output_content.get('content_metadata') or {})
1592 registered_panda_ids = _to_id_list(content_metadata.get('panda_ids') or content_metadata.get('panda_id'))
1593 merged = sorted(set(registered_panda_ids) | set(panda_ids))
1594 content_metadata['panda_ids'] = merged
1595 update_contents.append({'content_id': output_content['content_id'],
1596 'request_id': request_id,
1597 'content_metadata': content_metadata})
1598 return update_contents
1599
1600
1601 def handle_update_processing_new(processing, agent_attributes, max_updates_per_round=2000, max_jobs_per_round=None, use_bulk_update_mappings=True, executors=None, logger=None, log_prefix=''):
1602 """
1603 Handle update processing with optional paginated job polling to reduce peak memory usage.
1604
1605 :param max_jobs_per_round: maximum number of map_ids (jobs) to poll per round.
1606 When set, input_output_maps are loaded and polled in pages of this size,
1607 flushing results to DB after each page rather than accumulating everything
1608 in memory at once. When None (default), all maps are loaded and polled at
1609 once (same behaviour as handle_update_processing).
1610 """
1611 logger = get_logger(logger)
1612
1613 ret_msgs = []
1614 new_contents = []
1615 new_input_output_maps = {}
1616
1617 request_id = processing['request_id']
1618 transform_id = processing['transform_id']
1619 workload_id = processing['workload_id']
1620
1621 proc = processing['processing_metadata']['processing']
1622 work = proc.work
1623 work.set_agent_attributes(agent_attributes, processing)
1624
1625 if processing['command'] in [CommandType.AbortProcessing]:
1626 handle_abort_processing(processing, agent_attributes=agent_attributes, sync=False, logger=logger, log_prefix=log_prefix)
1627 if processing['command'] in [CommandType.ResumeProcessing]:
1628 handle_resume_processing(processing, agent_attributes=agent_attributes, logger=logger, log_prefix=log_prefix)
1629
1630 num_inputs = None
1631 if hasattr(work, "num_inputs"):
1632 num_inputs = work.num_inputs
1633
1634 if hasattr(work, 'es') and work.es:
1635 max_jobs_per_round = None
1636 logger.debug(log_prefix + "handle_update_processing_new: ES job detected, paging disabled")
1637
1638 input_output_maps = None
1639
1640 num_input_output_maps = core_catalog.get_input_output_map_count(request_id, transform_id)
1641 logger.debug(log_prefix + "get_input_output_map_count: %s" % num_input_output_maps)
1642 if work.has_external_content_id() and not has_external_content_id(request_id, transform_id):
1643 external_content_ids = work.get_external_content_ids(processing, log_prefix=log_prefix)
1644 name_to_id_map = core_catalog.get_content_name_to_id_map(request_id, transform_id, es=work.es)
1645 update_external_content_ids = get_update_external_content_ids_from_name_map(
1646 name_to_id_map, external_content_ids, request_id)
1647 core_catalog.update_contents(update_external_content_ids)
1648
1649 has_new_inputs = work.has_new_inputs if (processing["num_unmapped"] is None) else False
1650 if processing["num_unmapped"] > 0 or has_new_inputs or (num_inputs is not None and num_inputs > num_input_output_maps):
1651 logger.debug(log_prefix + f"handle_update_processing_new: checking for new input_output_maps with num_inputs: {num_inputs}, num_input_output_maps: {num_input_output_maps},"
1652 f"processing['num_unmapped']: {processing['num_unmapped']}, work.has_new_inputs: {work.has_new_inputs}")
1653 input_output_maps = get_input_output_maps(request_id, transform_id, work, with_deps=False)
1654 logger.debug(log_prefix + "get_input_output_maps: len: %s" % len(input_output_maps))
1655 new_input_output_maps = work.get_new_input_output_maps(input_output_maps)
1656 logger.debug(log_prefix + "get_new_input_output_maps: len: %s" % len(new_input_output_maps))
1657 logger.debug(log_prefix + "get_new_input_output_maps.keys[:3]: %s" % str(list(new_input_output_maps.keys())[:3]))
1658
1659 if num_inputs:
1660 processing["num_unmapped"] = num_inputs - num_input_output_maps
1661
1662
1663 if not max_jobs_per_round and input_output_maps is None:
1664 input_output_maps = get_input_output_maps(request_id, transform_id, work, with_deps=False)
1665 logger.debug(log_prefix + "get_input_output_maps: len: %s" % len(input_output_maps))
1666
1667
1668 maps_for_unmatch = input_output_maps
1669 if maps_for_unmatch is None:
1670 maps_for_unmatch = get_input_output_maps(request_id, transform_id, work, with_deps=False)
1671 unmatched_updates = get_unmatched_panda_id_updates(processing, request_id, transform_id, work,
1672 maps_for_unmatch, logger=logger, log_prefix=log_prefix)
1673 if unmatched_updates:
1674 logger.debug(log_prefix + "get_unmatched_panda_id_updates: updating %s contents" % len(unmatched_updates))
1675 core_catalog.update_contents(unmatched_updates)
1676
1677 final_terminated_status = [ContentStatus.Available, ContentStatus.FakeAvailable,
1678 ContentStatus.FinalFailed, ContentStatus.Missing,
1679 ContentStatus.FinalSubAvailable]
1680 status_filter = [s for s in ContentStatus if s not in final_terminated_status]
1681
1682 if max_jobs_per_round:
1683 maps_for_unmatch = None
1684 input_output_maps = None
1685 else:
1686
1687
1688 panda_id_filter = False if (hasattr(work, 'es') and work.es) else True
1689 logger.debug(log_prefix + "Reloading input_output_maps with panda_id filter for polling loop")
1690 input_output_maps = get_input_output_maps(request_id, transform_id, work, with_deps=False, with_panda_id=panda_id_filter, status=status_filter, match_content_ext=True)
1691
1692 if hasattr(work, 'input_dependency_coll_ids'):
1693 input_dependency_coll_ids = work.input_dependency_coll_ids
1694 else:
1695 input_dependency_coll_ids = []
1696
1697 ret_futures = set()
1698
1699
1700 contents_ext = []
1701 if work.require_ext_contents():
1702 contents_ext = get_ext_content_ids(request_id, transform_id, work)
1703
1704
1705
1706
1707
1708 process_status = None
1709 parameters = {}
1710 new_input_output_maps_from_poll = {}
1711 page_num = 0
1712
1713 logger.debug(log_prefix + f"Starting polling loop with max_jobs_per_round={max_jobs_per_round}")
1714
1715 while True:
1716 if max_jobs_per_round:
1717 maps_page = get_input_output_maps(request_id, transform_id, work, with_deps=False,
1718 page_num=page_num, page_size=max_jobs_per_round,
1719 with_panda_id=True, status=status_filter, match_content_ext=True)
1720 logger.debug(log_prefix + "handle_update_processing_new: polling page %d with %d maps"
1721 % (page_num, len(maps_page)))
1722 else:
1723 maps_page = input_output_maps
1724
1725
1726 if work.require_ext_contents():
1727 job_info_maps = core_catalog.get_contents_ext_maps()
1728 ret_poll = work.poll_processing_updates(processing, maps_page, contents_ext=contents_ext,
1729 job_info_maps=job_info_maps, executors=executors, log_prefix=log_prefix)
1730 process_status, content_updates, new_input_output_maps1, updated_contents_full, parameters, new_contents_ext, update_contents_ext = ret_poll
1731 else:
1732 ret_poll = work.poll_processing_updates(processing, maps_page, log_prefix=log_prefix)
1733 new_contents_ext, update_contents_ext = [], []
1734 process_status, content_updates, new_input_output_maps1, updated_contents_full, parameters = ret_poll
1735
1736 new_input_output_maps_from_poll.update(new_input_output_maps1)
1737
1738 logger.debug(log_prefix + "poll_processing_updates process_status: %s" % process_status)
1739 logger.debug(log_prefix + "poll_processing_updates content_updates[:3]: %s" % content_updates[:3])
1740 logger.debug(log_prefix + "poll_processing_updates new_input_output_maps1.keys[:3]: %s" % (list(new_input_output_maps1.keys())[:3]))
1741 logger.debug(log_prefix + "poll_processing_updates updated_contents_full[:3]: %s" % (updated_contents_full[:3]))
1742 logger.debug(log_prefix + f"poll_processing_updates parameters: {parameters}")
1743
1744
1745
1746 content_updates_missing_chunks = poll_missing_outputs(maps_page, contents_ext=contents_ext,
1747 max_updates_per_round=max_updates_per_round,
1748 process_status=process_status)
1749 for content_updates_missing_chunk in content_updates_missing_chunks:
1750 content_updates_missing, updated_contents_full_missing = content_updates_missing_chunk
1751 msgs = []
1752 if updated_contents_full_missing:
1753 msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file',
1754 files=updated_contents_full_missing, relation_type='output')
1755 if executors is None:
1756 logger.debug(log_prefix + "handle_update_processing_new: update %s missing contents" % (len(content_updates_missing)))
1757 core_processings.update_processing_contents(update_processing=None,
1758 update_contents=content_updates_missing,
1759 request_id=request_id,
1760 use_bulk_update_mappings=False,
1761 messages=msgs)
1762 else:
1763 log_msg = "handle_update_processing_new thread: update %s missing contents" % (len(content_updates_missing))
1764 kwargs = {'update_processing': None,
1765 'request_id': request_id,
1766 'update_contents': content_updates_missing,
1767 'use_bulk_update_mappings': False,
1768 'messages': msgs}
1769 f = executors.submit(update_processing_contents_thread, logger, log_prefix, log_msg, kwargs)
1770 ret_futures.add(f)
1771
1772
1773 if updated_contents_full:
1774 updated_contents_full_chunks = get_list_chunks(updated_contents_full, bulk_size=max_updates_per_round)
1775 for updated_contents_full_chunk in updated_contents_full_chunks:
1776 msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file',
1777 files=updated_contents_full_chunk, relation_type='output')
1778 if executors is None:
1779 log_msg = "handle_update_processing_new: update %s messages" % (len(msgs))
1780 logger.debug(log_prefix + log_msg)
1781 core_processings.update_processing_contents(update_processing=None,
1782 request_id=request_id,
1783 messages=msgs)
1784 else:
1785 log_msg = "handle_update_processing_new thread: update %s messages" % (len(msgs))
1786 kwargs = {'update_processing': None,
1787 'request_id': request_id,
1788 'messages': msgs}
1789 f = executors.submit(update_processing_contents_thread, logger, log_prefix, log_msg, kwargs)
1790 ret_futures.add(f)
1791
1792 if new_contents_ext:
1793 new_contents_ext_chunks = get_list_chunks(new_contents_ext, bulk_size=max_updates_per_round)
1794 for new_contents_ext_chunk in new_contents_ext_chunks:
1795 if executors is None:
1796 log_msg = "handle_update_processing_new: add %s ext contents" % (len(new_contents_ext_chunk))
1797 logger.debug(log_prefix + log_msg)
1798 core_processings.update_processing_contents(update_processing=None,
1799 request_id=request_id,
1800 new_contents_ext=new_contents_ext_chunk)
1801 else:
1802 log_msg = "handle_update_processing_new thread: add %s ext contents" % (len(new_contents_ext_chunk))
1803 kwargs = {'update_processing': None,
1804 'request_id': request_id,
1805 'new_contents_ext': new_contents_ext_chunk}
1806 f = executors.submit(update_processing_contents_thread, logger, log_prefix, log_msg, kwargs)
1807 ret_futures.add(f)
1808
1809 if update_contents_ext:
1810 update_contents_ext_chunks = get_list_chunks(update_contents_ext, bulk_size=max_updates_per_round)
1811 for update_contents_ext_chunk in update_contents_ext_chunks:
1812 if executors is None:
1813 log_msg = "handle_update_processing_new: update %s ext contents" % (len(update_contents_ext_chunk))
1814 logger.debug(log_prefix + log_msg)
1815 core_processings.update_processing_contents(update_processing=None,
1816 request_id=request_id,
1817 update_contents_ext=update_contents_ext_chunk)
1818 else:
1819 log_msg = "handle_update_processing_new thread: update %s ext contents" % (len(update_contents_ext_chunk))
1820 kwargs = {'update_processing': None,
1821 'request_id': request_id,
1822 'update_contents_ext': update_contents_ext_chunk}
1823 f = executors.submit(update_processing_contents_thread, logger, log_prefix, log_msg, kwargs)
1824 ret_futures.add(f)
1825
1826 if content_updates:
1827 content_updates_chunks = get_list_chunks(content_updates, bulk_size=max_updates_per_round)
1828 for content_updates_chunk in content_updates_chunks:
1829 if executors is None:
1830 log_msg = "handle_update_processing_new: update %s contents" % (len(content_updates_chunk))
1831 logger.debug(log_prefix + log_msg)
1832 core_processings.update_processing_contents(update_processing=None,
1833 request_id=request_id,
1834 use_bulk_update_mappings=use_bulk_update_mappings,
1835 update_contents=content_updates_chunk)
1836 else:
1837 log_msg = "handle_update_processing_new thread: update %s contents" % (len(content_updates_chunk))
1838 kwargs = {'update_processing': None,
1839 'request_id': request_id,
1840 'use_bulk_update_mappings': use_bulk_update_mappings,
1841 'update_contents': content_updates_chunk}
1842 f = executors.submit(update_processing_contents_thread, logger, log_prefix, log_msg, kwargs)
1843 ret_futures.add(f)
1844
1845
1846 if not max_jobs_per_round or len(maps_page) < max_jobs_per_round:
1847 break
1848 page_num += 1
1849
1850
1851 new_input_output_maps.update(new_input_output_maps_from_poll)
1852
1853 ret_new_contents_chunks = get_new_contents(request_id, transform_id, workload_id, new_input_output_maps,
1854 input_dependency_coll_ids=input_dependency_coll_ids,
1855 max_updates_per_round=max_updates_per_round)
1856 for ret_new_contents in ret_new_contents_chunks:
1857 new_input_contents, new_output_contents, new_log_contents, new_input_dependency_contents = ret_new_contents
1858
1859 ret_msgs = []
1860
1861 new_contents = new_input_contents + new_output_contents + new_log_contents + new_input_dependency_contents
1862
1863 if executors is None:
1864 logger.debug(log_prefix + "handle_update_processing_new: add %s new contents" % (len(new_contents)))
1865 core_processings.update_processing_contents(update_processing=None,
1866 new_contents=new_contents,
1867 request_id=request_id,
1868 use_bulk_update_mappings=use_bulk_update_mappings,
1869 messages=ret_msgs)
1870 else:
1871 log_msg = "handle_update_processing_new thread: add %s new contents" % (len(new_contents))
1872 kwargs = {'update_processing': None,
1873 'request_id': request_id,
1874 'new_contents': new_contents,
1875 'use_bulk_update_mappings': use_bulk_update_mappings,
1876 'messages': ret_msgs}
1877 f = executors.submit(update_processing_contents_thread, logger, log_prefix, log_msg, kwargs)
1878 ret_futures.add(f)
1879
1880 if len(ret_futures) > 0:
1881 wait_futures_finish(ret_futures, "handle_update_processing_new", logger, log_prefix)
1882
1883 if not parameters:
1884 parameters = {}
1885 parameters["num_unmapped"] = processing["num_unmapped"]
1886
1887
1888 return process_status, [], [], ret_msgs, [], parameters, [], []
1889
1890
1891 def get_transform_id_dependency_map(transform_id, logger=None, log_prefix=''):
1892 cache = get_redis_cache()
1893 transform_id_dependcy_map_key = "transform_id_dependcy_map_%s" % transform_id
1894 transform_id_dependcy_map = cache.get(transform_id_dependcy_map_key, default=[])
1895 return transform_id_dependcy_map
1896
1897
1898 def set_transform_id_dependency_map(transform_id, transform_id_dependcy_map, logger=None, log_prefix=''):
1899 cache = get_redis_cache()
1900 transform_id_dependcy_map_key = "transform_id_dependcy_map_%s" % transform_id
1901 cache.set(transform_id_dependcy_map_key, transform_id_dependcy_map)
1902
1903
1904 def get_updated_transforms_by_content_status(request_id=None, transform_id=None, logger=None, log_prefix=''):
1905 logger = get_logger(logger)
1906 logger.debug("get_updated_transforms_by_content_status starts")
1907
1908 update_transforms = get_transform_id_dependency_map(transform_id=transform_id, logger=logger, log_prefix=log_prefix)
1909 if not update_transforms:
1910 update_transforms = core_catalog.get_updated_transforms_by_content_status(request_id=request_id,
1911 transform_id=transform_id)
1912 set_transform_id_dependency_map(transform_id, update_transforms, logger=logger, log_prefix=log_prefix)
1913 logger.debug("get_updated_transforms_by_content_status ends")
1914 return update_transforms
1915
1916
1917 def update_contents_thread(logger, log_prefix, log_msg, kwargs):
1918 try:
1919 logger = get_logger(logger)
1920 logger.debug(log_prefix + log_msg)
1921 core_catalog.update_contents(**kwargs)
1922 logger.debug(log_prefix + " end")
1923 except Exception as ex:
1924 logger.error(log_prefix + "update_contents_thread: %s" % str(ex))
1925 raise ex
1926 except:
1927 logger.error(traceback.format_exc())
1928 raise Exception("update_contents_thread error")
1929
1930
1931 def handle_trigger_processing(processing, agent_attributes, trigger_new_updates=False, max_updates_per_round=2000, executors=None, logger=None, log_prefix=''):
1932 logger = get_logger(logger)
1933
1934 has_updates = False
1935 ret_msgs = []
1936 content_updates = []
1937 ret_update_transforms = []
1938 new_update_contents = []
1939
1940 request_id = processing['request_id']
1941 transform_id = processing['transform_id']
1942 workload_id = processing['workload_id']
1943 processing_id = processing['processing_id']
1944
1945 proc = processing['processing_metadata']['processing']
1946 work = proc.work
1947 work.set_agent_attributes(agent_attributes, processing)
1948
1949 num_dependencies = None
1950 num_inputs = None
1951 default_input_dep_page_size = 500
1952 min_input_dep_page_size = 100
1953 max_dependencies = 5000
1954 try:
1955 num_inputs = work.num_inputs
1956 num_dependencies = work.num_dependencies
1957 if num_inputs is not None and num_dependencies is not None and num_dependencies > 0:
1958 input_dep_page_size = int(max_dependencies * num_inputs / num_dependencies)
1959 if input_dep_page_size < default_input_dep_page_size:
1960 default_input_dep_page_size = input_dep_page_size
1961 log_info = f"input_dep_page_size ({input_dep_page_size}) is smaller than default_input_dep_page_size ({default_input_dep_page_size}),"
1962 log_info = "update default_input_dep_page_size from input_dep_page_size"
1963 logger.info(log_info)
1964 if default_input_dep_page_size < min_input_dep_page_size:
1965 log_info = f"default_input_dep_page_size ({default_input_dep_page_size}) is smaller than min_input_dep_page_size ({min_input_dep_page_size}),"
1966 log_info = "update default_input_dep_page_size from min_input_dep_page_size"
1967 logger.info(log_info)
1968 default_input_dep_page_size = min_input_dep_page_size
1969 except Exception as ex:
1970 logger.warn(f"request_id ({request_id}) transform_id ({transform_id}) processing_id ({processing_id}) fails to get num_dependencies: {ex}")
1971
1972 if (not work.use_dependency_to_release_jobs()) or workload_id is None:
1973 return processing['substatus'], [], [], {}, {}, {}, [], [], has_updates
1974 else:
1975 if trigger_new_updates:
1976
1977
1978
1979 pass
1980
1981 logger.debug(log_prefix + "sync contents_update to contents")
1982 core_catalog.set_fetching_contents_update(request_id=request_id, transform_id=transform_id, fetch=True)
1983 contents_update_list = core_catalog.get_contents_update(request_id=request_id, transform_id=transform_id, fetch=True)
1984 new_contents_update_list = []
1985
1986 for con in contents_update_list:
1987 has_updates = True
1988 if not work.es or con['substatus'] in [ContentStatus.Available]:
1989 con_dict = {'content_id': con['content_id'],
1990 'request_id': con['request_id'],
1991 'substatus': con['substatus'],
1992 'status': con['substatus']}
1993 if 'content_metadata' in con and con['content_metadata']:
1994 con_dict['content_metadata'] = con['content_metadata']
1995 new_contents_update_list.append(con_dict)
1996
1997 new_contents_update_list_chunks = [new_contents_update_list[i:i + max_updates_per_round] for i in range(0, len(new_contents_update_list), max_updates_per_round)]
1998 ret_futures = set()
1999 for chunk in new_contents_update_list_chunks:
2000 has_updates = True
2001 if executors is None:
2002 logger.debug(log_prefix + "new_contents_update chunk[:3](total: %s): %s" % (len(chunk), str(chunk[:3])))
2003
2004 core_catalog.update_contents(chunk, request_id=request_id, transform_id=transform_id, use_bulk_update_mappings=True)
2005 else:
2006 log_msg = "new_contents_update thread chunk[:3](total: %s): %s" % (len(chunk), str(chunk[:3]))
2007 kwargs = {'parameters': chunk,
2008 'request_id': request_id,
2009 'transform_id': transform_id,
2010 'use_bulk_update_mappings': True}
2011 f = executors.submit(update_contents_thread, logger, log_prefix, log_msg, kwargs)
2012 ret_futures.add(f)
2013 if len(ret_futures) > 0:
2014 wait_futures_finish(ret_futures, "new_contents_update", logger, log_prefix)
2015
2016
2017 core_catalog.delete_contents_update(request_id=request_id, transform_id=transform_id, fetch=True)
2018 logger.debug(log_prefix + "sync contents_update to contents done")
2019
2020 """
2021 logger.debug(log_prefix + "update_contents_from_others_by_dep_id")
2022 # core_catalog.update_contents_from_others_by_dep_id(request_id=request_id, transform_id=transform_id)
2023 to_triggered_contents = core_catalog.get_update_contents_from_others_by_dep_id(request_id=request_id, transform_id=transform_id)
2024 to_triggered_contents_chunks = [to_triggered_contents[i:i + max_updates_per_round] for i in range(0, len(to_triggered_contents), max_updates_per_round)]
2025
2026 ret_futures = set()
2027 for chunk in to_triggered_contents_chunks:
2028 has_updates = True
2029 if executors is None:
2030 logger.debug(log_prefix + "update_contents_from_others_by_dep_id chunk[:3](total: %s): %s" % (len(chunk), str(chunk[:3])))
2031 core_catalog.update_contents(chunk, request_id=request_id, transform_id=transform_id, use_bulk_update_mappings=False)
2032 else:
2033 log_msg = "update_contents_from_others_by_dep_id thread chunk[:3](total: %s): %s" % (len(chunk), str(chunk[:3]))
2034 kwargs = {'parameters': chunk,
2035 'request_id': request_id,
2036 'transform_id': transform_id,
2037 'use_bulk_update_mappings': False}
2038 f = executors.submit(update_contents_thread, logger, log_prefix, log_msg, kwargs)
2039 ret_futures.add(f)
2040 if len(ret_futures) > 0:
2041 wait_futures_finish(ret_futures, "update_contents_from_others_by_dep_id", logger, log_prefix)
2042
2043 logger.debug(log_prefix + "update_contents_from_others_by_dep_id done")
2044 """
2045
2046 logger.debug(log_prefix + "update_contents_from_others_by_dep_id_pages")
2047 status_not_to_check = [ContentStatus.Available, ContentStatus.FakeAvailable,
2048 ContentStatus.FinalFailed, ContentStatus.Missing]
2049 core_catalog.update_contents_from_others_by_dep_id_pages(request_id=request_id, transform_id=transform_id,
2050 page_size=2000, status_not_to_check=status_not_to_check,
2051 logger=logger, log_prefix=log_prefix)
2052 logger.debug(log_prefix + "update_contents_from_others_by_dep_id_pages done")
2053
2054 terminated_processing = False
2055 terminated_status = [ProcessingStatus.Finished, ProcessingStatus.Failed, ProcessingStatus.SubFinished,
2056 ProcessingStatus.Terminating, ProcessingStatus.Cancelled]
2057 if processing['status'] in terminated_status or processing['substatus'] in terminated_status:
2058 terminated_processing = True
2059
2060 logger.debug(log_prefix + "update_input_contents_by_dependency_pages")
2061 status_not_to_check = [ContentStatus.Available, ContentStatus.FakeAvailable,
2062 ContentStatus.FinalFailed, ContentStatus.Missing]
2063 core_catalog.update_input_contents_by_dependency_pages(request_id=request_id, transform_id=transform_id,
2064 page_size=default_input_dep_page_size,
2065 terminated=terminated_processing,
2066 batch_size=2000, status_not_to_check=status_not_to_check,
2067 logger=logger, log_prefix=log_prefix)
2068 logger.debug(log_prefix + "update_input_contents_by_dependency_pages done")
2069
2070 with_deps = False
2071 input_output_maps = get_input_output_maps(request_id, transform_id, work, with_deps=with_deps)
2072 logger.debug(log_prefix + "input_output_maps.keys[:2]: %s" % str(list(input_output_maps.keys())[:2]))
2073
2074 updated_contents_ret_chunks = get_updated_contents_by_input_output_maps(input_output_maps=input_output_maps,
2075 terminated=terminated_processing,
2076 max_updates_per_round=max_updates_per_round,
2077 es=work.es,
2078 with_deps=with_deps,
2079 logger=logger,
2080 log_prefix=log_prefix)
2081
2082 ret_futures = set()
2083 for updated_contents_ret in updated_contents_ret_chunks:
2084 updated_contents, updated_contents_full_input, updated_contents_full_output, updated_contents_full_input_deps, new_update_contents = updated_contents_ret
2085
2086 if updated_contents_full_input:
2087
2088 msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file',
2089 files=updated_contents_full_input, relation_type='input')
2090 ret_msgs = ret_msgs + msgs
2091 if updated_contents_full_output:
2092
2093 msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file',
2094 files=updated_contents_full_output, relation_type='output')
2095 ret_msgs = ret_msgs + msgs
2096
2097
2098
2099 if updated_contents or new_update_contents:
2100 has_updates = True
2101
2102 if executors is None:
2103 logger.debug(log_prefix + "handle_trigger_processing: updated_contents[:3] (total: %s): %s" % (len(updated_contents), updated_contents[:3]))
2104 core_processings.update_processing_contents(update_processing=None,
2105 update_contents=updated_contents,
2106
2107 messages=ret_msgs,
2108 request_id=request_id,
2109
2110 use_bulk_update_mappings=False)
2111 else:
2112 log_msg = "handle_trigger_processing thread: updated_contents[:3] (total: %s): %s" % (len(updated_contents), updated_contents[:3])
2113 kwargs = {'update_processing': None,
2114 'request_id': request_id,
2115 'update_contents': updated_contents,
2116 'messages': ret_msgs,
2117 'use_bulk_update_mappings': False}
2118 f = executors.submit(update_processing_contents_thread, logger, log_prefix, log_msg, kwargs)
2119 ret_futures.add(f)
2120
2121 updated_contents = []
2122 new_update_contents = []
2123 ret_msgs = []
2124 if len(ret_futures) > 0:
2125 wait_futures_finish(ret_futures, "handle_trigger_processing", logger, log_prefix)
2126
2127 if has_updates:
2128 ret_update_transforms = get_updated_transforms_by_content_status(request_id=request_id,
2129 transform_id=transform_id,
2130 logger=logger,
2131 log_prefix=log_prefix)
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143 return processing['substatus'], content_updates, ret_msgs, {}, {}, {}, new_update_contents, ret_update_transforms, has_updates
2144
2145
2146 def get_content_status_from_panda_msg_status(status):
2147 status_map = {'starting': ContentStatus.New,
2148 'activated': ContentStatus.Activated,
2149 'running': ContentStatus.Processing,
2150 'finished': ContentStatus.Available,
2151 'failed': ContentStatus.Failed}
2152 if status in status_map:
2153 return status_map[status]
2154 return ContentStatus.New
2155
2156
2157 def get_collection_id_transform_id_map(coll_id, request_id, request_ids=[]):
2158 cache = get_redis_cache()
2159 coll_tf_id_map_key = "collection_id_transform_id_map"
2160 coll_tf_id_map = cache.get(coll_tf_id_map_key, default={})
2161
2162 if coll_id is None or coll_id not in coll_tf_id_map:
2163 if not request_ids:
2164 request_ids = []
2165 if request_id not in request_ids:
2166 request_ids.append(request_id)
2167 colls = core_catalog.get_collections_by_request_ids(request_ids)
2168 for coll in colls:
2169 coll_tf_id_map[coll['coll_id']] = (coll['request_id'], coll['transform_id'], coll['workload_id'])
2170
2171 cache.set(coll_tf_id_map_key, coll_tf_id_map)
2172
2173 if coll_id is None or coll_id not in coll_tf_id_map:
2174 return None, None, None
2175 return coll_tf_id_map[coll_id]
2176
2177
2178 workload_id_lock = threading.Lock()
2179
2180
2181 def get_workload_id_transform_id_map(workload_id, logger=None, log_prefix=''):
2182 cache = get_redis_cache()
2183 workload_id_transform_id_map_key = "all_worloadid2transformid_map"
2184 workload_id_transform_id_map = cache.get(workload_id_transform_id_map_key, default={})
2185
2186 workload_id_transform_id_map_notexist_key = "all_worloadid2transformid_map_notexist"
2187 workload_id_transform_id_map_notexist = cache.get(workload_id_transform_id_map_notexist_key, default={})
2188
2189 if type(workload_id_transform_id_map_notexist) in (list, tuple):
2190 workload_id_transform_id_map_notexist = {}
2191
2192 workload_id_str = str(workload_id)
2193 if workload_id_str in workload_id_transform_id_map:
2194 return workload_id_transform_id_map[workload_id_str]
2195
2196 if workload_id_str in workload_id_transform_id_map_notexist and workload_id_transform_id_map_notexist[workload_id_str] + 600 < time.time():
2197 return None
2198
2199
2200 workload_id_lock.acquire()
2201
2202 workload_id_transform_id_map = cache.get(workload_id_transform_id_map_key, default={})
2203 workload_id_transform_id_map_notexist = cache.get(workload_id_transform_id_map_notexist_key, default={})
2204
2205 if type(workload_id_transform_id_map_notexist) in (list, tuple):
2206 workload_id_transform_id_map_notexist = {}
2207
2208 request_ids = []
2209 if not workload_id_transform_id_map or workload_id_str not in workload_id_transform_id_map or len(workload_id_transform_id_map[workload_id_str]) < 5:
2210 processing_status = [ProcessingStatus.New,
2211 ProcessingStatus.Submitting, ProcessingStatus.Submitted,
2212 ProcessingStatus.Running, ProcessingStatus.FinishedOnExec,
2213 ProcessingStatus.Cancel, ProcessingStatus.FinishedOnStep,
2214 ProcessingStatus.ToCancel, ProcessingStatus.Cancelling,
2215 ProcessingStatus.ToSuspend, ProcessingStatus.Suspending,
2216 ProcessingStatus.ToResume, ProcessingStatus.Resuming,
2217 ProcessingStatus.ToExpire, ProcessingStatus.Expiring,
2218 ProcessingStatus.ToFinish, ProcessingStatus.ToForceFinish]
2219
2220 procs = core_processings.get_processings_by_status(status=processing_status)
2221 for proc in procs:
2222 processing = proc['processing_metadata']['processing']
2223 work = processing.work
2224 if work.use_dependency_to_release_jobs():
2225 workload_id_transform_id_map[str(proc['workload_id'])] = (proc['request_id'],
2226 proc['transform_id'],
2227 proc['processing_id'],
2228 proc['status'].value,
2229 proc['substatus'].value)
2230 if proc['request_id'] not in request_ids:
2231 request_ids.append(proc['request_id'])
2232
2233 cache.set(workload_id_transform_id_map_key, workload_id_transform_id_map)
2234
2235 for key in workload_id_transform_id_map:
2236 if key in workload_id_transform_id_map_notexist:
2237 del workload_id_transform_id_map_notexist[key]
2238
2239
2240 if request_ids:
2241 get_collection_id_transform_id_map(coll_id=None, request_id=request_ids[0], request_ids=request_ids)
2242
2243 keys = list(workload_id_transform_id_map_notexist.keys())
2244 for key in keys:
2245 if workload_id_transform_id_map_notexist[key] + 7200 < time.time():
2246 del workload_id_transform_id_map_notexist[key]
2247
2248 cache.set(workload_id_transform_id_map_notexist_key, workload_id_transform_id_map_notexist)
2249
2250 if workload_id_str not in workload_id_transform_id_map:
2251 if workload_id_str not in workload_id_transform_id_map_notexist:
2252 workload_id_transform_id_map_notexist[workload_id_str] = time.time()
2253 cache.set(workload_id_transform_id_map_notexist_key, workload_id_transform_id_map_notexist)
2254
2255 workload_id_lock.release()
2256 return None
2257 else:
2258 workload_id_lock.release()
2259
2260 return workload_id_transform_id_map[workload_id_str]
2261
2262
2263 content_id_lock = threading.Lock()
2264
2265
2266 def get_input_name_content_id_map(request_id, workload_id, transform_id):
2267 cache = get_redis_cache()
2268 input_name_content_id_map_key = "transform_input_contentid_map_%s" % transform_id
2269 input_name_content_id_map = cache.get(input_name_content_id_map_key, default={})
2270
2271 if not input_name_content_id_map:
2272 content_id_lock.acquire()
2273
2274 contents = core_catalog.get_contents_by_request_transform(request_id=request_id, transform_id=transform_id)
2275 input_name_content_id_map = {}
2276 for content in contents:
2277 if content['content_relation_type'] == ContentRelationType.Output:
2278 if content['name'] not in input_name_content_id_map:
2279 input_name_content_id_map[content['name']] = []
2280 input_name_content_id_map[content['name']].append(content['content_id'])
2281 if content['path']:
2282 if content['path'] not in input_name_content_id_map:
2283 input_name_content_id_map[content['path']] = []
2284 input_name_content_id_map[content['path']].append(content['content_id'])
2285
2286 cache.set(input_name_content_id_map_key, input_name_content_id_map)
2287
2288 content_id_lock.release()
2289 return input_name_content_id_map
2290
2291
2292 def get_jobid_content_id_map(request_id, workload_id, transform_id, job_id, inputs):
2293 cache = get_redis_cache()
2294 jobid_content_id_map_key = "transform_jobid_contentid_map_%s" % transform_id
2295 jobid_content_id_map = cache.get(jobid_content_id_map_key, default={})
2296
2297 to_update_jobid = False
2298 job_id = str(job_id)
2299 if not jobid_content_id_map or job_id not in jobid_content_id_map:
2300 to_update_jobid = True
2301 input_name_content_id_map = get_input_name_content_id_map(request_id, workload_id, transform_id)
2302 for ip in inputs:
2303 if ':' in ip:
2304 pos = ip.find(":")
2305 ip = ip[pos + 1:]
2306 if ip in input_name_content_id_map:
2307 content_ids = input_name_content_id_map[ip]
2308 jobid_content_id_map[job_id] = content_ids
2309 break
2310
2311 cache.set(jobid_content_id_map_key, jobid_content_id_map)
2312 return jobid_content_id_map, to_update_jobid
2313
2314
2315 def get_content_id_from_job_id(request_id, workload_id, transform_id, job_id, inputs):
2316 jobid_content_id_map, to_update_jobid = get_jobid_content_id_map(request_id, workload_id, transform_id, job_id, inputs)
2317
2318 if str(job_id) in jobid_content_id_map:
2319 content_ids = jobid_content_id_map[str(job_id)]
2320 else:
2321 content_ids = None
2322 return content_ids, to_update_jobid
2323
2324
2325 pending_lock = threading.Lock()
2326
2327
2328 def whether_to_process_pending_workload_id(workload_id, logger=None, log_prefix=''):
2329 cache = get_redis_cache()
2330 processed_pending_workload_id_map_key = "processed_pending_workload_id_map"
2331 processed_pending_workload_id_map = cache.get(processed_pending_workload_id_map_key, default={})
2332 processed_pending_workload_id_map_time_key = "processed_pending_workload_id_map_time"
2333 processed_pending_workload_id_map_time = cache.get(processed_pending_workload_id_map_time_key, default=None)
2334
2335 workload_id = str(workload_id)
2336 if workload_id in processed_pending_workload_id_map:
2337 return False
2338
2339
2340 pending_lock.acquire()
2341 processed_pending_workload_id_map = cache.get(processed_pending_workload_id_map_key, default={})
2342
2343 processed_pending_workload_id_map[workload_id] = time.time()
2344 if processed_pending_workload_id_map_time is None or processed_pending_workload_id_map_time + 86400 < time.time():
2345 cache.set(processed_pending_workload_id_map_time_key, int(time.time()), expire_seconds=86400)
2346
2347 keys = list(processed_pending_workload_id_map.keys())
2348 for workload_id in keys:
2349 if processed_pending_workload_id_map[workload_id] + 86400 < time.time():
2350 del processed_pending_workload_id_map[workload_id]
2351
2352 cache.set(processed_pending_workload_id_map_key, processed_pending_workload_id_map, expire_seconds=86400)
2353 pending_lock.release()
2354 return True
2355
2356
2357 update_processing_lock = threading.Lock()
2358
2359
2360 def whether_to_update_processing(processing_id, interval=300):
2361 cache = get_redis_cache()
2362 ret = False
2363
2364 update_processing_lock.acquire()
2365 update_processing_map_key = "update_processing_map"
2366 update_processing_map = cache.get(update_processing_map_key, default={})
2367
2368 processing_id_str = str(processing_id)
2369 if processing_id_str not in update_processing_map or update_processing_map[processing_id_str] + interval < time.time():
2370 update_processing_map[processing_id_str] = time.time()
2371 ret = True
2372
2373 keys = list(update_processing_map.keys())
2374 for key in keys:
2375 if update_processing_map[key] + 86400 < time.time():
2376 del update_processing_map[key]
2377
2378 cache.set(update_processing_map_key, update_processing_map, expire_seconds=86400)
2379 update_processing_lock.release()
2380 return ret
2381
2382
2383 def handle_messages_processing(messages, logger=None, log_prefix='', update_processing_interval=300):
2384 logger = get_logger(logger)
2385 if not log_prefix:
2386 log_prefix = "<Message>"
2387
2388 update_processings = []
2389 update_processings_by_job = []
2390 terminated_processings = []
2391 update_contents = []
2392
2393 for ori_msg in messages:
2394 if type(ori_msg) in [dict]:
2395 msg = ori_msg
2396 else:
2397 msg = json.loads(ori_msg)
2398 if 'taskid' not in msg or not msg['taskid']:
2399 continue
2400
2401 if msg['msg_type'] in ['task_status']:
2402 workload_id = msg['taskid']
2403 status = msg['status']
2404 if status in ['pending1']:
2405 logger.debug(log_prefix + "Received message: %s" % str(ori_msg))
2406
2407 ret_req_tf_pr_id = get_workload_id_transform_id_map(workload_id, logger=logger, log_prefix=log_prefix)
2408 if not ret_req_tf_pr_id:
2409
2410 logger.debug(log_prefix + "No matched workload_id, discard message: %s" % str(ori_msg))
2411 continue
2412
2413 logger.debug(log_prefix + "(request_id, transform_id, processing_id, status, substatus): %s" % str(ret_req_tf_pr_id))
2414 req_id, tf_id, processing_id, r_status, r_substatus = ret_req_tf_pr_id
2415 if whether_to_process_pending_workload_id(workload_id, logger=logger, log_prefix=log_prefix):
2416
2417 if processing_id not in update_processings:
2418 update_processings.append(processing_id)
2419 logger.debug(log_prefix + "Add to update processing: %s" % str(processing_id))
2420 else:
2421 logger.debug(log_prefix + "Processing %s is already processed, not add it to update processing" % (str(processing_id)))
2422 elif status in ['finished', 'done']:
2423 logger.debug(log_prefix + "Received message: %s" % str(ori_msg))
2424
2425 ret_req_tf_pr_id = get_workload_id_transform_id_map(workload_id, logger=logger, log_prefix=log_prefix)
2426 if not ret_req_tf_pr_id:
2427
2428 logger.debug(log_prefix + "No matched workload_id, discard message: %s" % str(ori_msg))
2429 continue
2430
2431 logger.debug(log_prefix + "(request_id, transform_id, processing_id, status, substatus): %s" % str(ret_req_tf_pr_id))
2432 req_id, tf_id, processing_id, r_status, r_substatus = ret_req_tf_pr_id
2433
2434 if processing_id not in update_processings:
2435 terminated_processings.append(processing_id)
2436 logger.debug(log_prefix + "Add to terminated processing: %s" % str(processing_id))
2437
2438 if msg['msg_type'] in ['job_status']:
2439 workload_id = msg['taskid']
2440 job_id = msg['jobid']
2441 status = msg['status']
2442 inputs = msg['inputs']
2443
2444
2445 if inputs and status in ['finished', 'activated']:
2446 logger.debug(log_prefix + "Received message: %s" % str(ori_msg))
2447
2448 ret_req_tf_pr_id = get_workload_id_transform_id_map(workload_id, logger=logger, log_prefix=log_prefix)
2449 if not ret_req_tf_pr_id:
2450
2451 logger.debug(log_prefix + "No matched workload_id, discard message: %s" % str(ori_msg))
2452 continue
2453
2454 logger.debug(log_prefix + "(request_id, transform_id, processing_id, status, substatus): %s" % str(ret_req_tf_pr_id))
2455
2456 req_id, tf_id, processing_id, r_status, r_substatus = ret_req_tf_pr_id
2457 content_ids, to_update_jobid = get_content_id_from_job_id(req_id, workload_id, tf_id, job_id, inputs)
2458 if content_ids:
2459 for content_id in content_ids:
2460 if to_update_jobid:
2461 u_content = {'content_id': content_id,
2462 'request_id': req_id,
2463 'transform_id': tf_id,
2464 'workload_id': workload_id,
2465
2466 'substatus': get_content_status_from_panda_msg_status(status),
2467 'content_metadata': {'panda_id': job_id}}
2468 else:
2469 u_content = {'content_id': content_id,
2470 'request_id': req_id,
2471 'transform_id': tf_id,
2472 'workload_id': workload_id,
2473 'substatus': get_content_status_from_panda_msg_status(status)}
2474
2475
2476 update_contents.append(u_content)
2477
2478
2479 if processing_id not in update_processings_by_job:
2480 update_processings_by_job.append(processing_id)
2481 logger.debug(log_prefix + "Add to update processing by job: %s" % str(processing_id))
2482
2483 return update_processings, update_processings_by_job, terminated_processings, update_contents, []
2484
2485
2486 def sync_collection_status(request_id, transform_id, workload_id, work, input_output_maps=None, log_prefix='',
2487 close_collection=False, force_close_collection=False, abort=False, terminate=False):
2488 logger = get_logger()
2489
2490 logger.info(log_prefix + "sync_collection_status")
2491
2492 if input_output_maps is None:
2493 input_output_maps = get_input_output_maps(request_id, transform_id, work, with_deps=False)
2494
2495 all_updates_flushed = True
2496 coll_status = {}
2497 messages = []
2498 for map_id in input_output_maps:
2499 inputs = input_output_maps[map_id]['inputs'] if 'inputs' in input_output_maps[map_id] else []
2500
2501 outputs = input_output_maps[map_id]['outputs'] if 'outputs' in input_output_maps[map_id] else []
2502 logs = input_output_maps[map_id]['logs'] if 'logs' in input_output_maps[map_id] else []
2503
2504 for content in inputs + outputs + logs:
2505 if content['coll_id'] not in coll_status:
2506 coll_status[content['coll_id']] = {'total_files': 0, 'processed_files': 0, 'processing_files': 0, 'bytes': 0,
2507 'new_files': 0, 'activated_files': 0, 'failed_files': 0, 'missing_files': 0,
2508 'ext_files': 0, 'processed_ext_files': 0, 'failed_ext_files': 0,
2509 'preprocessing_files': 0, 'missing_ext_files': 0}
2510 coll_status[content['coll_id']]['total_files'] += 1
2511
2512 if content['status'] in [ContentStatus.Available, ContentStatus.Mapped,
2513 ContentStatus.Available.value, ContentStatus.Mapped.value,
2514 ContentStatus.FakeAvailable, ContentStatus.FakeAvailable.value]:
2515 coll_status[content['coll_id']]['processed_files'] += 1
2516 coll_status[content['coll_id']]['bytes'] += content['bytes']
2517 elif content['status'] in [ContentStatus.New]:
2518 coll_status[content['coll_id']]['new_files'] += 1
2519 elif content['status'] in [ContentStatus.Failed, ContentStatus.FinalFailed,
2520 ContentStatus.SubAvailable, ContentStatus.FinalSubAvailable]:
2521 coll_status[content['coll_id']]['failed_files'] += 1
2522 elif content['status'] in [ContentStatus.Lost, ContentStatus.Deleted, ContentStatus.Missing]:
2523 coll_status[content['coll_id']]['missing_files'] += 1
2524 elif content['status'] in [ContentStatus.Processing]:
2525 coll_status[content['coll_id']]['processing_files'] += 1
2526 elif content['status'] in [ContentStatus.Activated]:
2527 coll_status[content['coll_id']]['activated_files'] += 1
2528 else:
2529 coll_status[content['coll_id']]['preprocessing_files'] += 1
2530
2531 if content['status'] != content['substatus']:
2532 all_updates_flushed = False
2533
2534 all_ext_updated = True
2535 if work.require_ext_contents():
2536 all_ext_updated = False
2537 contents_ext = core_catalog.get_contents_ext(request_id=request_id, transform_id=transform_id)
2538 for content in contents_ext:
2539 coll_status[content['coll_id']]['ext_files'] += 1
2540
2541 if content['status'] in [ContentStatus.Available, ContentStatus.Mapped,
2542 ContentStatus.Available.value, ContentStatus.Mapped.value,
2543 ContentStatus.FakeAvailable, ContentStatus.FakeAvailable.value]:
2544 coll_status[content['coll_id']]['processed_ext_files'] += 1
2545
2546 elif content['status'] in [ContentStatus.Failed, ContentStatus.FinalFailed,
2547 ContentStatus.SubAvailable, ContentStatus.FinalSubAvailable]:
2548 coll_status[content['coll_id']]['failed_ext_files'] += 1
2549 elif content['status'] in [ContentStatus.Lost, ContentStatus.Deleted, ContentStatus.Missing]:
2550 coll_status[content['coll_id']]['missing_ext_files'] += 1
2551
2552 logger.info(log_prefix + f"sync_collection_status, coll_status: {coll_status}")
2553
2554 input_collections = work.get_input_collections(poll_externel=True)
2555 output_collections = work.get_output_collections()
2556 log_collections = work.get_log_collections()
2557
2558 update_collections = []
2559 for coll in input_collections + output_collections + log_collections:
2560 if coll.coll_id in coll_status:
2561 if 'total_files' in coll.coll_metadata and coll.coll_metadata['total_files']:
2562 coll.total_files = coll.coll_metadata['total_files']
2563 else:
2564 coll.total_files = coll_status[coll.coll_id]['total_files']
2565 coll.processed_files = coll_status[coll.coll_id]['processed_files']
2566 coll.processing_files = coll_status[coll.coll_id]['processing_files']
2567 coll.preprocessing_files = coll_status[coll.coll_id]['preprocessing_files']
2568 coll.activated_files = coll_status[coll.coll_id]['activated_files']
2569 coll.bytes = coll_status[coll.coll_id]['bytes']
2570 coll.new_files = coll_status[coll.coll_id]['new_files']
2571 coll.failed_files = coll_status[coll.coll_id]['failed_files']
2572 coll.missing_files = coll_status[coll.coll_id]['missing_files']
2573 coll.ext_files = coll_status[coll.coll_id]['ext_files']
2574 coll.processed_ext_files = coll_status[coll.coll_id]['processed_ext_files']
2575 coll.failed_ext_files = coll_status[coll.coll_id]['failed_ext_files']
2576 coll.missing_ext_files = coll_status[coll.coll_id]['missing_ext_files']
2577 else:
2578 if 'total_files' in coll.coll_metadata and coll.coll_metadata['total_files']:
2579 coll.total_files = coll.coll_metadata['total_files']
2580 else:
2581 coll.total_files = 0
2582 if 'availability' in coll.coll_metadata and coll.coll_metadata['availability']:
2583 coll.processed_files = coll.coll_metadata['availability']
2584 else:
2585 coll.processed_files = 0
2586 if 'stuck' in coll.coll_metadata and coll.coll_metadata['stuck']:
2587 coll.failed_files = coll.coll_metadata['stuck']
2588 else:
2589 coll.failed_files = 0
2590 if 'processing' in coll.coll_metadata and coll.coll_metadata['processing']:
2591 coll.processing_files = coll.coll_metadata['processing']
2592 else:
2593 coll.processing_files = coll.total_files - coll.processed_files - coll.failed_files
2594 coll.new_files = 0
2595 coll.preprocessing_files = 0
2596 coll.activated_files = 0
2597 coll.missing_files = 0
2598 coll.ext_files = 0
2599 coll.processed_ext_files = 0
2600 coll.failed_ext_files = 0
2601 coll.missing_ext_files = 0
2602
2603 u_coll = {'coll_id': coll.coll_id,
2604 'total_files': coll.total_files,
2605 'processed_files': coll.processed_files,
2606 'processing_files': coll.processing_files,
2607 'activated_files': coll.activated_files,
2608 'preprocessing_files': coll.preprocessing_files,
2609 'new_files': coll.new_files,
2610 'failed_files': coll.failed_files,
2611 'missing_files': coll.missing_files,
2612 'bytes': coll.bytes,
2613 'ext_files': coll.ext_files,
2614 'processed_ext_files': coll.processed_ext_files,
2615 'failed_ext_files': coll.failed_ext_files,
2616 'missing_ext_files': coll.missing_ext_files}
2617
2618 if (not work.generating_new_inputs()) and (coll in input_collections and (workload_id is not None)):
2619 if coll.total_files == coll.processed_files + coll.failed_files + coll.missing_files:
2620 coll_db = core_catalog.get_collection(coll_id=coll.coll_id)
2621 coll.status = coll_db['status']
2622 if coll.status is not None and coll.status != CollectionStatus.Closed:
2623 u_coll['status'] = CollectionStatus.Closed
2624 u_coll['substatus'] = CollectionStatus.Closed
2625 coll.status = CollectionStatus.Closed
2626 coll.substatus = CollectionStatus.Closed
2627
2628 msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='collection', files=[coll], relation_type='input')
2629 messages += msgs
2630
2631 if terminate:
2632 all_files_monitored = False
2633 if coll.total_files == coll.processed_files + coll.failed_files + coll.missing_files:
2634 all_files_monitored = True
2635
2636 if abort:
2637 u_coll['status'] = CollectionStatus.Closed
2638 u_coll['substatus'] = CollectionStatus.Closed
2639 coll.status = CollectionStatus.Closed
2640 coll.substatus = CollectionStatus.Closed
2641 elif coll in output_collections:
2642 if (not work.require_ext_contents() or (work.require_ext_contents()
2643 and coll.processed_files <= coll.processed_ext_files and coll.failed_files <= coll.failed_ext_files)):
2644 all_ext_updated = True
2645 if (force_close_collection or (close_collection and all_updates_flushed and all_ext_updated and all_files_monitored)
2646 or coll.status == CollectionStatus.Closed):
2647 u_coll['status'] = CollectionStatus.Closed
2648 u_coll['substatus'] = CollectionStatus.Closed
2649 coll.status = CollectionStatus.Closed
2650 coll.substatus = CollectionStatus.Closed
2651 elif force_close_collection or (close_collection and all_updates_flushed and all_files_monitored) or coll.status == CollectionStatus.Closed:
2652 u_coll['status'] = CollectionStatus.Closed
2653 u_coll['substatus'] = CollectionStatus.Closed
2654 coll.status = CollectionStatus.Closed
2655 coll.substatus = CollectionStatus.Closed
2656
2657 update_collections.append(u_coll)
2658
2659 logger.info(log_prefix + f"sync_collection_status, update_collections: {update_collections}")
2660
2661 return update_collections, all_updates_flushed, messages
2662
2663
2664 def sync_collection_status_new(request_id, transform_id, workload_id, work, log_prefix='',
2665 close_collection=False, force_close_collection=False, abort=False, terminate=False):
2666 """
2667 Synchronise collection file-count statistics using pure SQL aggregate queries,
2668 avoiding loading all input_output_maps rows into Python memory.
2669
2670 The function issues two GROUP-BY queries to the database:
2671 1. ``contents`` table → count + bytes per (coll_id, status), plus a flag
2672 indicating whether any row still has status != substatus.
2673 2. ``contents_ext`` table (only when the work requires ext contents) →
2674 count per (coll_id, status).
2675
2676 The results are mapped to the same ``update_collections`` / ``all_updates_flushed``
2677 / ``messages`` return shape that ``sync_collection_status`` produces, so the two
2678 functions are interchangeable at call sites.
2679 """
2680 logger = get_logger()
2681 logger.info(log_prefix + "sync_collection_status_new")
2682
2683
2684
2685
2686 coll_stats_raw = core_catalog.get_content_status_statistics_by_coll(
2687 request_id=request_id, transform_id=transform_id, with_deps=False
2688 )
2689
2690
2691
2692 _processed_statuses = {
2693 ContentStatus.Available, ContentStatus.Mapped,
2694 ContentStatus.FakeAvailable,
2695 ContentStatus.Available.value, ContentStatus.Mapped.value,
2696 ContentStatus.FakeAvailable.value,
2697 }
2698 _failed_statuses = {
2699 ContentStatus.Failed, ContentStatus.FinalFailed,
2700 ContentStatus.SubAvailable, ContentStatus.FinalSubAvailable,
2701 }
2702 _missing_statuses = {
2703 ContentStatus.Lost, ContentStatus.Deleted, ContentStatus.Missing,
2704 }
2705
2706 all_updates_flushed = True
2707 coll_status = {}
2708
2709 for coll_id, by_status in coll_stats_raw.items():
2710 entry = {
2711 'total_files': 0,
2712 'processed_files': 0,
2713 'processing_files': 0,
2714 'bytes': 0,
2715 'new_files': 0,
2716 'activated_files': 0,
2717 'failed_files': 0,
2718 'missing_files': 0,
2719 'preprocessing_files': 0,
2720 'ext_files': 0,
2721 'processed_ext_files': 0,
2722 'failed_ext_files': 0,
2723 'missing_ext_files': 0,
2724 }
2725
2726 if by_status.get('has_unsynced', False):
2727 all_updates_flushed = False
2728
2729 for status, stat in by_status.items():
2730 if status == 'has_unsynced':
2731 continue
2732 cnt = stat['count']
2733 byt = stat['bytes']
2734 entry['total_files'] += cnt
2735
2736 if status in _processed_statuses:
2737 entry['processed_files'] += cnt
2738 entry['bytes'] += byt
2739 elif status == ContentStatus.New or status == ContentStatus.New.value:
2740 entry['new_files'] += cnt
2741 elif status in _failed_statuses or status in {s.value for s in _failed_statuses}:
2742 entry['failed_files'] += cnt
2743 elif status in _missing_statuses or status in {s.value for s in _missing_statuses}:
2744 entry['missing_files'] += cnt
2745 elif status == ContentStatus.Processing or status == ContentStatus.Processing.value:
2746 entry['processing_files'] += cnt
2747 elif status == ContentStatus.Activated or status == ContentStatus.Activated.value:
2748 entry['activated_files'] += cnt
2749 else:
2750 entry['preprocessing_files'] += cnt
2751
2752 coll_status[coll_id] = entry
2753
2754
2755
2756
2757 all_ext_updated = True
2758 if work.require_ext_contents():
2759 all_ext_updated = False
2760 ext_stats_raw = core_catalog.get_content_ext_status_statistics_by_coll(
2761 request_id=request_id, transform_id=transform_id
2762 )
2763
2764 for coll_id, by_status in ext_stats_raw.items():
2765 if coll_id not in coll_status:
2766 coll_status[coll_id] = {
2767 'total_files': 0, 'processed_files': 0, 'processing_files': 0,
2768 'bytes': 0, 'new_files': 0, 'activated_files': 0,
2769 'failed_files': 0, 'missing_files': 0, 'preprocessing_files': 0,
2770 'ext_files': 0, 'processed_ext_files': 0, 'failed_ext_files': 0,
2771 'missing_ext_files': 0,
2772 }
2773 for status, cnt in by_status.items():
2774 coll_status[coll_id]['ext_files'] += cnt
2775 if status in _processed_statuses:
2776 coll_status[coll_id]['processed_ext_files'] += cnt
2777 elif status in _failed_statuses or status in {s.value for s in _failed_statuses}:
2778 coll_status[coll_id]['failed_ext_files'] += cnt
2779 elif status in _missing_statuses or status in {s.value for s in _missing_statuses}:
2780 coll_status[coll_id]['missing_ext_files'] += cnt
2781
2782 logger.info(log_prefix + f"sync_collection_status_new, coll_status: {coll_status}")
2783
2784
2785
2786
2787 input_collections = work.get_input_collections(poll_externel=True)
2788 output_collections = work.get_output_collections()
2789 log_collections = work.get_log_collections()
2790
2791 messages = []
2792 update_collections = []
2793
2794 for coll in input_collections + output_collections + log_collections:
2795 if coll.coll_id in coll_status:
2796 st = coll_status[coll.coll_id]
2797 if 'total_files' in coll.coll_metadata and coll.coll_metadata['total_files']:
2798 coll.total_files = coll.coll_metadata['total_files']
2799 else:
2800 coll.total_files = st['total_files']
2801 coll.processed_files = st['processed_files']
2802 coll.processing_files = st['processing_files']
2803 coll.preprocessing_files = st['preprocessing_files']
2804 coll.activated_files = st['activated_files']
2805 coll.bytes = st['bytes']
2806 coll.new_files = st['new_files']
2807 coll.failed_files = st['failed_files']
2808 coll.missing_files = st['missing_files']
2809 coll.ext_files = st['ext_files']
2810 coll.processed_ext_files = st['processed_ext_files']
2811 coll.failed_ext_files = st['failed_ext_files']
2812 coll.missing_ext_files = st['missing_ext_files']
2813 else:
2814 if 'total_files' in coll.coll_metadata and coll.coll_metadata['total_files']:
2815 coll.total_files = coll.coll_metadata['total_files']
2816 else:
2817 coll.total_files = 0
2818 if 'availability' in coll.coll_metadata and coll.coll_metadata['availability']:
2819 coll.processed_files = coll.coll_metadata['availability']
2820 else:
2821 coll.processed_files = 0
2822 if 'stuck' in coll.coll_metadata and coll.coll_metadata['stuck']:
2823 coll.failed_files = coll.coll_metadata['stuck']
2824 else:
2825 coll.failed_files = 0
2826 if 'processing' in coll.coll_metadata and coll.coll_metadata['processing']:
2827 coll.processing_files = coll.coll_metadata['processing']
2828 else:
2829 coll.processing_files = coll.total_files - coll.processed_files - coll.failed_files
2830 coll.new_files = 0
2831 coll.preprocessing_files = 0
2832 coll.activated_files = 0
2833 coll.missing_files = 0
2834 coll.ext_files = 0
2835 coll.processed_ext_files = 0
2836 coll.failed_ext_files = 0
2837 coll.missing_ext_files = 0
2838
2839 u_coll = {
2840 'coll_id': coll.coll_id,
2841 'total_files': coll.total_files,
2842 'processed_files': coll.processed_files,
2843 'processing_files': coll.processing_files,
2844 'activated_files': coll.activated_files,
2845 'preprocessing_files': coll.preprocessing_files,
2846 'new_files': coll.new_files,
2847 'failed_files': coll.failed_files,
2848 'missing_files': coll.missing_files,
2849 'bytes': coll.bytes,
2850 'ext_files': coll.ext_files,
2851 'processed_ext_files': coll.processed_ext_files,
2852 'failed_ext_files': coll.failed_ext_files,
2853 'missing_ext_files': coll.missing_ext_files,
2854 }
2855
2856 if (not work.generating_new_inputs()) and (coll in input_collections and (workload_id is not None)):
2857 if coll.total_files == coll.processed_files + coll.failed_files + coll.missing_files:
2858 coll_db = core_catalog.get_collection(coll_id=coll.coll_id)
2859 coll.status = coll_db['status']
2860 if coll.status is not None and coll.status != CollectionStatus.Closed:
2861 u_coll['status'] = CollectionStatus.Closed
2862 u_coll['substatus'] = CollectionStatus.Closed
2863 coll.status = CollectionStatus.Closed
2864 coll.substatus = CollectionStatus.Closed
2865
2866 msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='collection', files=[coll], relation_type='input')
2867 messages += msgs
2868
2869 if terminate:
2870 all_files_monitored = False
2871 if coll.total_files == coll.processed_files + coll.failed_files + coll.missing_files:
2872 all_files_monitored = True
2873
2874 if abort:
2875 u_coll['status'] = CollectionStatus.Closed
2876 u_coll['substatus'] = CollectionStatus.Closed
2877 coll.status = CollectionStatus.Closed
2878 coll.substatus = CollectionStatus.Closed
2879 elif coll in output_collections:
2880 if (not work.require_ext_contents() or (work.require_ext_contents()
2881 and coll.processed_files <= coll.processed_ext_files and coll.failed_files <= coll.failed_ext_files)):
2882 all_ext_updated = True
2883 if (force_close_collection or (close_collection and all_updates_flushed and all_ext_updated and all_files_monitored)
2884 or coll.status == CollectionStatus.Closed):
2885 u_coll['status'] = CollectionStatus.Closed
2886 u_coll['substatus'] = CollectionStatus.Closed
2887 coll.status = CollectionStatus.Closed
2888 coll.substatus = CollectionStatus.Closed
2889 elif force_close_collection or (close_collection and all_updates_flushed and all_files_monitored) or coll.status == CollectionStatus.Closed:
2890 u_coll['status'] = CollectionStatus.Closed
2891 u_coll['substatus'] = CollectionStatus.Closed
2892 coll.status = CollectionStatus.Closed
2893 coll.substatus = CollectionStatus.Closed
2894
2895 update_collections.append(u_coll)
2896
2897 logger.info(log_prefix + f"sync_collection_status_new, update_collections: {update_collections}")
2898
2899 return update_collections, all_updates_flushed, messages
2900
2901
2902 def sync_work_status(request_id, transform_id, workload_id, work, substatus=None, log_prefix=""):
2903 logger = get_logger()
2904
2905 input_collections = work.get_input_collections()
2906 output_collections = work.get_output_collections()
2907 log_collections = work.get_log_collections()
2908
2909 is_all_collections_closed = True
2910 is_all_files_processed = True
2911 is_all_files_failed = True
2912 has_files = False
2913 for coll in input_collections + output_collections + log_collections:
2914 if coll.status != CollectionStatus.Closed:
2915 is_all_collections_closed = False
2916 for coll in output_collections:
2917 if coll.total_files > 0:
2918 has_files = True
2919 if coll.total_files != coll.processed_files:
2920 is_all_files_processed = False
2921 if coll.processed_files > 0 or coll.total_files == coll.processed_files:
2922 is_all_files_failed = False
2923
2924 if is_all_collections_closed:
2925 logger.debug(log_prefix + "has_files: %s, is_all_files_processed: %s, is_all_files_failed: %s, substatus: %s" % (has_files,
2926 is_all_files_processed,
2927 is_all_files_failed,
2928 substatus))
2929 if has_files:
2930 if is_all_files_processed:
2931 work.status = WorkStatus.Finished
2932 elif is_all_files_failed:
2933 work.status = WorkStatus.Failed
2934 else:
2935 work.status = WorkStatus.SubFinished
2936 else:
2937 if substatus:
2938 work.status = get_work_status_from_transform_processing_status(substatus)
2939 else:
2940 work.status = WorkStatus.Failed
2941 elif substatus and substatus in [ProcessingStatus.Broken]:
2942 work.status = get_work_status_from_transform_processing_status(substatus)
2943 logger.debug(log_prefix + "work status: %s, substatus: %s" % (str(work.status), substatus))
2944
2945
2946 def sync_processing(processing, agent_attributes, terminate=False, abort=False, logger=None, log_prefix=""):
2947 logger = get_logger()
2948
2949 terminated_status = [ProcessingStatus.Finished, ProcessingStatus.Failed, ProcessingStatus.SubFinished,
2950 ProcessingStatus.Terminating, ProcessingStatus.Cancelled]
2951
2952 request_id = processing['request_id']
2953 transform_id = processing['transform_id']
2954 workload_id = processing['workload_id']
2955
2956 proc = processing['processing_metadata']['processing']
2957 work = proc.work
2958 work.set_agent_attributes(agent_attributes, processing)
2959
2960 messages = []
2961
2962 if processing['substatus'] in terminated_status or processing['substatus'] in terminated_status:
2963 terminate = True
2964 update_collections, all_updates_flushed, msgs = sync_collection_status_new(request_id, transform_id, workload_id, work,
2965 log_prefix=log_prefix,
2966 close_collection=True, abort=abort, terminate=terminate)
2967
2968 messages += msgs
2969
2970 sync_work_status(request_id, transform_id, workload_id, work, processing['substatus'], log_prefix)
2971 logger.info(log_prefix + "sync_processing: work status: %s" % work.get_status())
2972 if terminate and work.is_terminated() and all_updates_flushed:
2973 msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='work')
2974 messages += msgs
2975 if work.is_finished():
2976 processing['status'] = ProcessingStatus.Finished
2977
2978 elif work.is_subfinished():
2979 processing['status'] = ProcessingStatus.SubFinished
2980 elif work.is_failed():
2981 processing['status'] = ProcessingStatus.Failed
2982 else:
2983 processing['status'] = ProcessingStatus.SubFinished
2984
2985
2986 if work.dispatch_ext_content:
2987 input_output_maps = get_input_output_maps(request_id, transform_id, work, with_deps=False)
2988 logger.info(f"{log_prefix} generating messages for ext contents")
2989 contents_ext = core_catalog.get_contents_ext(request_id=request_id, transform_id=transform_id)
2990 msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='content_ext', files=contents_ext,
2991 relation_type='output', input_output_maps=input_output_maps)
2992 messages += msgs
2993
2994 if processing['status'] == ProcessingStatus.Terminating and is_process_terminated(processing['substatus']):
2995 processing['status'] = processing['substatus']
2996
2997 return processing, update_collections, messages
2998
2999
3000 def handle_abort_processing(processing, agent_attributes, logger=None, sync=True, log_prefix=''):
3001 logger = get_logger(logger)
3002
3003
3004
3005
3006
3007 proc = processing['processing_metadata']['processing']
3008 work = proc.work
3009 work.set_agent_attributes(agent_attributes, processing)
3010
3011 work.abort_processing(processing, log_prefix=log_prefix)
3012
3013
3014
3015
3016
3017
3018
3019
3020
3021
3022
3023
3024
3025 update_contents = []
3026 if sync:
3027 processing, update_collections, messages = sync_processing(processing, agent_attributes, terminate=True, abort=True, logger=logger, log_prefix=log_prefix)
3028 else:
3029 update_collections = []
3030 messages = []
3031
3032
3033 return processing, update_collections, update_contents, messages
3034
3035
3036 def reactive_contents(request_id, transform_id, workload_id, work, input_output_maps):
3037 updated_contents = []
3038 contents = core_catalog.get_contents_by_request_transform(request_id=request_id, transform_id=transform_id)
3039 for content in contents:
3040 if content['status'] not in [ContentStatus.Available, ContentStatus.Mapped,
3041 ContentStatus.Available.value, ContentStatus.Mapped.value,
3042 ContentStatus.FakeAvailable, ContentStatus.FakeAvailable.value]:
3043 u_content = {'content_id': content['content_id'],
3044 'request_id': content['request_id'],
3045 'substatus': ContentStatus.New,
3046 'status': ContentStatus.New}
3047 updated_contents.append(u_content)
3048 return updated_contents
3049
3050
3051 def handle_resume_processing(processing, agent_attributes, logger=None, log_prefix=''):
3052 logger = get_logger(logger)
3053
3054 request_id = processing['request_id']
3055 transform_id = processing['transform_id']
3056 workload_id = processing['workload_id']
3057
3058 proc = processing['processing_metadata']['processing']
3059 work = proc.work
3060 work.set_agent_attributes(agent_attributes, processing)
3061
3062 work.resume_processing(processing, log_prefix=log_prefix)
3063
3064 input_collections = work.get_input_collections()
3065 output_collections = work.get_output_collections()
3066 log_collections = work.get_log_collections()
3067
3068 update_collections = []
3069 for coll in input_collections + output_collections + log_collections:
3070 coll.status = CollectionStatus.Open
3071 coll.substatus = CollectionStatus.Open
3072 u_collection = {'coll_id': coll.coll_id,
3073 'status': CollectionStatus.Open,
3074 'substatus': CollectionStatus.Open}
3075 update_collections.append(u_collection)
3076
3077 input_output_maps = get_input_output_maps(request_id, transform_id, work, with_deps=False)
3078 update_contents = reactive_contents(request_id, transform_id, workload_id, work, input_output_maps)
3079
3080 processing['status'] = ProcessingStatus.Running
3081 return processing, update_collections, update_contents