Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:19

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2019 - 2025
0010 
0011 
0012 """
0013 operations related to Catalog(Collections and Contents).
0014 """
0015 
0016 
0017 from idds.common import exceptions
0018 from idds.common.constants import (CollectionType, CollectionStatus, CollectionLocking,
0019                                    CollectionRelationType, ContentType, ContentStatus,
0020                                    ContentLocking, ContentRelationType)
0021 from idds.orm.base.session import read_session, transactional_session
0022 from idds.orm import (transforms as orm_transforms,
0023                       collections as orm_collections,
0024                       contents as orm_contents,
0025                       messages as orm_messages)
0026 
0027 
0028 @transactional_session
0029 def get_collections_by_status(status, relation_type=CollectionRelationType.Input, time_period=None,
0030                               locking=False, bulk_size=None, to_json=False, session=None):
0031     """
0032     Get collections by status, relation_type and time_period or raise a NoObject exception.
0033 
0034     :param status: The collection status.
0035     :param relation_type: The relation_type of the collection to the transform.
0036     :param time_period: time period in seconds since last update.
0037     :param locking: Whether to retrieve unlocked files and lock them.
0038     :param to_json: return json format.
0039 
0040     :param session: The database session in use.
0041 
0042     :raises NoObject: If no collections are founded.
0043 
0044     :returns: list of Collections.
0045     """
0046     colls = orm_collections.get_collections_by_status(status=status, relation_type=relation_type, bulk_size=bulk_size,
0047                                                       time_period=time_period, locking=locking, to_json=to_json,
0048                                                       session=session)
0049 
0050     if locking:
0051         parameters = {'locking': CollectionLocking.Locking}
0052         for coll in colls:
0053             orm_collections.update_collection(coll_id=coll['coll_id'], parameters=parameters, session=session)
0054     return colls
0055 
0056 
0057 @read_session
0058 def get_collections(scope=None, name=None, request_id=None, workload_id=None, transform_id=None,
0059                     relation_type=None, to_json=False, session=None):
0060     """
0061     Get collections by scope, name, request_id and workload id.
0062 
0063     :param scope: scope of the collection.
0064     :param name: name the the collection.
0065     :param request_id: the request id.
0066     :param workload_id: The workload_id of the request.
0067     :param transform_id: The transform id related to this collection.
0068     :param relation_type: The relation between this collection and its transform,
0069                           such as Input, Output, Log and so on.
0070     :param to_json: return json format.
0071     :param session: The database session in use.
0072 
0073     :returns: dict of collections
0074     """
0075     collections = orm_collections.get_collections(scope=scope, name=name, request_id=request_id,
0076                                                   workload_id=workload_id, transform_id=transform_id,
0077                                                   to_json=to_json,
0078                                                   relation_type=relation_type, session=session)
0079     return collections
0080 
0081 
0082 @read_session
0083 def get_collections_by_request_ids(request_ids, session=None):
0084     """"
0085     Get collections by a list of request ids.
0086 
0087     :param request_ids: list of request ids.
0088 
0089     :return collections: list of collections.
0090     """
0091     return orm_collections.get_collections_by_request_ids(request_ids)
0092 
0093 
0094 @transactional_session
0095 def add_collection(request_id, workload_id, scope, name, coll_type=CollectionType.Dataset, transform_id=None,
0096                    relation_type=CollectionRelationType.Input, bytes=0, status=CollectionStatus.New,
0097                    total_files=0, new_files=0, processing_files=0, processed_files=0, retries=0,
0098                    expired_at=None, coll_metadata=None, session=None):
0099     """
0100     Add a collection.
0101 
0102     :param scope: The scope of the request data.
0103     :param name: The name of the request data.
0104     :param coll_type: The type of dataset as dataset or container.
0105     :param request_id: The request id.
0106     :param workload_id: The workload id.
0107     :param transform_id: The transform id related to this collection.
0108     :param relation_type: The relation between this collection and its transform,
0109                           such as Input, Output, Log and so on.
0110     :param bytes: The size of the collection.
0111     :param status: The status.
0112     :param total_files: Number of total files.
0113     :param retries: Number of retries.
0114     :param expired_at: The datetime when it expires.
0115     :param coll_metadata: The metadata as json.
0116 
0117     :raises DuplicatedObject: If a collection with the same name exists.
0118     :raises DatabaseException: If there is a database error.
0119 
0120     :returns: collection id.
0121     """
0122     orm_collections.add_collection(request_id=request_id, workload_id=workload_id,
0123                                    scope=scope, name=name, coll_type=coll_type,
0124                                    transform_id=transform_id, relation_type=relation_type,
0125                                    bytes=bytes, status=status, total_files=total_files,
0126                                    new_files=new_files, processing_files=processing_files,
0127                                    processed_files=processed_files, retries=retries, expired_at=expired_at,
0128                                    coll_metadata=coll_metadata, session=session)
0129 
0130 
0131 @transactional_session
0132 def update_collection(coll_id, parameters, msg=None, session=None):
0133     """
0134     update a collection.
0135 
0136     :param coll_id: the collection id.
0137     :param parameters: A dictionary of parameters.
0138     :param msg: messages.
0139     :param session: The database session in use.
0140 
0141     :raises NoObject: If no request is founded.
0142     :raises DatabaseException: If there is a database error.
0143 
0144     """
0145     orm_collections.update_collection(coll_id=coll_id, parameters=parameters, session=session)
0146 
0147     if msg:
0148         orm_messages.add_message(msg_type=msg['msg_type'],
0149                                  status=msg['status'],
0150                                  source=msg['source'],
0151                                  transform_id=msg['transform_id'],
0152                                  num_contents=msg['num_contents'],
0153                                  msg_content=msg['msg_content'],
0154                                  session=session)
0155 
0156 
0157 @read_session
0158 def get_collection(coll_id=None, transform_id=None, relation_type=None, to_json=False, session=None):
0159     """
0160     Get a collection or raise a NoObject exception.
0161 
0162     :param coll_id: The id of the collection.
0163     :param transform_id: The transform id related to this collection.
0164     :param relation_type: The relation between this collection and its transform,
0165                           such as Input, Output, Log and so on.
0166     :param to_json: return json format.
0167     :param session: The database session in use.
0168 
0169     :raises NoObject: If no request is founded.
0170 
0171     :returns: Collection.
0172     """
0173     return orm_collections.get_collection(coll_id=coll_id, transform_id=transform_id,
0174                                           relation_type=relation_type, to_json=to_json,
0175                                           session=session)
0176 
0177 
0178 @transactional_session
0179 def add_content(request_id, workload_id, transform_id, coll_id, map_id, scope, name, min_id=0, max_id=0,
0180                 content_type=ContentType.File, status=ContentStatus.New, content_relation_type=ContentRelationType.Input,
0181                 bytes=0, md5=None, adler32=None, processing_id=None, storage_id=None, retries=0,
0182                 locking=ContentLocking.Idle, path=None, expired_at=None, content_metadata=None, session=None):
0183     """
0184     Add a content.
0185 
0186     :param request_id: The request id.
0187     :param workload_id: The workload id.
0188     :param transform_id: transform id.
0189     :param coll_id: collection id.
0190     :param map_id: The id to map inputs to outputs.
0191     :param scope: The scope of the request data.
0192     :param name: The name of the request data.
0193     :param min_id: The minimal id of the content.
0194     :param max_id: The maximal id of the content.
0195     :param content_type: The type of the content.
0196     :param status: content status.
0197     :param bytes: The size of the content.
0198     :param md5: md5 checksum.
0199     :param alder32: adler32 checksum.
0200     :param processing_id: The processing id.
0201     :param storage_id: The storage id.
0202     :param retries: The number of retries.
0203     :param path: The content path.
0204     :param expired_at: The datetime when it expires.
0205     :param content_metadata: The metadata as json.
0206 
0207     :raises DuplicatedObject: If a collection with the same name exists.
0208     :raises DatabaseException: If there is a database error.
0209 
0210     :returns: content id.
0211     """
0212     return orm_contents.add_content(request_id=request_id, workload_id=workload_id, transform_id=transform_id,
0213                                     coll_id=coll_id, map_id=map_id, scope=scope, name=name, min_id=min_id,
0214                                     max_id=max_id, content_type=content_type, status=status,
0215                                     content_relation_type=content_relation_type, bytes=bytes, md5=md5,
0216                                     adler32=adler32, processing_id=processing_id, storage_id=storage_id,
0217                                     retries=retries, locking=locking, path=path, expired_at=expired_at,
0218                                     content_metadata=content_metadata, session=session)
0219 
0220 
0221 @transactional_session
0222 def add_contents(contents, bulk_size=1000, session=None):
0223     """
0224     Add contents.
0225 
0226     :param contents: dict of contents.
0227     :param bulk_size: bulk per insert to db.
0228     :param session: session.
0229 
0230     :raises DuplicatedObject: If a collection with the same name exists.
0231     :raises DatabaseException: If there is a database error.
0232 
0233     :returns: content id.
0234     """
0235     return orm_contents.add_contents(contents=contents, bulk_size=bulk_size,
0236                                      session=session)
0237 
0238 
0239 @transactional_session
0240 def update_input_collection_with_contents(coll, parameters, contents, bulk_size=1000, session=None):
0241     """
0242     update a collection.
0243 
0244     :param coll_id: the collection id.
0245     :param parameters: A dictionary of parameters.
0246     :param contents: dict of contents.
0247     :param bulk_size: bulk per insert to db.
0248     :param session: The database session in use.
0249 
0250     :raises NoObject: If no request is founded.
0251     :raises DatabaseException: If there is a database error.
0252 
0253     :returns new contents
0254     """
0255     new_files = 0
0256     processed_files = 0
0257     avail_contents = orm_contents.get_contents(coll_id=coll['coll_id'], session=session)
0258     avail_contents_dict = {}
0259     for content in avail_contents:
0260         key = '%s:%s:%s:%s' % (content['scope'], content['name'], content['min_id'], content['max_id'])
0261         avail_contents_dict[key] = content
0262         if content['status'] in [ContentStatus.Mapped, ContentStatus.Mapped.value]:
0263             processed_files += 1
0264         if content['status'] in [ContentStatus.New, ContentStatus.New.value]:
0265             new_files += 1
0266 
0267     to_addes = []
0268     # to_updates = []
0269     for content in contents:
0270         key = '%s:%s:%s:%s' % (content['scope'], content['name'], content['min_id'], content['max_id'])
0271         if key in avail_contents_dict:
0272             """
0273             to_update = {'content_id': content['content_id'],
0274                          'status': content['status']}
0275             if 'bytes' in content:
0276                 to_update['bytes'] = content['bytes']
0277             if 'md5' in content:
0278                 to_update['md5'] = content['md5']
0279             if 'adler32' in content:
0280                 to_update['adler32'] = content['adler32']
0281             if 'expired_at' in content:
0282                 to_update['expired_at'] = content['expired_at']
0283             to_updates.append(to_updated)
0284             # not to do anything, no need to update
0285             """
0286             pass
0287         else:
0288             to_addes.append(content)
0289 
0290     # there are new files
0291     if to_addes:
0292         add_contents(to_addes, bulk_size=bulk_size, session=session)
0293 
0294     if 'total_files' in parameters:
0295         total_files = parameters['total_files']
0296     else:
0297         total_files = coll['total_files']
0298     parameters['processed_files'] = processed_files
0299     parameters['new_files'] = new_files
0300     if processed_files == total_files:
0301         parameters['status'] = CollectionStatus.Closed
0302 
0303     update_collection(coll['coll_id'], parameters, session=session)
0304     return to_addes
0305 
0306 
0307 @read_session
0308 def get_input_output_map_count(request_id, transform_id, session=None):
0309     """Return the number of distinct map_ids (jobs) for the given transform."""
0310     return orm_contents.get_input_output_map_count(request_id=request_id, transform_id=transform_id, session=session)
0311 
0312 
0313 @read_session
0314 def get_content_name_to_id_map(request_id, transform_id, es=False, session=None):
0315     """Return a lightweight {name: [content_id, ...]} map for Input and Output contents."""
0316     return orm_contents.get_content_name_to_id_map(request_id=request_id, transform_id=transform_id, es=es, session=session)
0317 
0318 
0319 @read_session
0320 def has_input_contents_without_external_id(request_id, transform_id, session=None):
0321     """
0322     Check whether any Input content for the given transform is missing an external_content_id.
0323 
0324     Returns True if all Input contents have external_content_id set, False otherwise.
0325     request_id is included because the database uses it for virtual table partitioning.
0326     """
0327     return orm_contents.has_input_contents_without_external_id(request_id=request_id, transform_id=transform_id, session=session)
0328 
0329 
0330 @transactional_session
0331 def update_contents(parameters, request_id=None, transform_id=None, use_bulk_update_mappings=True, session=None):
0332     """
0333     updatecontents.
0334 
0335     :param parameters: list of dictionary of parameters.
0336     :param session: The database session in use.
0337 
0338     :raises NoObject: If no content is founded.
0339     :raises DatabaseException: If there is a database error.
0340 
0341     """
0342     return orm_contents.update_contents(parameters, request_id=request_id, transform_id=transform_id,
0343                                         use_bulk_update_mappings=use_bulk_update_mappings, session=session)
0344 
0345 
0346 @transactional_session
0347 def update_content(content_id, parameters, session=None):
0348     """
0349     update a content.
0350 
0351     :param content_id: the content id.
0352     :param parameters: A dictionary of parameters.
0353     :param session: The database session in use.
0354 
0355     :raises NoObject: If no content is founded.
0356     :raises DatabaseException: If there is a database error.
0357 
0358     """
0359     return orm_contents.update_content(content_id, parameters, session=session)
0360 
0361 
0362 @read_session
0363 def get_contents(coll_scope=None, coll_name=None, coll_id=[], request_id=None, workload_id=None, transform_id=None,
0364                  relation_type=None, content_relation_type=None, status=None, to_json=False, session=None):
0365     """
0366     Get contents with collection scope, collection name, request id, workload id and relation type.
0367 
0368     :param coll_scope: scope of the collection.
0369     :param coll_name: name the the collection.
0370     :param request_id: the request id.
0371     :param workload_id: The workload_id of the request.
0372     :param transform_id: The transform id related to this collection.
0373     :param relation_type: The relation type between the collection and transform: input, outpu, logs and etc.
0374     :param to_json: return json format.
0375     :param session: The database session in use.
0376 
0377     :returns: list of contents
0378     """
0379     if not coll_id:
0380         collections = get_collections(scope=coll_scope, name=coll_name, request_id=request_id,
0381                                       workload_id=workload_id, transform_id=transform_id,
0382                                       relation_type=relation_type, to_json=to_json, session=session)
0383 
0384         coll_ids = [coll['coll_id'] for coll in collections]
0385     else:
0386         coll_ids = coll_id
0387 
0388     if coll_ids:
0389         if content_relation_type is None:
0390             if relation_type is None:
0391                 content_relation_type = None
0392             else:
0393                 if relation_type == CollectionRelationType.Output:
0394                     content_relation_type = ContentRelationType.Output
0395                 elif relation_type == CollectionRelationType.Input:
0396                     content_relation_type = ContentRelationType.Input
0397                 elif relation_type == CollectionRelationType.Log:
0398                     content_relation_type = ContentRelationType.Log
0399         rets = orm_contents.get_contents(request_id=request_id, transform_id=transform_id, coll_id=coll_ids, status=status,
0400                                          to_json=to_json, relation_type=content_relation_type, session=session)
0401     else:
0402         rets = []
0403     return rets
0404 
0405 
0406 @read_session
0407 def get_contents_by_request_transform(request_id=None, workload_id=None, transform_id=None, status=None, map_id=None, status_updated=False, by_map=False, match_content_ext=False, session=None):
0408     """
0409     Get contents with request id, workload id and transform id.
0410 
0411     :param request_id: the request id.
0412     :param workload_id: The workload_id of the request.
0413     :param transform_id: The transform id related to this collection.
0414     :param session: The database session in use.
0415 
0416     :returns: list of contents
0417     """
0418     ret = orm_contents.get_contents_by_request_transform(request_id=request_id, transform_id=transform_id,
0419                                                          workload_id=workload_id, status=status, map_id=map_id,
0420                                                          status_updated=status_updated, by_map=by_map,
0421                                                          match_content_ext=match_content_ext, session=session)
0422     return ret
0423 
0424 
0425 @read_session
0426 def get_contents_by_content_ids(content_ids, request_id=None, session=None):
0427     """
0428     Get content or raise a NoObject exception.
0429 
0430     :param request_id: request id.
0431     :param content_ids: list of content id.
0432     :param workload_id: workload id.
0433 
0434     :param session: The database session in use.
0435 
0436     :raises NoObject: If no content is founded.
0437 
0438     :returns: list of contents.
0439     """
0440     ret = orm_contents.get_contents_by_content_ids(content_ids=content_ids, request_id=request_id, session=session)
0441     return ret
0442 
0443 
0444 @read_session
0445 def get_contents_by_coll_id_status(coll_id, status=None, to_json=False, session=None):
0446     """
0447     Get contents or raise a NoObject exception.
0448 
0449     :param coll_id: Collection id.
0450     :param status: Content status or list of content status.
0451     :param to_json: return json format.
0452     :param session: The database session in use.
0453 
0454     :raises NoObject: If no content is founded.
0455 
0456     :returns: list of contents.
0457     """
0458     return orm_contents.get_contents(coll_id=coll_id, status=status, to_json=to_json, session=session)
0459 
0460 
0461 @transactional_session
0462 def register_output_contents(coll_scope, coll_name, contents, request_id=None, workload_id=None,
0463                              relation_type=CollectionRelationType.Output, session=None):
0464     """
0465     register contents with collection scope, collection name, request id, workload id and contents.
0466 
0467     :param coll_scope: scope of the collection.
0468     :param coll_name: name the the collection.
0469     :param request_id: the request id.
0470     :param workload_id: The workload_id of the request.
0471     :param contents: list of contents [{'scope': <scope>, 'name': <name>, 'min_id': min_id, 'max_id': max_id,
0472                                         'status': <status>, 'path': <path>}].
0473     :param session: The database session in use.
0474     """
0475     transform_ids = orm_transforms.get_transform_ids(request_id=request_id,
0476                                                      workload_id=workload_id,
0477                                                      session=session)
0478 
0479     if transform_ids:
0480         collections = orm_collections.get_collections(scope=coll_scope, name=coll_name, transform_id=transform_ids,
0481                                                       relation_type=relation_type, session=session)
0482     else:
0483         collections = []
0484 
0485     coll_def = "request_id=%s, workload_id=%s, coll_scope=%s" % (request_id, workload_id, coll_scope)
0486     coll_def += ", coll_name=%s, relation_type: %s" % (coll_name, relation_type)
0487 
0488     if len(collections) != 1:
0489         msg = "There should be only one collection matched. However there are %s collections" % len(collections)
0490         msg += coll_def
0491         raise exceptions.WrongParameterException(msg)
0492 
0493     coll_id = collections[0]['coll_id']
0494 
0495     keys = ['scope', 'name', 'min_id', 'max_id']
0496     for content in contents:
0497         ex_content = orm_contents.get_content(coll_id=coll_id, scope=content['scope'],
0498                                               name=content['name'], min_id=content['min_id'],
0499                                               max_id=content['max_id'], session=session)
0500 
0501         content_def = "scope: %s, name: %s, min_id: %s, max_id: %s" % (content['scope'],
0502                                                                        content['name'],
0503                                                                        content['min_id'],
0504                                                                        content['max_id'])
0505 
0506         if not ex_content:
0507             msg = "No matched content in collection(%s) with content(%s)" % (coll_def, content_def)
0508             raise exceptions.WrongParameterException(msg)
0509 
0510         for key in keys:
0511             if key in content:
0512                 del content[key]
0513         content['content_id'] = ex_content['content_id']
0514 
0515     orm_contents.update_contents(contents, session=session)
0516 
0517 
0518 @read_session
0519 def get_match_contents(coll_scope, coll_name, scope, name, min_id=None, max_id=None,
0520                        request_id=None, workload_id=None, relation_type=None,
0521                        only_return_best_match=False, to_json=False, session=None):
0522     """
0523     Get matched contents with collection scope, collection name, scope, name, min_id, max_id,
0524     request id, workload id and only_return_best_match.
0525 
0526     :param coll_scope: scope of the collection.
0527     :param coll_name: name the the collection.
0528     :param scope: scope of the content.
0529     :param name: name of the content.
0530     :param min_id: min_id of the content.
0531     :param max_id: max_id of the content.
0532     :param request_id: the request id.
0533     :param workload_id: The workload_id of the request.
0534     :param only_return_best_match: only return best matched content if it's true.
0535     :param session: The database session in use.
0536 
0537     :returns: list of contents
0538     """
0539     transform_ids = orm_transforms.get_transform_ids(request_id=request_id,
0540                                                      workload_id=workload_id,
0541                                                      session=session)
0542 
0543     if transform_ids:
0544         collections = orm_collections.get_collections(scope=coll_scope, name=coll_name, transform_id=transform_ids,
0545                                                       relation_type=relation_type, session=session)
0546     else:
0547         collections = []
0548 
0549     coll_def = "request_id=%s, workload_id=%s, coll_scope=%s" % (request_id, workload_id, coll_scope)
0550     coll_def += ", coll_name=%s, relation_type: %s" % (coll_name, relation_type)
0551 
0552     if len(collections) != 1:
0553         msg = "There should be only one collection matched. However there are %s collections" % len(collections)
0554         msg += coll_def
0555         raise exceptions.WrongParameterException(msg)
0556 
0557     coll_id = collections[0]['coll_id']
0558 
0559     contents = orm_contents.get_match_contents(coll_id=coll_id, scope=scope, name=name, min_id=min_id, max_id=max_id,
0560                                                to_json=to_json, session=session)
0561 
0562     if not only_return_best_match:
0563         return contents
0564 
0565     if len(contents) == 1:
0566         return contents
0567 
0568     content = None
0569     for row in contents:
0570         if (not content) or (content['max_id'] - content['min_id'] > row['max_id'] - row['min_id']):
0571             content = row
0572     return [content]
0573 
0574 
0575 @read_session
0576 def get_content_status_statistics(coll_id=None, transform_ids=None, session=None):
0577     """
0578     Get statistics group by status
0579 
0580     :param coll_id: Collection id.
0581     :param session: The database session in use.
0582 
0583     :returns: statistics group by status, as a dict.
0584     """
0585     return orm_contents.get_content_status_statistics(coll_id=coll_id, transform_ids=transform_ids, session=session)
0586 
0587 
0588 @read_session
0589 def get_content_status_statistics_by_relation_type(transform_ids, bulk_size=500, session=None):
0590     """
0591     Get statistics group by status
0592 
0593     :param coll_id: Collection id.
0594     :param session: The database session in use.
0595 
0596     :returns: statistics group by status, as a dict.
0597     """
0598     if transform_ids and not isinstance(transform_ids, (list, tuple)):
0599         transform_ids = [transform_ids]
0600     if transform_ids and len(transform_ids) == 1:
0601         transform_ids = [transform_ids[0], transform_ids[0]]
0602 
0603     if transform_ids and len(transform_ids) > bulk_size:
0604         chunks = [transform_ids[i:i + bulk_size] for i in range(0, len(transform_ids), bulk_size)]
0605         ret = []
0606         for chunk in chunks:
0607             tmp = orm_contents.get_content_status_statistics_by_relation_type(chunk, session=session)
0608             ret += tmp
0609         return ret
0610     else:
0611         return orm_contents.get_content_status_statistics_by_relation_type(transform_ids, session=session)
0612 
0613 
0614 @read_session
0615 def get_content_status_statistics_by_coll(request_id=None, transform_id=None, with_deps=True, session=None):
0616     """
0617     Get content statistics grouped by (coll_id, status) with sum of bytes.
0618 
0619     :param request_id: request id.
0620     :param transform_id: transform id.
0621     :param session: The database session in use.
0622 
0623     :returns: dict {coll_id: {status: {'count': N, 'bytes': B}, 'has_unsynced': bool}}
0624     """
0625     return orm_contents.get_content_status_statistics_by_coll(request_id=request_id, transform_id=transform_id, with_deps=with_deps, session=session)
0626 
0627 
0628 @read_session
0629 def get_content_ext_status_statistics_by_coll(request_id=None, transform_id=None, session=None):
0630     """
0631     Get contents_ext statistics grouped by (coll_id, status).
0632 
0633     :param request_id: request id.
0634     :param transform_id: transform id.
0635     :param session: The database session in use.
0636 
0637     :returns: dict {coll_id: {status: count}}
0638     """
0639     return orm_contents.get_content_ext_status_statistics_by_coll(request_id=request_id, transform_id=transform_id, session=session)
0640 
0641 
0642 @transactional_session
0643 def clean_locking(time_period=3600, session=None):
0644     """
0645     Clearn locking which is older than time period.
0646 
0647     :param time_period in seconds
0648     """
0649     orm_collections.clean_locking(time_period=time_period, session=session)
0650 
0651 
0652 @transactional_session
0653 def clean_next_poll_at(status, session=None):
0654     """
0655     Clearn next_poll_at.
0656 
0657     :param status: status of the collection
0658     """
0659     orm_collections.clean_next_poll_at(status=status, session=session)
0660 
0661 
0662 @read_session
0663 def get_output_content_by_request_id_content_name(request_id, content_scope, content_name, transform_id=None,
0664                                                   content_type=None, min_id=None, max_id=None, to_json=False,
0665                                                   session=None):
0666     """
0667     Get output content by request_id and content name
0668 
0669     :param request_id: requestn id.
0670     :param content_name: The name of the content.
0671     :param to_json: return json format.
0672     :param session: The database session in use.
0673 
0674     :returns: content of the output collection.
0675     """
0676     transform_ids = orm_transforms.get_transform_ids(request_id, session=session)
0677 
0678     found_transform_id = None
0679     if transform_ids:
0680         if len(transform_ids) == 1:
0681             found_transform_id = transform_ids[0]
0682         elif len(transform_ids) > 1 and transform_id is None:
0683             raise "Number of the transforms(%s) is bigger than 1 and transform id is not provided" % len(transform_ids)
0684         else:
0685             for tf_id in transform_ids:
0686                 if tf_id == transform_id:
0687                     found_transform_id = tf_id
0688                     break
0689 
0690     coll_id = None
0691     if found_transform_id:
0692         coll_id = orm_collections.get_collection_id(transform_id=found_transform_id,
0693                                                     relation_type=CollectionRelationType.Output,
0694                                                     session=session)
0695     content = None
0696     if coll_id:
0697         content = orm_contents.get_content(coll_id=coll_id, scope=content_scope, name=content_name, content_type=content_type,
0698                                            min_id=min_id, max_id=max_id, to_json=to_json, session=session)
0699     return content
0700 
0701 
0702 @read_session
0703 def get_output_contents_by_request_id_status(request_id, name, content_status, limit, transform_id=None, to_json=False, session=None):
0704     """
0705     Get output content by request_id and content name
0706 
0707     :param request_id: requestn id.
0708     :param name: the content name.
0709     :param content_status: The content status.
0710     :param limit: limit number of contents.
0711     :param to_json: return json format.
0712     :param session: The database session in use.
0713 
0714     :returns: content of the output collection.
0715     """
0716     transform_ids = orm_transforms.get_transform_ids(request_id, session=session)
0717 
0718     found_transform_id = None
0719     if transform_ids:
0720         if len(transform_ids) == 1:
0721             found_transform_id = transform_ids[0]
0722         elif len(transform_ids) > 1 and transform_id is None:
0723             raise "Number of the transforms(%s) is bigger than 1 and transform id is not provided" % len(transform_ids)
0724         else:
0725             for tf_id in transform_ids:
0726                 if tf_id == transform_id:
0727                     found_transform_id = tf_id
0728                     break
0729 
0730     coll_id = None
0731     if found_transform_id:
0732         coll_id = orm_collections.get_collection_id(transform_id=found_transform_id,
0733                                                     relation_type=CollectionRelationType.Output,
0734                                                     session=session)
0735 
0736     contents = []
0737     if coll_id:
0738         contents = orm_contents.get_contents(coll_id=coll_id, status=content_status, to_json=to_json, session=session)
0739 
0740     if name:
0741         new_contents = []
0742         for content in contents:
0743             if str(content['name']) == str(name):
0744                 new_contents.append(content)
0745         contents = new_contents
0746 
0747     if contents and limit and len(contents) > limit:
0748         contents = contents[:limit]
0749     return contents
0750 
0751 
0752 @read_session
0753 def get_updated_transforms_by_content_status(request_id=None, transform_id=None, session=None):
0754     """
0755     Get updated transform ids by content status
0756 
0757     :param request_id: The Request id.
0758     :param transfomr_id: The transform id.
0759 
0760     :returns list
0761     """
0762     return orm_contents.get_updated_transforms_by_content_status(request_id=request_id, transform_id=transform_id, session=session)
0763 
0764 
0765 @transactional_session
0766 def update_contents_to_others_by_dep_id(request_id=None, transform_id=None, session=None):
0767     """
0768     Update contents to others by content_dep_id.
0769 
0770     :param request_id: The Request id.
0771     :param transfomr_id: The transform id.
0772     """
0773     return orm_contents.update_contents_to_others_by_dep_id(request_id=request_id, transform_id=transform_id, session=session)
0774 
0775 
0776 @transactional_session
0777 def update_contents_from_others_by_dep_id(request_id=None, transform_id=None, session=None):
0778     """
0779     Update contents from others by content_dep_id
0780 
0781     :param request_id: The Request id.
0782     :param transfomr_id: The transform id.
0783     """
0784     return orm_contents.update_contents_from_others_by_dep_id(request_id=request_id, transform_id=transform_id, session=session)
0785 
0786 
0787 @transactional_session
0788 def update_contents_from_others_by_dep_id_pages(request_id=None, transform_id=None, page_size=2000, status_not_to_check=None,
0789                                                 logger=None, log_prefix=None, session=None):
0790     """
0791     Update contents from others by content_dep_id
0792 
0793     :param request_id: The Request id.
0794     :param transfomr_id: The transform id.
0795     """
0796     return orm_contents.update_contents_from_others_by_dep_id_pages(request_id=request_id, transform_id=transform_id, page_size=page_size,
0797                                                                     logger=logger, log_prefix=log_prefix, status_not_to_check=status_not_to_check,
0798                                                                     session=session)
0799 
0800 
0801 @transactional_session
0802 def update_input_contents_by_dependency_pages(request_id=None, transform_id=None, page_size=2000, batch_size=2000, logger=None, log_prefix=None,
0803                                               terminated=False, status_not_to_check=None, session=None):
0804     """
0805     Update input contents by dependencies
0806 
0807     :param request_id: The Request id.
0808     :param transfomr_id: The transform id.
0809     """
0810     return orm_contents.update_input_contents_by_dependency_pages(request_id=request_id, transform_id=transform_id, page_size=page_size,
0811                                                                   batch_size=batch_size, terminated=terminated, logger=logger, log_prefix=log_prefix,
0812                                                                   status_not_to_check=status_not_to_check, session=session)
0813 
0814 
0815 @read_session
0816 def get_update_contents_from_others_by_dep_id(request_id=None, transform_id=None, session=None):
0817     """
0818     Update contents from others by content_dep_id
0819 
0820     :param request_id: The Request id.
0821     :param transfomr_id: The transform id.
0822     """
0823     return orm_contents.get_update_contents_from_others_by_dep_id(request_id=request_id, transform_id=transform_id, session=session)
0824 
0825 
0826 @transactional_session
0827 def add_contents_update(contents, bulk_size=10000, session=None):
0828     """
0829     Add contents update.
0830 
0831     :param contents: dict of contents.
0832     :param session: session.
0833 
0834     :raises DuplicatedObject: If a collection with the same name exists.
0835     :raises DatabaseException: If there is a database error.
0836 
0837     :returns: content ids.
0838     """
0839     return orm_contents.add_contents_update(contents, bulk_size=bulk_size, session=session)
0840 
0841 
0842 @transactional_session
0843 def set_fetching_contents_update(request_id=None, transform_id=None, fetch=False, session=None):
0844     """
0845     Set fetching contents update.
0846 
0847     :param session: session.
0848     """
0849     return orm_contents.set_fetching_contents_update(request_id=request_id, transform_id=transform_id, fetch=fetch, session=session)
0850 
0851 
0852 @read_session
0853 def get_contents_update(request_id=None, transform_id=None, fetch=False, session=None):
0854     """
0855     Get contents update.
0856 
0857     :param session: session.
0858     """
0859     return orm_contents.get_contents_update(request_id=request_id, transform_id=transform_id, fetch=fetch, session=session)
0860 
0861 
0862 @transactional_session
0863 def delete_contents_update(request_id=None, transform_id=None, contents=[], fetch=False, session=None):
0864     """
0865     delete a content.
0866 
0867     :param session: The database session in use.
0868 
0869     :raises NoObject: If no content is founded.
0870     :raises DatabaseException: If there is a database error.
0871     """
0872     return orm_contents.delete_contents_update(request_id=request_id, transform_id=transform_id, contents=contents, fetch=fetch, session=session)
0873 
0874 
0875 def get_contents_ext_maps():
0876     return orm_contents.get_contents_ext_maps()
0877 
0878 
0879 @transactional_session
0880 def add_contents_ext(contents, bulk_size=10000, session=None):
0881     """
0882     Add contents ext.
0883 
0884     :param contents: dict of contents.
0885     :param session: session.
0886 
0887     :raises DuplicatedObject: If a collection with the same name exists.
0888     :raises DatabaseException: If there is a database error.
0889 
0890     :returns: content ids.
0891     """
0892     return orm_contents.add_contents_ext(contents, bulk_size=bulk_size, session=session)
0893 
0894 
0895 @transactional_session
0896 def update_contents_ext(parameters, request_id=None, transform_id=None, use_bulk_update_mappings=True, session=None):
0897     """
0898     update contents ext.
0899 
0900     :param parameters: list of dictionary of parameters.
0901     :param session: The database session in use.
0902 
0903     :raises NoObject: If no content is founded.
0904     :raises DatabaseException: If there is a database error.
0905 
0906     """
0907     return orm_contents.update_contents_ext(parameters, request_id=request_id, transform_id=transform_id,
0908                                             use_bulk_update_mappings=use_bulk_update_mappings, session=session)
0909 
0910 
0911 @read_session
0912 def get_contents_ext(request_id=None, transform_id=None, workload_id=None, coll_id=None, status=None, session=None):
0913     """
0914     Get content or raise a NoObject exception.
0915 
0916     :param request_id: request id.
0917     :param transform_id: transform id.
0918     :param workload_id: workload id.
0919 
0920     :param session: The database session in use.
0921 
0922     :raises NoObject: If no content is founded.
0923 
0924     :returns: list of contents.
0925     """
0926     return orm_contents.get_contents_ext(request_id=request_id, transform_id=transform_id, workload_id=workload_id,
0927                                          coll_id=coll_id, status=status, session=session)
0928 
0929 
0930 @read_session
0931 def get_contents_ext_ids(request_id=None, transform_id=None, workload_id=None, coll_id=None, status=None, session=None):
0932     """
0933     Get content or raise a NoObject exception.
0934 
0935     :param request_id: request id.
0936     :param transform_id: transform id.
0937     :param workload_id: workload id.
0938 
0939     :param session: The database session in use.
0940 
0941     :raises NoObject: If no content is founded.
0942 
0943     :returns: list of content ids.
0944     """
0945     return orm_contents.get_contents_ext_ids(request_id=request_id, transform_id=transform_id, workload_id=workload_id,
0946                                              coll_id=coll_id, status=status, session=session)
0947 
0948 
0949 def combine_contents_ext(contents, contents_ext, with_status_name=False):
0950     return orm_contents.combine_contents_ext(contents, contents_ext, with_status_name=with_status_name)