File indexing completed on 2026-04-09 07:58:20
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 operations related to Requests.
0014 """
0015
0016 import datetime
0017 import hashlib
0018 import time
0019 import traceback
0020
0021 from enum import Enum
0022 from collections import defaultdict
0023
0024 import sqlalchemy
0025 from sqlalchemy import and_, or_
0026 from sqlalchemy import func
0027 from sqlalchemy.exc import DatabaseError, IntegrityError
0028
0029 from sqlalchemy.sql import exists, select, expression, update
0030 from sqlalchemy.sql.expression import asc
0031
0032 from idds.common import exceptions
0033 from idds.common.constants import (ContentType, ContentStatus, ContentLocking,
0034 ContentFetchStatus, ContentRelationType)
0035 from idds.common.utils import group_list
0036 from idds.common.utils import json_dumps
0037 from idds.orm.base.session import read_session, transactional_session
0038 from idds.orm.base import models
0039
0040
0041 def create_content(request_id, workload_id, transform_id, coll_id, map_id, scope, name,
0042 min_id, max_id, content_type=ContentType.File,
0043 status=ContentStatus.New, content_relation_type=ContentRelationType.Input,
0044 bytes=0, md5=None, adler32=None, processing_id=None, storage_id=None, retries=0,
0045 locking=ContentLocking.Idle, path=None, expired_at=None, content_metadata=None):
0046 """
0047 Create a content.
0048
0049 :param request_id: The request id.
0050 :param workload_id: The workload id.
0051 :param transform_id: transform id.
0052 :param coll_id: collection id.
0053 :param map_id: The id to map inputs to outputs.
0054 :param scope: The scope of the request data.
0055 :param name: The name of the request data.
0056 :param min_id: The minimal id of the content.
0057 :param max_id: The maximal id of the content.
0058 :param content_type: The type of the content.
0059 :param status: content status.
0060 :param bytes: The size of the content.
0061 :param md5: md5 checksum.
0062 :param alder32: adler32 checksum.
0063 :param processing_id: The processing id.
0064 :param storage_id: The storage id.
0065 :param retries: The number of retries.
0066 :param path: The content path.
0067 :param expired_at: The datetime when it expires.
0068 :param content_metadata: The metadata as json.
0069
0070 :returns: content.
0071 """
0072
0073 new_content = models.Content(request_id=request_id, workload_id=workload_id,
0074 transform_id=transform_id, coll_id=coll_id, map_id=map_id,
0075 scope=scope, name=name, min_id=min_id, max_id=max_id,
0076 content_type=content_type, content_relation_type=content_relation_type,
0077 status=status, bytes=bytes, md5=md5,
0078 name_md5=hashlib.md5(name.encode("utf-8")).hexdigest(),
0079 scope_name_md5=hashlib.md5(name.encode("utf-8")).hexdigest(),
0080 adler32=adler32, processing_id=processing_id, storage_id=storage_id,
0081 retries=retries, path=path, expired_at=expired_at, locking=locking,
0082 content_metadata=content_metadata)
0083 return new_content
0084
0085
0086 @transactional_session
0087 def add_content(request_id, workload_id, transform_id, coll_id, map_id, scope, name, min_id=0, max_id=0,
0088 content_type=ContentType.File, status=ContentStatus.New, content_relation_type=ContentRelationType.Input,
0089 bytes=0, md5=None, adler32=None, processing_id=None, storage_id=None, retries=0,
0090 locking=ContentLocking.Idle, path=None, expired_at=None, content_metadata=None, session=None):
0091 """
0092 Add a content.
0093
0094 :param request_id: The request id.
0095 :param workload_id: The workload id.
0096 :param transform_id: transform id.
0097 :param coll_id: collection id.
0098 :param map_id: The id to map inputs to outputs.
0099 :param scope: The scope of the request data.
0100 :param name: The name of the request data.
0101 :param min_id: The minimal id of the content.
0102 :param max_id: The maximal id of the content.
0103 :param content_type: The type of the content.
0104 :param status: content status.
0105 :param bytes: The size of the content.
0106 :param md5: md5 checksum.
0107 :param alder32: adler32 checksum.
0108 :param processing_id: The processing id.
0109 :param storage_id: The storage id.
0110 :param retries: The number of retries.
0111 :param path: The content path.
0112 :param expired_at: The datetime when it expires.
0113 :param content_metadata: The metadata as json.
0114
0115 :raises DuplicatedObject: If a collection with the same name exists.
0116 :raises DatabaseException: If there is a database error.
0117
0118 :returns: content id.
0119 """
0120
0121 try:
0122 new_content = create_content(request_id=request_id, workload_id=workload_id, transform_id=transform_id,
0123 coll_id=coll_id, map_id=map_id, content_relation_type=content_relation_type,
0124 scope=scope, name=name, min_id=min_id, max_id=max_id,
0125 content_type=content_type, status=status, bytes=bytes, md5=md5,
0126 adler32=adler32, processing_id=processing_id, storage_id=storage_id,
0127 retries=retries, path=path, expired_at=expired_at,
0128 content_metadata=content_metadata)
0129 new_content.save(session=session)
0130 content_id = new_content.content_id
0131 return content_id
0132 except IntegrityError as error:
0133 raise exceptions.DuplicatedObject('Content transform_id:map_id(%s:%s) already exists!: %s' %
0134 (transform_id, map_id, error))
0135 except DatabaseError as error:
0136 raise exceptions.DatabaseException(error)
0137
0138
0139 @transactional_session
0140 def add_contents(contents, bulk_size=10000, session=None):
0141 """
0142 Add contents.
0143
0144 :param contents: dict of contents.
0145 :param session: session.
0146
0147 :raises DuplicatedObject: If a collection with the same name exists.
0148 :raises DatabaseException: If there is a database error.
0149
0150 :returns: content id.
0151 """
0152 default_params = {'request_id': None, 'workload_id': None,
0153 'transform_id': None, 'coll_id': None, 'map_id': None,
0154 'scope': None, 'name': None, 'min_id': 0, 'max_id': 0,
0155 'content_type': ContentType.File, 'status': ContentStatus.New,
0156 'locking': ContentLocking.Idle, 'content_relation_type': ContentRelationType.Input,
0157 'bytes': 0, 'md5': None, 'adler32': None, 'processing_id': None,
0158 'storage_id': None, 'retries': 0, 'path': None,
0159 'name_md5': None, 'scope_name_md5': None,
0160 'expired_at': datetime.datetime.utcnow() + datetime.timedelta(days=30),
0161 'content_metadata': None}
0162
0163 for content in contents:
0164 for key in default_params:
0165 if key not in content:
0166 content[key] = default_params[key]
0167 content['name_md5'] = hashlib.md5(content['name'].encode("utf-8")).hexdigest()
0168 content['scope_name_md5'] = hashlib.md5(content['name'].encode("utf-8")).hexdigest()
0169
0170 sub_params = [contents[i:i + bulk_size] for i in range(0, len(contents), bulk_size)]
0171
0172 try:
0173 for sub_param in sub_params:
0174
0175 custom_bulk_insert_mappings(models.Content, sub_param, session=session)
0176 content_ids = [None for _ in range(len(contents))]
0177 return content_ids
0178 except IntegrityError as error:
0179 raise exceptions.DuplicatedObject('Duplicated objects: %s' % (error))
0180 except DatabaseError as error:
0181 raise exceptions.DatabaseException(error)
0182
0183
0184 @read_session
0185 def get_content(content_id=None, coll_id=None, scope=None, name=None, content_type=None, min_id=0, max_id=0, to_json=False, session=None):
0186 """
0187 Get contentor raise a NoObject exception.
0188
0189 :param content_id: content id.
0190 :param coll_id: collection id.
0191 :param scope: The scope of the request data.
0192 :param name: The name of the request data.
0193 :param min_id: The minimal id of the content.
0194 :param max_id: The maximal id of the content.
0195 :param content_type: The type of the content.
0196 :param to_json: return json format.
0197
0198 :param session: The database session in use.
0199
0200 :raises NoObject: If no content is founded.
0201
0202 :returns: Content.
0203 """
0204
0205 try:
0206 if content_id:
0207 query = session.query(models.Content)\
0208 .filter(models.Content.content_id == content_id)
0209 else:
0210 if content_type in [ContentType.File, ContentType.File.value]:
0211 query = session.query(models.Content)\
0212 .filter(models.Content.coll_id == coll_id)\
0213 .filter(models.Content.scope == scope)\
0214 .filter(models.Content.name == name)\
0215 .filter(models.Content.content_type == content_type)
0216 else:
0217 query = session.query(models.Content)\
0218 .filter(models.Content.coll_id == coll_id)\
0219 .filter(models.Content.scope == scope)\
0220 .filter(models.Content.name == name)\
0221 .filter(models.Content.min_id == min_id)\
0222 .filter(models.Content.max_id == max_id)
0223 if content_type:
0224 query = query.filter(models.Content.content_type == content_type)
0225
0226 ret = query.first()
0227 if not ret:
0228 return None
0229 else:
0230 if to_json:
0231 return ret.to_dict_json()
0232 else:
0233 return ret.to_dict()
0234 except sqlalchemy.orm.exc.NoResultFound as error:
0235 raise exceptions.NoObject('content(content_id: %s, coll_id: %s, scope: %s, name: %s, content_type: %s, min_id: %s, max_id: %s) cannot be found: %s' %
0236 (content_id, coll_id, scope, name, content_type, min_id, max_id, error))
0237 except Exception as error:
0238 raise error
0239
0240
0241 @read_session
0242 def get_match_contents(coll_id, scope, name, content_type=None, min_id=None, max_id=None, to_json=False, session=None):
0243 """
0244 Get contents which matches the query or raise a NoObject exception.
0245
0246 :param coll_id: collection id.
0247 :param scope: The scope of the request data.
0248 :param name: The name of the request data.
0249 :param min_id: The minimal id of the content.
0250 :param max_id: The maximal id of the content.
0251 :param content_type: The type of the content.
0252 :param to_json: return json format.
0253
0254 :param session: The database session in use.
0255
0256 :raises NoObject: If no content is founded.
0257
0258 :returns: list of Content ids.
0259 """
0260
0261 try:
0262 query = session.query(models.Content)\
0263 .filter(models.Content.coll_id == coll_id)\
0264 .filter(models.Content.scope == scope)\
0265 .filter(models.Content.name.like(name.replace('*', '%')))
0266
0267 if content_type is not None:
0268 query = query.filter(models.Content.content_tye == content_type)
0269 if min_id is not None:
0270 query = query.filter(models.Content.min_id <= min_id)
0271 if max_id is not None:
0272 query = query.filter(models.Content.max_id >= max_id)
0273
0274 tmp = query.all()
0275 rets = []
0276 if tmp:
0277 for t in tmp:
0278 if to_json:
0279 rets.append(t.to_dict_json())
0280 else:
0281 rets.append(t.to_dict())
0282 return rets
0283 except sqlalchemy.orm.exc.NoResultFound as error:
0284 raise exceptions.NoObject('No match contents for (coll_id: %s, scope: %s, name: %s, content_type: %s, min_id: %s, max_id: %s): %s' %
0285 (coll_id, scope, name, content_type, min_id, max_id, error))
0286 except Exception as error:
0287 raise error
0288
0289
0290 @read_session
0291 def get_contents(scope=None, name=None, request_id=None, transform_id=None, workload_id=None, coll_id=None, status=None,
0292 relation_type=None, without_content_dep_id=False, to_json=False, session=None):
0293 """
0294 Get content or raise a NoObject exception.
0295
0296 :param scope: The scope of the content data.
0297 :param name: The name of the content data.
0298 :param coll_id: list of Collection ids.
0299 :param to_json: return json format.
0300
0301 :param session: The database session in use.
0302
0303 :raises NoObject: If no content is founded.
0304
0305 :returns: list of contents.
0306 """
0307
0308 try:
0309 if status is not None:
0310 if not isinstance(status, (tuple, list)):
0311 status = [status]
0312 if len(status) == 1:
0313 status = [status[0], status[0]]
0314 if coll_id is not None:
0315 if not isinstance(coll_id, (tuple, list)):
0316 coll_id = [coll_id]
0317 if len(coll_id) == 1:
0318 coll_id = [coll_id[0], coll_id[0]]
0319
0320 query = session.query(models.Content)
0321
0322 if request_id:
0323 query = query.filter(models.Content.request_id == request_id)
0324 if transform_id:
0325 query = query.filter(models.Content.transform_id == transform_id)
0326 if workload_id:
0327 query = query.filter(models.Content.workload_id == workload_id)
0328 if coll_id:
0329 query = query.filter(models.Content.coll_id.in_(coll_id))
0330 if scope:
0331 query = query.filter(models.Content.scope == scope)
0332 if name:
0333 query = query.filter(models.Content.name.like(name.replace('*', '%')))
0334 if status is not None:
0335 query = query.filter(models.Content.status.in_(status))
0336 if relation_type:
0337 query = query.filter(models.Content.content_relation_type == relation_type)
0338 if without_content_dep_id:
0339 query = query.filter(or_(models.Content.content_dep_id == None, models.Content.content_dep_id == 0))
0340
0341 query = query.order_by(asc(models.Content.map_id))
0342
0343 tmp = query.all()
0344 rets = []
0345 if tmp:
0346 for t in tmp:
0347 if to_json:
0348 rets.append(t.to_dict_json())
0349 else:
0350 rets.append(t.to_dict())
0351 return rets
0352 except sqlalchemy.orm.exc.NoResultFound as error:
0353 raise exceptions.NoObject('No record can be found with (coll_id=%s, name=%s): %s' %
0354 (coll_id, name, error))
0355 except Exception as error:
0356 raise error
0357
0358
0359 @read_session
0360 def get_contents_by_request_transform(request_id=None, transform_id=None, workload_id=None, status=None, map_id=None, status_updated=False, with_deps=True, page_num=None, page_size=None, by_map=False, match_content_ext=False, session=None):
0361 """
0362 Get content or raise a NoObject exception.
0363
0364 :param request_id: request id.
0365 :param transform_id: transform id.
0366 :param workload_id: workload id.
0367 :param page_num: page number (0-based) for paginated retrieval.
0368 :param page_size: number of distinct map_ids per page.
0369 :param match_content_ext: Whether to match content extensions.
0370 :param session: The database session in use.
0371
0372 :raises NoObject: If no content is founded.
0373
0374 :returns: list of contents.
0375 """
0376
0377 try:
0378 if status is not None:
0379 if not isinstance(status, (tuple, list)):
0380 status = [status]
0381
0382 query = session.query(models.Content)
0383 if request_id:
0384 query = query.filter(models.Content.request_id == request_id)
0385 if transform_id:
0386 query = query.filter(models.Content.transform_id == transform_id)
0387 if workload_id:
0388 query = query.filter(models.Content.workload_id == workload_id)
0389 if map_id:
0390 query = query.filter(models.Content.map_id == map_id)
0391 if status_updated:
0392 query = query.filter(models.Content.status != models.Content.substatus)
0393 if not with_deps:
0394 query = query.filter(models.Content.content_relation_type != 3)
0395
0396 if (status is not None) or (match_content_ext):
0397
0398 conditions = []
0399 if status is not None:
0400 conditions.append(models.Content.substatus.in_(status))
0401 if match_content_ext:
0402 conditions.append(or_(
0403 models.Content_ext.content_id == None,
0404 models.Content_ext.status != models.Content.substatus
0405 ))
0406 combined = or_(*conditions) if len(conditions) > 1 else conditions[0]
0407
0408 if not by_map:
0409 if match_content_ext:
0410 query = query.outerjoin(models.Content_ext, models.Content.content_id == models.Content_ext.content_id)
0411 query = query.filter(combined)
0412 else:
0413
0414
0415 qualifying_maps = session.query(models.Content.map_id.label('map_id')).distinct()
0416 if request_id:
0417 qualifying_maps = qualifying_maps.filter(models.Content.request_id == request_id)
0418 if transform_id:
0419 qualifying_maps = qualifying_maps.filter(models.Content.transform_id == transform_id)
0420 if match_content_ext:
0421 qualifying_maps = qualifying_maps.outerjoin(models.Content_ext, models.Content.content_id == models.Content_ext.content_id)
0422 qualifying_maps = qualifying_maps.filter(combined).subquery('qualifying_maps')
0423 query = query.join(qualifying_maps, models.Content.map_id == qualifying_maps.c.map_id)
0424
0425 query = query.order_by(asc(models.Content.request_id), asc(models.Content.transform_id), asc(models.Content.map_id))
0426
0427 if page_num is not None and page_size is not None:
0428
0429 map_id_query = session.query(models.Content.map_id).distinct()
0430 if request_id:
0431 map_id_query = map_id_query.filter(models.Content.request_id == request_id)
0432 if transform_id:
0433 map_id_query = map_id_query.filter(models.Content.transform_id == transform_id)
0434 if not with_deps:
0435 map_id_query = map_id_query.filter(models.Content.content_relation_type != 3)
0436 map_id_query = map_id_query.order_by(asc(models.Content.map_id))
0437 map_id_query = map_id_query.offset(page_num * page_size).limit(page_size)
0438 page_map_ids = [row[0] for row in map_id_query.all()]
0439 if not page_map_ids:
0440 return []
0441 query = query.filter(models.Content.map_id.in_(page_map_ids))
0442
0443 tmp = query.all()
0444 rets = []
0445 if tmp:
0446 for t in tmp:
0447 rets.append(t.to_dict())
0448 return rets
0449 except sqlalchemy.orm.exc.NoResultFound as error:
0450 raise exceptions.NoObject('No record can be found with (transform_id=%s): %s' %
0451 (transform_id, error))
0452 except Exception as error:
0453 raise error
0454
0455
0456 @read_session
0457 def get_input_output_map_count(request_id, transform_id, session=None):
0458 """
0459 Return the number of distinct (map_id, sub_map_id) pairs for the given transform.
0460 Uses a subquery to count distinct pairs without loading all content rows.
0461 request_id is included because the database uses it for virtual table partitioning.
0462 """
0463 try:
0464 subq = session.query(models.Content.map_id, models.Content.sub_map_id)\
0465 .filter(models.Content.request_id == request_id)\
0466 .filter(models.Content.transform_id == transform_id)\
0467 .filter(models.Content.content_relation_type == ContentRelationType.Input)\
0468 .distinct()\
0469 .subquery()
0470 count = session.query(func.count()).select_from(subq).scalar()
0471 return count or 0
0472 except Exception as error:
0473 raise error
0474
0475
0476 @read_session
0477 def get_content_name_to_id_map(request_id, transform_id, es=False, session=None):
0478 """
0479 Return a lightweight {name: [content_id, ...]} mapping for Input and Output contents.
0480 For ES jobs (es=True), uses the 'path' column as the key instead of 'name'.
0481 Fetches only two columns instead of full content rows.
0482 request_id is included because the database uses it for virtual table partitioning.
0483 """
0484 try:
0485 key_col = models.Content.path if es else models.Content.name
0486 rows = session.query(key_col, models.Content.content_id)\
0487 .filter(models.Content.request_id == request_id)\
0488 .filter(models.Content.transform_id == transform_id)\
0489 .filter(models.Content.content_relation_type.in_([
0490 ContentRelationType.Input, ContentRelationType.Output]))\
0491 .all()
0492 name_to_id_map = {}
0493 for name, content_id in rows:
0494 if name not in name_to_id_map:
0495 name_to_id_map[name] = []
0496 name_to_id_map[name].append(content_id)
0497 return name_to_id_map
0498 except Exception as error:
0499 raise error
0500
0501
0502 @read_session
0503 def has_input_contents_without_external_id(request_id, transform_id, session=None):
0504 """
0505 Check whether any Input content for the given transform is missing an external_content_id.
0506
0507 Returns True if having Input contents without external_content_id, False otherwise.
0508 Uses a single COUNT query instead of loading all content rows.
0509 request_id is included because the database uses it for virtual table partitioning.
0510 """
0511 try:
0512 count = session.query(func.count(models.Content.content_id))\
0513 .filter(models.Content.request_id == request_id)\
0514 .filter(models.Content.transform_id == transform_id)\
0515 .filter(models.Content.content_relation_type == ContentRelationType.Input)\
0516 .filter(models.Content.external_content_id.is_(None))\
0517 .scalar()
0518 return count > 0
0519 except Exception as error:
0520 raise error
0521
0522
0523 @read_session
0524 def get_content_status_statistics(coll_id=None, transform_ids=None, session=None):
0525 """
0526 Get statistics group by status
0527
0528 :param coll_id: Collection id.
0529 :param session: The database session in use.
0530
0531 :returns: statistics group by status, as a dict.
0532 """
0533 try:
0534 if transform_ids and not isinstance(transform_ids, (list, tuple)):
0535 transform_ids = [transform_ids]
0536 if transform_ids and len(transform_ids) == 1:
0537 transform_ids = [transform_ids[0], transform_ids[0]]
0538
0539 query = session.query(models.Content.status, func.count(models.Content.content_id))
0540 if coll_id:
0541 query = query.filter(models.Content.coll_id == coll_id)
0542 if transform_ids:
0543 query = query.filter(models.Content.transform_id.in_(transform_ids))
0544
0545 query = query.group_by(models.Content.status)
0546 tmp = query.all()
0547 rets = {}
0548 if tmp:
0549 for status, count in tmp:
0550 rets[status] = count
0551 return rets
0552 except Exception as error:
0553 raise error
0554
0555
0556 @read_session
0557 def get_content_status_statistics_by_relation_type(transform_ids=None, session=None):
0558 """
0559 Get statistics group by status
0560
0561 :param coll_id: Collection id.
0562 :param session: The database session in use.
0563
0564 :returns: statistics group by status, as a dict.
0565 """
0566 try:
0567 if transform_ids and not isinstance(transform_ids, (list, tuple)):
0568 transform_ids = [transform_ids]
0569 if transform_ids and len(transform_ids) == 1:
0570 transform_ids = [transform_ids[0], transform_ids[0]]
0571
0572 query = session.query(models.Content.status, models.Content.content_relation_type, models.Content.transform_id, func.count(models.Content.content_id))
0573 if transform_ids:
0574 query = query.filter(models.Content.transform_id.in_(transform_ids))
0575
0576 query = query.group_by(models.Content.status, models.Content.content_relation_type, models.Content.transform_id)
0577 tmp = query.all()
0578 return tmp
0579 except Exception as error:
0580 raise error
0581
0582
0583 @read_session
0584 def get_content_status_statistics_by_coll(request_id=None, transform_id=None, with_deps=True, session=None):
0585 """
0586 Get content statistics grouped by (coll_id, status), with total bytes summed.
0587
0588 Returns a dict keyed by coll_id, each value being a dict of
0589 {status: {'count': N, 'bytes': B}}.
0590
0591 An extra key 'has_unsynced' is set to True on a coll_id entry when
0592 any row has status != substatus (indicating pending flush).
0593
0594 :param request_id: request id.
0595 :param transform_id: transform id.
0596 :param session: The database session in use.
0597
0598 :returns: dict {coll_id: {status: {'count': N, 'bytes': B}, 'has_unsynced': bool}}
0599 """
0600 try:
0601 query = session.query(
0602 models.Content.coll_id,
0603 models.Content.status,
0604 func.count(models.Content.content_id).label('cnt'),
0605 func.sum(models.Content.bytes).label('total_bytes'),
0606 func.sum(
0607 sqlalchemy.case(
0608 (models.Content.status != models.Content.substatus, 1),
0609 else_=0
0610 )
0611 ).label('unsynced_count')
0612 )
0613 if request_id is not None:
0614 query = query.filter(models.Content.request_id == request_id)
0615 if transform_id is not None:
0616 query = query.filter(models.Content.transform_id == transform_id)
0617 if not with_deps:
0618 query = query.filter(models.Content.content_relation_type != ContentRelationType.InputDependency)
0619 query = query.group_by(models.Content.coll_id, models.Content.status)
0620
0621 rets = {}
0622 for row in query.all():
0623 coll_id, status, cnt, total_bytes, unsynced_count = row
0624 if coll_id not in rets:
0625 rets[coll_id] = {'has_unsynced': False}
0626 rets[coll_id][status] = {
0627 'count': cnt,
0628 'bytes': total_bytes or 0
0629 }
0630 if unsynced_count and unsynced_count > 0:
0631 rets[coll_id]['has_unsynced'] = True
0632 return rets
0633 except Exception as error:
0634 raise error
0635
0636
0637 @read_session
0638 def get_content_ext_status_statistics_by_coll(request_id=None, transform_id=None, session=None):
0639 """
0640 Get contents_ext statistics grouped by (coll_id, status).
0641
0642 Returns a dict keyed by coll_id, each value being a dict of
0643 {status: N}.
0644
0645 :param request_id: request id.
0646 :param transform_id: transform id.
0647 :param session: The database session in use.
0648
0649 :returns: dict {coll_id: {status: count}}
0650 """
0651 try:
0652 query = session.query(
0653 models.Content_ext.coll_id,
0654 models.Content_ext.status,
0655 func.count(models.Content_ext.content_id).label('cnt')
0656 )
0657 if request_id is not None:
0658 query = query.filter(models.Content_ext.request_id == request_id)
0659 if transform_id is not None:
0660 query = query.filter(models.Content_ext.transform_id == transform_id)
0661 query = query.group_by(models.Content_ext.coll_id, models.Content_ext.status)
0662
0663 rets = {}
0664 for row in query.all():
0665 coll_id, status, cnt = row
0666 if coll_id not in rets:
0667 rets[coll_id] = {}
0668 rets[coll_id][status] = cnt
0669 return rets
0670 except Exception as error:
0671 raise error
0672
0673
0674 @transactional_session
0675 def update_content(content_id, parameters, session=None):
0676 """
0677 update a content.
0678
0679 :param content_id: the content id.
0680 :param parameters: A dictionary of parameters.
0681 :param session: The database session in use.
0682
0683 :raises NoObject: If no content is founded.
0684 :raises DatabaseException: If there is a database error.
0685
0686 """
0687 try:
0688 parameters['updated_at'] = datetime.datetime.utcnow()
0689
0690 session.query(models.Content).filter_by(content_id=content_id)\
0691 .update(parameters, synchronize_session=False)
0692 except sqlalchemy.orm.exc.NoResultFound as error:
0693 raise exceptions.NoObject('Content %s cannot be found: %s' % (content_id, error))
0694
0695
0696 @transactional_session
0697 def custom_bulk_insert_mappings_real(model, parameters, session=None):
0698 """
0699 insert contents in bulk
0700 """
0701 if not parameters:
0702 return
0703
0704 try:
0705 schema_prefix = f"{model.metadata.schema}." if model.metadata.schema else ""
0706 sequence_name = f'"{model.metadata.schema}"."CONTENT_ID_SEQ"' if model.metadata.schema else '"CONTENT_ID_SEQ"'
0707 table_name = f"{schema_prefix}{model.__tablename__}"
0708
0709 def get_row_value(row, column):
0710 """Process row values to ensure correct formatting for SQL"""
0711 val = row.get(column.name, None)
0712 if val is None:
0713 if column.name in ['map_id', 'fetch_status', 'sub_map_id', 'dep_sub_map_id', 'content_relation_type']:
0714 val = 0
0715 elif column.name in ['created_at', 'updated_at', 'accessed_at']:
0716 val = datetime.datetime.utcnow().isoformat()
0717 elif column.default is not None and not column.primary_key:
0718 default_val = column.default.arg
0719 if callable(default_val):
0720 try:
0721 return default_val()
0722 except TypeError:
0723 return None
0724 elif isinstance(default_val, expression.ClauseElement):
0725 return None
0726 return default_val
0727 return val
0728 elif isinstance(val, Enum):
0729 return val.value
0730 elif isinstance(val, datetime.datetime):
0731 return val.isoformat()
0732 elif isinstance(val, dict):
0733 return json_dumps(val)
0734 return val
0735
0736 if model.__tablename__.lower() in ['contents']:
0737 exclude_columns = ['content_id']
0738 else:
0739 exclude_columns = []
0740
0741 columns = [column for column in model.__mapper__.columns if column.name not in exclude_columns]
0742
0743 column_key_sql = ", ".join([column.name for column in columns])
0744 column_value_sql = ", ".join([f":{column.name}" for column in columns])
0745
0746
0747 updated_parameters = [
0748 {column.name: get_row_value(row, column) for column in columns} for row in parameters
0749 ]
0750
0751 if model.__tablename__.lower() == 'contents':
0752 sql = f"""
0753 INSERT INTO {table_name} (content_id, {column_key_sql})
0754 VALUES (nextval('{sequence_name}'), {column_value_sql})
0755 ON CONFLICT DO NOTHING
0756 """
0757 else:
0758 sql = f"""
0759 INSERT INTO {table_name} ({column_key_sql})
0760 VALUES ({column_value_sql})
0761 ON CONFLICT DO NOTHING
0762 """
0763
0764 stmt = sqlalchemy.text(sql)
0765 session.execute(stmt, updated_parameters)
0766
0767 except Exception as ex:
0768 print(f"custom_bulk_insert_mappings Exception: {ex}")
0769 print(traceback.format_exc())
0770 raise ex
0771
0772
0773 @transactional_session
0774 def custom_bulk_insert_mappings(model, parameters, batch_size=1000, session=None):
0775 """
0776 insert contents in bulk
0777 """
0778 if not parameters:
0779 return
0780
0781 dialect = session.bind.dialect.name
0782
0783 for i in range(0, len(parameters), batch_size):
0784 batch = parameters[i: i + batch_size]
0785 if dialect == 'postgresql':
0786 custom_bulk_insert_mappings_real(model=model, parameters=batch, session=session)
0787 else:
0788 session.bulk_insert_mappings(model, batch)
0789
0790
0791
0792
0793
0794 @transactional_session
0795 def custom_bulk_update_mappings_real(model, parameters, batch_size=1000, session=None):
0796 """
0797 update contents in bulk
0798 """
0799 if not parameters:
0800 return
0801
0802 select_keys = ['content_id', 'request_id', 'transform_id']
0803
0804 first_row = parameters[0]
0805 select_key_sql = [f"{key} = :{key}" for key in first_row if key in select_keys]
0806 update_key_sql = [f"{key} = :{key}" for key in first_row if key not in select_keys]
0807
0808 if not update_key_sql or not select_key_sql or 'content_id' not in first_row.keys():
0809 raise ValueError("No updatable columns found.")
0810
0811 for i in range(0, len(parameters), batch_size):
0812 batch = parameters[i: i + batch_size]
0813
0814
0815 updated_parameters = []
0816 for row in batch:
0817 updated_row = {
0818 key: (
0819 value.value if isinstance(value, Enum)
0820 else value.isoformat() if isinstance(value, datetime.datetime)
0821 else json_dumps(value) if isinstance(value, dict)
0822 else value
0823 )
0824 for key, value in row.items()
0825 }
0826 updated_parameters.append(updated_row)
0827
0828
0829 schema_prefix = f"{model.metadata.schema}." if model.metadata.schema else ""
0830 sql = f"""
0831 UPDATE {schema_prefix}{model.__tablename__}
0832 SET {", ".join(update_key_sql)}
0833 WHERE {" AND ".join(select_key_sql)}
0834 """
0835
0836 stmt = sqlalchemy.text(sql)
0837 session.execute(stmt, updated_parameters)
0838
0839
0840
0841
0842
0843 @transactional_session
0844 def custom_bulk_update_mappings(model, parameters, batch_size=1000, session=None):
0845 """
0846 update contents in bulk
0847 """
0848 if not parameters:
0849 return
0850
0851 column_key_groups = {}
0852 for row in parameters:
0853 keys = sorted(row.keys())
0854 keys_id = ','.join(keys)
0855 column_key_groups.setdefault(keys_id, []).append(row)
0856
0857 for keys_id, rows in column_key_groups.items():
0858 custom_bulk_update_mappings_real(model, rows, batch_size=batch_size, session=session)
0859
0860
0861 @transactional_session
0862 def update_contents(parameters, use_bulk_update_mappings=True, grouping=True, request_id=None, transform_id=None, session=None):
0863 """
0864 update contents.
0865
0866 :param parameters: list of dictionary of parameters.
0867 :param session: The database session in use.
0868
0869 :raises NoObject: If no content is founded.
0870 :raises DatabaseException: If there is a database error.
0871
0872 """
0873 try:
0874 if use_bulk_update_mappings:
0875 for parameter in parameters:
0876 parameter['updated_at'] = datetime.datetime.utcnow()
0877
0878
0879 custom_bulk_update_mappings(models.Content, parameters, session=session)
0880 elif grouping:
0881 groups = group_list(parameters, key='content_id')
0882 for group_key in groups:
0883 group = groups[group_key]
0884 keys = group['keys']
0885 items = group['items']
0886 items['updated_at'] = datetime.datetime.utcnow()
0887 query = session.query(models.Content)
0888 if request_id:
0889 query = query.filter(models.Content.request_id == request_id)
0890 if transform_id:
0891 query = query.filter(models.Content.transform_id == transform_id)
0892 query = query.filter(models.Content.content_id.in_(keys))\
0893 .update(items, synchronize_session=False)
0894 else:
0895 session.bulk_update_mappings(models.Content, parameters)
0896 except sqlalchemy.orm.exc.NoResultFound as error:
0897 raise exceptions.NoObject('Content cannot be found: %s' % (error))
0898
0899
0900 @transactional_session
0901 def update_dep_contents(request_id, content_dep_ids, status, bulk_size=10000, session=None):
0902 """
0903 update dependency contents.
0904
0905 :param content_dep_ids: list of content dependency id.
0906 :param status: Content status.
0907 :param session: The database session in use.
0908
0909 :raises NoObject: If no content is founded.
0910 :raises DatabaseException: If there is a database error.
0911
0912 """
0913 try:
0914 params = {'substatus': status}
0915 chunks = [content_dep_ids[i:i + bulk_size] for i in range(0, len(content_dep_ids), bulk_size)]
0916 for chunk in chunks:
0917 session.query(models.Content)\
0918 .filter(models.Content.request_id == request_id)\
0919 .filter(models.Content.content_id.in_(chunk))\
0920 .update(params, synchronize_session=False)
0921 except sqlalchemy.orm.exc.NoResultFound as error:
0922 raise exceptions.NoObject('Content cannot be found: %s' % (error))
0923
0924
0925 @transactional_session
0926 def delete_content(content_id=None, session=None):
0927 """
0928 delete a content.
0929
0930 :param content_id: The id of the content.
0931 :param session: The database session in use.
0932
0933 :raises NoObject: If no content is founded.
0934 :raises DatabaseException: If there is a database error.
0935 """
0936 try:
0937 session.query(models.Content).filter_by(content_id=content_id).delete()
0938 except sqlalchemy.orm.exc.NoResultFound as error:
0939 raise exceptions.NoObject('Content %s cannot be found: %s' % (content_id, error))
0940
0941
0942 @transactional_session
0943 def update_contents_to_others_by_dep_id(request_id=None, transform_id=None, session=None):
0944 """
0945 Update contents to others by content_dep_id.
0946
0947 :param request_id: The Request id.
0948 :param transfomr_id: The transform id.
0949 """
0950 try:
0951 idds_proc = sqlalchemy.text("CALL %s.update_contents_to_others(:request_id, :transform_id)" % session.schema)
0952 session.execute(idds_proc, {"request_id": request_id, "transform_id": transform_id})
0953 except Exception as ex:
0954 raise ex
0955
0956
0957 @transactional_session
0958 def update_contents_from_others_by_dep_id(request_id=None, transform_id=None, session=None):
0959 """
0960 Update contents from others by content_dep_id
0961
0962 :param request_id: The Request id.
0963 :param transfomr_id: The transform id.
0964 """
0965 try:
0966 idds_proc = sqlalchemy.text("CALL %s.update_contents_from_others(:request_id, :transform_id)" % session.schema)
0967 session.execute(idds_proc, {"request_id": request_id, "transform_id": transform_id})
0968 except Exception as ex:
0969 raise ex
0970
0971
0972 @transactional_session
0973 def update_contents_from_others_by_dep_id_pages_old(request_id=None, transform_id=None, page_size=1000, batch_size=500,
0974 status_not_to_check=None, logger=None, log_prefix=None, session=None):
0975 """
0976 Update contents from others by content_dep_id, with pages
0977
0978 :param request_id: The Request id.
0979 :param transfomr_id: The transform id.
0980 """
0981 try:
0982 if log_prefix is None:
0983 log_prefix = ""
0984
0985
0986
0987
0988
0989 main_subquery = session.query(
0990 models.Content.content_id,
0991 models.Content.substatus
0992 )
0993
0994 if request_id:
0995 main_subquery = main_subquery.filter(models.Content.request_id == request_id)
0996
0997 main_subquery = main_subquery.filter(
0998 models.Content.content_relation_type == ContentRelationType.Output,
0999 models.Content.substatus != ContentStatus.New
1000 ).subquery()
1001
1002
1003 dep_subquery = session.query(
1004 models.Content.content_id.label("content_id"),
1005 models.Content.content_dep_id.label("content_dep_id"),
1006 models.Content.substatus.label("substatus")
1007 )
1008
1009 if request_id:
1010 dep_subquery = dep_subquery.filter(models.Content.request_id == request_id)
1011 if transform_id:
1012 dep_subquery = dep_subquery.filter(models.Content.transform_id == transform_id)
1013 if status_not_to_check:
1014 dep_subquery = dep_subquery.filter(~models.Content.substatus.in_(status_not_to_check))
1015
1016 dep_subquery = dep_subquery.filter(models.Content.content_relation_type == ContentRelationType.InputDependency)
1017
1018
1019 last_id = None
1020 while True:
1021 paginated_query = dep_subquery.order_by(models.Content.content_id)
1022 if last_id is not None:
1023 paginated_query = paginated_query.filter(models.Content.content_id > last_id)
1024 else:
1025
1026
1027
1028 pass
1029
1030 paginated_query = paginated_query.limit(page_size)
1031 paginated_query = paginated_query.subquery()
1032
1033 paginated_query_deps_query = session.query(
1034 paginated_query.c.content_id.label('dep_content_id'),
1035 main_subquery.c.substatus,
1036 ).join(
1037 paginated_query,
1038 and_(
1039 paginated_query.c.content_dep_id == main_subquery.c.content_id,
1040 paginated_query.c.substatus != main_subquery.c.substatus
1041 )
1042 )
1043
1044
1045
1046
1047
1048
1049 results = paginated_query_deps_query.all()
1050 if not results:
1051 break
1052
1053 update_data = []
1054 for row in results:
1055 content_id = row[0]
1056 substatus = row[1]
1057 to_update = {"content_id": content_id, "substatus": substatus, "status": substatus}
1058 update_data.append(to_update)
1059 if last_id is None or content_id > last_id:
1060 last_id = content_id
1061
1062 for i in range(0, len(update_data), batch_size):
1063
1064
1065 custom_bulk_update_mappings(models.Content, update_data[i:i + batch_size], session=session)
1066
1067 if logger:
1068 logger.debug(f"{log_prefix}update_contents_from_others_by_dep_id_pages: last_id: {last_id}")
1069 except Exception as ex:
1070 raise ex
1071
1072
1073 @transactional_session
1074 def update_contents_from_others_by_dep_id_pages_with_coll_id(request_id, transform_id, coll_id, page_size=5000, batch_size=2000,
1075 status_not_to_check=None, logger=None, log_prefix=None, session=None):
1076 """
1077 Update contents from others by content_dep_id, with pages
1078
1079 :param request_id: The Request id.
1080 :param transfomr_id: The transform id.
1081 """
1082 try:
1083 if log_prefix is None:
1084 log_prefix = ""
1085
1086 start = time.time()
1087
1088 main_subquery = (
1089 session.query(
1090 models.Content.content_id,
1091 models.Content.substatus.label("substatus")
1092 )
1093 .filter(models.Content.request_id == request_id)
1094 .filter(models.Content.coll_id == coll_id)
1095 .filter(models.Content.content_relation_type == ContentRelationType.Output)
1096 .filter(models.Content.substatus != ContentStatus.New)
1097 )
1098
1099 main_subquery = main_subquery.subquery()
1100
1101 to_update = (
1102 update(models.Content)
1103 .values(status=main_subquery.c.substatus, substatus=main_subquery.c.substatus)
1104 .where(models.Content.request_id == request_id)
1105 .where(models.Content.transform_id == transform_id)
1106 .where(models.Content.coll_id == coll_id)
1107 .where(models.Content.content_relation_type == ContentRelationType.InputDependency)
1108 .where(models.Content.content_dep_id == main_subquery.c.content_id)
1109 .where(models.Content.substatus != main_subquery.c.substatus)
1110 )
1111
1112
1113 result = session.execute(to_update)
1114 session.commit()
1115
1116 end = time.time()
1117 time_used = end - start
1118 if logger:
1119 logger.debug(f"Updated {result.rowcount} rows (with skip locked)")
1120 logger.info(f"Update request_id {request_id} transform_id {transform_id} coll_id {coll_id} rows {result.rowcount} with {time_used} seconds")
1121 return result.rowcount
1122 except Exception as ex:
1123 raise ex
1124
1125
1126 @transactional_session
1127 def update_contents_from_others_by_dep_id_pages(request_id=None, transform_id=None, page_size=5000, batch_size=5000, status_not_to_check=None,
1128 logger=None, log_prefix=None, session=None):
1129 """
1130 Update contents from others by content_dep_id, with pages
1131
1132 :param request_id: The Request id.
1133 :param transfomr_id: The transform id.
1134 """
1135 try:
1136 if log_prefix is None:
1137 log_prefix = ""
1138
1139
1140 coll_query = session.query(
1141 models.Content.request_id,
1142 models.Content.transform_id,
1143 models.Content.coll_id
1144 )
1145
1146 if request_id:
1147 coll_query = coll_query.filter(models.Content.request_id == request_id)
1148 if transform_id:
1149 coll_query = coll_query.filter(models.Content.transform_id == transform_id)
1150 if status_not_to_check:
1151 coll_query = coll_query.filter(~models.Content.substatus.in_(status_not_to_check))
1152
1153 coll_query = coll_query.filter(models.Content.content_relation_type == ContentRelationType.InputDependency)
1154 coll_query = coll_query.group_by(models.Content.request_id, models.Content.transform_id, models.Content.coll_id)
1155 coll_rows = coll_query.all()
1156
1157 coll_req_tf_coll = [(row.request_id, row.transform_id, row.coll_id) for row in coll_rows]
1158 coll_ids = [row.coll_id for row in coll_rows]
1159
1160
1161 src_coll_query = session.query(
1162 models.Content.coll_id
1163 )
1164 if request_id:
1165 src_coll_query = src_coll_query.filter(models.Content.request_id == request_id)
1166 src_coll_query = src_coll_query.filter(models.Content.coll_id.in_(coll_ids))
1167 src_coll_query = src_coll_query.filter(models.Content.content_relation_type == ContentRelationType.Output)
1168 if status_not_to_check:
1169 src_coll_query = src_coll_query.filter(models.Content.substatus.in_(status_not_to_check))
1170 src_coll_rows = src_coll_query.distinct()
1171 src_coll_ids = [row.coll_id for row in src_coll_rows]
1172
1173
1174 for req_id, tf_id, coll_id in coll_req_tf_coll:
1175 if coll_id in src_coll_ids:
1176 update_contents_from_others_by_dep_id_pages_with_coll_id(request_id=request_id,
1177 transform_id=transform_id,
1178 coll_id=coll_id,
1179 page_size=page_size,
1180 batch_size=batch_size,
1181 status_not_to_check=status_not_to_check,
1182 logger=logger,
1183 log_prefix=log_prefix,
1184 session=session)
1185 except Exception as ex:
1186 raise ex
1187
1188
1189 @transactional_session
1190 def update_input_contents_by_dependency_pages(request_id=None, transform_id=None, page_size=2000, batch_size=2000, logger=None,
1191 log_prefix=None, terminated=False, status_not_to_check=None, session=None):
1192 """
1193 Update contents input contents by dependencies, with pages
1194
1195 :param request_id: The Request id.
1196 :param transfomr_id: The transform id.
1197 """
1198 try:
1199 if log_prefix is None:
1200 log_prefix = ""
1201
1202
1203
1204
1205
1206 query_ex = session.query(
1207 models.Content.request_id.label("request_id"),
1208 models.Content.transform_id.label("transform_id"),
1209 models.Content.map_id.label("map_id"),
1210 models.Content.sub_map_id.label("sub_map_id"),
1211 )
1212
1213 if request_id:
1214 query_ex = query_ex.filter(models.Content.request_id == request_id)
1215 if transform_id:
1216 query_ex = query_ex.filter(models.Content.transform_id == transform_id)
1217
1218 query_ex = query_ex.filter(
1219 and_(
1220 models.Content.content_relation_type == 3,
1221 models.Content.substatus == ContentStatus.New
1222 )
1223 ).distinct().subquery()
1224
1225
1226 query_deps = session.query(
1227 models.Content.request_id.label("request_id"),
1228 models.Content.transform_id.label("transform_id"),
1229 models.Content.map_id.label("map_id"),
1230 models.Content.sub_map_id.label("sub_map_id"),
1231 models.Content.content_id.label("content_id"),
1232 models.Content.substatus.label("substatus")
1233 )
1234
1235 if request_id:
1236 query_deps = query_deps.filter(models.Content.request_id == request_id)
1237 if transform_id:
1238 query_deps = query_deps.filter(models.Content.transform_id == transform_id)
1239
1240 query_deps = query_deps.filter(
1241 models.Content.content_relation_type == 3
1242 )
1243
1244 query_deps = query_deps.subquery()
1245
1246
1247
1248
1249
1250
1251
1252 main_query = session.query(
1253 models.Content.content_id,
1254 models.Content.request_id,
1255 models.Content.transform_id,
1256 models.Content.map_id,
1257 models.Content.sub_map_id
1258 )
1259
1260 if request_id:
1261 main_query = main_query.filter(models.Content.request_id == request_id)
1262
1263 if transform_id:
1264 main_query = main_query.filter(models.Content.transform_id == transform_id)
1265
1266 if status_not_to_check:
1267 main_query = main_query.filter(~(models.Content.substatus.in_(status_not_to_check)))
1268
1269 main_query = main_query.filter(models.Content.content_relation_type == 0)
1270
1271 main_query = main_query.filter(
1272 ~exists(
1273 select(
1274 models.Content.request_id,
1275 models.Content.transform_id,
1276 models.Content.map_id,
1277 models.Content.sub_map_id
1278 ).where(
1279 models.Content.request_id == query_ex.c.request_id,
1280 models.Content.transform_id == query_ex.c.transform_id,
1281 models.Content.map_id == query_ex.c.map_id,
1282 models.Content.sub_map_id == query_ex.c.sub_map_id
1283 )
1284 )
1285 )
1286
1287
1288 def custom_aggregation(key, values, terminated=False):
1289 input_request_id, request_id, transform_id, map_id, sub_map_id, input_content_id = key
1290 if request_id is None:
1291
1292
1293 logger.debug(f"{log_prefix}custom_aggregation, no dependencies")
1294 return ContentStatus.Available
1295
1296
1297 available_status = [ContentStatus.Available, ContentStatus.FakeAvailable]
1298 final_terminated_status = [ContentStatus.Available, ContentStatus.FakeAvailable,
1299 ContentStatus.FinalFailed, ContentStatus.Missing,
1300 ContentStatus.FinalSubAvailable]
1301 terminated_status = [ContentStatus.Available, ContentStatus.FakeAvailable,
1302 ContentStatus.Failed, ContentStatus.FinalFailed,
1303 ContentStatus.Missing, ContentStatus.FinalSubAvailable]
1304
1305 if all(v in available_status for v in values):
1306 return ContentStatus.Available
1307 elif all(v in final_terminated_status for v in values):
1308 return ContentStatus.Missing
1309 elif terminated and all(v in terminated_status for v in values):
1310 return ContentStatus.Missing
1311 return ContentStatus.New
1312
1313
1314 last_id = None
1315 while True:
1316
1317 paginated_query = main_query.order_by(models.Content.content_id)
1318 if last_id:
1319 paginated_query = paginated_query.filter(models.Content.content_id > last_id)
1320
1321 paginated_query = paginated_query.limit(page_size)
1322 paginated_query = paginated_query.subquery()
1323
1324 paginated_query_deps_query = session.query(
1325 paginated_query.c.content_id.label('input_content_id'),
1326 paginated_query.c.request_id.label('input_request_id'),
1327 query_deps.c.request_id,
1328 query_deps.c.transform_id,
1329 query_deps.c.map_id,
1330 query_deps.c.sub_map_id,
1331 query_deps.c.content_id,
1332 query_deps.c.substatus,
1333 ).outerjoin(
1334 query_deps,
1335 and_(
1336 paginated_query.c.request_id == query_deps.c.request_id,
1337 paginated_query.c.transform_id == query_deps.c.transform_id,
1338 paginated_query.c.map_id == query_deps.c.map_id,
1339 paginated_query.c.sub_map_id == query_deps.c.sub_map_id
1340 )
1341 ).order_by(
1342 query_deps.c.request_id, query_deps.c.transform_id,
1343 query_deps.c.map_id, query_deps.c.sub_map_id
1344 )
1345
1346 paginated_query_deps = paginated_query_deps_query.all()
1347
1348
1349
1350
1351
1352
1353 if not paginated_query_deps:
1354 break
1355
1356
1357 grouped_data = defaultdict(list)
1358 for row in paginated_query_deps:
1359 input_content_id, input_request_id, request_id, transform_id, map_id, sub_map_id, content_id, status = row
1360 grouped_data[(input_request_id, request_id, transform_id, map_id, sub_map_id, input_content_id)].append(status)
1361
1362 if last_id is None or input_content_id > last_id:
1363 last_id = input_content_id
1364
1365 aggregated_results = {key: custom_aggregation(key, values, terminated=terminated) for key, values in grouped_data.items()}
1366
1367 update_data = [
1368 {
1369 "content_id": key[5],
1370 "request_id": key[0],
1371 "substatus": value
1372 }
1373 for key, value in aggregated_results.items()
1374 ]
1375
1376 for i in range(0, len(update_data), batch_size):
1377
1378
1379 custom_bulk_update_mappings(models.Content, update_data[i:i + batch_size], session=session)
1380 session.commit()
1381
1382 if logger:
1383 logger.debug(f"{log_prefix}update_input_contents_by_dependency_pages: last_id {last_id}")
1384 except Exception as ex:
1385 raise ex
1386
1387
1388 @read_session
1389 def get_update_contents_from_others_by_dep_id(request_id=None, transform_id=None, session=None):
1390 """
1391 Get contents to update from others by content_dep_id
1392
1393 :param request_id: The Request id.
1394 :param transfomr_id: The transform id.
1395 """
1396 try:
1397 subquery = session.query(models.Content.content_id,
1398 models.Content.substatus)
1399 if request_id:
1400 subquery = subquery.filter(models.Content.request_id == request_id)
1401 subquery = subquery.filter(models.Content.content_relation_type == 1)\
1402 .filter(models.Content.substatus != ContentStatus.New)
1403 subquery = subquery.subquery()
1404
1405 columns = [models.Content.content_id, subquery.c.substatus]
1406 column_names = [column.name for column in columns]
1407
1408 query = session.query(*columns)
1409 if request_id:
1410 query = query.filter(models.Content.request_id == request_id)
1411 if transform_id:
1412 query = query.filter(models.Content.transform_id == transform_id)
1413 query = query.filter(models.Content.content_relation_type == 3)
1414 query = query.join(subquery, and_(models.Content.content_dep_id == subquery.c.content_id,
1415 models.Content.substatus != subquery.c.substatus))
1416
1417 tmp = query.distinct()
1418 rets = []
1419 if tmp:
1420 for t in tmp:
1421 t2 = dict(zip(column_names, t))
1422 rets.append(t2)
1423 return rets
1424 except Exception as ex:
1425 raise ex
1426
1427
1428 @read_session
1429 def get_updated_transforms_by_content_status(request_id=None, transform_id=None, check_substatus=False, session=None):
1430 """
1431 Get updated transform ids by content status
1432
1433 :param request_id: The Request id.
1434
1435 :returns list
1436 """
1437 try:
1438 subquery = session.query(models.Content.content_id,
1439 models.Content.substatus)
1440
1441 if request_id:
1442 subquery = subquery.filter(models.Content.request_id == request_id)
1443 if transform_id:
1444 subquery = subquery.filter(models.Content.transform_id == transform_id)
1445 subquery = subquery.filter(models.Content.content_relation_type == 1)
1446 subquery = subquery.subquery()
1447
1448 columns = [models.Content.request_id,
1449 models.Content.transform_id,
1450 models.Content.workload_id,
1451 models.Content.coll_id]
1452 column_names = [column.name for column in columns]
1453 query = session.query(*columns)
1454
1455
1456 if request_id:
1457 query = query.filter(models.Content.request_id == request_id)
1458 query = query.filter(models.Content.content_relation_type == 3)
1459 if check_substatus:
1460 query = query.join(subquery, and_(models.Content.content_dep_id == subquery.c.content_id,
1461 models.Content.substatus != subquery.c.substatus))
1462 else:
1463 query = query.join(subquery, and_(models.Content.content_dep_id == subquery.c.content_id))
1464 tmp = query.distinct()
1465
1466 rets = []
1467 if tmp:
1468 for t in tmp:
1469 t2 = dict(zip(column_names, t))
1470 rets.append(t2)
1471 return rets
1472 except Exception as error:
1473 raise error
1474
1475
1476 def get_contents_ext_items():
1477 default_params = {'pandaID': None, 'jobDefinitionID': None, 'schedulerID': None,
1478 'pilotID': None, 'creationTime': None, 'modificationTime': None,
1479 'startTime': None, 'endTime': None, 'prodSourceLabel': None,
1480 'prodUserID': None, 'assignedPriority': None, 'currentPriority': None,
1481 'attemptNr': None, 'maxAttempt': None, 'maxCpuCount': None,
1482 'maxCpuUnit': None, 'maxDiskCount': None, 'maxDiskUnit': None,
1483 'minRamCount': None, 'maxRamUnit': None, 'cpuConsumptionTime': None,
1484 'cpuConsumptionUnit': None, 'jobStatus': None, 'jobName': None,
1485 'transExitCode': None, 'pilotErrorCode': None, 'pilotErrorDiag': None,
1486 'exeErrorCode': None, 'exeErrorDiag': None, 'supErrorCode': None,
1487 'supErrorDiag': None, 'ddmErrorCode': None, 'ddmErrorDiag': None,
1488 'brokerageErrorCode': None, 'brokerageErrorDiag': None,
1489 'jobDispatcherErrorCode': None, 'jobDispatcherErrorDiag': None,
1490 'taskBufferErrorCode': None, 'taskBufferErrorDiag': None,
1491 'computingSite': None, 'computingElement': None,
1492 'grid': None, 'cloud': None, 'cpuConversion': None, 'taskID': None,
1493 'vo': None, 'pilotTiming': None, 'workingGroup': None,
1494 'processingType': None, 'prodUserName': None, 'coreCount': None,
1495 'nInputFiles': None, 'reqID': None, 'jediTaskID': None,
1496 'actualCoreCount': None, 'maxRSS': None, 'maxVMEM': None,
1497 'maxSWAP': None, 'maxPSS': None, 'avgRSS': None, 'avgVMEM': None,
1498 'avgSWAP': None, 'avgPSS': None, 'maxWalltime': None, 'diskIO': None,
1499 'failedAttempt': None, 'hs06': None, 'hs06sec': None,
1500 'memory_leak': None, 'memory_leak_x2': None, 'job_label': None}
1501 return default_params
1502
1503
1504 def get_contents_ext_maps():
1505 default_params = {'panda_id': 'PandaID', 'job_definition_id': 'jobDefinitionID', 'scheduler_id': 'schedulerID',
1506 'pilot_id': 'pilotID', 'creation_time': 'creationTime', 'modification_time': 'modificationTime',
1507 'start_time': 'startTime', 'end_time': 'endTime', 'prod_source_label': 'prodSourceLabel',
1508 'prod_user_id': 'prodUserID', 'assigned_priority': 'assignedPriority', 'current_priority': 'currentPriority',
1509 'attempt_nr': 'attemptNr', 'max_attempt': 'maxAttempt', 'max_cpu_count': 'maxCpuCount',
1510 'max_cpu_unit': 'maxCpuUnit', 'max_disk_count': 'maxDiskCount', 'max_disk_unit': 'maxDiskUnit',
1511 'min_ram_count': 'minRamCount', 'min_ram_unit': 'minRamUnit', 'cpu_consumption_time': 'cpuConsumptionTime',
1512 'cpu_consumption_unit': 'cpuConsumptionUnit', 'job_status': 'jobStatus', 'job_name': 'jobName',
1513 'trans_exit_code': 'transExitCode', 'pilot_error_code': 'pilotErrorCode', 'pilot_error_diag': 'pilotErrorDiag',
1514 'exe_error_code': 'exeErrorCode', 'exe_error_diag': 'exeErrorDiag', 'sup_error_code': 'supErrorCode',
1515 'sup_error_diag': 'supErrorDiag', 'ddm_error_code': 'ddmErrorCode', 'ddm_error_diag': 'ddmErrorDiag',
1516 'brokerage_error_code': 'brokerageErrorCode', 'brokerage_error_diag': 'brokerageErrorDiag',
1517 'job_dispatcher_error_code': 'jobDispatcherErrorCode', 'job_dispatcher_error_diag': 'jobDispatcherErrorDiag',
1518 'task_buffer_error_code': 'taskBufferErrorCode', 'task_buffer_error_diag': 'taskBufferErrorDiag',
1519 'computing_site': 'computingSite', 'computing_element': 'computingElement',
1520 'grid': 'grid', 'cloud': 'cloud', 'cpu_conversion': 'cpuConversion', 'task_id': 'taskID',
1521 'vo': 'VO', 'pilot_timing': 'pilotTiming', 'working_group': 'workingGroup',
1522 'processing_type': 'processingType', 'prod_user_name': 'prodUserName', 'core_count': 'coreCount',
1523 'n_input_files': 'nInputFiles', 'req_id': 'reqID', 'jedi_task_id': 'jediTaskID',
1524 'actual_core_count': 'actualCoreCount', 'max_rss': 'maxRSS', 'max_vmem': 'maxVMEM',
1525 'max_swap': 'maxSWAP', 'max_pss': 'maxPSS', 'avg_rss': 'avgRSS', 'avg_vmem': 'avgVMEM',
1526 'avg_swap': 'avgSWAP', 'avg_pss': 'avgPSS', 'max_walltime': 'maxWalltime', 'disk_io': 'diskIO',
1527 'failed_attempt': 'failedAttempt', 'hs06': 'hs06', 'hs06sec': 'hs06sec',
1528 'memory_leak': 'memory_leak', 'memory_leak_x2': 'memory_leak_x2', 'job_label': 'job_label'}
1529 return default_params
1530
1531
1532 @transactional_session
1533 def add_contents_update(contents, bulk_size=10000, session=None):
1534 """
1535 Add contents update.
1536
1537 :param contents: dict of contents.
1538 :param session: session.
1539
1540 :raises DuplicatedObject: If a collection with the same name exists.
1541 :raises DatabaseException: If there is a database error.
1542
1543 :returns: content ids.
1544 """
1545 sub_params = [contents[i:i + bulk_size] for i in range(0, len(contents), bulk_size)]
1546
1547 try:
1548 for sub_param in sub_params:
1549
1550 custom_bulk_insert_mappings(models.Content_update, sub_param, session=session)
1551 content_ids = [None for _ in range(len(contents))]
1552 return content_ids
1553 except IntegrityError as error:
1554 raise exceptions.DuplicatedObject('Duplicated objects: %s' % (error))
1555 except DatabaseError as error:
1556 raise exceptions.DatabaseException(error)
1557
1558
1559 @transactional_session
1560 def set_fetching_contents_update(request_id=None, transform_id=None, fetch=True, session=None):
1561 """
1562 Set fetching contents update.
1563
1564 :param session: session.
1565 """
1566 try:
1567 if fetch:
1568 query = session.query(models.Content_update)
1569 if request_id:
1570 query = query.filter(models.Content_update.request_id == request_id)
1571 if transform_id:
1572 query = query.filter(models.Content_update.transform_id == transform_id)
1573 query.update({'fetch_status': ContentFetchStatus.Fetching})
1574 except sqlalchemy.orm.exc.NoResultFound as error:
1575 raise exceptions.NoObject('No record can be found with (transform_id=%s): %s' %
1576 (transform_id, error))
1577 except Exception as error:
1578 raise error
1579
1580
1581 @read_session
1582 def get_contents_update(request_id=None, transform_id=None, fetch=False, session=None):
1583 """
1584 Get contents update.
1585
1586 :param session: session.
1587 """
1588 try:
1589 if fetch:
1590 query = session.query(models.Content_update)
1591 if request_id:
1592 query = query.filter(models.Content_update.request_id == request_id)
1593 if transform_id:
1594 query = query.filter(models.Content_update.transform_id == transform_id)
1595 query = query.filter(models.Content_update.fetch_status == ContentFetchStatus.Fetching)
1596 else:
1597 query = session.query(models.Content_update)
1598 if request_id:
1599 query = query.filter(models.Content_update.request_id == request_id)
1600 if transform_id:
1601 query = query.filter(models.Content_update.transform_id == transform_id)
1602
1603 tmp = query.all()
1604 rets = []
1605 if tmp:
1606 for t in tmp:
1607 rets.append(t.to_dict())
1608 return rets
1609 except sqlalchemy.orm.exc.NoResultFound as error:
1610 raise exceptions.NoObject('No record can be found with (transform_id=%s): %s' %
1611 (transform_id, error))
1612 except Exception as error:
1613 raise error
1614
1615
1616 @transactional_session
1617 def delete_contents_update(request_id=None, transform_id=None, contents=[], bulk_size=1000, fetch=False, session=None):
1618 """
1619 delete a content.
1620
1621 :param session: The database session in use.
1622
1623 :raises NoObject: If no content is founded.
1624 :raises DatabaseException: If there is a database error.
1625 """
1626 try:
1627 if fetch:
1628 del_query = session.query(models.Content_update)
1629 if request_id:
1630 del_query = del_query.filter(models.Content_update.request_id == request_id)
1631 if transform_id:
1632 del_query = del_query.filter(models.Content_update.transform_id == transform_id)
1633 del_query = del_query.filter(models.Content_update.fetch_status == ContentFetchStatus.Fetching)
1634 del_query.delete()
1635 else:
1636 if contents:
1637 contents_sub_params = [contents[i:i + bulk_size] for i in range(0, len(contents), bulk_size)]
1638
1639 for contents_sub_param in contents_sub_params:
1640 del_query = session.query(models.Content_update)
1641 if request_id:
1642 del_query = del_query.filter(models.Content_update.request_id == request_id)
1643 if transform_id:
1644 del_query = del_query.filter(models.Content_update.transform_id == transform_id)
1645 if contents_sub_param:
1646 del_query = del_query.filter(models.Content_update.content_id.in_(contents_sub_param))
1647 del_query.with_for_update(nowait=True, skip_locked=True)
1648 del_query.delete()
1649 else:
1650 del_query = session.query(models.Content_update)
1651 if request_id:
1652 del_query = del_query.filter(models.Content_update.request_id == request_id)
1653 if transform_id:
1654 del_query = del_query.filter(models.Content_update.transform_id == transform_id)
1655 del_query.with_for_update(nowait=True, skip_locked=True)
1656 del_query.delete()
1657 except Exception as error:
1658 raise exceptions.NoObject('Content_update deletion error: %s' % (error))
1659
1660
1661 @transactional_session
1662 def add_contents_ext(contents, bulk_size=10000, session=None):
1663 """
1664 Add contents ext.
1665
1666 :param contents: dict of contents.
1667 :param session: session.
1668
1669 :raises DuplicatedObject: If a collection with the same name exists.
1670 :raises DatabaseException: If there is a database error.
1671
1672 :returns: content ids.
1673 """
1674 default_params = get_contents_ext_items()
1675 default_params['status'] = ContentStatus.New
1676
1677 for content in contents:
1678 for key in default_params:
1679 if key not in content:
1680 content[key] = default_params[key]
1681
1682 sub_params = [contents[i:i + bulk_size] for i in range(0, len(contents), bulk_size)]
1683
1684 try:
1685 for sub_param in sub_params:
1686
1687 custom_bulk_insert_mappings(models.Content_ext, sub_param, session=session)
1688 content_ids = [None for _ in range(len(contents))]
1689 return content_ids
1690 except IntegrityError as error:
1691 raise exceptions.DuplicatedObject('Duplicated objects: %s' % (error))
1692 except DatabaseError as error:
1693 raise exceptions.DatabaseException(error)
1694
1695
1696 @transactional_session
1697 def update_contents_ext(parameters, use_bulk_update_mappings=True, request_id=None, transform_id=None, session=None):
1698 """
1699 update contents ext.
1700
1701 :param parameters: list of dictionary of parameters.
1702 :param session: The database session in use.
1703
1704 :raises NoObject: If no content is founded.
1705 :raises DatabaseException: If there is a database error.
1706
1707 """
1708 try:
1709 if use_bulk_update_mappings:
1710
1711 custom_bulk_update_mappings(models.Content_ext, parameters, session=session)
1712 else:
1713 groups = group_list(parameters, key='content_id')
1714 for group_key in groups:
1715 group = groups[group_key]
1716 keys = group['keys']
1717 items = group['items']
1718 query = session.query(models.Content_ext)
1719 if request_id:
1720 query = query.filter(models.Content.request_id == request_id)
1721 if transform_id:
1722 query = query.filter(models.Content.transform_id == transform_id)
1723 query = query.filter(models.Content.content_id.in_(keys))\
1724 .update(items, synchronize_session=False)
1725 except sqlalchemy.orm.exc.NoResultFound as error:
1726 raise exceptions.NoObject('Content cannot be found: %s' % (error))
1727
1728
1729 @read_session
1730 def get_contents_ext(request_id=None, transform_id=None, workload_id=None, coll_id=None, status=None, session=None):
1731 """
1732 Get content or raise a NoObject exception.
1733
1734 :param request_id: request id.
1735 :param transform_id: transform id.
1736 :param workload_id: workload id.
1737
1738 :param session: The database session in use.
1739
1740 :raises NoObject: If no content is founded.
1741
1742 :returns: list of contents.
1743 """
1744
1745 try:
1746 if status is not None:
1747 if not isinstance(status, (tuple, list)):
1748 status = [status]
1749
1750 query = session.query(models.Content_ext)
1751 if request_id:
1752 query = query.filter(models.Content_ext.request_id == request_id)
1753 if transform_id:
1754 query = query.filter(models.Content_ext.transform_id == transform_id)
1755 if workload_id:
1756 query = query.filter(models.Content_ext.workload_id == workload_id)
1757 if coll_id:
1758 query = query.filter(models.Content_ext.coll_id == coll_id)
1759 if status is not None:
1760 query = query.filter(models.Content_ext.status.in_(status))
1761 query = query.order_by(asc(models.Content_ext.request_id), asc(models.Content_ext.transform_id), asc(models.Content_ext.map_id))
1762
1763 tmp = query.all()
1764 rets = []
1765 if tmp:
1766 for t in tmp:
1767 rets.append(t.to_dict())
1768 return rets
1769 except sqlalchemy.orm.exc.NoResultFound as error:
1770 raise exceptions.NoObject('No record can be found with (transform_id=%s): %s' %
1771 (transform_id, error))
1772 except Exception as error:
1773 raise error
1774
1775
1776 @read_session
1777 def get_contents_ext_ids(request_id=None, transform_id=None, workload_id=None, coll_id=None, status=None, session=None):
1778 """
1779 Get content or raise a NoObject exception.
1780
1781 :param request_id: request id.
1782 :param transform_id: transform id.
1783 :param workload_id: workload id.
1784
1785 :param session: The database session in use.
1786
1787 :raises NoObject: If no content is founded.
1788
1789 :returns: list of content ids.
1790 """
1791
1792 try:
1793 if status is not None:
1794 if not isinstance(status, (tuple, list)):
1795 status = [status]
1796
1797 columns = [models.Content_ext.request_id,
1798 models.Content_ext.transform_id,
1799 models.Content_ext.workload_id,
1800 models.Content_ext.coll_id,
1801 models.Content_ext.content_id,
1802 models.Content_ext.panda_id,
1803 models.Content_ext.status]
1804 column_names = [column.name for column in columns]
1805 query = session.query(*columns)
1806 if request_id:
1807 query = query.filter(models.Content_ext.request_id == request_id)
1808 if transform_id:
1809 query = query.filter(models.Content_ext.transform_id == transform_id)
1810 if workload_id:
1811 query = query.filter(models.Content_ext.workload_id == workload_id)
1812 if coll_id:
1813 query = query.filter(models.Content_ext.coll_id == coll_id)
1814 if status is not None:
1815 query = query.filter(models.Content_ext.status.in_(status))
1816 query = query.order_by(asc(models.Content_ext.request_id), asc(models.Content_ext.transform_id), asc(models.Content_ext.map_id))
1817
1818 tmp = query.all()
1819 rets = []
1820 if tmp:
1821 for t in tmp:
1822 t2 = dict(zip(column_names, t))
1823 rets.append(t2)
1824 return rets
1825 except sqlalchemy.orm.exc.NoResultFound as error:
1826 raise exceptions.NoObject('No record can be found with (transform_id=%s): %s' %
1827 (transform_id, error))
1828 except Exception as error:
1829 raise error
1830
1831
1832 def combine_contents_ext(contents, contents_ext, with_status_name=False):
1833 contents_ext_map = {}
1834 for content in contents_ext:
1835 contents_ext_map[content['content_id']] = content
1836
1837 rets = []
1838 for content in contents:
1839 content_id = content['content_id']
1840 ret = content
1841 if content_id in contents_ext_map:
1842 contents_ext_map[content_id].update(content)
1843 ret = contents_ext_map[content_id]
1844 else:
1845 default_params = get_contents_ext_maps()
1846 for key in default_params:
1847 default_params[key] = None
1848
1849 default_params.update(content)
1850 ret = default_params
1851 if with_status_name:
1852 ret['status'] = content['status'].name
1853 else:
1854 ret['status'] = content['status']
1855 ret['scope'] = content['scope']
1856 ret['name'] = content['name']
1857
1858 rets.append(ret)
1859 return rets