File indexing completed on 2026-04-09 07:58:19
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 operations related to Processings.
0014 """
0015
0016 from idds.orm.base.session import read_session, transactional_session
0017 from idds.common.constants import CommandType, ProcessingStatus, ProcessingType, GranularityType, ContentRelationType
0018 from idds.common.utils import get_list_chunks
0019 from idds.orm import (processings as orm_processings,
0020 collections as orm_collections,
0021 contents as orm_contents,
0022 messages as orm_messages,
0023 transforms as orm_transforms)
0024
0025
0026 @transactional_session
0027 def add_processing(request_id, workload_id, transform_id, status, submitter=None,
0028 substatus=ProcessingStatus.New, granularity=None,
0029 granularity_type=GranularityType.File,
0030 processing_type=ProcessingType.Workflow,
0031 site=None,
0032 command=CommandType.NoneCommand,
0033 new_poll_period=1, update_poll_period=10,
0034 internal_id=None, parent_internal_id=None, loop_index=None,
0035 new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0,
0036 expired_at=None, processing_metadata=None, session=None):
0037 """
0038 Add a processing.
0039
0040 :param request_id: The request id.
0041 :param workload_id: The workload id.
0042 :param transform_id: Transform id.
0043 :param status: processing status.
0044 :param submitter: submitter name.
0045 :param granularity: Granularity size.
0046 :param granularity_type: Granularity type.
0047 :param expired_at: The datetime when it expires.
0048 :param processing_metadata: The metadata as json.
0049
0050 :raises DuplicatedObject: If a processing with the same name exists.
0051 :raises DatabaseException: If there is a database error.
0052
0053 :returns: processing id.
0054 """
0055 return orm_processings.add_processing(request_id=request_id, workload_id=workload_id, transform_id=transform_id,
0056 status=status, substatus=substatus, submitter=submitter,
0057 granularity=granularity, granularity_type=granularity_type,
0058 site=site,
0059 new_poll_period=new_poll_period,
0060 update_poll_period=update_poll_period,
0061 new_retries=new_retries, update_retries=update_retries,
0062 max_new_retries=max_new_retries,
0063 max_update_retries=max_update_retries,
0064 processing_type=processing_type,
0065 command=command,
0066 internal_id=internal_id, parent_internal_id=parent_internal_id,
0067 loop_index=loop_index,
0068 expired_at=expired_at, processing_metadata=processing_metadata,
0069 session=session)
0070
0071
0072 @read_session
0073 def get_processing(processing_id=None, request_id=None, transform_id=None, to_json=False, session=None):
0074 """
0075 Get processing or raise a NoObject exception.
0076
0077 :param processing_id: Processing id.
0078 :param to_json: return json format.
0079 :param session: The database session in use.
0080
0081 :raises NoObject: If no processing is founded.
0082
0083 :returns: Processing.
0084 """
0085 return orm_processings.get_processing(processing_id=processing_id, request_id=request_id,
0086 transform_id=transform_id, to_json=to_json, session=session)
0087
0088
0089 @read_session
0090 def get_processings(request_id=None, workload_id=None, transform_id=None, loop_index=None, internal_ids=None,
0091 site=None, parent_internal_ids=None, to_json=False, session=None):
0092 """
0093 Get processing or raise a NoObject exception.
0094
0095 :param processing_id: Processing id.
0096 :param to_json: return json format.
0097 :param session: The database session in use.
0098
0099 :raises NoObject: If no processing is founded.
0100
0101 :returns: Processing.
0102 """
0103 prs = orm_processings.get_processings(
0104 request_id=request_id,
0105 workload_id=workload_id,
0106 transform_id=transform_id,
0107 loop_index=loop_index,
0108 internal_ids=internal_ids,
0109 site=site,
0110 to_json=to_json, session=session
0111 )
0112 if not prs or not parent_internal_ids:
0113 return prs
0114
0115 if not isinstance(parent_internal_ids, (list, tuple)):
0116 parent_internal_ids = parent_internal_ids.split(",")
0117
0118 ret_prs = []
0119 for pr in prs:
0120 pr_parent_internal_id = pr['parent_internal_id']
0121 if pr_parent_internal_id:
0122 pr_parent_internal_id = pr_parent_internal_id.split(",")
0123 if any(pid in parent_internal_ids for pid in pr_parent_internal_id):
0124 ret_prs.append(pr)
0125 return ret_prs
0126
0127
0128 @read_session
0129 def get_processings_by_transform_id(transform_id=None, to_json=False, session=None):
0130 """
0131 Get processings or raise a NoObject exception.
0132
0133 :param tranform_id: Transform id.
0134 :param to_json: return json format.
0135 :param session: The database session in use.
0136
0137 :raises NoObject: If no processing is founded.
0138
0139 :returns: Processings.
0140 """
0141 return orm_processings.get_processings_by_transform_id(transform_id=transform_id, to_json=to_json, session=session)
0142
0143
0144 @transactional_session
0145 def get_processing_by_id_status(processing_id, status=None, exclude_status=None, locking=False, to_lock=False, lock_period=None, session=None):
0146
0147 pr = orm_processings.get_processing_by_id_status(processing_id=processing_id, status=status,
0148 exclude_status=exclude_status, locking=locking,
0149 to_lock=to_lock, session=session)
0150 return pr
0151
0152
0153 @transactional_session
0154 def get_processings_by_status(status, time_period=None, locking=False, bulk_size=None, to_json=False, by_substatus=False,
0155 not_lock=False, next_poll_at=None, for_poller=False, only_return_id=False,
0156 min_request_id=None, locking_for_update=False, new_poll=False, update_poll=False, session=None):
0157 """
0158 Get processing or raise a NoObject exception.
0159
0160 :param status: Processing status of list of processing status.
0161 :param time_period: Time period in seconds.
0162 :param locking: Whether to retrieve only unlocked items and lock them.
0163 :param to_json: return json format.
0164 :param session: The database session in use.
0165
0166 :raises NoObject: If no processing is founded.
0167
0168 :returns: Processings.
0169 """
0170 processings = orm_processings.get_processings_by_status(status=status, period=time_period, locking=locking,
0171 bulk_size=bulk_size, to_json=to_json,
0172 locking_for_update=locking_for_update,
0173 new_poll=new_poll, update_poll=update_poll,
0174 only_return_id=only_return_id,
0175 min_request_id=min_request_id, not_lock=not_lock,
0176 by_substatus=by_substatus, for_poller=for_poller, session=session)
0177
0178 return processings
0179
0180
0181 @transactional_session
0182 def update_processing(processing_id, parameters, session=None):
0183 """
0184 update a processing.
0185
0186 :param processing_id: the transform id.
0187 :param parameters: A dictionary of parameters.
0188 :param session: The database session in use.
0189
0190 :raises NoObject: If no content is founded.
0191 :raises DatabaseException: If there is a database error.
0192
0193 """
0194 return orm_processings.update_processing(processing_id=processing_id, parameters=parameters, session=session)
0195
0196
0197 @transactional_session
0198 def abort_resume_processings(transform_id=None, request_id=None, processing_id=None, abort=False, resume=False, session=None):
0199 """
0200 abort/resume processings.
0201
0202 :param request_id: The request id.
0203 :param transform_id: The id of the transform.
0204 :param session: The database session in use.
0205
0206 :raises NoObject: If no content is founded.
0207 :raises DatabaseException: If there is a database error.
0208 """
0209 orm_processings.abort_resume_processings(
0210 transform_id=transform_id, request_id=request_id, processing_id=processing_id, abort=abort, resume=resume, session=session
0211 )
0212
0213
0214 @transactional_session
0215 def delete_processing(processing_id=None, session=None):
0216 """
0217 delete a processing.
0218
0219 :param processing_id: The id of the processing.
0220 :param session: The database session in use.
0221
0222 :raises NoObject: If no processing is founded.
0223 :raises DatabaseException: If there is a database error.
0224 """
0225 return orm_processings.delete_processing(processing_id=processing_id, session=session)
0226
0227
0228 @transactional_session
0229 def update_processing_with_collection_contents(updated_processing, new_processing=None, updated_collection=None,
0230 updated_files=None, new_files=None,
0231 coll_msg_content=None, file_msg_content=None, transform_updates=None,
0232 message_bulk_size=1000, session=None):
0233 """
0234 Update processing with collection, contents, file messages and collection messages.
0235
0236 :param updated_processing: dict with processing id and parameters.
0237 :param updated_collection: dict with collection id and parameters.
0238 :param updated_files: list of content files.
0239 :param coll_msg_content: message with collection info.
0240 :param file_msg_content: message with files info.
0241 """
0242 if updated_files:
0243 orm_contents.update_contents(updated_files, session=session)
0244 if new_files:
0245 orm_contents.add_contents(contents=new_files, session=session)
0246 if file_msg_content:
0247 if not type(file_msg_content) in [list, tuple]:
0248 file_msg_content = [file_msg_content]
0249 for file_msg_con in file_msg_content:
0250 orm_messages.add_message(msg_type=file_msg_con['msg_type'],
0251 status=file_msg_con['status'],
0252 source=file_msg_con['source'],
0253 transform_id=file_msg_con['transform_id'],
0254 num_contents=file_msg_con['num_contents'],
0255 msg_content=file_msg_con['msg_content'],
0256 bulk_size=message_bulk_size,
0257 session=session)
0258 if updated_collection:
0259 orm_collections.update_collection(coll_id=updated_collection['coll_id'],
0260 parameters=updated_collection['parameters'],
0261 session=session)
0262 if coll_msg_content:
0263 orm_messages.add_message(msg_type=coll_msg_content['msg_type'],
0264 status=coll_msg_content['status'],
0265 source=coll_msg_content['source'],
0266 transform_id=coll_msg_content['transform_id'],
0267 num_contents=coll_msg_content['num_contents'],
0268 msg_content=coll_msg_content['msg_content'],
0269 session=session)
0270 if updated_processing:
0271 orm_processings.update_processing(processing_id=updated_processing['processing_id'],
0272 parameters=updated_processing['parameters'],
0273 session=session)
0274 if new_processing:
0275 orm_processings.add_processing(**new_processing, session=session)
0276 if transform_updates:
0277 orm_transforms.update_transform(transform_id=transform_updates['transform_id'],
0278 parameters=transform_updates['parameters'],
0279 session=session)
0280
0281
0282 def resolve_input_dependency_id(new_input_dependency_contents, request_id=None, session=None):
0283 has_missing_dep = False
0284 coll_ids = []
0285 for content in new_input_dependency_contents:
0286 if 'sub_map_id' not in content or content['sub_map_id'] is None:
0287 content['sub_map_id'] = 0
0288 if not content['content_dep_id'] and content['coll_id'] not in coll_ids:
0289 coll_ids.append(content['coll_id'])
0290 if not coll_ids:
0291 return new_input_dependency_contents, has_missing_dep
0292
0293 contents = orm_contents.get_contents(coll_id=coll_ids, request_id=request_id, relation_type=ContentRelationType.Output, session=session)
0294 content_name_id_map = {}
0295 for content in contents:
0296 if content['coll_id'] not in content_name_id_map:
0297 content_name_id_map[content['coll_id']] = {}
0298 if content['name'] not in content_name_id_map[content['coll_id']]:
0299 content_name_id_map[content['coll_id']][content['name']] = {}
0300
0301
0302
0303 content_name_id_map[content['coll_id']][content['name']] = content['content_id']
0304
0305 for content in new_input_dependency_contents:
0306
0307
0308
0309
0310 if not content['content_dep_id'] and content['coll_id'] in content_name_id_map and content['name'] in content_name_id_map[content['coll_id']]:
0311 content_dep_id = content_name_id_map[content['coll_id']][content['name']]
0312 content['content_dep_id'] = content_dep_id
0313 content['name'] = str(content_dep_id)
0314 else:
0315 has_missing_dep = True
0316 return new_input_dependency_contents, has_missing_dep
0317
0318
0319 def fix_input_dependency_contents(request_id=None, transform_id=None, session=None):
0320 input_dependency_contents = orm_contents.get_contents(request_id=request_id, transform_id=transform_id,
0321 relation_type=ContentRelationType.InputDependency,
0322 without_content_dep_id=True, session=session)
0323 coll_ids = []
0324 for content in input_dependency_contents:
0325 if content['coll_id'] not in coll_ids:
0326 coll_ids.append(content['coll_id'])
0327 if not coll_ids:
0328 return []
0329 contents = orm_contents.get_contents(coll_id=coll_ids, request_id=request_id, relation_type=ContentRelationType.Output, session=session)
0330 content_name_id_map = {}
0331 for content in contents:
0332 if content['coll_id'] not in content_name_id_map:
0333 content_name_id_map[content['coll_id']] = {}
0334 if content['name'] not in content_name_id_map[content['coll_id']]:
0335 content_name_id_map[content['coll_id']][content['name']] = {}
0336 content_name_id_map[content['coll_id']][content['name']] = content['content_id']
0337
0338 to_update_input_dependency_contents = []
0339 for content in input_dependency_contents:
0340 if 'content_dep_id' not in content or content['content_dep_id'] is None or content['content_dep_id'] == 0:
0341 if content['coll_id'] in content_name_id_map and content['name'] in content_name_id_map[content['coll_id']]:
0342 content_dep_id = content_name_id_map[content['coll_id']][content['name']]
0343 to_update = {'content_id': content['content_id'], 'content_dep_id': content_dep_id}
0344 to_update_input_dependency_contents.append(to_update)
0345 return to_update_input_dependency_contents
0346
0347
0348 @transactional_session
0349 def update_processing_contents(update_processing, update_contents=None, update_messages=None, new_contents=None,
0350 update_dep_contents=None, update_collections=None, messages=None,
0351 new_update_contents=None, new_input_dependency_contents=None,
0352 new_contents_ext=None, update_contents_ext=None,
0353 request_id=None, transform_id=None, use_bulk_update_mappings=True,
0354 fix_missing_content_dep_id=False, message_bulk_size=2000, session=None):
0355 """
0356 Update processing with contents.
0357
0358 :param update_processing: dict with processing id and parameters.
0359 :param update_contents: list of content files.
0360 """
0361
0362
0363
0364 num_added_contents = 0
0365 num_updated_contents = 0
0366 num_added_messages = 0
0367 num_updated_messages = 0
0368 if new_contents_ext:
0369 chunks = get_list_chunks(new_contents_ext)
0370 for chunk in chunks:
0371 orm_contents.add_contents_ext(chunk, session=session)
0372 num_added_contents += len(new_contents_ext)
0373 has_missing_dep = False
0374 if new_input_dependency_contents:
0375 new_input_dependency_contents, has_missing_dep = resolve_input_dependency_id(new_input_dependency_contents, request_id=request_id, session=session)
0376 chunks = get_list_chunks(new_input_dependency_contents)
0377 for chunk in chunks:
0378 orm_contents.add_contents(chunk, session=session)
0379 num_added_contents += len(new_input_dependency_contents)
0380 if new_update_contents:
0381
0382
0383 chunks = get_list_chunks(new_update_contents)
0384 for chunk in chunks:
0385 orm_contents.add_contents_update(chunk, session=session)
0386 num_updated_contents += len(new_update_contents)
0387
0388 pass
0389 if messages:
0390 if not type(messages) in [list, tuple]:
0391 messages = [messages]
0392 orm_messages.add_messages(messages, bulk_size=message_bulk_size, session=session)
0393 num_added_messages += len(messages)
0394 if new_contents:
0395 chunks = get_list_chunks(new_contents)
0396 for chunk in chunks:
0397 orm_contents.add_contents(chunk, session=session)
0398 num_added_contents += len(new_contents)
0399
0400
0401
0402 if has_missing_dep or fix_missing_content_dep_id:
0403 to_update_input_dependency_contents = fix_input_dependency_contents(request_id=request_id, transform_id=transform_id, session=session)
0404 if to_update_input_dependency_contents:
0405 chunks = get_list_chunks(to_update_input_dependency_contents)
0406 for chunk in chunks:
0407 orm_contents.update_contents(chunk, request_id=request_id, transform_id=transform_id,
0408 use_bulk_update_mappings=False, grouping=False, session=session)
0409 num_updated_contents += len(to_update_input_dependency_contents)
0410
0411
0412 if update_contents_ext:
0413 chunks = get_list_chunks(update_contents_ext)
0414 for chunk in chunks:
0415 orm_contents.update_contents_ext(chunk, request_id=request_id, transform_id=transform_id,
0416 use_bulk_update_mappings=use_bulk_update_mappings, session=session)
0417 num_updated_contents += len(update_contents_ext)
0418
0419 if update_dep_contents:
0420 request_id, update_dep_contents_status_name, update_dep_contents_status = update_dep_contents
0421 for status_name in update_dep_contents_status_name:
0422 status = update_dep_contents_status_name[status_name]
0423 status_content_ids = update_dep_contents_status[status_name]
0424 if status_content_ids:
0425 chunks = get_list_chunks(status_content_ids)
0426 for chunk in chunks:
0427 orm_contents.update_dep_contents(request_id, chunk, status, session=session)
0428 num_updated_contents += len(status_content_ids)
0429 if update_contents:
0430 chunks = get_list_chunks(update_contents)
0431 for chunk in chunks:
0432 orm_contents.update_contents(chunk, request_id=request_id, transform_id=transform_id,
0433 use_bulk_update_mappings=use_bulk_update_mappings, session=session)
0434 num_updated_contents += len(update_contents)
0435 if update_collections:
0436 orm_collections.update_collections(update_collections, session=session)
0437
0438 if update_messages:
0439 chunks = get_list_chunks(update_messages)
0440 for chunk in chunks:
0441 orm_messages.update_messages(chunk, bulk_size=message_bulk_size, request_id=request_id, transform_id=transform_id,
0442 use_bulk_update_mappings=use_bulk_update_mappings, session=session)
0443 num_updated_messages += len(update_messages)
0444
0445 if update_processing:
0446 orm_processings.update_processing(processing_id=update_processing['processing_id'],
0447 parameters=update_processing['parameters'],
0448 session=session)
0449 return num_added_contents, num_updated_contents, num_added_messages, num_updated_messages
0450
0451
0452 @transactional_session
0453 def clean_locking(time_period=3600, min_request_id=None, health_items=[], force=False, hostname=None, pid=None, session=None):
0454 """
0455 Clearn locking which is older than time period.
0456
0457 :param time_period in seconds
0458 """
0459 return orm_processings.clean_locking(time_period=time_period, min_request_id=min_request_id, health_items=health_items,
0460 force=force, hostname=hostname, pid=pid, session=session)
0461
0462
0463 @transactional_session
0464 def clean_next_poll_at(status, session=None):
0465 """
0466 Clearn next_poll_at.
0467
0468 :param status: status of the processing
0469 """
0470 orm_processings.clean_next_poll_at(status=status, session=session)
0471
0472
0473 @read_session
0474 def get_num_active_processings(active_status=None, session=None):
0475 return orm_processings.get_num_active_processings(active_status=active_status, session=session)
0476
0477
0478 @read_session
0479 def get_active_processings(active_status=None, session=None):
0480 return orm_processings.get_active_processings(active_status=active_status, session=session)