Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:20

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2019 - 2025
0010 
0011 
0012 """
0013 operations related to Requests.
0014 """
0015 
0016 import datetime
0017 import hashlib
0018 import time
0019 import traceback
0020 
0021 from enum import Enum
0022 from collections import defaultdict
0023 
0024 import sqlalchemy
0025 from sqlalchemy import and_, or_
0026 from sqlalchemy import func
0027 from sqlalchemy.exc import DatabaseError, IntegrityError
0028 # from sqlalchemy.orm import aliased
0029 from sqlalchemy.sql import exists, select, expression, update
0030 from sqlalchemy.sql.expression import asc
0031 
0032 from idds.common import exceptions
0033 from idds.common.constants import (ContentType, ContentStatus, ContentLocking,
0034                                    ContentFetchStatus, ContentRelationType)
0035 from idds.common.utils import group_list
0036 from idds.common.utils import json_dumps
0037 from idds.orm.base.session import read_session, transactional_session
0038 from idds.orm.base import models
0039 
0040 
0041 def create_content(request_id, workload_id, transform_id, coll_id, map_id, scope, name,
0042                    min_id, max_id, content_type=ContentType.File,
0043                    status=ContentStatus.New, content_relation_type=ContentRelationType.Input,
0044                    bytes=0, md5=None, adler32=None, processing_id=None, storage_id=None, retries=0,
0045                    locking=ContentLocking.Idle, path=None, expired_at=None, content_metadata=None):
0046     """
0047     Create a content.
0048 
0049     :param request_id: The request id.
0050     :param workload_id: The workload id.
0051     :param transform_id: transform id.
0052     :param coll_id: collection id.
0053     :param map_id: The id to map inputs to outputs.
0054     :param scope: The scope of the request data.
0055     :param name: The name of the request data.
0056     :param min_id: The minimal id of the content.
0057     :param max_id: The maximal id of the content.
0058     :param content_type: The type of the content.
0059     :param status: content status.
0060     :param bytes: The size of the content.
0061     :param md5: md5 checksum.
0062     :param alder32: adler32 checksum.
0063     :param processing_id: The processing id.
0064     :param storage_id: The storage id.
0065     :param retries: The number of retries.
0066     :param path: The content path.
0067     :param expired_at: The datetime when it expires.
0068     :param content_metadata: The metadata as json.
0069 
0070     :returns: content.
0071     """
0072 
0073     new_content = models.Content(request_id=request_id, workload_id=workload_id,
0074                                  transform_id=transform_id, coll_id=coll_id, map_id=map_id,
0075                                  scope=scope, name=name, min_id=min_id, max_id=max_id,
0076                                  content_type=content_type, content_relation_type=content_relation_type,
0077                                  status=status, bytes=bytes, md5=md5,
0078                                  name_md5=hashlib.md5(name.encode("utf-8")).hexdigest(),
0079                                  scope_name_md5=hashlib.md5(name.encode("utf-8")).hexdigest(),
0080                                  adler32=adler32, processing_id=processing_id, storage_id=storage_id,
0081                                  retries=retries, path=path, expired_at=expired_at, locking=locking,
0082                                  content_metadata=content_metadata)
0083     return new_content
0084 
0085 
0086 @transactional_session
0087 def add_content(request_id, workload_id, transform_id, coll_id, map_id, scope, name, min_id=0, max_id=0,
0088                 content_type=ContentType.File, status=ContentStatus.New, content_relation_type=ContentRelationType.Input,
0089                 bytes=0, md5=None, adler32=None, processing_id=None, storage_id=None, retries=0,
0090                 locking=ContentLocking.Idle, path=None, expired_at=None, content_metadata=None, session=None):
0091     """
0092     Add a content.
0093 
0094     :param request_id: The request id.
0095     :param workload_id: The workload id.
0096     :param transform_id: transform id.
0097     :param coll_id: collection id.
0098     :param map_id: The id to map inputs to outputs.
0099     :param scope: The scope of the request data.
0100     :param name: The name of the request data.
0101     :param min_id: The minimal id of the content.
0102     :param max_id: The maximal id of the content.
0103     :param content_type: The type of the content.
0104     :param status: content status.
0105     :param bytes: The size of the content.
0106     :param md5: md5 checksum.
0107     :param alder32: adler32 checksum.
0108     :param processing_id: The processing id.
0109     :param storage_id: The storage id.
0110     :param retries: The number of retries.
0111     :param path: The content path.
0112     :param expired_at: The datetime when it expires.
0113     :param content_metadata: The metadata as json.
0114 
0115     :raises DuplicatedObject: If a collection with the same name exists.
0116     :raises DatabaseException: If there is a database error.
0117 
0118     :returns: content id.
0119     """
0120 
0121     try:
0122         new_content = create_content(request_id=request_id, workload_id=workload_id, transform_id=transform_id,
0123                                      coll_id=coll_id, map_id=map_id, content_relation_type=content_relation_type,
0124                                      scope=scope, name=name, min_id=min_id, max_id=max_id,
0125                                      content_type=content_type, status=status, bytes=bytes, md5=md5,
0126                                      adler32=adler32, processing_id=processing_id, storage_id=storage_id,
0127                                      retries=retries, path=path, expired_at=expired_at,
0128                                      content_metadata=content_metadata)
0129         new_content.save(session=session)
0130         content_id = new_content.content_id
0131         return content_id
0132     except IntegrityError as error:
0133         raise exceptions.DuplicatedObject('Content transform_id:map_id(%s:%s) already exists!: %s' %
0134                                           (transform_id, map_id, error))
0135     except DatabaseError as error:
0136         raise exceptions.DatabaseException(error)
0137 
0138 
0139 @transactional_session
0140 def add_contents(contents, bulk_size=10000, session=None):
0141     """
0142     Add contents.
0143 
0144     :param contents: dict of contents.
0145     :param session: session.
0146 
0147     :raises DuplicatedObject: If a collection with the same name exists.
0148     :raises DatabaseException: If there is a database error.
0149 
0150     :returns: content id.
0151     """
0152     default_params = {'request_id': None, 'workload_id': None,
0153                       'transform_id': None, 'coll_id': None, 'map_id': None,
0154                       'scope': None, 'name': None, 'min_id': 0, 'max_id': 0,
0155                       'content_type': ContentType.File, 'status': ContentStatus.New,
0156                       'locking': ContentLocking.Idle, 'content_relation_type': ContentRelationType.Input,
0157                       'bytes': 0, 'md5': None, 'adler32': None, 'processing_id': None,
0158                       'storage_id': None, 'retries': 0, 'path': None,
0159                       'name_md5': None, 'scope_name_md5': None,
0160                       'expired_at': datetime.datetime.utcnow() + datetime.timedelta(days=30),
0161                       'content_metadata': None}
0162 
0163     for content in contents:
0164         for key in default_params:
0165             if key not in content:
0166                 content[key] = default_params[key]
0167         content['name_md5'] = hashlib.md5(content['name'].encode("utf-8")).hexdigest()
0168         content['scope_name_md5'] = hashlib.md5(content['name'].encode("utf-8")).hexdigest()
0169 
0170     sub_params = [contents[i:i + bulk_size] for i in range(0, len(contents), bulk_size)]
0171 
0172     try:
0173         for sub_param in sub_params:
0174             # session.bulk_insert_mappings(models.Content, sub_param)
0175             custom_bulk_insert_mappings(models.Content, sub_param, session=session)
0176         content_ids = [None for _ in range(len(contents))]
0177         return content_ids
0178     except IntegrityError as error:
0179         raise exceptions.DuplicatedObject('Duplicated objects: %s' % (error))
0180     except DatabaseError as error:
0181         raise exceptions.DatabaseException(error)
0182 
0183 
0184 @read_session
0185 def get_content(content_id=None, coll_id=None, scope=None, name=None, content_type=None, min_id=0, max_id=0, to_json=False, session=None):
0186     """
0187     Get contentor raise a NoObject exception.
0188 
0189     :param content_id: content id.
0190     :param coll_id: collection id.
0191     :param scope: The scope of the request data.
0192     :param name: The name of the request data.
0193     :param min_id: The minimal id of the content.
0194     :param max_id: The maximal id of the content.
0195     :param content_type: The type of the content.
0196     :param to_json: return json format.
0197 
0198     :param session: The database session in use.
0199 
0200     :raises NoObject: If no content is founded.
0201 
0202     :returns: Content.
0203     """
0204 
0205     try:
0206         if content_id:
0207             query = session.query(models.Content)\
0208                            .filter(models.Content.content_id == content_id)
0209         else:
0210             if content_type in [ContentType.File, ContentType.File.value]:
0211                 query = session.query(models.Content)\
0212                                .filter(models.Content.coll_id == coll_id)\
0213                                .filter(models.Content.scope == scope)\
0214                                .filter(models.Content.name == name)\
0215                                .filter(models.Content.content_type == content_type)
0216             else:
0217                 query = session.query(models.Content)\
0218                                .filter(models.Content.coll_id == coll_id)\
0219                                .filter(models.Content.scope == scope)\
0220                                .filter(models.Content.name == name)\
0221                                .filter(models.Content.min_id == min_id)\
0222                                .filter(models.Content.max_id == max_id)
0223                 if content_type:
0224                     query = query.filter(models.Content.content_type == content_type)
0225 
0226         ret = query.first()
0227         if not ret:
0228             return None
0229         else:
0230             if to_json:
0231                 return ret.to_dict_json()
0232             else:
0233                 return ret.to_dict()
0234     except sqlalchemy.orm.exc.NoResultFound as error:
0235         raise exceptions.NoObject('content(content_id: %s, coll_id: %s, scope: %s, name: %s, content_type: %s, min_id: %s, max_id: %s) cannot be found: %s' %
0236                                   (content_id, coll_id, scope, name, content_type, min_id, max_id, error))
0237     except Exception as error:
0238         raise error
0239 
0240 
0241 @read_session
0242 def get_match_contents(coll_id, scope, name, content_type=None, min_id=None, max_id=None, to_json=False, session=None):
0243     """
0244     Get contents which matches the query or raise a NoObject exception.
0245 
0246     :param coll_id: collection id.
0247     :param scope: The scope of the request data.
0248     :param name: The name of the request data.
0249     :param min_id: The minimal id of the content.
0250     :param max_id: The maximal id of the content.
0251     :param content_type: The type of the content.
0252     :param to_json: return json format.
0253 
0254     :param session: The database session in use.
0255 
0256     :raises NoObject: If no content is founded.
0257 
0258     :returns: list of Content ids.
0259     """
0260 
0261     try:
0262         query = session.query(models.Content)\
0263                        .filter(models.Content.coll_id == coll_id)\
0264                        .filter(models.Content.scope == scope)\
0265                        .filter(models.Content.name.like(name.replace('*', '%')))
0266 
0267         if content_type is not None:
0268             query = query.filter(models.Content.content_tye == content_type)
0269         if min_id is not None:
0270             query = query.filter(models.Content.min_id <= min_id)
0271         if max_id is not None:
0272             query = query.filter(models.Content.max_id >= max_id)
0273 
0274         tmp = query.all()
0275         rets = []
0276         if tmp:
0277             for t in tmp:
0278                 if to_json:
0279                     rets.append(t.to_dict_json())
0280                 else:
0281                     rets.append(t.to_dict())
0282         return rets
0283     except sqlalchemy.orm.exc.NoResultFound as error:
0284         raise exceptions.NoObject('No match contents for (coll_id: %s, scope: %s, name: %s, content_type: %s, min_id: %s, max_id: %s): %s' %
0285                                   (coll_id, scope, name, content_type, min_id, max_id, error))
0286     except Exception as error:
0287         raise error
0288 
0289 
0290 @read_session
0291 def get_contents(scope=None, name=None, request_id=None, transform_id=None, workload_id=None, coll_id=None, status=None,
0292                  relation_type=None, without_content_dep_id=False, to_json=False, session=None):
0293     """
0294     Get content or raise a NoObject exception.
0295 
0296     :param scope: The scope of the content data.
0297     :param name: The name of the content data.
0298     :param coll_id: list of Collection ids.
0299     :param to_json: return json format.
0300 
0301     :param session: The database session in use.
0302 
0303     :raises NoObject: If no content is founded.
0304 
0305     :returns: list of contents.
0306     """
0307 
0308     try:
0309         if status is not None:
0310             if not isinstance(status, (tuple, list)):
0311                 status = [status]
0312             if len(status) == 1:
0313                 status = [status[0], status[0]]
0314         if coll_id is not None:
0315             if not isinstance(coll_id, (tuple, list)):
0316                 coll_id = [coll_id]
0317             if len(coll_id) == 1:
0318                 coll_id = [coll_id[0], coll_id[0]]
0319 
0320         query = session.query(models.Content)
0321 
0322         if request_id:
0323             query = query.filter(models.Content.request_id == request_id)
0324         if transform_id:
0325             query = query.filter(models.Content.transform_id == transform_id)
0326         if workload_id:
0327             query = query.filter(models.Content.workload_id == workload_id)
0328         if coll_id:
0329             query = query.filter(models.Content.coll_id.in_(coll_id))
0330         if scope:
0331             query = query.filter(models.Content.scope == scope)
0332         if name:
0333             query = query.filter(models.Content.name.like(name.replace('*', '%')))
0334         if status is not None:
0335             query = query.filter(models.Content.status.in_(status))
0336         if relation_type:
0337             query = query.filter(models.Content.content_relation_type == relation_type)
0338         if without_content_dep_id:
0339             query = query.filter(or_(models.Content.content_dep_id == None, models.Content.content_dep_id == 0))  # noqa: E711
0340 
0341         query = query.order_by(asc(models.Content.map_id))
0342 
0343         tmp = query.all()
0344         rets = []
0345         if tmp:
0346             for t in tmp:
0347                 if to_json:
0348                     rets.append(t.to_dict_json())
0349                 else:
0350                     rets.append(t.to_dict())
0351         return rets
0352     except sqlalchemy.orm.exc.NoResultFound as error:
0353         raise exceptions.NoObject('No record can be found with (coll_id=%s, name=%s): %s' %
0354                                   (coll_id, name, error))
0355     except Exception as error:
0356         raise error
0357 
0358 
0359 @read_session
0360 def get_contents_by_request_transform(request_id=None, transform_id=None, workload_id=None, status=None, map_id=None, status_updated=False, with_deps=True, page_num=None, page_size=None, by_map=False, match_content_ext=False, session=None):
0361     """
0362     Get content or raise a NoObject exception.
0363 
0364     :param request_id: request id.
0365     :param transform_id: transform id.
0366     :param workload_id: workload id.
0367     :param page_num: page number (0-based) for paginated retrieval.
0368     :param page_size: number of distinct map_ids per page.
0369     :param match_content_ext: Whether to match content extensions.
0370     :param session: The database session in use.
0371 
0372     :raises NoObject: If no content is founded.
0373 
0374     :returns: list of contents.
0375     """
0376 
0377     try:
0378         if status is not None:
0379             if not isinstance(status, (tuple, list)):
0380                 status = [status]
0381 
0382         query = session.query(models.Content)
0383         if request_id:
0384             query = query.filter(models.Content.request_id == request_id)
0385         if transform_id:
0386             query = query.filter(models.Content.transform_id == transform_id)
0387         if workload_id:
0388             query = query.filter(models.Content.workload_id == workload_id)
0389         if map_id:
0390             query = query.filter(models.Content.map_id == map_id)
0391         if status_updated:
0392             query = query.filter(models.Content.status != models.Content.substatus)
0393         if not with_deps:
0394             query = query.filter(models.Content.content_relation_type != 3)
0395 
0396         if (status is not None) or (match_content_ext):
0397             # Build OR conditions: status match OR ext mismatch
0398             conditions = []
0399             if status is not None:
0400                 conditions.append(models.Content.substatus.in_(status))
0401             if match_content_ext:
0402                 conditions.append(or_(
0403                     models.Content_ext.content_id == None,   # noqa E711
0404                     models.Content_ext.status != models.Content.substatus
0405                 ))
0406             combined = or_(*conditions) if len(conditions) > 1 else conditions[0]
0407 
0408             if not by_map:
0409                 if match_content_ext:
0410                     query = query.outerjoin(models.Content_ext, models.Content.content_id == models.Content_ext.content_id)
0411                 query = query.filter(combined)
0412             else:
0413                 # Find map_ids where at least one content satisfies combined condition,
0414                 # then return ALL contents for those map_ids via a JOIN.
0415                 qualifying_maps = session.query(models.Content.map_id.label('map_id')).distinct()
0416                 if request_id:
0417                     qualifying_maps = qualifying_maps.filter(models.Content.request_id == request_id)
0418                 if transform_id:
0419                     qualifying_maps = qualifying_maps.filter(models.Content.transform_id == transform_id)
0420                 if match_content_ext:
0421                     qualifying_maps = qualifying_maps.outerjoin(models.Content_ext, models.Content.content_id == models.Content_ext.content_id)
0422                 qualifying_maps = qualifying_maps.filter(combined).subquery('qualifying_maps')
0423                 query = query.join(qualifying_maps, models.Content.map_id == qualifying_maps.c.map_id)
0424 
0425         query = query.order_by(asc(models.Content.request_id), asc(models.Content.transform_id), asc(models.Content.map_id))
0426 
0427         if page_num is not None and page_size is not None:
0428             # Paginate by collecting page_size distinct map_ids then filtering
0429             map_id_query = session.query(models.Content.map_id).distinct()
0430             if request_id:
0431                 map_id_query = map_id_query.filter(models.Content.request_id == request_id)
0432             if transform_id:
0433                 map_id_query = map_id_query.filter(models.Content.transform_id == transform_id)
0434             if not with_deps:
0435                 map_id_query = map_id_query.filter(models.Content.content_relation_type != 3)
0436             map_id_query = map_id_query.order_by(asc(models.Content.map_id))
0437             map_id_query = map_id_query.offset(page_num * page_size).limit(page_size)
0438             page_map_ids = [row[0] for row in map_id_query.all()]
0439             if not page_map_ids:
0440                 return []
0441             query = query.filter(models.Content.map_id.in_(page_map_ids))
0442 
0443         tmp = query.all()
0444         rets = []
0445         if tmp:
0446             for t in tmp:
0447                 rets.append(t.to_dict())
0448         return rets
0449     except sqlalchemy.orm.exc.NoResultFound as error:
0450         raise exceptions.NoObject('No record can be found with (transform_id=%s): %s' %
0451                                   (transform_id, error))
0452     except Exception as error:
0453         raise error
0454 
0455 
0456 @read_session
0457 def get_input_output_map_count(request_id, transform_id, session=None):
0458     """
0459     Return the number of distinct (map_id, sub_map_id) pairs for the given transform.
0460     Uses a subquery to count distinct pairs without loading all content rows.
0461     request_id is included because the database uses it for virtual table partitioning.
0462     """
0463     try:
0464         subq = session.query(models.Content.map_id, models.Content.sub_map_id)\
0465                       .filter(models.Content.request_id == request_id)\
0466                       .filter(models.Content.transform_id == transform_id)\
0467                       .filter(models.Content.content_relation_type == ContentRelationType.Input)\
0468                       .distinct()\
0469                       .subquery()
0470         count = session.query(func.count()).select_from(subq).scalar()
0471         return count or 0
0472     except Exception as error:
0473         raise error
0474 
0475 
0476 @read_session
0477 def get_content_name_to_id_map(request_id, transform_id, es=False, session=None):
0478     """
0479     Return a lightweight {name: [content_id, ...]} mapping for Input and Output contents.
0480     For ES jobs (es=True), uses the 'path' column as the key instead of 'name'.
0481     Fetches only two columns instead of full content rows.
0482     request_id is included because the database uses it for virtual table partitioning.
0483     """
0484     try:
0485         key_col = models.Content.path if es else models.Content.name
0486         rows = session.query(key_col, models.Content.content_id)\
0487                       .filter(models.Content.request_id == request_id)\
0488                       .filter(models.Content.transform_id == transform_id)\
0489                       .filter(models.Content.content_relation_type.in_([
0490                           ContentRelationType.Input, ContentRelationType.Output]))\
0491                       .all()
0492         name_to_id_map = {}
0493         for name, content_id in rows:
0494             if name not in name_to_id_map:
0495                 name_to_id_map[name] = []
0496             name_to_id_map[name].append(content_id)
0497         return name_to_id_map
0498     except Exception as error:
0499         raise error
0500 
0501 
0502 @read_session
0503 def has_input_contents_without_external_id(request_id, transform_id, session=None):
0504     """
0505     Check whether any Input content for the given transform is missing an external_content_id.
0506 
0507     Returns True if having Input contents without external_content_id, False otherwise.
0508     Uses a single COUNT query instead of loading all content rows.
0509     request_id is included because the database uses it for virtual table partitioning.
0510     """
0511     try:
0512         count = session.query(func.count(models.Content.content_id))\
0513                        .filter(models.Content.request_id == request_id)\
0514                        .filter(models.Content.transform_id == transform_id)\
0515                        .filter(models.Content.content_relation_type == ContentRelationType.Input)\
0516                        .filter(models.Content.external_content_id.is_(None))\
0517                        .scalar()
0518         return count > 0
0519     except Exception as error:
0520         raise error
0521 
0522 
0523 @read_session
0524 def get_content_status_statistics(coll_id=None, transform_ids=None, session=None):
0525     """
0526     Get statistics group by status
0527 
0528     :param coll_id: Collection id.
0529     :param session: The database session in use.
0530 
0531     :returns: statistics group by status, as a dict.
0532     """
0533     try:
0534         if transform_ids and not isinstance(transform_ids, (list, tuple)):
0535             transform_ids = [transform_ids]
0536         if transform_ids and len(transform_ids) == 1:
0537             transform_ids = [transform_ids[0], transform_ids[0]]
0538 
0539         query = session.query(models.Content.status, func.count(models.Content.content_id))
0540         if coll_id:
0541             query = query.filter(models.Content.coll_id == coll_id)
0542         if transform_ids:
0543             query = query.filter(models.Content.transform_id.in_(transform_ids))
0544 
0545         query = query.group_by(models.Content.status)
0546         tmp = query.all()
0547         rets = {}
0548         if tmp:
0549             for status, count in tmp:
0550                 rets[status] = count
0551         return rets
0552     except Exception as error:
0553         raise error
0554 
0555 
0556 @read_session
0557 def get_content_status_statistics_by_relation_type(transform_ids=None, session=None):
0558     """
0559     Get statistics group by status
0560 
0561     :param coll_id: Collection id.
0562     :param session: The database session in use.
0563 
0564     :returns: statistics group by status, as a dict.
0565     """
0566     try:
0567         if transform_ids and not isinstance(transform_ids, (list, tuple)):
0568             transform_ids = [transform_ids]
0569         if transform_ids and len(transform_ids) == 1:
0570             transform_ids = [transform_ids[0], transform_ids[0]]
0571 
0572         query = session.query(models.Content.status, models.Content.content_relation_type, models.Content.transform_id, func.count(models.Content.content_id))
0573         if transform_ids:
0574             query = query.filter(models.Content.transform_id.in_(transform_ids))
0575 
0576         query = query.group_by(models.Content.status, models.Content.content_relation_type, models.Content.transform_id)
0577         tmp = query.all()
0578         return tmp
0579     except Exception as error:
0580         raise error
0581 
0582 
0583 @read_session
0584 def get_content_status_statistics_by_coll(request_id=None, transform_id=None, with_deps=True, session=None):
0585     """
0586     Get content statistics grouped by (coll_id, status), with total bytes summed.
0587 
0588     Returns a dict keyed by coll_id, each value being a dict of
0589     {status: {'count': N, 'bytes': B}}.
0590 
0591     An extra key 'has_unsynced' is set to True on a coll_id entry when
0592     any row has status != substatus (indicating pending flush).
0593 
0594     :param request_id: request id.
0595     :param transform_id: transform id.
0596     :param session: The database session in use.
0597 
0598     :returns: dict {coll_id: {status: {'count': N, 'bytes': B}, 'has_unsynced': bool}}
0599     """
0600     try:
0601         query = session.query(
0602             models.Content.coll_id,
0603             models.Content.status,
0604             func.count(models.Content.content_id).label('cnt'),
0605             func.sum(models.Content.bytes).label('total_bytes'),
0606             func.sum(
0607                 sqlalchemy.case(
0608                     (models.Content.status != models.Content.substatus, 1),
0609                     else_=0
0610                 )
0611             ).label('unsynced_count')
0612         )
0613         if request_id is not None:
0614             query = query.filter(models.Content.request_id == request_id)
0615         if transform_id is not None:
0616             query = query.filter(models.Content.transform_id == transform_id)
0617         if not with_deps:
0618             query = query.filter(models.Content.content_relation_type != ContentRelationType.InputDependency)
0619         query = query.group_by(models.Content.coll_id, models.Content.status)
0620 
0621         rets = {}
0622         for row in query.all():
0623             coll_id, status, cnt, total_bytes, unsynced_count = row
0624             if coll_id not in rets:
0625                 rets[coll_id] = {'has_unsynced': False}
0626             rets[coll_id][status] = {
0627                 'count': cnt,
0628                 'bytes': total_bytes or 0
0629             }
0630             if unsynced_count and unsynced_count > 0:
0631                 rets[coll_id]['has_unsynced'] = True
0632         return rets
0633     except Exception as error:
0634         raise error
0635 
0636 
0637 @read_session
0638 def get_content_ext_status_statistics_by_coll(request_id=None, transform_id=None, session=None):
0639     """
0640     Get contents_ext statistics grouped by (coll_id, status).
0641 
0642     Returns a dict keyed by coll_id, each value being a dict of
0643     {status: N}.
0644 
0645     :param request_id: request id.
0646     :param transform_id: transform id.
0647     :param session: The database session in use.
0648 
0649     :returns: dict {coll_id: {status: count}}
0650     """
0651     try:
0652         query = session.query(
0653             models.Content_ext.coll_id,
0654             models.Content_ext.status,
0655             func.count(models.Content_ext.content_id).label('cnt')
0656         )
0657         if request_id is not None:
0658             query = query.filter(models.Content_ext.request_id == request_id)
0659         if transform_id is not None:
0660             query = query.filter(models.Content_ext.transform_id == transform_id)
0661         query = query.group_by(models.Content_ext.coll_id, models.Content_ext.status)
0662 
0663         rets = {}
0664         for row in query.all():
0665             coll_id, status, cnt = row
0666             if coll_id not in rets:
0667                 rets[coll_id] = {}
0668             rets[coll_id][status] = cnt
0669         return rets
0670     except Exception as error:
0671         raise error
0672 
0673 
0674 @transactional_session
0675 def update_content(content_id, parameters, session=None):
0676     """
0677     update a content.
0678 
0679     :param content_id: the content id.
0680     :param parameters: A dictionary of parameters.
0681     :param session: The database session in use.
0682 
0683     :raises NoObject: If no content is founded.
0684     :raises DatabaseException: If there is a database error.
0685 
0686     """
0687     try:
0688         parameters['updated_at'] = datetime.datetime.utcnow()
0689 
0690         session.query(models.Content).filter_by(content_id=content_id)\
0691                .update(parameters, synchronize_session=False)
0692     except sqlalchemy.orm.exc.NoResultFound as error:
0693         raise exceptions.NoObject('Content %s cannot be found: %s' % (content_id, error))
0694 
0695 
0696 @transactional_session
0697 def custom_bulk_insert_mappings_real(model, parameters, session=None):
0698     """
0699     insert contents in bulk
0700     """
0701     if not parameters:
0702         return
0703 
0704     try:
0705         schema_prefix = f"{model.metadata.schema}." if model.metadata.schema else ""
0706         sequence_name = f'"{model.metadata.schema}"."CONTENT_ID_SEQ"' if model.metadata.schema else '"CONTENT_ID_SEQ"'
0707         table_name = f"{schema_prefix}{model.__tablename__}"
0708 
0709         def get_row_value(row, column):
0710             """Process row values to ensure correct formatting for SQL"""
0711             val = row.get(column.name, None)
0712             if val is None:
0713                 if column.name in ['map_id', 'fetch_status', 'sub_map_id', 'dep_sub_map_id', 'content_relation_type']:
0714                     val = 0
0715                 elif column.name in ['created_at', 'updated_at', 'accessed_at']:
0716                     val = datetime.datetime.utcnow().isoformat()
0717                 elif column.default is not None and not column.primary_key:
0718                     default_val = column.default.arg
0719                     if callable(default_val):
0720                         try:
0721                             return default_val()
0722                         except TypeError:
0723                             return None
0724                     elif isinstance(default_val, expression.ClauseElement):
0725                         return None
0726                     return default_val
0727                 return val
0728             elif isinstance(val, Enum):
0729                 return val.value
0730             elif isinstance(val, datetime.datetime):
0731                 return val.isoformat()
0732             elif isinstance(val, dict):
0733                 return json_dumps(val)
0734             return val
0735 
0736         if model.__tablename__.lower() in ['contents']:
0737             exclude_columns = ['content_id']
0738         else:
0739             exclude_columns = []
0740 
0741         columns = [column for column in model.__mapper__.columns if column.name not in exclude_columns]
0742 
0743         column_key_sql = ", ".join([column.name for column in columns])
0744         column_value_sql = ", ".join([f":{column.name}" for column in columns])
0745 
0746         # Convert Enum fields to their values
0747         updated_parameters = [
0748             {column.name: get_row_value(row, column) for column in columns} for row in parameters
0749         ]
0750 
0751         if model.__tablename__.lower() == 'contents':
0752             sql = f"""
0753                INSERT INTO {table_name} (content_id, {column_key_sql})
0754                VALUES (nextval('{sequence_name}'), {column_value_sql})
0755                ON CONFLICT DO NOTHING
0756             """
0757         else:
0758             sql = f"""
0759                INSERT INTO {table_name} ({column_key_sql})
0760                VALUES ({column_value_sql})
0761                ON CONFLICT DO NOTHING
0762             """
0763 
0764         stmt = sqlalchemy.text(sql)
0765         session.execute(stmt, updated_parameters)
0766         # session.commit()
0767     except Exception as ex:
0768         print(f"custom_bulk_insert_mappings Exception: {ex}")
0769         print(traceback.format_exc())
0770         raise ex
0771 
0772 
0773 @transactional_session
0774 def custom_bulk_insert_mappings(model, parameters, batch_size=1000, session=None):
0775     """
0776     insert contents in bulk
0777     """
0778     if not parameters:
0779         return
0780 
0781     dialect = session.bind.dialect.name
0782 
0783     for i in range(0, len(parameters), batch_size):
0784         batch = parameters[i: i + batch_size]
0785         if dialect == 'postgresql':
0786             custom_bulk_insert_mappings_real(model=model, parameters=batch, session=session)
0787         else:
0788             session.bulk_insert_mappings(model, batch)
0789             # session.flush()
0790 
0791     # session.commit()
0792 
0793 
0794 @transactional_session
0795 def custom_bulk_update_mappings_real(model, parameters, batch_size=1000, session=None):
0796     """
0797     update contents in bulk
0798     """
0799     if not parameters:
0800         return
0801 
0802     select_keys = ['content_id', 'request_id', 'transform_id']
0803 
0804     first_row = parameters[0]
0805     select_key_sql = [f"{key} = :{key}" for key in first_row if key in select_keys]
0806     update_key_sql = [f"{key} = :{key}" for key in first_row if key not in select_keys]
0807 
0808     if not update_key_sql or not select_key_sql or 'content_id' not in first_row.keys():
0809         raise ValueError("No updatable columns found.")
0810 
0811     for i in range(0, len(parameters), batch_size):
0812         batch = parameters[i: i + batch_size]
0813 
0814         # Convert Enum fields to their values
0815         updated_parameters = []
0816         for row in batch:
0817             updated_row = {
0818                 key: (
0819                     value.value if isinstance(value, Enum)
0820                     else value.isoformat() if isinstance(value, datetime.datetime)
0821                     else json_dumps(value) if isinstance(value, dict)
0822                     else value
0823                 )
0824                 for key, value in row.items()
0825             }
0826             updated_parameters.append(updated_row)
0827 
0828         # Construct SQL dynamically
0829         schema_prefix = f"{model.metadata.schema}." if model.metadata.schema else ""
0830         sql = f"""
0831             UPDATE {schema_prefix}{model.__tablename__}
0832             SET {", ".join(update_key_sql)}
0833             WHERE {" AND ".join(select_key_sql)}
0834         """
0835 
0836         stmt = sqlalchemy.text(sql)
0837         session.execute(stmt, updated_parameters)
0838         # session.flush()
0839 
0840     # session.commit()
0841 
0842 
0843 @transactional_session
0844 def custom_bulk_update_mappings(model, parameters, batch_size=1000, session=None):
0845     """
0846     update contents in bulk
0847     """
0848     if not parameters:
0849         return
0850 
0851     column_key_groups = {}
0852     for row in parameters:
0853         keys = sorted(row.keys())  # consistent key ordering
0854         keys_id = ','.join(keys)
0855         column_key_groups.setdefault(keys_id, []).append(row)
0856 
0857     for keys_id, rows in column_key_groups.items():
0858         custom_bulk_update_mappings_real(model, rows, batch_size=batch_size, session=session)
0859 
0860 
0861 @transactional_session
0862 def update_contents(parameters, use_bulk_update_mappings=True, grouping=True, request_id=None, transform_id=None, session=None):
0863     """
0864     update contents.
0865 
0866     :param parameters: list of dictionary of parameters.
0867     :param session: The database session in use.
0868 
0869     :raises NoObject: If no content is founded.
0870     :raises DatabaseException: If there is a database error.
0871 
0872     """
0873     try:
0874         if use_bulk_update_mappings:
0875             for parameter in parameters:
0876                 parameter['updated_at'] = datetime.datetime.utcnow()
0877 
0878             # session.bulk_update_mappings(models.Content, parameters)
0879             custom_bulk_update_mappings(models.Content, parameters, session=session)
0880         elif grouping:
0881             groups = group_list(parameters, key='content_id')
0882             for group_key in groups:
0883                 group = groups[group_key]
0884                 keys = group['keys']
0885                 items = group['items']
0886                 items['updated_at'] = datetime.datetime.utcnow()
0887                 query = session.query(models.Content)
0888                 if request_id:
0889                     query = query.filter(models.Content.request_id == request_id)
0890                 if transform_id:
0891                     query = query.filter(models.Content.transform_id == transform_id)
0892                 query = query.filter(models.Content.content_id.in_(keys))\
0893                              .update(items, synchronize_session=False)
0894         else:
0895             session.bulk_update_mappings(models.Content, parameters)
0896     except sqlalchemy.orm.exc.NoResultFound as error:
0897         raise exceptions.NoObject('Content cannot be found: %s' % (error))
0898 
0899 
0900 @transactional_session
0901 def update_dep_contents(request_id, content_dep_ids, status, bulk_size=10000, session=None):
0902     """
0903     update dependency contents.
0904 
0905     :param content_dep_ids: list of content dependency id.
0906     :param status: Content status.
0907     :param session: The database session in use.
0908 
0909     :raises NoObject: If no content is founded.
0910     :raises DatabaseException: If there is a database error.
0911 
0912     """
0913     try:
0914         params = {'substatus': status}
0915         chunks = [content_dep_ids[i:i + bulk_size] for i in range(0, len(content_dep_ids), bulk_size)]
0916         for chunk in chunks:
0917             session.query(models.Content)\
0918                    .filter(models.Content.request_id == request_id)\
0919                    .filter(models.Content.content_id.in_(chunk))\
0920                    .update(params, synchronize_session=False)
0921     except sqlalchemy.orm.exc.NoResultFound as error:
0922         raise exceptions.NoObject('Content cannot be found: %s' % (error))
0923 
0924 
0925 @transactional_session
0926 def delete_content(content_id=None, session=None):
0927     """
0928     delete a content.
0929 
0930     :param content_id: The id of the content.
0931     :param session: The database session in use.
0932 
0933     :raises NoObject: If no content is founded.
0934     :raises DatabaseException: If there is a database error.
0935     """
0936     try:
0937         session.query(models.Content).filter_by(content_id=content_id).delete()
0938     except sqlalchemy.orm.exc.NoResultFound as error:
0939         raise exceptions.NoObject('Content %s cannot be found: %s' % (content_id, error))
0940 
0941 
0942 @transactional_session
0943 def update_contents_to_others_by_dep_id(request_id=None, transform_id=None, session=None):
0944     """
0945     Update contents to others by content_dep_id.
0946 
0947     :param request_id: The Request id.
0948     :param transfomr_id: The transform id.
0949     """
0950     try:
0951         idds_proc = sqlalchemy.text("CALL %s.update_contents_to_others(:request_id, :transform_id)" % session.schema)
0952         session.execute(idds_proc, {"request_id": request_id, "transform_id": transform_id})
0953     except Exception as ex:
0954         raise ex
0955 
0956 
0957 @transactional_session
0958 def update_contents_from_others_by_dep_id(request_id=None, transform_id=None, session=None):
0959     """
0960     Update contents from others by content_dep_id
0961 
0962     :param request_id: The Request id.
0963     :param transfomr_id: The transform id.
0964     """
0965     try:
0966         idds_proc = sqlalchemy.text("CALL %s.update_contents_from_others(:request_id, :transform_id)" % session.schema)
0967         session.execute(idds_proc, {"request_id": request_id, "transform_id": transform_id})
0968     except Exception as ex:
0969         raise ex
0970 
0971 
0972 @transactional_session
0973 def update_contents_from_others_by_dep_id_pages_old(request_id=None, transform_id=None, page_size=1000, batch_size=500,
0974                                                     status_not_to_check=None, logger=None, log_prefix=None, session=None):
0975     """
0976     Update contents from others by content_dep_id, with pages
0977 
0978     :param request_id: The Request id.
0979     :param transfomr_id: The transform id.
0980     """
0981     try:
0982         if log_prefix is None:
0983             log_prefix = ""
0984 
0985         # Define alias for Content model to avoid conflicts
0986         # content_alias = aliased(models.Content)
0987 
0988         # Create the main subquery (dependent contents)
0989         main_subquery = session.query(
0990             models.Content.content_id,
0991             models.Content.substatus
0992         )
0993 
0994         if request_id:
0995             main_subquery = main_subquery.filter(models.Content.request_id == request_id)
0996 
0997         main_subquery = main_subquery.filter(
0998             models.Content.content_relation_type == ContentRelationType.Output,
0999             models.Content.substatus != ContentStatus.New
1000         ).subquery()
1001 
1002         # dep query
1003         dep_subquery = session.query(
1004             models.Content.content_id.label("content_id"),
1005             models.Content.content_dep_id.label("content_dep_id"),
1006             models.Content.substatus.label("substatus")
1007         )
1008 
1009         if request_id:
1010             dep_subquery = dep_subquery.filter(models.Content.request_id == request_id)
1011         if transform_id:
1012             dep_subquery = dep_subquery.filter(models.Content.transform_id == transform_id)
1013         if status_not_to_check:
1014             dep_subquery = dep_subquery.filter(~models.Content.substatus.in_(status_not_to_check))
1015 
1016         dep_subquery = dep_subquery.filter(models.Content.content_relation_type == ContentRelationType.InputDependency)
1017 
1018         # Paginated Update Loop
1019         last_id = None
1020         while True:
1021             paginated_query = dep_subquery.order_by(models.Content.content_id)
1022             if last_id is not None:
1023                 paginated_query = paginated_query.filter(models.Content.content_id > last_id)
1024             else:
1025                 # paginated_query = dep_subquery
1026                 # it makes paginated_query and dep_subquery the same object
1027                 # updates in paginated_query will also be in dep_subquery
1028                 pass
1029 
1030             paginated_query = paginated_query.limit(page_size)
1031             paginated_query = paginated_query.subquery()
1032 
1033             paginated_query_deps_query = session.query(
1034                 paginated_query.c.content_id.label('dep_content_id'),
1035                 main_subquery.c.substatus,
1036             ).join(
1037                 paginated_query,
1038                 and_(
1039                     paginated_query.c.content_dep_id == main_subquery.c.content_id,
1040                     paginated_query.c.substatus != main_subquery.c.substatus
1041                 )
1042             )
1043 
1044             # from sqlalchemy.dialects import postgresql
1045             # query_deps_sql = paginated_query_deps_query.subquery().compile(dialect=postgresql.dialect(), compile_kwargs={"literal_binds": True})
1046             # if logger:
1047             #     logger.debug(f"{log_prefix}query_update_sql: {query_deps_sql}")
1048 
1049             results = paginated_query_deps_query.all()
1050             if not results:
1051                 break
1052 
1053             update_data = []
1054             for row in results:
1055                 content_id = row[0]
1056                 substatus = row[1]
1057                 to_update = {"content_id": content_id, "substatus": substatus, "status": substatus}
1058                 update_data.append(to_update)
1059                 if last_id is None or content_id > last_id:
1060                     last_id = content_id
1061 
1062             for i in range(0, len(update_data), batch_size):
1063                 # session.bulk_update_mappings(models.Content, update_data[i:i + batch_size])
1064                 # session.commit()
1065                 custom_bulk_update_mappings(models.Content, update_data[i:i + batch_size], session=session)
1066 
1067             if logger:
1068                 logger.debug(f"{log_prefix}update_contents_from_others_by_dep_id_pages: last_id: {last_id}")
1069     except Exception as ex:
1070         raise ex
1071 
1072 
1073 @transactional_session
1074 def update_contents_from_others_by_dep_id_pages_with_coll_id(request_id, transform_id, coll_id, page_size=5000, batch_size=2000,
1075                                                              status_not_to_check=None, logger=None, log_prefix=None, session=None):
1076     """
1077     Update contents from others by content_dep_id, with pages
1078 
1079     :param request_id: The Request id.
1080     :param transfomr_id: The transform id.
1081     """
1082     try:
1083         if log_prefix is None:
1084             log_prefix = ""
1085 
1086         start = time.time()
1087         # Subquery of dependent contents
1088         main_subquery = (
1089             session.query(
1090                 models.Content.content_id,
1091                 models.Content.substatus.label("substatus")
1092             )
1093             .filter(models.Content.request_id == request_id)
1094             .filter(models.Content.coll_id == coll_id)
1095             .filter(models.Content.content_relation_type == ContentRelationType.Output)
1096             .filter(models.Content.substatus != ContentStatus.New)
1097         )
1098 
1099         main_subquery = main_subquery.subquery()
1100 
1101         to_update = (
1102             update(models.Content)
1103             .values(status=main_subquery.c.substatus, substatus=main_subquery.c.substatus)
1104             .where(models.Content.request_id == request_id)
1105             .where(models.Content.transform_id == transform_id)
1106             .where(models.Content.coll_id == coll_id)
1107             .where(models.Content.content_relation_type == ContentRelationType.InputDependency)
1108             .where(models.Content.content_dep_id == main_subquery.c.content_id)
1109             .where(models.Content.substatus != main_subquery.c.substatus)
1110         )
1111 
1112         # Execute
1113         result = session.execute(to_update)
1114         session.commit()
1115 
1116         end = time.time()
1117         time_used = end - start
1118         if logger:
1119             logger.debug(f"Updated {result.rowcount} rows (with skip locked)")
1120             logger.info(f"Update request_id {request_id} transform_id {transform_id} coll_id {coll_id} rows {result.rowcount} with {time_used} seconds")
1121         return result.rowcount
1122     except Exception as ex:
1123         raise ex
1124 
1125 
1126 @transactional_session
1127 def update_contents_from_others_by_dep_id_pages(request_id=None, transform_id=None, page_size=5000, batch_size=5000, status_not_to_check=None,
1128                                                 logger=None, log_prefix=None, session=None):
1129     """
1130     Update contents from others by content_dep_id, with pages
1131 
1132     :param request_id: The Request id.
1133     :param transfomr_id: The transform id.
1134     """
1135     try:
1136         if log_prefix is None:
1137             log_prefix = ""
1138 
1139         # get coll_id with unterminated contents
1140         coll_query = session.query(
1141             models.Content.request_id,
1142             models.Content.transform_id,
1143             models.Content.coll_id
1144         )
1145 
1146         if request_id:
1147             coll_query = coll_query.filter(models.Content.request_id == request_id)
1148         if transform_id:
1149             coll_query = coll_query.filter(models.Content.transform_id == transform_id)
1150         if status_not_to_check:
1151             coll_query = coll_query.filter(~models.Content.substatus.in_(status_not_to_check))
1152 
1153         coll_query = coll_query.filter(models.Content.content_relation_type == ContentRelationType.InputDependency)
1154         coll_query = coll_query.group_by(models.Content.request_id, models.Content.transform_id, models.Content.coll_id)
1155         coll_rows = coll_query.all()
1156 
1157         coll_req_tf_coll = [(row.request_id, row.transform_id, row.coll_id) for row in coll_rows]
1158         coll_ids = [row.coll_id for row in coll_rows]
1159 
1160         # get depended tasks' coll_id with terminated contents
1161         src_coll_query = session.query(
1162             models.Content.coll_id
1163         )
1164         if request_id:
1165             src_coll_query = src_coll_query.filter(models.Content.request_id == request_id)
1166         src_coll_query = src_coll_query.filter(models.Content.coll_id.in_(coll_ids))
1167         src_coll_query = src_coll_query.filter(models.Content.content_relation_type == ContentRelationType.Output)
1168         if status_not_to_check:
1169             src_coll_query = src_coll_query.filter(models.Content.substatus.in_(status_not_to_check))
1170         src_coll_rows = src_coll_query.distinct()
1171         src_coll_ids = [row.coll_id for row in src_coll_rows]
1172 
1173         # coll_ids with terminated jobs in depended tasks and with unterminated jobs in current tasks
1174         for req_id, tf_id, coll_id in coll_req_tf_coll:
1175             if coll_id in src_coll_ids:
1176                 update_contents_from_others_by_dep_id_pages_with_coll_id(request_id=request_id,
1177                                                                          transform_id=transform_id,
1178                                                                          coll_id=coll_id,
1179                                                                          page_size=page_size,
1180                                                                          batch_size=batch_size,
1181                                                                          status_not_to_check=status_not_to_check,
1182                                                                          logger=logger,
1183                                                                          log_prefix=log_prefix,
1184                                                                          session=session)
1185     except Exception as ex:
1186         raise ex
1187 
1188 
1189 @transactional_session
1190 def update_input_contents_by_dependency_pages(request_id=None, transform_id=None, page_size=2000, batch_size=2000, logger=None,
1191                                               log_prefix=None, terminated=False, status_not_to_check=None, session=None):
1192     """
1193     Update contents input contents by dependencies, with pages
1194 
1195     :param request_id: The Request id.
1196     :param transfomr_id: The transform id.
1197     """
1198     try:
1199         if log_prefix is None:
1200             log_prefix = ""
1201 
1202         # Define alias for Content model to avoid conflicts
1203         # content_alias = aliased(models.Content)
1204 
1205         # Contents to be excluded by map_id and sub_map_id
1206         query_ex = session.query(
1207             models.Content.request_id.label("request_id"),
1208             models.Content.transform_id.label("transform_id"),
1209             models.Content.map_id.label("map_id"),
1210             models.Content.sub_map_id.label("sub_map_id"),
1211         )
1212 
1213         if request_id:
1214             query_ex = query_ex.filter(models.Content.request_id == request_id)
1215         if transform_id:
1216             query_ex = query_ex.filter(models.Content.transform_id == transform_id)
1217 
1218         query_ex = query_ex.filter(
1219             and_(
1220                 models.Content.content_relation_type == 3,
1221                 models.Content.substatus == ContentStatus.New        # dependencies not ready
1222             )
1223         ).distinct().subquery()
1224 
1225         # query dependencies
1226         query_deps = session.query(
1227             models.Content.request_id.label("request_id"),
1228             models.Content.transform_id.label("transform_id"),
1229             models.Content.map_id.label("map_id"),
1230             models.Content.sub_map_id.label("sub_map_id"),
1231             models.Content.content_id.label("content_id"),
1232             models.Content.substatus.label("substatus")
1233         )
1234 
1235         if request_id:
1236             query_deps = query_deps.filter(models.Content.request_id == request_id)
1237         if transform_id:
1238             query_deps = query_deps.filter(models.Content.transform_id == transform_id)
1239 
1240         query_deps = query_deps.filter(
1241             models.Content.content_relation_type == 3
1242         )
1243 
1244         query_deps = query_deps.subquery()
1245 
1246         # from sqlalchemy.dialects import postgresql
1247         # query_deps_sql = query_deps.compile(dialect=postgresql.dialect(), compile_kwargs={"literal_binds": True})
1248         # if logger:
1249         #     logger.debug(f"{log_prefix}query_deps_sql: {query_deps_sql}")
1250 
1251         # Define the main query with the necessary filters
1252         main_query = session.query(
1253             models.Content.content_id,
1254             models.Content.request_id,
1255             models.Content.transform_id,
1256             models.Content.map_id,
1257             models.Content.sub_map_id
1258         )
1259 
1260         if request_id:
1261             main_query = main_query.filter(models.Content.request_id == request_id)
1262 
1263         if transform_id:
1264             main_query = main_query.filter(models.Content.transform_id == transform_id)
1265 
1266         if status_not_to_check:
1267             main_query = main_query.filter(~(models.Content.substatus.in_(status_not_to_check)))
1268 
1269         main_query = main_query.filter(models.Content.content_relation_type == 0)
1270 
1271         main_query = main_query.filter(
1272             ~exists(
1273                 select(
1274                     models.Content.request_id,
1275                     models.Content.transform_id,
1276                     models.Content.map_id,
1277                     models.Content.sub_map_id
1278                 ).where(
1279                     models.Content.request_id == query_ex.c.request_id,
1280                     models.Content.transform_id == query_ex.c.transform_id,
1281                     models.Content.map_id == query_ex.c.map_id,
1282                     models.Content.sub_map_id == query_ex.c.sub_map_id
1283                 )
1284             )
1285         )
1286 
1287         # Aggregation function to determine final status
1288         def custom_aggregation(key, values, terminated=False):
1289             input_request_id, request_id, transform_id, map_id, sub_map_id, input_content_id = key
1290             if request_id is None:
1291                 # left join, without right values
1292                 # no dependencies
1293                 logger.debug(f"{log_prefix}custom_aggregation, no dependencies")
1294                 return ContentStatus.Available
1295 
1296             # available_status = [ContentStatus.Available, ContentStatus.FakeAvailable, ContentStatus.FinalSubAvailable]
1297             available_status = [ContentStatus.Available, ContentStatus.FakeAvailable]
1298             final_terminated_status = [ContentStatus.Available, ContentStatus.FakeAvailable,
1299                                        ContentStatus.FinalFailed, ContentStatus.Missing,
1300                                        ContentStatus.FinalSubAvailable]
1301             terminated_status = [ContentStatus.Available, ContentStatus.FakeAvailable,
1302                                  ContentStatus.Failed, ContentStatus.FinalFailed,
1303                                  ContentStatus.Missing, ContentStatus.FinalSubAvailable]
1304 
1305             if all(v in available_status for v in values):
1306                 return ContentStatus.Available
1307             elif all(v in final_terminated_status for v in values):
1308                 return ContentStatus.Missing
1309             elif terminated and all(v in terminated_status for v in values):
1310                 return ContentStatus.Missing
1311             return ContentStatus.New
1312 
1313         # Paginated Update Loop
1314         last_id = None
1315         while True:
1316             # Fetch next batch using keyset pagination
1317             paginated_query = main_query.order_by(models.Content.content_id)
1318             if last_id:
1319                 paginated_query = paginated_query.filter(models.Content.content_id > last_id)
1320 
1321             paginated_query = paginated_query.limit(page_size)
1322             paginated_query = paginated_query.subquery()
1323 
1324             paginated_query_deps_query = session.query(
1325                 paginated_query.c.content_id.label('input_content_id'),
1326                 paginated_query.c.request_id.label('input_request_id'),
1327                 query_deps.c.request_id,
1328                 query_deps.c.transform_id,
1329                 query_deps.c.map_id,
1330                 query_deps.c.sub_map_id,
1331                 query_deps.c.content_id,
1332                 query_deps.c.substatus,
1333             ).outerjoin(
1334                 query_deps,
1335                 and_(
1336                     paginated_query.c.request_id == query_deps.c.request_id,
1337                     paginated_query.c.transform_id == query_deps.c.transform_id,
1338                     paginated_query.c.map_id == query_deps.c.map_id,
1339                     paginated_query.c.sub_map_id == query_deps.c.sub_map_id
1340                 )
1341             ).order_by(
1342                 query_deps.c.request_id, query_deps.c.transform_id,
1343                 query_deps.c.map_id, query_deps.c.sub_map_id
1344             )
1345 
1346             paginated_query_deps = paginated_query_deps_query.all()
1347 
1348             # from sqlalchemy.dialects import postgresql
1349             # paginated_query_deps_query_sql = paginated_query_deps_query.subquery().compile(dialect=postgresql.dialect(), compile_kwargs={"literal_binds": True})
1350             # if logger:
1351             #     logger.debug(f"{log_prefix}paginated_query_deps_query_sql: {paginated_query_deps_query_sql}")
1352 
1353             if not paginated_query_deps:
1354                 break  # No more rows to process
1355 
1356             # Aggregate results
1357             grouped_data = defaultdict(list)
1358             for row in paginated_query_deps:
1359                 input_content_id, input_request_id, request_id, transform_id, map_id, sub_map_id, content_id, status = row
1360                 grouped_data[(input_request_id, request_id, transform_id, map_id, sub_map_id, input_content_id)].append(status)
1361 
1362                 if last_id is None or input_content_id > last_id:
1363                     last_id = input_content_id
1364 
1365             aggregated_results = {key: custom_aggregation(key, values, terminated=terminated) for key, values in grouped_data.items()}
1366 
1367             update_data = [
1368                 {
1369                     "content_id": key[5],
1370                     "request_id": key[0],
1371                     "substatus": value
1372                 }
1373                 for key, value in aggregated_results.items()
1374             ]
1375 
1376             for i in range(0, len(update_data), batch_size):
1377                 # session.bulk_update_mappings(models.Content, update_data[i:i + batch_size])
1378                 # session.commit()
1379                 custom_bulk_update_mappings(models.Content, update_data[i:i + batch_size], session=session)
1380                 session.commit()
1381 
1382             if logger:
1383                 logger.debug(f"{log_prefix}update_input_contents_by_dependency_pages: last_id {last_id}")
1384     except Exception as ex:
1385         raise ex
1386 
1387 
1388 @read_session
1389 def get_update_contents_from_others_by_dep_id(request_id=None, transform_id=None, session=None):
1390     """
1391     Get contents to update from others by content_dep_id
1392 
1393     :param request_id: The Request id.
1394     :param transfomr_id: The transform id.
1395     """
1396     try:
1397         subquery = session.query(models.Content.content_id,
1398                                  models.Content.substatus)
1399         if request_id:
1400             subquery = subquery.filter(models.Content.request_id == request_id)
1401         subquery = subquery.filter(models.Content.content_relation_type == 1)\
1402                            .filter(models.Content.substatus != ContentStatus.New)
1403         subquery = subquery.subquery()
1404 
1405         columns = [models.Content.content_id, subquery.c.substatus]
1406         column_names = [column.name for column in columns]
1407 
1408         query = session.query(*columns)
1409         if request_id:
1410             query = query.filter(models.Content.request_id == request_id)
1411         if transform_id:
1412             query = query.filter(models.Content.transform_id == transform_id)
1413         query = query.filter(models.Content.content_relation_type == 3)
1414         query = query.join(subquery, and_(models.Content.content_dep_id == subquery.c.content_id,
1415                                           models.Content.substatus != subquery.c.substatus))
1416 
1417         tmp = query.distinct()
1418         rets = []
1419         if tmp:
1420             for t in tmp:
1421                 t2 = dict(zip(column_names, t))
1422                 rets.append(t2)
1423         return rets
1424     except Exception as ex:
1425         raise ex
1426 
1427 
1428 @read_session
1429 def get_updated_transforms_by_content_status(request_id=None, transform_id=None, check_substatus=False, session=None):
1430     """
1431     Get updated transform ids by content status
1432 
1433     :param request_id: The Request id.
1434 
1435     :returns list
1436     """
1437     try:
1438         subquery = session.query(models.Content.content_id,
1439                                  models.Content.substatus)
1440         # subquery = subquery.with_hint(models.Content, "INDEX(CONTENTS CONTENTS_REQ_TF_COLL_IDX)", 'oracle')
1441         if request_id:
1442             subquery = subquery.filter(models.Content.request_id == request_id)
1443         if transform_id:
1444             subquery = subquery.filter(models.Content.transform_id == transform_id)
1445         subquery = subquery.filter(models.Content.content_relation_type == 1)
1446         subquery = subquery.subquery()
1447 
1448         columns = [models.Content.request_id,
1449                    models.Content.transform_id,
1450                    models.Content.workload_id,
1451                    models.Content.coll_id]
1452         column_names = [column.name for column in columns]
1453         query = session.query(*columns)
1454         # query = query.with_hint(models.Content, "INDEX(CONTENTS CONTENTS_REQ_TF_COLL_IDX)", 'oracle')
1455 
1456         if request_id:
1457             query = query.filter(models.Content.request_id == request_id)
1458         query = query.filter(models.Content.content_relation_type == 3)
1459         if check_substatus:
1460             query = query.join(subquery, and_(models.Content.content_dep_id == subquery.c.content_id,
1461                                               models.Content.substatus != subquery.c.substatus))
1462         else:
1463             query = query.join(subquery, and_(models.Content.content_dep_id == subquery.c.content_id))
1464         tmp = query.distinct()
1465 
1466         rets = []
1467         if tmp:
1468             for t in tmp:
1469                 t2 = dict(zip(column_names, t))
1470                 rets.append(t2)
1471         return rets
1472     except Exception as error:
1473         raise error
1474 
1475 
1476 def get_contents_ext_items():
1477     default_params = {'pandaID': None, 'jobDefinitionID': None, 'schedulerID': None,
1478                       'pilotID': None, 'creationTime': None, 'modificationTime': None,
1479                       'startTime': None, 'endTime': None, 'prodSourceLabel': None,
1480                       'prodUserID': None, 'assignedPriority': None, 'currentPriority': None,
1481                       'attemptNr': None, 'maxAttempt': None, 'maxCpuCount': None,
1482                       'maxCpuUnit': None, 'maxDiskCount': None, 'maxDiskUnit': None,
1483                       'minRamCount': None, 'maxRamUnit': None, 'cpuConsumptionTime': None,
1484                       'cpuConsumptionUnit': None, 'jobStatus': None, 'jobName': None,
1485                       'transExitCode': None, 'pilotErrorCode': None, 'pilotErrorDiag': None,
1486                       'exeErrorCode': None, 'exeErrorDiag': None, 'supErrorCode': None,
1487                       'supErrorDiag': None, 'ddmErrorCode': None, 'ddmErrorDiag': None,
1488                       'brokerageErrorCode': None, 'brokerageErrorDiag': None,
1489                       'jobDispatcherErrorCode': None, 'jobDispatcherErrorDiag': None,
1490                       'taskBufferErrorCode': None, 'taskBufferErrorDiag': None,
1491                       'computingSite': None, 'computingElement': None,
1492                       'grid': None, 'cloud': None, 'cpuConversion': None, 'taskID': None,
1493                       'vo': None, 'pilotTiming': None, 'workingGroup': None,
1494                       'processingType': None, 'prodUserName': None, 'coreCount': None,
1495                       'nInputFiles': None, 'reqID': None, 'jediTaskID': None,
1496                       'actualCoreCount': None, 'maxRSS': None, 'maxVMEM': None,
1497                       'maxSWAP': None, 'maxPSS': None, 'avgRSS': None, 'avgVMEM': None,
1498                       'avgSWAP': None, 'avgPSS': None, 'maxWalltime': None, 'diskIO': None,
1499                       'failedAttempt': None, 'hs06': None, 'hs06sec': None,
1500                       'memory_leak': None, 'memory_leak_x2': None, 'job_label': None}
1501     return default_params
1502 
1503 
1504 def get_contents_ext_maps():
1505     default_params = {'panda_id': 'PandaID', 'job_definition_id': 'jobDefinitionID', 'scheduler_id': 'schedulerID',
1506                       'pilot_id': 'pilotID', 'creation_time': 'creationTime', 'modification_time': 'modificationTime',
1507                       'start_time': 'startTime', 'end_time': 'endTime', 'prod_source_label': 'prodSourceLabel',
1508                       'prod_user_id': 'prodUserID', 'assigned_priority': 'assignedPriority', 'current_priority': 'currentPriority',
1509                       'attempt_nr': 'attemptNr', 'max_attempt': 'maxAttempt', 'max_cpu_count': 'maxCpuCount',
1510                       'max_cpu_unit': 'maxCpuUnit', 'max_disk_count': 'maxDiskCount', 'max_disk_unit': 'maxDiskUnit',
1511                       'min_ram_count': 'minRamCount', 'min_ram_unit': 'minRamUnit', 'cpu_consumption_time': 'cpuConsumptionTime',
1512                       'cpu_consumption_unit': 'cpuConsumptionUnit', 'job_status': 'jobStatus', 'job_name': 'jobName',
1513                       'trans_exit_code': 'transExitCode', 'pilot_error_code': 'pilotErrorCode', 'pilot_error_diag': 'pilotErrorDiag',
1514                       'exe_error_code': 'exeErrorCode', 'exe_error_diag': 'exeErrorDiag', 'sup_error_code': 'supErrorCode',
1515                       'sup_error_diag': 'supErrorDiag', 'ddm_error_code': 'ddmErrorCode', 'ddm_error_diag': 'ddmErrorDiag',
1516                       'brokerage_error_code': 'brokerageErrorCode', 'brokerage_error_diag': 'brokerageErrorDiag',
1517                       'job_dispatcher_error_code': 'jobDispatcherErrorCode', 'job_dispatcher_error_diag': 'jobDispatcherErrorDiag',
1518                       'task_buffer_error_code': 'taskBufferErrorCode', 'task_buffer_error_diag': 'taskBufferErrorDiag',
1519                       'computing_site': 'computingSite', 'computing_element': 'computingElement',
1520                       'grid': 'grid', 'cloud': 'cloud', 'cpu_conversion': 'cpuConversion', 'task_id': 'taskID',
1521                       'vo': 'VO', 'pilot_timing': 'pilotTiming', 'working_group': 'workingGroup',
1522                       'processing_type': 'processingType', 'prod_user_name': 'prodUserName', 'core_count': 'coreCount',
1523                       'n_input_files': 'nInputFiles', 'req_id': 'reqID', 'jedi_task_id': 'jediTaskID',
1524                       'actual_core_count': 'actualCoreCount', 'max_rss': 'maxRSS', 'max_vmem': 'maxVMEM',
1525                       'max_swap': 'maxSWAP', 'max_pss': 'maxPSS', 'avg_rss': 'avgRSS', 'avg_vmem': 'avgVMEM',
1526                       'avg_swap': 'avgSWAP', 'avg_pss': 'avgPSS', 'max_walltime': 'maxWalltime', 'disk_io': 'diskIO',
1527                       'failed_attempt': 'failedAttempt', 'hs06': 'hs06', 'hs06sec': 'hs06sec',
1528                       'memory_leak': 'memory_leak', 'memory_leak_x2': 'memory_leak_x2', 'job_label': 'job_label'}
1529     return default_params
1530 
1531 
1532 @transactional_session
1533 def add_contents_update(contents, bulk_size=10000, session=None):
1534     """
1535     Add contents update.
1536 
1537     :param contents: dict of contents.
1538     :param session: session.
1539 
1540     :raises DuplicatedObject: If a collection with the same name exists.
1541     :raises DatabaseException: If there is a database error.
1542 
1543     :returns: content ids.
1544     """
1545     sub_params = [contents[i:i + bulk_size] for i in range(0, len(contents), bulk_size)]
1546 
1547     try:
1548         for sub_param in sub_params:
1549             # session.bulk_insert_mappings(models.Content_update, sub_param)
1550             custom_bulk_insert_mappings(models.Content_update, sub_param, session=session)
1551         content_ids = [None for _ in range(len(contents))]
1552         return content_ids
1553     except IntegrityError as error:
1554         raise exceptions.DuplicatedObject('Duplicated objects: %s' % (error))
1555     except DatabaseError as error:
1556         raise exceptions.DatabaseException(error)
1557 
1558 
1559 @transactional_session
1560 def set_fetching_contents_update(request_id=None, transform_id=None, fetch=True, session=None):
1561     """
1562     Set fetching contents update.
1563 
1564     :param session: session.
1565     """
1566     try:
1567         if fetch:
1568             query = session.query(models.Content_update)
1569             if request_id:
1570                 query = query.filter(models.Content_update.request_id == request_id)
1571             if transform_id:
1572                 query = query.filter(models.Content_update.transform_id == transform_id)
1573             query.update({'fetch_status': ContentFetchStatus.Fetching})
1574     except sqlalchemy.orm.exc.NoResultFound as error:
1575         raise exceptions.NoObject('No record can be found with (transform_id=%s): %s' %
1576                                   (transform_id, error))
1577     except Exception as error:
1578         raise error
1579 
1580 
1581 @read_session
1582 def get_contents_update(request_id=None, transform_id=None, fetch=False, session=None):
1583     """
1584     Get contents update.
1585 
1586     :param session: session.
1587     """
1588     try:
1589         if fetch:
1590             query = session.query(models.Content_update)
1591             if request_id:
1592                 query = query.filter(models.Content_update.request_id == request_id)
1593             if transform_id:
1594                 query = query.filter(models.Content_update.transform_id == transform_id)
1595             query = query.filter(models.Content_update.fetch_status == ContentFetchStatus.Fetching)
1596         else:
1597             query = session.query(models.Content_update)
1598             if request_id:
1599                 query = query.filter(models.Content_update.request_id == request_id)
1600             if transform_id:
1601                 query = query.filter(models.Content_update.transform_id == transform_id)
1602 
1603         tmp = query.all()
1604         rets = []
1605         if tmp:
1606             for t in tmp:
1607                 rets.append(t.to_dict())
1608         return rets
1609     except sqlalchemy.orm.exc.NoResultFound as error:
1610         raise exceptions.NoObject('No record can be found with (transform_id=%s): %s' %
1611                                   (transform_id, error))
1612     except Exception as error:
1613         raise error
1614 
1615 
1616 @transactional_session
1617 def delete_contents_update(request_id=None, transform_id=None, contents=[], bulk_size=1000, fetch=False, session=None):
1618     """
1619     delete a content.
1620 
1621     :param session: The database session in use.
1622 
1623     :raises NoObject: If no content is founded.
1624     :raises DatabaseException: If there is a database error.
1625     """
1626     try:
1627         if fetch:
1628             del_query = session.query(models.Content_update)
1629             if request_id:
1630                 del_query = del_query.filter(models.Content_update.request_id == request_id)
1631             if transform_id:
1632                 del_query = del_query.filter(models.Content_update.transform_id == transform_id)
1633             del_query = del_query.filter(models.Content_update.fetch_status == ContentFetchStatus.Fetching)
1634             del_query.delete()
1635         else:
1636             if contents:
1637                 contents_sub_params = [contents[i:i + bulk_size] for i in range(0, len(contents), bulk_size)]
1638 
1639                 for contents_sub_param in contents_sub_params:
1640                     del_query = session.query(models.Content_update)
1641                     if request_id:
1642                         del_query = del_query.filter(models.Content_update.request_id == request_id)
1643                     if transform_id:
1644                         del_query = del_query.filter(models.Content_update.transform_id == transform_id)
1645                     if contents_sub_param:
1646                         del_query = del_query.filter(models.Content_update.content_id.in_(contents_sub_param))
1647                     del_query.with_for_update(nowait=True, skip_locked=True)
1648                     del_query.delete()
1649             else:
1650                 del_query = session.query(models.Content_update)
1651                 if request_id:
1652                     del_query = del_query.filter(models.Content_update.request_id == request_id)
1653                 if transform_id:
1654                     del_query = del_query.filter(models.Content_update.transform_id == transform_id)
1655                 del_query.with_for_update(nowait=True, skip_locked=True)
1656                 del_query.delete()
1657     except Exception as error:
1658         raise exceptions.NoObject('Content_update deletion error: %s' % (error))
1659 
1660 
1661 @transactional_session
1662 def add_contents_ext(contents, bulk_size=10000, session=None):
1663     """
1664     Add contents ext.
1665 
1666     :param contents: dict of contents.
1667     :param session: session.
1668 
1669     :raises DuplicatedObject: If a collection with the same name exists.
1670     :raises DatabaseException: If there is a database error.
1671 
1672     :returns: content ids.
1673     """
1674     default_params = get_contents_ext_items()
1675     default_params['status'] = ContentStatus.New
1676 
1677     for content in contents:
1678         for key in default_params:
1679             if key not in content:
1680                 content[key] = default_params[key]
1681 
1682     sub_params = [contents[i:i + bulk_size] for i in range(0, len(contents), bulk_size)]
1683 
1684     try:
1685         for sub_param in sub_params:
1686             # session.bulk_insert_mappings(models.Content_ext, sub_param)
1687             custom_bulk_insert_mappings(models.Content_ext, sub_param, session=session)
1688         content_ids = [None for _ in range(len(contents))]
1689         return content_ids
1690     except IntegrityError as error:
1691         raise exceptions.DuplicatedObject('Duplicated objects: %s' % (error))
1692     except DatabaseError as error:
1693         raise exceptions.DatabaseException(error)
1694 
1695 
1696 @transactional_session
1697 def update_contents_ext(parameters, use_bulk_update_mappings=True, request_id=None, transform_id=None, session=None):
1698     """
1699     update contents ext.
1700 
1701     :param parameters: list of dictionary of parameters.
1702     :param session: The database session in use.
1703 
1704     :raises NoObject: If no content is founded.
1705     :raises DatabaseException: If there is a database error.
1706 
1707     """
1708     try:
1709         if use_bulk_update_mappings:
1710             # session.bulk_update_mappings(models.Content_ext, parameters)
1711             custom_bulk_update_mappings(models.Content_ext, parameters, session=session)
1712         else:
1713             groups = group_list(parameters, key='content_id')
1714             for group_key in groups:
1715                 group = groups[group_key]
1716                 keys = group['keys']
1717                 items = group['items']
1718                 query = session.query(models.Content_ext)
1719                 if request_id:
1720                     query = query.filter(models.Content.request_id == request_id)
1721                 if transform_id:
1722                     query = query.filter(models.Content.transform_id == transform_id)
1723                 query = query.filter(models.Content.content_id.in_(keys))\
1724                              .update(items, synchronize_session=False)
1725     except sqlalchemy.orm.exc.NoResultFound as error:
1726         raise exceptions.NoObject('Content cannot be found: %s' % (error))
1727 
1728 
1729 @read_session
1730 def get_contents_ext(request_id=None, transform_id=None, workload_id=None, coll_id=None, status=None, session=None):
1731     """
1732     Get content or raise a NoObject exception.
1733 
1734     :param request_id: request id.
1735     :param transform_id: transform id.
1736     :param workload_id: workload id.
1737 
1738     :param session: The database session in use.
1739 
1740     :raises NoObject: If no content is founded.
1741 
1742     :returns: list of contents.
1743     """
1744 
1745     try:
1746         if status is not None:
1747             if not isinstance(status, (tuple, list)):
1748                 status = [status]
1749 
1750         query = session.query(models.Content_ext)
1751         if request_id:
1752             query = query.filter(models.Content_ext.request_id == request_id)
1753         if transform_id:
1754             query = query.filter(models.Content_ext.transform_id == transform_id)
1755         if workload_id:
1756             query = query.filter(models.Content_ext.workload_id == workload_id)
1757         if coll_id:
1758             query = query.filter(models.Content_ext.coll_id == coll_id)
1759         if status is not None:
1760             query = query.filter(models.Content_ext.status.in_(status))
1761         query = query.order_by(asc(models.Content_ext.request_id), asc(models.Content_ext.transform_id), asc(models.Content_ext.map_id))
1762 
1763         tmp = query.all()
1764         rets = []
1765         if tmp:
1766             for t in tmp:
1767                 rets.append(t.to_dict())
1768         return rets
1769     except sqlalchemy.orm.exc.NoResultFound as error:
1770         raise exceptions.NoObject('No record can be found with (transform_id=%s): %s' %
1771                                   (transform_id, error))
1772     except Exception as error:
1773         raise error
1774 
1775 
1776 @read_session
1777 def get_contents_ext_ids(request_id=None, transform_id=None, workload_id=None, coll_id=None, status=None, session=None):
1778     """
1779     Get content or raise a NoObject exception.
1780 
1781     :param request_id: request id.
1782     :param transform_id: transform id.
1783     :param workload_id: workload id.
1784 
1785     :param session: The database session in use.
1786 
1787     :raises NoObject: If no content is founded.
1788 
1789     :returns: list of content ids.
1790     """
1791 
1792     try:
1793         if status is not None:
1794             if not isinstance(status, (tuple, list)):
1795                 status = [status]
1796 
1797         columns = [models.Content_ext.request_id,
1798                    models.Content_ext.transform_id,
1799                    models.Content_ext.workload_id,
1800                    models.Content_ext.coll_id,
1801                    models.Content_ext.content_id,
1802                    models.Content_ext.panda_id,
1803                    models.Content_ext.status]
1804         column_names = [column.name for column in columns]
1805         query = session.query(*columns)
1806         if request_id:
1807             query = query.filter(models.Content_ext.request_id == request_id)
1808         if transform_id:
1809             query = query.filter(models.Content_ext.transform_id == transform_id)
1810         if workload_id:
1811             query = query.filter(models.Content_ext.workload_id == workload_id)
1812         if coll_id:
1813             query = query.filter(models.Content_ext.coll_id == coll_id)
1814         if status is not None:
1815             query = query.filter(models.Content_ext.status.in_(status))
1816         query = query.order_by(asc(models.Content_ext.request_id), asc(models.Content_ext.transform_id), asc(models.Content_ext.map_id))
1817 
1818         tmp = query.all()
1819         rets = []
1820         if tmp:
1821             for t in tmp:
1822                 t2 = dict(zip(column_names, t))
1823                 rets.append(t2)
1824         return rets
1825     except sqlalchemy.orm.exc.NoResultFound as error:
1826         raise exceptions.NoObject('No record can be found with (transform_id=%s): %s' %
1827                                   (transform_id, error))
1828     except Exception as error:
1829         raise error
1830 
1831 
1832 def combine_contents_ext(contents, contents_ext, with_status_name=False):
1833     contents_ext_map = {}
1834     for content in contents_ext:
1835         contents_ext_map[content['content_id']] = content
1836 
1837     rets = []
1838     for content in contents:
1839         content_id = content['content_id']
1840         ret = content
1841         if content_id in contents_ext_map:
1842             contents_ext_map[content_id].update(content)
1843             ret = contents_ext_map[content_id]
1844         else:
1845             default_params = get_contents_ext_maps()
1846             for key in default_params:
1847                 default_params[key] = None
1848 
1849             default_params.update(content)
1850             ret = default_params
1851         if with_status_name:
1852             ret['status'] = content['status'].name
1853         else:
1854             ret['status'] = content['status']
1855         ret['scope'] = content['scope']
1856         ret['name'] = content['name']
1857 
1858         rets.append(ret)
1859     return rets