Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:19

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2019 - 2025
0010 
0011 
0012 """
0013 operations related to Processings.
0014 """
0015 
0016 from idds.orm.base.session import read_session, transactional_session
0017 from idds.common.constants import CommandType, ProcessingStatus, ProcessingType, GranularityType, ContentRelationType
0018 from idds.common.utils import get_list_chunks
0019 from idds.orm import (processings as orm_processings,
0020                       collections as orm_collections,
0021                       contents as orm_contents,
0022                       messages as orm_messages,
0023                       transforms as orm_transforms)
0024 
0025 
0026 @transactional_session
0027 def add_processing(request_id, workload_id, transform_id, status, submitter=None,
0028                    substatus=ProcessingStatus.New, granularity=None,
0029                    granularity_type=GranularityType.File,
0030                    processing_type=ProcessingType.Workflow,
0031                    site=None,
0032                    command=CommandType.NoneCommand,
0033                    new_poll_period=1, update_poll_period=10,
0034                    internal_id=None, parent_internal_id=None, loop_index=None,
0035                    new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0,
0036                    expired_at=None, processing_metadata=None, session=None):
0037     """
0038     Add a processing.
0039 
0040     :param request_id: The request id.
0041     :param workload_id: The workload id.
0042     :param transform_id: Transform id.
0043     :param status: processing status.
0044     :param submitter: submitter name.
0045     :param granularity: Granularity size.
0046     :param granularity_type: Granularity type.
0047     :param expired_at: The datetime when it expires.
0048     :param processing_metadata: The metadata as json.
0049 
0050     :raises DuplicatedObject: If a processing with the same name exists.
0051     :raises DatabaseException: If there is a database error.
0052 
0053     :returns: processing id.
0054     """
0055     return orm_processings.add_processing(request_id=request_id, workload_id=workload_id, transform_id=transform_id,
0056                                           status=status, substatus=substatus, submitter=submitter,
0057                                           granularity=granularity, granularity_type=granularity_type,
0058                                           site=site,
0059                                           new_poll_period=new_poll_period,
0060                                           update_poll_period=update_poll_period,
0061                                           new_retries=new_retries, update_retries=update_retries,
0062                                           max_new_retries=max_new_retries,
0063                                           max_update_retries=max_update_retries,
0064                                           processing_type=processing_type,
0065                                           command=command,
0066                                           internal_id=internal_id, parent_internal_id=parent_internal_id,
0067                                           loop_index=loop_index,
0068                                           expired_at=expired_at, processing_metadata=processing_metadata,
0069                                           session=session)
0070 
0071 
0072 @read_session
0073 def get_processing(processing_id=None, request_id=None, transform_id=None, to_json=False, session=None):
0074     """
0075     Get processing or raise a NoObject exception.
0076 
0077     :param processing_id: Processing id.
0078     :param to_json: return json format.
0079     :param session: The database session in use.
0080 
0081     :raises NoObject: If no processing is founded.
0082 
0083     :returns: Processing.
0084     """
0085     return orm_processings.get_processing(processing_id=processing_id, request_id=request_id,
0086                                           transform_id=transform_id, to_json=to_json, session=session)
0087 
0088 
0089 @read_session
0090 def get_processings(request_id=None, workload_id=None, transform_id=None, loop_index=None, internal_ids=None,
0091                     site=None, parent_internal_ids=None, to_json=False, session=None):
0092     """
0093     Get processing or raise a NoObject exception.
0094 
0095     :param processing_id: Processing id.
0096     :param to_json: return json format.
0097     :param session: The database session in use.
0098 
0099     :raises NoObject: If no processing is founded.
0100 
0101     :returns: Processing.
0102     """
0103     prs = orm_processings.get_processings(
0104         request_id=request_id,
0105         workload_id=workload_id,
0106         transform_id=transform_id,
0107         loop_index=loop_index,
0108         internal_ids=internal_ids,
0109         site=site,
0110         to_json=to_json, session=session
0111     )
0112     if not prs or not parent_internal_ids:
0113         return prs
0114 
0115     if not isinstance(parent_internal_ids, (list, tuple)):
0116         parent_internal_ids = parent_internal_ids.split(",")
0117 
0118     ret_prs = []
0119     for pr in prs:
0120         pr_parent_internal_id = pr['parent_internal_id']
0121         if pr_parent_internal_id:
0122             pr_parent_internal_id = pr_parent_internal_id.split(",")
0123             if any(pid in parent_internal_ids for pid in pr_parent_internal_id):
0124                 ret_prs.append(pr)
0125     return ret_prs
0126 
0127 
0128 @read_session
0129 def get_processings_by_transform_id(transform_id=None, to_json=False, session=None):
0130     """
0131     Get processings or raise a NoObject exception.
0132 
0133     :param tranform_id: Transform id.
0134     :param to_json: return json format.
0135     :param session: The database session in use.
0136 
0137     :raises NoObject: If no processing is founded.
0138 
0139     :returns: Processings.
0140     """
0141     return orm_processings.get_processings_by_transform_id(transform_id=transform_id, to_json=to_json, session=session)
0142 
0143 
0144 @transactional_session
0145 def get_processing_by_id_status(processing_id, status=None, exclude_status=None, locking=False, to_lock=False, lock_period=None, session=None):
0146     # pr = orm_processings.get_processing_by_id_status(processing_id=processing_id, status=status, locking=locking, session=session)
0147     pr = orm_processings.get_processing_by_id_status(processing_id=processing_id, status=status,
0148                                                      exclude_status=exclude_status, locking=locking,
0149                                                      to_lock=to_lock, session=session)
0150     return pr
0151 
0152 
0153 @transactional_session
0154 def get_processings_by_status(status, time_period=None, locking=False, bulk_size=None, to_json=False, by_substatus=False,
0155                               not_lock=False, next_poll_at=None, for_poller=False, only_return_id=False,
0156                               min_request_id=None, locking_for_update=False, new_poll=False, update_poll=False, session=None):
0157     """
0158     Get processing or raise a NoObject exception.
0159 
0160     :param status: Processing status of list of processing status.
0161     :param time_period: Time period in seconds.
0162     :param locking: Whether to retrieve only unlocked items and lock them.
0163     :param to_json: return json format.
0164     :param session: The database session in use.
0165 
0166     :raises NoObject: If no processing is founded.
0167 
0168     :returns: Processings.
0169     """
0170     processings = orm_processings.get_processings_by_status(status=status, period=time_period, locking=locking,
0171                                                             bulk_size=bulk_size, to_json=to_json,
0172                                                             locking_for_update=locking_for_update,
0173                                                             new_poll=new_poll, update_poll=update_poll,
0174                                                             only_return_id=only_return_id,
0175                                                             min_request_id=min_request_id, not_lock=not_lock,
0176                                                             by_substatus=by_substatus, for_poller=for_poller, session=session)
0177 
0178     return processings
0179 
0180 
0181 @transactional_session
0182 def update_processing(processing_id, parameters, session=None):
0183     """
0184     update a processing.
0185 
0186     :param processing_id: the transform id.
0187     :param parameters: A dictionary of parameters.
0188     :param session: The database session in use.
0189 
0190     :raises NoObject: If no content is founded.
0191     :raises DatabaseException: If there is a database error.
0192 
0193     """
0194     return orm_processings.update_processing(processing_id=processing_id, parameters=parameters, session=session)
0195 
0196 
0197 @transactional_session
0198 def abort_resume_processings(transform_id=None, request_id=None, processing_id=None, abort=False, resume=False, session=None):
0199     """
0200     abort/resume processings.
0201 
0202     :param request_id: The request id.
0203     :param transform_id: The id of the transform.
0204     :param session: The database session in use.
0205 
0206     :raises NoObject: If no content is founded.
0207     :raises DatabaseException: If there is a database error.
0208     """
0209     orm_processings.abort_resume_processings(
0210         transform_id=transform_id, request_id=request_id, processing_id=processing_id, abort=abort, resume=resume, session=session
0211     )
0212 
0213 
0214 @transactional_session
0215 def delete_processing(processing_id=None, session=None):
0216     """
0217     delete a processing.
0218 
0219     :param processing_id: The id of the processing.
0220     :param session: The database session in use.
0221 
0222     :raises NoObject: If no processing is founded.
0223     :raises DatabaseException: If there is a database error.
0224     """
0225     return orm_processings.delete_processing(processing_id=processing_id, session=session)
0226 
0227 
0228 @transactional_session
0229 def update_processing_with_collection_contents(updated_processing, new_processing=None, updated_collection=None,
0230                                                updated_files=None, new_files=None,
0231                                                coll_msg_content=None, file_msg_content=None, transform_updates=None,
0232                                                message_bulk_size=1000, session=None):
0233     """
0234     Update processing with collection, contents, file messages and collection messages.
0235 
0236     :param updated_processing: dict with processing id and parameters.
0237     :param updated_collection: dict with collection id and parameters.
0238     :param updated_files: list of content files.
0239     :param coll_msg_content: message with collection info.
0240     :param file_msg_content: message with files info.
0241     """
0242     if updated_files:
0243         orm_contents.update_contents(updated_files, session=session)
0244     if new_files:
0245         orm_contents.add_contents(contents=new_files, session=session)
0246     if file_msg_content:
0247         if not type(file_msg_content) in [list, tuple]:
0248             file_msg_content = [file_msg_content]
0249         for file_msg_con in file_msg_content:
0250             orm_messages.add_message(msg_type=file_msg_con['msg_type'],
0251                                      status=file_msg_con['status'],
0252                                      source=file_msg_con['source'],
0253                                      transform_id=file_msg_con['transform_id'],
0254                                      num_contents=file_msg_con['num_contents'],
0255                                      msg_content=file_msg_con['msg_content'],
0256                                      bulk_size=message_bulk_size,
0257                                      session=session)
0258     if updated_collection:
0259         orm_collections.update_collection(coll_id=updated_collection['coll_id'],
0260                                           parameters=updated_collection['parameters'],
0261                                           session=session)
0262     if coll_msg_content:
0263         orm_messages.add_message(msg_type=coll_msg_content['msg_type'],
0264                                  status=coll_msg_content['status'],
0265                                  source=coll_msg_content['source'],
0266                                  transform_id=coll_msg_content['transform_id'],
0267                                  num_contents=coll_msg_content['num_contents'],
0268                                  msg_content=coll_msg_content['msg_content'],
0269                                  session=session)
0270     if updated_processing:
0271         orm_processings.update_processing(processing_id=updated_processing['processing_id'],
0272                                           parameters=updated_processing['parameters'],
0273                                           session=session)
0274     if new_processing:
0275         orm_processings.add_processing(**new_processing, session=session)
0276     if transform_updates:
0277         orm_transforms.update_transform(transform_id=transform_updates['transform_id'],
0278                                         parameters=transform_updates['parameters'],
0279                                         session=session)
0280 
0281 
0282 def resolve_input_dependency_id(new_input_dependency_contents, request_id=None, session=None):
0283     has_missing_dep = False
0284     coll_ids = []
0285     for content in new_input_dependency_contents:
0286         if 'sub_map_id' not in content or content['sub_map_id'] is None:
0287             content['sub_map_id'] = 0
0288         if not content['content_dep_id'] and content['coll_id'] not in coll_ids:
0289             coll_ids.append(content['coll_id'])
0290     if not coll_ids:
0291         return new_input_dependency_contents, has_missing_dep
0292 
0293     contents = orm_contents.get_contents(coll_id=coll_ids, request_id=request_id, relation_type=ContentRelationType.Output, session=session)
0294     content_name_id_map = {}
0295     for content in contents:
0296         if content['coll_id'] not in content_name_id_map:
0297             content_name_id_map[content['coll_id']] = {}
0298         if content['name'] not in content_name_id_map[content['coll_id']]:
0299             content_name_id_map[content['coll_id']][content['name']] = {}
0300         # if content['map_id'] not in content_name_id_map[content['coll_id']][content['name']]:
0301         #     content_name_id_map[content['coll_id']][content['name']][content['map_id']] = {}
0302         # content_name_id_map[content['coll_id']][content['name']][content['sub_map_id']] = content['content_id']
0303         content_name_id_map[content['coll_id']][content['name']] = content['content_id']
0304 
0305     for content in new_input_dependency_contents:
0306         # dep_sub_map_id = content.get("dep_sub_map_id", 0)
0307         # if dep_sub_map_id is None:
0308         #     dep_sub_map_id = 0
0309         # content_dep_id = content_name_id_map[content['coll_id']][content['name']][dep_sub_map_id]
0310         if not content['content_dep_id'] and content['coll_id'] in content_name_id_map and content['name'] in content_name_id_map[content['coll_id']]:
0311             content_dep_id = content_name_id_map[content['coll_id']][content['name']]
0312             content['content_dep_id'] = content_dep_id
0313             content['name'] = str(content_dep_id)
0314         else:
0315             has_missing_dep = True
0316     return new_input_dependency_contents, has_missing_dep
0317 
0318 
0319 def fix_input_dependency_contents(request_id=None, transform_id=None, session=None):
0320     input_dependency_contents = orm_contents.get_contents(request_id=request_id, transform_id=transform_id,
0321                                                           relation_type=ContentRelationType.InputDependency,
0322                                                           without_content_dep_id=True, session=session)
0323     coll_ids = []
0324     for content in input_dependency_contents:
0325         if content['coll_id'] not in coll_ids:
0326             coll_ids.append(content['coll_id'])
0327     if not coll_ids:
0328         return []
0329     contents = orm_contents.get_contents(coll_id=coll_ids, request_id=request_id, relation_type=ContentRelationType.Output, session=session)
0330     content_name_id_map = {}
0331     for content in contents:
0332         if content['coll_id'] not in content_name_id_map:
0333             content_name_id_map[content['coll_id']] = {}
0334         if content['name'] not in content_name_id_map[content['coll_id']]:
0335             content_name_id_map[content['coll_id']][content['name']] = {}
0336         content_name_id_map[content['coll_id']][content['name']] = content['content_id']
0337 
0338     to_update_input_dependency_contents = []
0339     for content in input_dependency_contents:
0340         if 'content_dep_id' not in content or content['content_dep_id'] is None or content['content_dep_id'] == 0:
0341             if content['coll_id'] in content_name_id_map and content['name'] in content_name_id_map[content['coll_id']]:
0342                 content_dep_id = content_name_id_map[content['coll_id']][content['name']]
0343                 to_update = {'content_id': content['content_id'], 'content_dep_id': content_dep_id}
0344                 to_update_input_dependency_contents.append(to_update)
0345     return to_update_input_dependency_contents
0346 
0347 
0348 @transactional_session
0349 def update_processing_contents(update_processing, update_contents=None, update_messages=None, new_contents=None,
0350                                update_dep_contents=None, update_collections=None, messages=None,
0351                                new_update_contents=None, new_input_dependency_contents=None,
0352                                new_contents_ext=None, update_contents_ext=None,
0353                                request_id=None, transform_id=None, use_bulk_update_mappings=True,
0354                                fix_missing_content_dep_id=False, message_bulk_size=2000, session=None):
0355     """
0356     Update processing with contents.
0357 
0358     :param update_processing: dict with processing id and parameters.
0359     :param update_contents: list of content files.
0360     """
0361     # new_update_contents, new_contents_ext, new_input_dependency_contents and then new_contents
0362     # make sure new_contents the last one: when a process is killed, the session may break consistency.
0363     # new_contents is used to check not inserted contents
0364     num_added_contents = 0
0365     num_updated_contents = 0
0366     num_added_messages = 0
0367     num_updated_messages = 0
0368     if new_contents_ext:
0369         chunks = get_list_chunks(new_contents_ext)
0370         for chunk in chunks:
0371             orm_contents.add_contents_ext(chunk, session=session)
0372         num_added_contents += len(new_contents_ext)
0373     has_missing_dep = False
0374     if new_input_dependency_contents:
0375         new_input_dependency_contents, has_missing_dep = resolve_input_dependency_id(new_input_dependency_contents, request_id=request_id, session=session)
0376         chunks = get_list_chunks(new_input_dependency_contents)
0377         for chunk in chunks:
0378             orm_contents.add_contents(chunk, session=session)
0379         num_added_contents += len(new_input_dependency_contents)
0380     if new_update_contents:
0381         # first add and then delete, to trigger the trigger 'update_content_dep_status'.
0382         # too slow
0383         chunks = get_list_chunks(new_update_contents)
0384         for chunk in chunks:
0385             orm_contents.add_contents_update(chunk, session=session)
0386         num_updated_contents += len(new_update_contents)
0387         # orm_contents.delete_contents_update(session=session)
0388         pass
0389     if messages:
0390         if not type(messages) in [list, tuple]:
0391             messages = [messages]
0392         orm_messages.add_messages(messages, bulk_size=message_bulk_size, session=session)
0393         num_added_messages += len(messages)
0394     if new_contents:
0395         chunks = get_list_chunks(new_contents)
0396         for chunk in chunks:
0397             orm_contents.add_contents(chunk, session=session)
0398         num_added_contents += len(new_contents)
0399     # fix input_dependency_contents without content_dep_id.
0400     # It happens when dependency content is added after the input_dependency_contents are added,
0401     # when there are dependencies inside one task between different jobs.
0402     if has_missing_dep or fix_missing_content_dep_id:
0403         to_update_input_dependency_contents = fix_input_dependency_contents(request_id=request_id, transform_id=transform_id, session=session)
0404         if to_update_input_dependency_contents:
0405             chunks = get_list_chunks(to_update_input_dependency_contents)
0406             for chunk in chunks:
0407                 orm_contents.update_contents(chunk, request_id=request_id, transform_id=transform_id,
0408                                              use_bulk_update_mappings=False, grouping=False, session=session)
0409             num_updated_contents += len(to_update_input_dependency_contents)
0410 
0411     # update contents, keep the order
0412     if update_contents_ext:
0413         chunks = get_list_chunks(update_contents_ext)
0414         for chunk in chunks:
0415             orm_contents.update_contents_ext(chunk, request_id=request_id, transform_id=transform_id,
0416                                              use_bulk_update_mappings=use_bulk_update_mappings, session=session)
0417         num_updated_contents += len(update_contents_ext)
0418 
0419     if update_dep_contents:
0420         request_id, update_dep_contents_status_name, update_dep_contents_status = update_dep_contents
0421         for status_name in update_dep_contents_status_name:
0422             status = update_dep_contents_status_name[status_name]
0423             status_content_ids = update_dep_contents_status[status_name]
0424             if status_content_ids:
0425                 chunks = get_list_chunks(status_content_ids)
0426                 for chunk in chunks:
0427                     orm_contents.update_dep_contents(request_id, chunk, status, session=session)
0428                 num_updated_contents += len(status_content_ids)
0429     if update_contents:
0430         chunks = get_list_chunks(update_contents)
0431         for chunk in chunks:
0432             orm_contents.update_contents(chunk, request_id=request_id, transform_id=transform_id,
0433                                          use_bulk_update_mappings=use_bulk_update_mappings, session=session)
0434         num_updated_contents += len(update_contents)
0435     if update_collections:
0436         orm_collections.update_collections(update_collections, session=session)
0437 
0438     if update_messages:
0439         chunks = get_list_chunks(update_messages)
0440         for chunk in chunks:
0441             orm_messages.update_messages(chunk, bulk_size=message_bulk_size, request_id=request_id, transform_id=transform_id,
0442                                          use_bulk_update_mappings=use_bulk_update_mappings, session=session)
0443         num_updated_messages += len(update_messages)
0444 
0445     if update_processing:
0446         orm_processings.update_processing(processing_id=update_processing['processing_id'],
0447                                           parameters=update_processing['parameters'],
0448                                           session=session)
0449     return num_added_contents, num_updated_contents, num_added_messages, num_updated_messages
0450 
0451 
0452 @transactional_session
0453 def clean_locking(time_period=3600, min_request_id=None, health_items=[], force=False, hostname=None, pid=None, session=None):
0454     """
0455     Clearn locking which is older than time period.
0456 
0457     :param time_period in seconds
0458     """
0459     return orm_processings.clean_locking(time_period=time_period, min_request_id=min_request_id, health_items=health_items,
0460                                          force=force, hostname=hostname, pid=pid, session=session)
0461 
0462 
0463 @transactional_session
0464 def clean_next_poll_at(status, session=None):
0465     """
0466     Clearn next_poll_at.
0467 
0468     :param status: status of the processing
0469     """
0470     orm_processings.clean_next_poll_at(status=status, session=session)
0471 
0472 
0473 @read_session
0474 def get_num_active_processings(active_status=None, session=None):
0475     return orm_processings.get_num_active_processings(active_status=active_status, session=session)
0476 
0477 
0478 @read_session
0479 def get_active_processings(active_status=None, session=None):
0480     return orm_processings.get_active_processings(active_status=active_status, session=session)