Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:19

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2019 - 2024
0010 
0011 
0012 """
0013 operations related to Transform.
0014 """
0015 
0016 import logging
0017 
0018 # from idds.common import exceptions
0019 
0020 from idds.common.constants import (TransformStatus, ContentRelationType, ContentStatus,
0021                                    TransformLocking, CollectionRelationType, CommandType)
0022 from idds.orm.base.session import read_session, transactional_session
0023 from idds.orm import (transforms as orm_transforms,
0024                       collections as orm_collections,
0025                       contents as orm_contents,
0026                       messages as orm_messages,
0027                       processings as orm_processings)
0028 
0029 
0030 @transactional_session
0031 def add_transform(request_id, workload_id, transform_type, transform_tag=None, priority=0, name=None,
0032                   status=TransformStatus.New, substatus=TransformStatus.New, locking=TransformLocking.Idle,
0033                   new_poll_period=1, update_poll_period=10, retries=0, expired_at=None, transform_metadata=None,
0034                   new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0,
0035                   parent_transform_id=None, previous_transform_id=None, current_processing_id=None,
0036                   internal_id=None, has_previous_conditions=None, loop_index=None,
0037                   parent_internal_id=None, command=CommandType.NoneCommand,
0038                   cloned_from=None, triggered_conditions=None, untriggered_conditions=None,
0039                   site=None, workprogress_id=None, session=None):
0040     """
0041     Add a transform.
0042 
0043     :param request_id: The request id.
0044     :param workload_id: The workload id.
0045     :param transform_type: Transform type.
0046     :param transform_tag: Transform tag.
0047     :param priority: priority.
0048     :param status: Transform status.
0049     :param locking: Transform locking.
0050     :param retries: The number of retries.
0051     :param expired_at: The datetime when it expires.
0052     :param transform_metadata: The metadata as json.
0053 
0054     :raises DuplicatedObject: If a transform with the same name exists.
0055     :raises DatabaseException: If there is a database error.
0056 
0057     :returns: transform id.
0058     """
0059     transform_id = orm_transforms.add_transform(request_id=request_id, workload_id=workload_id,
0060                                                 transform_type=transform_type, transform_tag=transform_tag,
0061                                                 priority=priority, status=status, substatus=substatus,
0062                                                 locking=locking, retries=retries, name=name,
0063                                                 new_poll_period=new_poll_period,
0064                                                 update_poll_period=update_poll_period,
0065                                                 new_retries=new_retries, update_retries=update_retries,
0066                                                 max_new_retries=max_new_retries,
0067                                                 max_update_retries=max_update_retries,
0068                                                 parent_transform_id=parent_transform_id,
0069                                                 previous_transform_id=previous_transform_id,
0070                                                 current_processing_id=current_processing_id,
0071                                                 expired_at=expired_at,
0072                                                 transform_metadata=transform_metadata,
0073                                                 site=site,
0074                                                 command=command,
0075                                                 internal_id=internal_id,
0076                                                 parent_internal_id=parent_internal_id,
0077                                                 has_previous_conditions=has_previous_conditions,
0078                                                 loop_index=loop_index, cloned_from=cloned_from,
0079                                                 triggered_conditions=triggered_conditions,
0080                                                 untriggered_conditions=untriggered_conditions,
0081                                                 workprogress_id=workprogress_id, session=session)
0082     return transform_id
0083 
0084 
0085 @read_session
0086 def get_transform(transform_id, request_id=None, to_json=False, session=None):
0087     """
0088     Get transform or raise a NoObject exception.
0089 
0090     :param transform_id: Transform id.
0091     :param to_json: return json format.
0092     :param session: The database session in use.
0093 
0094     :raises NoObject: If no transform is founded.
0095 
0096     :returns: Transform.
0097     """
0098     return orm_transforms.get_transform(transform_id=transform_id, request_id=request_id, to_json=to_json, session=session)
0099 
0100 
0101 @transactional_session
0102 def get_transform_by_id_status(transform_id, status=None, locking=False, session=None):
0103     tf = orm_transforms.get_transform_by_id_status(transform_id=transform_id, status=status, locking=locking, session=session)
0104     return tf
0105 
0106 
0107 @read_session
0108 def get_transforms_with_input_collection(transform_type, transform_tag, coll_scope, coll_name, to_json=False, session=None):
0109     """
0110     Get transform or raise a NoObject exception.
0111 
0112     :param transform_type: Transform type.
0113     :param transform_tag: Transform tag.
0114     :param coll_scope: The collection scope.
0115     :param coll_name: The collection name.
0116     :param to_json: return json format.
0117     :param session: The database session in use.
0118 
0119     :raises NoObject: If no transform is founded.
0120 
0121     :returns: Transforms.
0122     """
0123     return orm_transforms.get_transforms_with_input_collection(transform_type, transform_tag, coll_scope,
0124                                                                coll_name, to_json=to_json, session=session)
0125 
0126 
0127 @read_session
0128 def get_transform_ids(workprogress_id, request_id=None, workload_id=None, transform_id=None, session=None):
0129     """
0130     Get transform ids or raise a NoObject exception.
0131 
0132     :param workprogress_id: Workprogress id.
0133     :param session: The database session in use.
0134 
0135     :raises NoObject: If no transform is founded.
0136 
0137     :returns: list of transform ids.
0138     """
0139     return orm_transforms.get_transform_ids(workprogress_id=workprogress_id, request_id=request_id,
0140                                             workload_id=workload_id, transform_id=transform_id, session=session)
0141 
0142 
0143 @read_session
0144 def get_transforms(request_id=None, workload_id=None, transform_id=None, loop_index=None, internal_ids=None, to_json=False, session=None):
0145     """
0146     Get transforms or raise a NoObject exception.
0147 
0148     :param workprogress_id: Workprogress id.
0149     :param to_json: return json format.
0150     :param session: The database session in use.
0151 
0152     :raises NoObject: If no transform is founded.
0153 
0154     :returns: list of transform.
0155     """
0156     return orm_transforms.get_transforms(request_id=request_id,
0157                                          workload_id=workload_id,
0158                                          transform_id=transform_id,
0159                                          loop_index=loop_index,
0160                                          internal_ids=internal_ids,
0161                                          to_json=to_json, session=session)
0162 
0163 
0164 @transactional_session
0165 def get_transforms_by_status(status, period=None, locking=False, bulk_size=None, to_json=False, by_substatus=False,
0166                              new_poll=False, update_poll=False, only_return_id=False, min_request_id=None,
0167                              order_by_fifo=False, not_lock=False, next_poll_at=None, session=None):
0168     """
0169     Get transforms or raise a NoObject exception.
0170 
0171     :param status: Transform status or list of transform status.
0172     :param session: The database session in use.
0173     :param locking: Whether to lock retrieved items.
0174     :param to_json: return json format.
0175 
0176     :raises NoObject: If no transform is founded.
0177 
0178     :returns: list of transform.
0179     """
0180     transforms = orm_transforms.get_transforms_by_status(status=status, period=period, locking=locking,
0181                                                          locking_for_update=False, order_by_fifo=order_by_fifo,
0182                                                          bulk_size=bulk_size, to_json=to_json,
0183                                                          new_poll=new_poll, update_poll=update_poll,
0184                                                          only_return_id=only_return_id,
0185                                                          min_request_id=min_request_id, not_lock=not_lock,
0186                                                          by_substatus=by_substatus, session=session)
0187 
0188     return transforms
0189 
0190 
0191 @transactional_session
0192 def update_transform(transform_id, parameters, session=None):
0193     """
0194     update a transform.
0195 
0196     :param transform_id: the transform id.
0197     :param parameters: A dictionary of parameters.
0198     :param session: The database session in use.
0199 
0200     :raises NoObject: If no content is founded.
0201     :raises DatabaseException: If there is a database error.
0202 
0203     """
0204     orm_transforms.update_transform(transform_id=transform_id, parameters=parameters, session=session)
0205 
0206 
0207 @transactional_session
0208 def add_transform_outputs(transform, transform_parameters, input_collections=None, output_collections=None, log_collections=None,
0209                           update_input_collections=None, update_output_collections=None, update_log_collections=None,
0210                           new_contents=None, update_contents=None, new_processing=None, update_processing=None,
0211                           messages=None, update_messages=None, message_bulk_size=10000, session=None):
0212     """
0213     For input contents, add corresponding output contents.
0214 
0215     :param transform: the transform.
0216     :param input_collections: The new input collections.
0217     :param output_collections: The new output collections.
0218     :param log_collections: The new log collections.
0219     :param update_input_collections: The updated input collections.
0220     :param update_output_collections: The updated output collections.
0221     :param update_log_collections: The updated log collections.
0222     :param new_contents: The new contents.
0223     :param update_contents: The updated contents.
0224     :param new_processing: The new processing.
0225     :param messages: Messages.
0226     :param message_bulk_size: The message bulk size.
0227     :param session: The database session in use.
0228 
0229     :raises DatabaseException: If there is a database error.
0230     """
0231     work = transform['transform_metadata']['work']
0232 
0233     new_pr_ids, update_pr_ids = [], []
0234 
0235     if input_collections:
0236         for coll in input_collections:
0237             collection = None
0238             if 'collection' in coll:
0239                 collection = coll['collection']
0240                 del coll['collection']
0241             coll_id = orm_collections.add_collection(**coll, session=session)
0242             if collection:
0243                 # work.set_collection_id(coll, coll_id)
0244                 collection.coll_id = coll_id
0245     if output_collections:
0246         for coll in output_collections:
0247             collection = None
0248             if 'collection' in coll:
0249                 collection = coll['collection']
0250                 del coll['collection']
0251             coll_id = orm_collections.add_collection(**coll, session=session)
0252             if collection:
0253                 # work.set_collection_id(coll, coll_id)
0254                 collection.coll_id = coll_id
0255     if log_collections:
0256         for coll in log_collections:
0257             collection = None
0258             if 'collection' in coll:
0259                 collection = coll['collection']
0260                 del coll['collection']
0261             coll_id = orm_collections.add_collection(**coll, session=session)
0262             if collection:
0263                 # work.set_collection_id(coll, coll_id)
0264                 collection.coll_id = coll_id
0265 
0266     if update_input_collections:
0267         update_input_colls = [coll.collection for coll in update_input_collections]
0268         orm_collections.update_collections(update_input_colls, session=session)
0269     if update_output_collections:
0270         update_output_colls = [coll.collection for coll in update_output_collections]
0271         orm_collections.update_collections(update_output_colls, session=session)
0272     if update_log_collections:
0273         update_log_colls = [coll.collection for coll in update_log_collections]
0274         orm_collections.update_collections(update_log_colls, session=session)
0275 
0276     if new_contents:
0277         orm_contents.add_contents(new_contents, session=session)
0278     if update_contents:
0279         orm_contents.update_contents(update_contents, session=session)
0280 
0281     processing_id = None
0282     if new_processing:
0283         # print(new_processing)
0284         processing_id = orm_processings.add_processing(**new_processing, session=session)
0285         new_pr_ids.append(processing_id)
0286         transform_parameters['current_processing_id'] = processing_id
0287     if update_processing:
0288         for proc_id in update_processing:
0289             orm_processings.update_processing(processing_id=proc_id, parameters=update_processing[proc_id], session=session)
0290             update_pr_ids.append(proc_id)
0291 
0292     if messages:
0293         if not type(messages) in [list, tuple]:
0294             messages = [messages]
0295         # for message in messages:
0296         #     orm_messages.add_message(msg_type=message['msg_type'],
0297         #                              status=message['status'],
0298         #                              source=message['source'],
0299         #                              request_id=message['request_id'],
0300         #                              workload_id=message['workload_id'],
0301         #                              transform_id=message['transform_id'],
0302         #                              num_contents=message['num_contents'],
0303         #                              msg_content=message['msg_content'],
0304         #                              bulk_size=message_bulk_size,
0305         #                              session=session)
0306         logging.debug("message_bulk_size: %s" % str(message_bulk_size))
0307         orm_messages.add_messages(messages, bulk_size=message_bulk_size, session=session)
0308     if update_messages:
0309         orm_messages.update_messages(update_messages, bulk_size=message_bulk_size, session=session)
0310 
0311     if transform:
0312         if processing_id:
0313             # work.set_processing_id(new_processing, processing_id)
0314             if hasattr(work, 'set_processing_id'):
0315                 work.set_processing_id(new_processing['processing_metadata']['processing'], processing_id)
0316         if hasattr(work, 'refresh_work'):
0317             work.refresh_work()
0318         orm_transforms.update_transform(transform_id=transform['transform_id'],
0319                                         parameters=transform_parameters,
0320                                         session=session)
0321     return new_pr_ids, update_pr_ids
0322 
0323 
0324 @transactional_session
0325 def abort_resume_transforms(transform_id=None, request_id=None, abort=False, resume=False, session=None):
0326     """
0327     abort/resume transforms.
0328 
0329     :param request_id: The request id.
0330     :param transform_id: The id of the transform.
0331     :param session: The database session in use.
0332 
0333     :raises NoObject: If no content is founded.
0334     :raises DatabaseException: If there is a database error.
0335     """
0336     orm_transforms.abort_resume_transforms(transform_id=transform_id, request_id=request_id, abort=abort, resume=resume, session=session)
0337 
0338 
0339 @transactional_session
0340 def delete_transform(transform_id=None, session=None):
0341     """
0342     delete a transform.
0343 
0344     :param transform_id: The id of the transform.
0345     :param session: The database session in use.
0346 
0347     :raises NoObject: If no content is founded.
0348     :raises DatabaseException: If there is a database error.
0349     """
0350     orm_transforms.delete_transform(transform_id=transform_id, session=session)
0351 
0352 
0353 @transactional_session
0354 def clean_locking(time_period=3600, min_request_id=None, health_items=[], force=False, hostname=None, pid=None, session=None):
0355     """
0356     Clearn locking which is older than time period.
0357 
0358     :param time_period in seconds
0359     """
0360     orm_transforms.clean_locking(time_period=time_period, min_request_id=min_request_id, health_items=health_items,
0361                                  force=force, hostname=hostname, pid=pid, session=session)
0362 
0363 
0364 @transactional_session
0365 def clean_next_poll_at(status, session=None):
0366     """
0367     Clearn next_poll_at.
0368 
0369     :param status: status of the transform
0370     """
0371     orm_transforms.clean_next_poll_at(status=status, session=session)
0372 
0373 
0374 @read_session
0375 def get_transform_input_output_maps(request_id, transform_id, input_coll_ids, output_coll_ids, log_coll_ids=[], with_sub_map_id=False, is_es=False, with_deps=True, page_num=None, page_size=None, status=None, match_content_ext=False, session=None):
0376     """
0377     Get transform input output maps.
0378 
0379     :param request_id: request id (used for virtual table partitioning).
0380     :param transform_id: transform id.
0381     :param page_num: page number (0-based) for paginated retrieval.
0382     :param page_size: number of distinct map_ids per page.
0383     """
0384     contents = orm_contents.get_contents_by_request_transform(request_id=request_id, transform_id=transform_id, with_deps=with_deps, page_num=page_num, page_size=page_size, status=status, by_map=True, match_content_ext=match_content_ext, session=session)
0385     ret = {}
0386     for content in contents:
0387         map_id = content['map_id']
0388         sub_map_id = content['sub_map_id']
0389         if not with_sub_map_id:
0390             if is_es:
0391                 sub_map_id = content['sub_map_id']
0392                 path = content['path']
0393                 if map_id not in ret:
0394                     ret[map_id] = {'inputs_dependency': [], 'inputs': [], 'outputs': [], 'logs': [], 'others': [],
0395                                    'es_name': path, 'sub_maps': {}}
0396             elif map_id not in ret:
0397                 ret[map_id] = {'inputs_dependency': [], 'inputs': [], 'outputs': [], 'logs': [], 'others': []}
0398         else:
0399             sub_map_id = content['sub_map_id']
0400             if map_id not in ret:
0401                 ret[map_id] = {}
0402             if sub_map_id not in ret[map_id]:
0403                 ret[map_id][sub_map_id] = {'inputs_dependency': [], 'inputs': [], 'outputs': [], 'logs': [], 'others': []}
0404         """
0405         if content['coll_id'] in input_coll_ids:
0406             ret[map_id]['inputs'].append(content)
0407         elif content['coll_id'] in output_coll_ids:
0408             ret[map_id]['outputs'].append(content)
0409         elif content['coll_id'] in log_coll_ids:
0410             ret[map_id]['logs'].append(content)
0411         else:
0412             ret[map_id]['others'].append(content)
0413         """
0414         if not with_sub_map_id:
0415             if content['content_relation_type'] == ContentRelationType.Input:
0416                 ret[map_id]['inputs'].append(content)
0417             elif content['content_relation_type'] == ContentRelationType.InputDependency:
0418                 ret[map_id]['inputs_dependency'].append(content)
0419             elif content['content_relation_type'] == ContentRelationType.Output:
0420                 ret[map_id]['outputs'].append(content)
0421 
0422                 if is_es:
0423                     sub_map_id = content['sub_map_id']
0424                     if sub_map_id not in ret[map_id]['sub_maps'][sub_map_id]:
0425                         ret[map_id]['sub_maps'][sub_map_id] = []
0426                     ret[map_id]['sub_maps'][sub_map_id].append(content)
0427             elif content['content_relation_type'] == ContentRelationType.Log:
0428                 ret[map_id]['logs'].append(content)
0429             else:
0430                 ret[map_id]['others'].append(content)
0431         else:
0432             if content['content_relation_type'] == ContentRelationType.Input:
0433                 ret[map_id][sub_map_id]['inputs'].append(content)
0434             elif content['content_relation_type'] == ContentRelationType.InputDependency:
0435                 ret[map_id][sub_map_id]['inputs_dependency'].append(content)
0436             elif content['content_relation_type'] == ContentRelationType.Output:
0437                 ret[map_id][sub_map_id]['outputs'].append(content)
0438             elif content['content_relation_type'] == ContentRelationType.Log:
0439                 ret[map_id][sub_map_id]['logs'].append(content)
0440             else:
0441                 ret[map_id][sub_map_id]['others'].append(content)
0442     return ret
0443 
0444 
0445 def release_inputs(to_release_inputs):
0446     update_contents = []
0447     for to_release in to_release_inputs:
0448         contents = orm_contents.get_input_contents(request_id=to_release['request_id'],
0449                                                    coll_id=to_release['coll_id'],
0450                                                    name=to_release['name'])
0451         for content in contents:
0452             if content['content_relation_type'] == ContentRelationType.InputDependency:
0453                 update_content = {'content_id': content['content_id'],
0454                                   'substatus': to_release['substatus'],
0455                                   'status': to_release['status']}
0456                 update_contents.append(update_content)
0457     return update_contents
0458 
0459 
0460 def release_inputs_by_collection_old(to_release_inputs):
0461     update_contents = []
0462     for coll_id in to_release_inputs:
0463         to_release_contents = to_release_inputs[coll_id]
0464         if to_release_contents:
0465             to_release = to_release_contents[0]
0466             to_release_names_available = []
0467             to_release_names_fake_available = []
0468             to_release_names_final_failed = []
0469             to_release_names_missing = []
0470             for to_release_content in to_release_contents:
0471                 if (to_release_content['status'] in [ContentStatus.Available]            # noqa: W503
0472                    or to_release_content['substatus'] in [ContentStatus.Available]):    # noqa: W503
0473                     to_release_names_available.append(to_release_content['name'])
0474                 elif (to_release_content['status'] in [ContentStatus.FakeAvailable]            # noqa: W503
0475                      or to_release_content['substatus'] in [ContentStatus.FakeAvailable]):    # noqa: W503, E128
0476                     to_release_names_fake_available.append(to_release_content['name'])
0477                 elif (to_release_content['status'] in [ContentStatus.FinalFailed]            # noqa: W503
0478                      or to_release_content['substatus'] in [ContentStatus.FinalFailed]):    # noqa: W503, E128
0479                     to_release_names_final_failed.append(to_release_content['name'])
0480                 elif (to_release_content['status'] in [ContentStatus.Missing]            # noqa: W503
0481                       or to_release_content['substatus'] in [ContentStatus.Missing]):    # noqa: W503
0482                     to_release_names_missing.append(to_release_content['name'])
0483             contents = orm_contents.get_input_contents(request_id=to_release['request_id'],
0484                                                        coll_id=to_release['coll_id'],
0485                                                        name=None)
0486 
0487             for content in contents:
0488                 if (content['content_relation_type'] == ContentRelationType.InputDependency):    # noqa: W503
0489                     if (content['status'] not in [ContentStatus.Available]                       # noqa: W503
0490                        and content['name'] in to_release_names_available):                          # noqa: W503
0491                         update_content = {'content_id': content['content_id'],
0492                                           'substatus': ContentStatus.Available,
0493                                           'status': ContentStatus.Available}
0494                         update_contents.append(update_content)
0495                     elif (content['status'] not in [ContentStatus.FakeAvailable]                     # noqa: W503
0496                           and content['name'] in to_release_names_fake_available):                        # noqa: W503
0497                         update_content = {'content_id': content['content_id'],
0498                                           'substatus': ContentStatus.FakeAvailable,
0499                                           'status': ContentStatus.FakeAvailable}
0500                         update_contents.append(update_content)
0501                     elif (content['status'] not in [ContentStatus.FinalFailed]                     # noqa: W503
0502                           and content['name'] in to_release_names_final_failed):                        # noqa: W503
0503                         update_content = {'content_id': content['content_id'],
0504                                           'substatus': ContentStatus.FinalFailed,
0505                                           'status': ContentStatus.FinalFailed}
0506                         update_contents.append(update_content)
0507                     elif (content['status'] not in [ContentStatus.Missing]                     # noqa: W503
0508                           and content['name'] in to_release_names_missing):                        # noqa: W503
0509                         update_content = {'content_id': content['content_id'],
0510                                           'substatus': ContentStatus.Missing,
0511                                           'status': ContentStatus.Missing}
0512                         update_contents.append(update_content)
0513     return update_contents
0514 
0515 
0516 def release_inputs_by_collection(to_release_inputs, final=False):
0517     update_contents = []
0518     status_to_check = [ContentStatus.Available, ContentStatus.FakeAvailable, ContentStatus.FinalFailed, ContentStatus.Missing]
0519     for coll_id in to_release_inputs:
0520         to_release_contents = to_release_inputs[coll_id]
0521         if to_release_contents:
0522             to_release_status = {}
0523             for to_release_content in to_release_contents:
0524                 if (to_release_content['status'] in status_to_check):
0525                     to_release_status[to_release_content['name']] = to_release_content['status']
0526                 elif (to_release_content['substatus'] in status_to_check):
0527                     to_release_status[to_release_content['name']] = to_release_content['substatus']
0528 
0529             # print("to_release_status: %s" % str(to_release_status))
0530 
0531             contents = orm_contents.get_input_contents(request_id=to_release_contents[0]['request_id'],
0532                                                        coll_id=to_release_contents[0]['coll_id'],
0533                                                        name=None)
0534             # print("contents: %s" % str(contents))
0535 
0536             unfinished_contents_dict = {}
0537             for content in contents:
0538                 if (content['content_relation_type'] == ContentRelationType.InputDependency):    # noqa: W503
0539                     if content['status'] not in status_to_check:
0540                         if content['name'] not in unfinished_contents_dict:
0541                             unfinished_contents_dict[content['name']] = []
0542                         content_short = {'content_id': content['content_id'], 'status': content['status']}
0543                         unfinished_contents_dict[content['name']].append(content_short)
0544 
0545             intersection_keys = to_release_status.keys() & unfinished_contents_dict.keys()
0546             intersection_keys = list(intersection_keys)
0547             logging.debug("release_inputs_by_collection(coll_id: %s): intersection_keys[:10]: %s" % (coll_id, str(intersection_keys[:10])))
0548 
0549             for name in intersection_keys:
0550                 matched_content_status = to_release_status[name]
0551                 matched_contents = unfinished_contents_dict[name]
0552                 for matched_content in matched_contents:
0553                     if (matched_content['status'] != matched_content_status):
0554                         update_content = {'content_id': matched_content['content_id'],
0555                                           'substatus': matched_content_status,
0556                                           'status': matched_content_status}
0557                         update_contents.append(update_content)
0558 
0559     return update_contents
0560 
0561 
0562 def poll_inputs_dependency_by_collection(unfinished_inputs):
0563     update_contents = []
0564     status_to_check = [ContentStatus.Available, ContentStatus.FakeAvailable, ContentStatus.FinalFailed, ContentStatus.Missing]
0565     for coll_id in unfinished_inputs:
0566         unfinished_contents = unfinished_inputs[coll_id]
0567         contents = orm_contents.get_input_contents(request_id=unfinished_contents[0]['request_id'],
0568                                                    coll_id=unfinished_contents[0]['coll_id'],
0569                                                    name=None)
0570 
0571         logging.debug("poll_inputs_dependency_by_collection(coll_id: %s): unfinished_contents[:10]: %s" % (coll_id, str(unfinished_contents[:10])))
0572 
0573         to_release_status = {}
0574         for content in contents:
0575             if (content['content_relation_type'] == ContentRelationType.Output):    # noqa: W503
0576                 if content['status'] in status_to_check:
0577                     to_release_status[content['name']] = content['status']
0578                 elif content['substatus'] in status_to_check:
0579                     to_release_status[content['name']] = content['substatus']
0580 
0581         unfinished_contents_dict = {}
0582         for content in unfinished_contents:
0583             if content['name'] not in unfinished_contents_dict:
0584                 unfinished_contents_dict[content['name']] = []
0585             content_short = {'content_id': content['content_id'], 'status': content['status']}
0586             unfinished_contents_dict[content['name']].append(content_short)
0587 
0588         intersection_keys = to_release_status.keys() & unfinished_contents_dict.keys()
0589         intersection_keys = list(intersection_keys)
0590         logging.debug("poll_inputs_dependency_by_collection(coll_id: %s): intersection_keys[:10]: %s" % (coll_id, str(intersection_keys[:10])))
0591 
0592         for name in intersection_keys:
0593             matched_content_status = to_release_status[name]
0594             matched_contents = unfinished_contents_dict[name]
0595             for matched_content in matched_contents:
0596                 if (matched_content['status'] != matched_content_status):
0597                     update_content = {'content_id': matched_content['content_id'],
0598                                       'substatus': matched_content_status,
0599                                       'status': matched_content_status}
0600                     update_contents.append(update_content)
0601 
0602         # if len(unfinished_contents_dict.keys()) < len(to_release_status.keys()):
0603         #     for name, content in unfinished_contents_dict.items():
0604         #         if name in to_release_status:
0605         #             matched_content_status = to_release_status[name]
0606         #             if (content['status'] != matched_content_status):
0607         #                 update_content = {'content_id': content['content_id'],
0608         #                                   'substatus': matched_content_status,
0609         #                                   'status': matched_content_status}
0610         #                 update_contents.append(update_content)
0611         # else:
0612         #     for name, status in to_release_status.items():
0613         #         if name in unfinished_contents_dict:
0614         #             matched_content = unfinished_contents_dict[name]
0615         #             if (matched_content['status'] != status):
0616         #                 update_content = {'content_id': matched_content['content_id'],
0617         #                                   'substatus': status,
0618         #                                   'status': status}
0619         #                 update_contents.append(update_content)
0620 
0621         # for content in unfinished_contents:
0622         #     if content['name'] in to_release_status:
0623         #         matched_content_status = to_release_status[content['name']]
0624         #         if (content['status'] != matched_content_status):
0625         #             update_content = {'content_id': content['content_id'],
0626         #                               'substatus': matched_content_status,
0627         #                               'status': matched_content_status}
0628         #             update_contents.append(update_content)
0629 
0630     return update_contents
0631 
0632 
0633 def get_work_name_to_coll_map(request_id):
0634     tfs = orm_transforms.get_transforms(request_id=request_id)
0635     colls = orm_collections.get_collections(request_id=request_id)
0636     work_name_to_coll_map = {}
0637     for tf in tfs:
0638         if ('transform_metadata' in tf and tf['transform_metadata']
0639            and 'work_name' in tf['transform_metadata'] and tf['transform_metadata']['work_name']):  # noqa: W503
0640             work_name = tf['transform_metadata']['work_name']
0641             transform_id = tf['transform_id']
0642             if work_name not in work_name_to_coll_map:
0643                 work_name_to_coll_map[work_name] = {'inputs': [], 'outputs': []}
0644             for coll in colls:
0645                 if coll['transform_id'] == transform_id:
0646                     if coll['relation_type'] == CollectionRelationType.Input:
0647                         work_name_to_coll_map[work_name]['inputs'].append({'coll_id': coll['coll_id'], 'transform_id': coll['transform_id'],
0648                                                                            'workload_id': coll['workload_id'],
0649                                                                            'scope': coll['scope'], 'name': coll['name']})
0650                     elif coll['relation_type'] == CollectionRelationType.Output:
0651                         work_name_to_coll_map[work_name]['outputs'].append({'coll_id': coll['coll_id'], 'transform_id': coll['transform_id'],
0652                                                                             'workload_id': coll['workload_id'],
0653                                                                             'scope': coll['scope'], 'name': coll['name']})
0654     return work_name_to_coll_map
0655 
0656 
0657 @read_session
0658 def get_num_active_transforms(active_status=None, session=None):
0659     return orm_transforms.get_num_active_transforms(active_status=active_status, session=session)
0660 
0661 
0662 @read_session
0663 def get_active_transforms(active_status=None, session=None):
0664     return orm_transforms.get_active_transforms(active_status=active_status, session=session)