File indexing completed on 2026-04-09 07:58:19
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 operations related to Catalog(Collections and Contents).
0014 """
0015
0016
0017 from idds.common import exceptions
0018 from idds.common.constants import (CollectionType, CollectionStatus, CollectionLocking,
0019 CollectionRelationType, ContentType, ContentStatus,
0020 ContentLocking, ContentRelationType)
0021 from idds.orm.base.session import read_session, transactional_session
0022 from idds.orm import (transforms as orm_transforms,
0023 collections as orm_collections,
0024 contents as orm_contents,
0025 messages as orm_messages)
0026
0027
0028 @transactional_session
0029 def get_collections_by_status(status, relation_type=CollectionRelationType.Input, time_period=None,
0030 locking=False, bulk_size=None, to_json=False, session=None):
0031 """
0032 Get collections by status, relation_type and time_period or raise a NoObject exception.
0033
0034 :param status: The collection status.
0035 :param relation_type: The relation_type of the collection to the transform.
0036 :param time_period: time period in seconds since last update.
0037 :param locking: Whether to retrieve unlocked files and lock them.
0038 :param to_json: return json format.
0039
0040 :param session: The database session in use.
0041
0042 :raises NoObject: If no collections are founded.
0043
0044 :returns: list of Collections.
0045 """
0046 colls = orm_collections.get_collections_by_status(status=status, relation_type=relation_type, bulk_size=bulk_size,
0047 time_period=time_period, locking=locking, to_json=to_json,
0048 session=session)
0049
0050 if locking:
0051 parameters = {'locking': CollectionLocking.Locking}
0052 for coll in colls:
0053 orm_collections.update_collection(coll_id=coll['coll_id'], parameters=parameters, session=session)
0054 return colls
0055
0056
0057 @read_session
0058 def get_collections(scope=None, name=None, request_id=None, workload_id=None, transform_id=None,
0059 relation_type=None, to_json=False, session=None):
0060 """
0061 Get collections by scope, name, request_id and workload id.
0062
0063 :param scope: scope of the collection.
0064 :param name: name the the collection.
0065 :param request_id: the request id.
0066 :param workload_id: The workload_id of the request.
0067 :param transform_id: The transform id related to this collection.
0068 :param relation_type: The relation between this collection and its transform,
0069 such as Input, Output, Log and so on.
0070 :param to_json: return json format.
0071 :param session: The database session in use.
0072
0073 :returns: dict of collections
0074 """
0075 collections = orm_collections.get_collections(scope=scope, name=name, request_id=request_id,
0076 workload_id=workload_id, transform_id=transform_id,
0077 to_json=to_json,
0078 relation_type=relation_type, session=session)
0079 return collections
0080
0081
0082 @read_session
0083 def get_collections_by_request_ids(request_ids, session=None):
0084 """"
0085 Get collections by a list of request ids.
0086
0087 :param request_ids: list of request ids.
0088
0089 :return collections: list of collections.
0090 """
0091 return orm_collections.get_collections_by_request_ids(request_ids)
0092
0093
0094 @transactional_session
0095 def add_collection(request_id, workload_id, scope, name, coll_type=CollectionType.Dataset, transform_id=None,
0096 relation_type=CollectionRelationType.Input, bytes=0, status=CollectionStatus.New,
0097 total_files=0, new_files=0, processing_files=0, processed_files=0, retries=0,
0098 expired_at=None, coll_metadata=None, session=None):
0099 """
0100 Add a collection.
0101
0102 :param scope: The scope of the request data.
0103 :param name: The name of the request data.
0104 :param coll_type: The type of dataset as dataset or container.
0105 :param request_id: The request id.
0106 :param workload_id: The workload id.
0107 :param transform_id: The transform id related to this collection.
0108 :param relation_type: The relation between this collection and its transform,
0109 such as Input, Output, Log and so on.
0110 :param bytes: The size of the collection.
0111 :param status: The status.
0112 :param total_files: Number of total files.
0113 :param retries: Number of retries.
0114 :param expired_at: The datetime when it expires.
0115 :param coll_metadata: The metadata as json.
0116
0117 :raises DuplicatedObject: If a collection with the same name exists.
0118 :raises DatabaseException: If there is a database error.
0119
0120 :returns: collection id.
0121 """
0122 orm_collections.add_collection(request_id=request_id, workload_id=workload_id,
0123 scope=scope, name=name, coll_type=coll_type,
0124 transform_id=transform_id, relation_type=relation_type,
0125 bytes=bytes, status=status, total_files=total_files,
0126 new_files=new_files, processing_files=processing_files,
0127 processed_files=processed_files, retries=retries, expired_at=expired_at,
0128 coll_metadata=coll_metadata, session=session)
0129
0130
0131 @transactional_session
0132 def update_collection(coll_id, parameters, msg=None, session=None):
0133 """
0134 update a collection.
0135
0136 :param coll_id: the collection id.
0137 :param parameters: A dictionary of parameters.
0138 :param msg: messages.
0139 :param session: The database session in use.
0140
0141 :raises NoObject: If no request is founded.
0142 :raises DatabaseException: If there is a database error.
0143
0144 """
0145 orm_collections.update_collection(coll_id=coll_id, parameters=parameters, session=session)
0146
0147 if msg:
0148 orm_messages.add_message(msg_type=msg['msg_type'],
0149 status=msg['status'],
0150 source=msg['source'],
0151 transform_id=msg['transform_id'],
0152 num_contents=msg['num_contents'],
0153 msg_content=msg['msg_content'],
0154 session=session)
0155
0156
0157 @read_session
0158 def get_collection(coll_id=None, transform_id=None, relation_type=None, to_json=False, session=None):
0159 """
0160 Get a collection or raise a NoObject exception.
0161
0162 :param coll_id: The id of the collection.
0163 :param transform_id: The transform id related to this collection.
0164 :param relation_type: The relation between this collection and its transform,
0165 such as Input, Output, Log and so on.
0166 :param to_json: return json format.
0167 :param session: The database session in use.
0168
0169 :raises NoObject: If no request is founded.
0170
0171 :returns: Collection.
0172 """
0173 return orm_collections.get_collection(coll_id=coll_id, transform_id=transform_id,
0174 relation_type=relation_type, to_json=to_json,
0175 session=session)
0176
0177
0178 @transactional_session
0179 def add_content(request_id, workload_id, transform_id, coll_id, map_id, scope, name, min_id=0, max_id=0,
0180 content_type=ContentType.File, status=ContentStatus.New, content_relation_type=ContentRelationType.Input,
0181 bytes=0, md5=None, adler32=None, processing_id=None, storage_id=None, retries=0,
0182 locking=ContentLocking.Idle, path=None, expired_at=None, content_metadata=None, session=None):
0183 """
0184 Add a content.
0185
0186 :param request_id: The request id.
0187 :param workload_id: The workload id.
0188 :param transform_id: transform id.
0189 :param coll_id: collection id.
0190 :param map_id: The id to map inputs to outputs.
0191 :param scope: The scope of the request data.
0192 :param name: The name of the request data.
0193 :param min_id: The minimal id of the content.
0194 :param max_id: The maximal id of the content.
0195 :param content_type: The type of the content.
0196 :param status: content status.
0197 :param bytes: The size of the content.
0198 :param md5: md5 checksum.
0199 :param alder32: adler32 checksum.
0200 :param processing_id: The processing id.
0201 :param storage_id: The storage id.
0202 :param retries: The number of retries.
0203 :param path: The content path.
0204 :param expired_at: The datetime when it expires.
0205 :param content_metadata: The metadata as json.
0206
0207 :raises DuplicatedObject: If a collection with the same name exists.
0208 :raises DatabaseException: If there is a database error.
0209
0210 :returns: content id.
0211 """
0212 return orm_contents.add_content(request_id=request_id, workload_id=workload_id, transform_id=transform_id,
0213 coll_id=coll_id, map_id=map_id, scope=scope, name=name, min_id=min_id,
0214 max_id=max_id, content_type=content_type, status=status,
0215 content_relation_type=content_relation_type, bytes=bytes, md5=md5,
0216 adler32=adler32, processing_id=processing_id, storage_id=storage_id,
0217 retries=retries, locking=locking, path=path, expired_at=expired_at,
0218 content_metadata=content_metadata, session=session)
0219
0220
0221 @transactional_session
0222 def add_contents(contents, bulk_size=1000, session=None):
0223 """
0224 Add contents.
0225
0226 :param contents: dict of contents.
0227 :param bulk_size: bulk per insert to db.
0228 :param session: session.
0229
0230 :raises DuplicatedObject: If a collection with the same name exists.
0231 :raises DatabaseException: If there is a database error.
0232
0233 :returns: content id.
0234 """
0235 return orm_contents.add_contents(contents=contents, bulk_size=bulk_size,
0236 session=session)
0237
0238
0239 @transactional_session
0240 def update_input_collection_with_contents(coll, parameters, contents, bulk_size=1000, session=None):
0241 """
0242 update a collection.
0243
0244 :param coll_id: the collection id.
0245 :param parameters: A dictionary of parameters.
0246 :param contents: dict of contents.
0247 :param bulk_size: bulk per insert to db.
0248 :param session: The database session in use.
0249
0250 :raises NoObject: If no request is founded.
0251 :raises DatabaseException: If there is a database error.
0252
0253 :returns new contents
0254 """
0255 new_files = 0
0256 processed_files = 0
0257 avail_contents = orm_contents.get_contents(coll_id=coll['coll_id'], session=session)
0258 avail_contents_dict = {}
0259 for content in avail_contents:
0260 key = '%s:%s:%s:%s' % (content['scope'], content['name'], content['min_id'], content['max_id'])
0261 avail_contents_dict[key] = content
0262 if content['status'] in [ContentStatus.Mapped, ContentStatus.Mapped.value]:
0263 processed_files += 1
0264 if content['status'] in [ContentStatus.New, ContentStatus.New.value]:
0265 new_files += 1
0266
0267 to_addes = []
0268
0269 for content in contents:
0270 key = '%s:%s:%s:%s' % (content['scope'], content['name'], content['min_id'], content['max_id'])
0271 if key in avail_contents_dict:
0272 """
0273 to_update = {'content_id': content['content_id'],
0274 'status': content['status']}
0275 if 'bytes' in content:
0276 to_update['bytes'] = content['bytes']
0277 if 'md5' in content:
0278 to_update['md5'] = content['md5']
0279 if 'adler32' in content:
0280 to_update['adler32'] = content['adler32']
0281 if 'expired_at' in content:
0282 to_update['expired_at'] = content['expired_at']
0283 to_updates.append(to_updated)
0284 # not to do anything, no need to update
0285 """
0286 pass
0287 else:
0288 to_addes.append(content)
0289
0290
0291 if to_addes:
0292 add_contents(to_addes, bulk_size=bulk_size, session=session)
0293
0294 if 'total_files' in parameters:
0295 total_files = parameters['total_files']
0296 else:
0297 total_files = coll['total_files']
0298 parameters['processed_files'] = processed_files
0299 parameters['new_files'] = new_files
0300 if processed_files == total_files:
0301 parameters['status'] = CollectionStatus.Closed
0302
0303 update_collection(coll['coll_id'], parameters, session=session)
0304 return to_addes
0305
0306
0307 @read_session
0308 def get_input_output_map_count(request_id, transform_id, session=None):
0309 """Return the number of distinct map_ids (jobs) for the given transform."""
0310 return orm_contents.get_input_output_map_count(request_id=request_id, transform_id=transform_id, session=session)
0311
0312
0313 @read_session
0314 def get_content_name_to_id_map(request_id, transform_id, es=False, session=None):
0315 """Return a lightweight {name: [content_id, ...]} map for Input and Output contents."""
0316 return orm_contents.get_content_name_to_id_map(request_id=request_id, transform_id=transform_id, es=es, session=session)
0317
0318
0319 @read_session
0320 def has_input_contents_without_external_id(request_id, transform_id, session=None):
0321 """
0322 Check whether any Input content for the given transform is missing an external_content_id.
0323
0324 Returns True if all Input contents have external_content_id set, False otherwise.
0325 request_id is included because the database uses it for virtual table partitioning.
0326 """
0327 return orm_contents.has_input_contents_without_external_id(request_id=request_id, transform_id=transform_id, session=session)
0328
0329
0330 @transactional_session
0331 def update_contents(parameters, request_id=None, transform_id=None, use_bulk_update_mappings=True, session=None):
0332 """
0333 updatecontents.
0334
0335 :param parameters: list of dictionary of parameters.
0336 :param session: The database session in use.
0337
0338 :raises NoObject: If no content is founded.
0339 :raises DatabaseException: If there is a database error.
0340
0341 """
0342 return orm_contents.update_contents(parameters, request_id=request_id, transform_id=transform_id,
0343 use_bulk_update_mappings=use_bulk_update_mappings, session=session)
0344
0345
0346 @transactional_session
0347 def update_content(content_id, parameters, session=None):
0348 """
0349 update a content.
0350
0351 :param content_id: the content id.
0352 :param parameters: A dictionary of parameters.
0353 :param session: The database session in use.
0354
0355 :raises NoObject: If no content is founded.
0356 :raises DatabaseException: If there is a database error.
0357
0358 """
0359 return orm_contents.update_content(content_id, parameters, session=session)
0360
0361
0362 @read_session
0363 def get_contents(coll_scope=None, coll_name=None, coll_id=[], request_id=None, workload_id=None, transform_id=None,
0364 relation_type=None, content_relation_type=None, status=None, to_json=False, session=None):
0365 """
0366 Get contents with collection scope, collection name, request id, workload id and relation type.
0367
0368 :param coll_scope: scope of the collection.
0369 :param coll_name: name the the collection.
0370 :param request_id: the request id.
0371 :param workload_id: The workload_id of the request.
0372 :param transform_id: The transform id related to this collection.
0373 :param relation_type: The relation type between the collection and transform: input, outpu, logs and etc.
0374 :param to_json: return json format.
0375 :param session: The database session in use.
0376
0377 :returns: list of contents
0378 """
0379 if not coll_id:
0380 collections = get_collections(scope=coll_scope, name=coll_name, request_id=request_id,
0381 workload_id=workload_id, transform_id=transform_id,
0382 relation_type=relation_type, to_json=to_json, session=session)
0383
0384 coll_ids = [coll['coll_id'] for coll in collections]
0385 else:
0386 coll_ids = coll_id
0387
0388 if coll_ids:
0389 if content_relation_type is None:
0390 if relation_type is None:
0391 content_relation_type = None
0392 else:
0393 if relation_type == CollectionRelationType.Output:
0394 content_relation_type = ContentRelationType.Output
0395 elif relation_type == CollectionRelationType.Input:
0396 content_relation_type = ContentRelationType.Input
0397 elif relation_type == CollectionRelationType.Log:
0398 content_relation_type = ContentRelationType.Log
0399 rets = orm_contents.get_contents(request_id=request_id, transform_id=transform_id, coll_id=coll_ids, status=status,
0400 to_json=to_json, relation_type=content_relation_type, session=session)
0401 else:
0402 rets = []
0403 return rets
0404
0405
0406 @read_session
0407 def get_contents_by_request_transform(request_id=None, workload_id=None, transform_id=None, status=None, map_id=None, status_updated=False, by_map=False, match_content_ext=False, session=None):
0408 """
0409 Get contents with request id, workload id and transform id.
0410
0411 :param request_id: the request id.
0412 :param workload_id: The workload_id of the request.
0413 :param transform_id: The transform id related to this collection.
0414 :param session: The database session in use.
0415
0416 :returns: list of contents
0417 """
0418 ret = orm_contents.get_contents_by_request_transform(request_id=request_id, transform_id=transform_id,
0419 workload_id=workload_id, status=status, map_id=map_id,
0420 status_updated=status_updated, by_map=by_map,
0421 match_content_ext=match_content_ext, session=session)
0422 return ret
0423
0424
0425 @read_session
0426 def get_contents_by_content_ids(content_ids, request_id=None, session=None):
0427 """
0428 Get content or raise a NoObject exception.
0429
0430 :param request_id: request id.
0431 :param content_ids: list of content id.
0432 :param workload_id: workload id.
0433
0434 :param session: The database session in use.
0435
0436 :raises NoObject: If no content is founded.
0437
0438 :returns: list of contents.
0439 """
0440 ret = orm_contents.get_contents_by_content_ids(content_ids=content_ids, request_id=request_id, session=session)
0441 return ret
0442
0443
0444 @read_session
0445 def get_contents_by_coll_id_status(coll_id, status=None, to_json=False, session=None):
0446 """
0447 Get contents or raise a NoObject exception.
0448
0449 :param coll_id: Collection id.
0450 :param status: Content status or list of content status.
0451 :param to_json: return json format.
0452 :param session: The database session in use.
0453
0454 :raises NoObject: If no content is founded.
0455
0456 :returns: list of contents.
0457 """
0458 return orm_contents.get_contents(coll_id=coll_id, status=status, to_json=to_json, session=session)
0459
0460
0461 @transactional_session
0462 def register_output_contents(coll_scope, coll_name, contents, request_id=None, workload_id=None,
0463 relation_type=CollectionRelationType.Output, session=None):
0464 """
0465 register contents with collection scope, collection name, request id, workload id and contents.
0466
0467 :param coll_scope: scope of the collection.
0468 :param coll_name: name the the collection.
0469 :param request_id: the request id.
0470 :param workload_id: The workload_id of the request.
0471 :param contents: list of contents [{'scope': <scope>, 'name': <name>, 'min_id': min_id, 'max_id': max_id,
0472 'status': <status>, 'path': <path>}].
0473 :param session: The database session in use.
0474 """
0475 transform_ids = orm_transforms.get_transform_ids(request_id=request_id,
0476 workload_id=workload_id,
0477 session=session)
0478
0479 if transform_ids:
0480 collections = orm_collections.get_collections(scope=coll_scope, name=coll_name, transform_id=transform_ids,
0481 relation_type=relation_type, session=session)
0482 else:
0483 collections = []
0484
0485 coll_def = "request_id=%s, workload_id=%s, coll_scope=%s" % (request_id, workload_id, coll_scope)
0486 coll_def += ", coll_name=%s, relation_type: %s" % (coll_name, relation_type)
0487
0488 if len(collections) != 1:
0489 msg = "There should be only one collection matched. However there are %s collections" % len(collections)
0490 msg += coll_def
0491 raise exceptions.WrongParameterException(msg)
0492
0493 coll_id = collections[0]['coll_id']
0494
0495 keys = ['scope', 'name', 'min_id', 'max_id']
0496 for content in contents:
0497 ex_content = orm_contents.get_content(coll_id=coll_id, scope=content['scope'],
0498 name=content['name'], min_id=content['min_id'],
0499 max_id=content['max_id'], session=session)
0500
0501 content_def = "scope: %s, name: %s, min_id: %s, max_id: %s" % (content['scope'],
0502 content['name'],
0503 content['min_id'],
0504 content['max_id'])
0505
0506 if not ex_content:
0507 msg = "No matched content in collection(%s) with content(%s)" % (coll_def, content_def)
0508 raise exceptions.WrongParameterException(msg)
0509
0510 for key in keys:
0511 if key in content:
0512 del content[key]
0513 content['content_id'] = ex_content['content_id']
0514
0515 orm_contents.update_contents(contents, session=session)
0516
0517
0518 @read_session
0519 def get_match_contents(coll_scope, coll_name, scope, name, min_id=None, max_id=None,
0520 request_id=None, workload_id=None, relation_type=None,
0521 only_return_best_match=False, to_json=False, session=None):
0522 """
0523 Get matched contents with collection scope, collection name, scope, name, min_id, max_id,
0524 request id, workload id and only_return_best_match.
0525
0526 :param coll_scope: scope of the collection.
0527 :param coll_name: name the the collection.
0528 :param scope: scope of the content.
0529 :param name: name of the content.
0530 :param min_id: min_id of the content.
0531 :param max_id: max_id of the content.
0532 :param request_id: the request id.
0533 :param workload_id: The workload_id of the request.
0534 :param only_return_best_match: only return best matched content if it's true.
0535 :param session: The database session in use.
0536
0537 :returns: list of contents
0538 """
0539 transform_ids = orm_transforms.get_transform_ids(request_id=request_id,
0540 workload_id=workload_id,
0541 session=session)
0542
0543 if transform_ids:
0544 collections = orm_collections.get_collections(scope=coll_scope, name=coll_name, transform_id=transform_ids,
0545 relation_type=relation_type, session=session)
0546 else:
0547 collections = []
0548
0549 coll_def = "request_id=%s, workload_id=%s, coll_scope=%s" % (request_id, workload_id, coll_scope)
0550 coll_def += ", coll_name=%s, relation_type: %s" % (coll_name, relation_type)
0551
0552 if len(collections) != 1:
0553 msg = "There should be only one collection matched. However there are %s collections" % len(collections)
0554 msg += coll_def
0555 raise exceptions.WrongParameterException(msg)
0556
0557 coll_id = collections[0]['coll_id']
0558
0559 contents = orm_contents.get_match_contents(coll_id=coll_id, scope=scope, name=name, min_id=min_id, max_id=max_id,
0560 to_json=to_json, session=session)
0561
0562 if not only_return_best_match:
0563 return contents
0564
0565 if len(contents) == 1:
0566 return contents
0567
0568 content = None
0569 for row in contents:
0570 if (not content) or (content['max_id'] - content['min_id'] > row['max_id'] - row['min_id']):
0571 content = row
0572 return [content]
0573
0574
0575 @read_session
0576 def get_content_status_statistics(coll_id=None, transform_ids=None, session=None):
0577 """
0578 Get statistics group by status
0579
0580 :param coll_id: Collection id.
0581 :param session: The database session in use.
0582
0583 :returns: statistics group by status, as a dict.
0584 """
0585 return orm_contents.get_content_status_statistics(coll_id=coll_id, transform_ids=transform_ids, session=session)
0586
0587
0588 @read_session
0589 def get_content_status_statistics_by_relation_type(transform_ids, bulk_size=500, session=None):
0590 """
0591 Get statistics group by status
0592
0593 :param coll_id: Collection id.
0594 :param session: The database session in use.
0595
0596 :returns: statistics group by status, as a dict.
0597 """
0598 if transform_ids and not isinstance(transform_ids, (list, tuple)):
0599 transform_ids = [transform_ids]
0600 if transform_ids and len(transform_ids) == 1:
0601 transform_ids = [transform_ids[0], transform_ids[0]]
0602
0603 if transform_ids and len(transform_ids) > bulk_size:
0604 chunks = [transform_ids[i:i + bulk_size] for i in range(0, len(transform_ids), bulk_size)]
0605 ret = []
0606 for chunk in chunks:
0607 tmp = orm_contents.get_content_status_statistics_by_relation_type(chunk, session=session)
0608 ret += tmp
0609 return ret
0610 else:
0611 return orm_contents.get_content_status_statistics_by_relation_type(transform_ids, session=session)
0612
0613
0614 @read_session
0615 def get_content_status_statistics_by_coll(request_id=None, transform_id=None, with_deps=True, session=None):
0616 """
0617 Get content statistics grouped by (coll_id, status) with sum of bytes.
0618
0619 :param request_id: request id.
0620 :param transform_id: transform id.
0621 :param session: The database session in use.
0622
0623 :returns: dict {coll_id: {status: {'count': N, 'bytes': B}, 'has_unsynced': bool}}
0624 """
0625 return orm_contents.get_content_status_statistics_by_coll(request_id=request_id, transform_id=transform_id, with_deps=with_deps, session=session)
0626
0627
0628 @read_session
0629 def get_content_ext_status_statistics_by_coll(request_id=None, transform_id=None, session=None):
0630 """
0631 Get contents_ext statistics grouped by (coll_id, status).
0632
0633 :param request_id: request id.
0634 :param transform_id: transform id.
0635 :param session: The database session in use.
0636
0637 :returns: dict {coll_id: {status: count}}
0638 """
0639 return orm_contents.get_content_ext_status_statistics_by_coll(request_id=request_id, transform_id=transform_id, session=session)
0640
0641
0642 @transactional_session
0643 def clean_locking(time_period=3600, session=None):
0644 """
0645 Clearn locking which is older than time period.
0646
0647 :param time_period in seconds
0648 """
0649 orm_collections.clean_locking(time_period=time_period, session=session)
0650
0651
0652 @transactional_session
0653 def clean_next_poll_at(status, session=None):
0654 """
0655 Clearn next_poll_at.
0656
0657 :param status: status of the collection
0658 """
0659 orm_collections.clean_next_poll_at(status=status, session=session)
0660
0661
0662 @read_session
0663 def get_output_content_by_request_id_content_name(request_id, content_scope, content_name, transform_id=None,
0664 content_type=None, min_id=None, max_id=None, to_json=False,
0665 session=None):
0666 """
0667 Get output content by request_id and content name
0668
0669 :param request_id: requestn id.
0670 :param content_name: The name of the content.
0671 :param to_json: return json format.
0672 :param session: The database session in use.
0673
0674 :returns: content of the output collection.
0675 """
0676 transform_ids = orm_transforms.get_transform_ids(request_id, session=session)
0677
0678 found_transform_id = None
0679 if transform_ids:
0680 if len(transform_ids) == 1:
0681 found_transform_id = transform_ids[0]
0682 elif len(transform_ids) > 1 and transform_id is None:
0683 raise "Number of the transforms(%s) is bigger than 1 and transform id is not provided" % len(transform_ids)
0684 else:
0685 for tf_id in transform_ids:
0686 if tf_id == transform_id:
0687 found_transform_id = tf_id
0688 break
0689
0690 coll_id = None
0691 if found_transform_id:
0692 coll_id = orm_collections.get_collection_id(transform_id=found_transform_id,
0693 relation_type=CollectionRelationType.Output,
0694 session=session)
0695 content = None
0696 if coll_id:
0697 content = orm_contents.get_content(coll_id=coll_id, scope=content_scope, name=content_name, content_type=content_type,
0698 min_id=min_id, max_id=max_id, to_json=to_json, session=session)
0699 return content
0700
0701
0702 @read_session
0703 def get_output_contents_by_request_id_status(request_id, name, content_status, limit, transform_id=None, to_json=False, session=None):
0704 """
0705 Get output content by request_id and content name
0706
0707 :param request_id: requestn id.
0708 :param name: the content name.
0709 :param content_status: The content status.
0710 :param limit: limit number of contents.
0711 :param to_json: return json format.
0712 :param session: The database session in use.
0713
0714 :returns: content of the output collection.
0715 """
0716 transform_ids = orm_transforms.get_transform_ids(request_id, session=session)
0717
0718 found_transform_id = None
0719 if transform_ids:
0720 if len(transform_ids) == 1:
0721 found_transform_id = transform_ids[0]
0722 elif len(transform_ids) > 1 and transform_id is None:
0723 raise "Number of the transforms(%s) is bigger than 1 and transform id is not provided" % len(transform_ids)
0724 else:
0725 for tf_id in transform_ids:
0726 if tf_id == transform_id:
0727 found_transform_id = tf_id
0728 break
0729
0730 coll_id = None
0731 if found_transform_id:
0732 coll_id = orm_collections.get_collection_id(transform_id=found_transform_id,
0733 relation_type=CollectionRelationType.Output,
0734 session=session)
0735
0736 contents = []
0737 if coll_id:
0738 contents = orm_contents.get_contents(coll_id=coll_id, status=content_status, to_json=to_json, session=session)
0739
0740 if name:
0741 new_contents = []
0742 for content in contents:
0743 if str(content['name']) == str(name):
0744 new_contents.append(content)
0745 contents = new_contents
0746
0747 if contents and limit and len(contents) > limit:
0748 contents = contents[:limit]
0749 return contents
0750
0751
0752 @read_session
0753 def get_updated_transforms_by_content_status(request_id=None, transform_id=None, session=None):
0754 """
0755 Get updated transform ids by content status
0756
0757 :param request_id: The Request id.
0758 :param transfomr_id: The transform id.
0759
0760 :returns list
0761 """
0762 return orm_contents.get_updated_transforms_by_content_status(request_id=request_id, transform_id=transform_id, session=session)
0763
0764
0765 @transactional_session
0766 def update_contents_to_others_by_dep_id(request_id=None, transform_id=None, session=None):
0767 """
0768 Update contents to others by content_dep_id.
0769
0770 :param request_id: The Request id.
0771 :param transfomr_id: The transform id.
0772 """
0773 return orm_contents.update_contents_to_others_by_dep_id(request_id=request_id, transform_id=transform_id, session=session)
0774
0775
0776 @transactional_session
0777 def update_contents_from_others_by_dep_id(request_id=None, transform_id=None, session=None):
0778 """
0779 Update contents from others by content_dep_id
0780
0781 :param request_id: The Request id.
0782 :param transfomr_id: The transform id.
0783 """
0784 return orm_contents.update_contents_from_others_by_dep_id(request_id=request_id, transform_id=transform_id, session=session)
0785
0786
0787 @transactional_session
0788 def update_contents_from_others_by_dep_id_pages(request_id=None, transform_id=None, page_size=2000, status_not_to_check=None,
0789 logger=None, log_prefix=None, session=None):
0790 """
0791 Update contents from others by content_dep_id
0792
0793 :param request_id: The Request id.
0794 :param transfomr_id: The transform id.
0795 """
0796 return orm_contents.update_contents_from_others_by_dep_id_pages(request_id=request_id, transform_id=transform_id, page_size=page_size,
0797 logger=logger, log_prefix=log_prefix, status_not_to_check=status_not_to_check,
0798 session=session)
0799
0800
0801 @transactional_session
0802 def update_input_contents_by_dependency_pages(request_id=None, transform_id=None, page_size=2000, batch_size=2000, logger=None, log_prefix=None,
0803 terminated=False, status_not_to_check=None, session=None):
0804 """
0805 Update input contents by dependencies
0806
0807 :param request_id: The Request id.
0808 :param transfomr_id: The transform id.
0809 """
0810 return orm_contents.update_input_contents_by_dependency_pages(request_id=request_id, transform_id=transform_id, page_size=page_size,
0811 batch_size=batch_size, terminated=terminated, logger=logger, log_prefix=log_prefix,
0812 status_not_to_check=status_not_to_check, session=session)
0813
0814
0815 @read_session
0816 def get_update_contents_from_others_by_dep_id(request_id=None, transform_id=None, session=None):
0817 """
0818 Update contents from others by content_dep_id
0819
0820 :param request_id: The Request id.
0821 :param transfomr_id: The transform id.
0822 """
0823 return orm_contents.get_update_contents_from_others_by_dep_id(request_id=request_id, transform_id=transform_id, session=session)
0824
0825
0826 @transactional_session
0827 def add_contents_update(contents, bulk_size=10000, session=None):
0828 """
0829 Add contents update.
0830
0831 :param contents: dict of contents.
0832 :param session: session.
0833
0834 :raises DuplicatedObject: If a collection with the same name exists.
0835 :raises DatabaseException: If there is a database error.
0836
0837 :returns: content ids.
0838 """
0839 return orm_contents.add_contents_update(contents, bulk_size=bulk_size, session=session)
0840
0841
0842 @transactional_session
0843 def set_fetching_contents_update(request_id=None, transform_id=None, fetch=False, session=None):
0844 """
0845 Set fetching contents update.
0846
0847 :param session: session.
0848 """
0849 return orm_contents.set_fetching_contents_update(request_id=request_id, transform_id=transform_id, fetch=fetch, session=session)
0850
0851
0852 @read_session
0853 def get_contents_update(request_id=None, transform_id=None, fetch=False, session=None):
0854 """
0855 Get contents update.
0856
0857 :param session: session.
0858 """
0859 return orm_contents.get_contents_update(request_id=request_id, transform_id=transform_id, fetch=fetch, session=session)
0860
0861
0862 @transactional_session
0863 def delete_contents_update(request_id=None, transform_id=None, contents=[], fetch=False, session=None):
0864 """
0865 delete a content.
0866
0867 :param session: The database session in use.
0868
0869 :raises NoObject: If no content is founded.
0870 :raises DatabaseException: If there is a database error.
0871 """
0872 return orm_contents.delete_contents_update(request_id=request_id, transform_id=transform_id, contents=contents, fetch=fetch, session=session)
0873
0874
0875 def get_contents_ext_maps():
0876 return orm_contents.get_contents_ext_maps()
0877
0878
0879 @transactional_session
0880 def add_contents_ext(contents, bulk_size=10000, session=None):
0881 """
0882 Add contents ext.
0883
0884 :param contents: dict of contents.
0885 :param session: session.
0886
0887 :raises DuplicatedObject: If a collection with the same name exists.
0888 :raises DatabaseException: If there is a database error.
0889
0890 :returns: content ids.
0891 """
0892 return orm_contents.add_contents_ext(contents, bulk_size=bulk_size, session=session)
0893
0894
0895 @transactional_session
0896 def update_contents_ext(parameters, request_id=None, transform_id=None, use_bulk_update_mappings=True, session=None):
0897 """
0898 update contents ext.
0899
0900 :param parameters: list of dictionary of parameters.
0901 :param session: The database session in use.
0902
0903 :raises NoObject: If no content is founded.
0904 :raises DatabaseException: If there is a database error.
0905
0906 """
0907 return orm_contents.update_contents_ext(parameters, request_id=request_id, transform_id=transform_id,
0908 use_bulk_update_mappings=use_bulk_update_mappings, session=session)
0909
0910
0911 @read_session
0912 def get_contents_ext(request_id=None, transform_id=None, workload_id=None, coll_id=None, status=None, session=None):
0913 """
0914 Get content or raise a NoObject exception.
0915
0916 :param request_id: request id.
0917 :param transform_id: transform id.
0918 :param workload_id: workload id.
0919
0920 :param session: The database session in use.
0921
0922 :raises NoObject: If no content is founded.
0923
0924 :returns: list of contents.
0925 """
0926 return orm_contents.get_contents_ext(request_id=request_id, transform_id=transform_id, workload_id=workload_id,
0927 coll_id=coll_id, status=status, session=session)
0928
0929
0930 @read_session
0931 def get_contents_ext_ids(request_id=None, transform_id=None, workload_id=None, coll_id=None, status=None, session=None):
0932 """
0933 Get content or raise a NoObject exception.
0934
0935 :param request_id: request id.
0936 :param transform_id: transform id.
0937 :param workload_id: workload id.
0938
0939 :param session: The database session in use.
0940
0941 :raises NoObject: If no content is founded.
0942
0943 :returns: list of content ids.
0944 """
0945 return orm_contents.get_contents_ext_ids(request_id=request_id, transform_id=transform_id, workload_id=workload_id,
0946 coll_id=coll_id, status=status, session=session)
0947
0948
0949 def combine_contents_ext(contents, contents_ext, with_status_name=False):
0950 return orm_contents.combine_contents_ext(contents, contents_ext, with_status_name=with_status_name)