File indexing completed on 2026-04-09 07:58:19
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 operations related to Requests.
0014 """
0015
0016 import datetime
0017
0018 import sqlalchemy
0019 from sqlalchemy.exc import DatabaseError, IntegrityError
0020 from sqlalchemy.sql.expression import asc
0021
0022 from idds.common import exceptions
0023 from idds.common.constants import CollectionType, CollectionStatus, CollectionLocking, CollectionRelationType
0024 from idds.orm.base.session import read_session, transactional_session
0025 from idds.orm.base import models
0026
0027
0028 def create_collection(request_id, workload_id, scope, name, coll_type=CollectionType.Dataset, transform_id=None,
0029 relation_type=CollectionRelationType.Input, bytes=0, status=CollectionStatus.New,
0030 locking=CollectionLocking.Idle, total_files=0, new_files=0, processing_files=0,
0031 processed_files=0, retries=0, expired_at=None,
0032 coll_metadata=None):
0033 """
0034 Create a collection.
0035
0036 :param scope: The scope of the request data.
0037 :param name: The name of the request data.
0038 :param coll_type: The type of dataset as dataset or container.
0039 :param request_id: The request id.
0040 :param workload_id: The workload id.
0041 :param transform_id: The transform id related to this collection.
0042 :param relation_type: The relation between this collection and its transform,
0043 such as Input, Output, Log and so on.
0044 :param size: The size of the collection.
0045 :param status: The status.
0046 :param locking: The locking.
0047 :param total_files: Number of total files.
0048 :param retries: Number of retries.
0049 :param expired_at: The datetime when it expires.
0050 :param coll_metadata: The metadata as json.
0051
0052 :returns: collection.
0053 """
0054 new_coll = models.Collection(request_id=request_id, workload_id=workload_id, scope=scope, name=name,
0055 coll_type=coll_type, transform_id=transform_id,
0056 relation_type=relation_type, bytes=bytes, status=status, locking=locking,
0057 total_files=total_files, new_files=new_files, processing_files=processing_files,
0058 processed_files=processed_files, retries=retries,
0059 expired_at=expired_at, coll_metadata=coll_metadata)
0060 return new_coll
0061
0062
0063 @transactional_session
0064 def add_collection(request_id, workload_id, scope, name, coll_type=CollectionType.Dataset, transform_id=None,
0065 relation_type=CollectionRelationType.Input, bytes=0, status=CollectionStatus.New,
0066 locking=CollectionLocking.Idle, total_files=0, new_files=0, processing_files=0,
0067 processed_files=0, retries=0, expired_at=None,
0068 coll_metadata=None, session=None):
0069 """
0070 Add a collection.
0071
0072 :param scope: The scope of the request data.
0073 :param name: The name of the request data.
0074 :param coll_type: The type of dataset as dataset or container.
0075 :param request_id: The request id.
0076 :param workload_id: The workload id.
0077 :param transform_id: The transform id related to this collection.
0078 :param relation_type: The relation between this collection and its transform,
0079 such as Input, Output, Log and so on.
0080 :param size: The size of the collection.
0081 :param status: The status.
0082 :param locking: The locking.
0083 :param total_files: Number of total files.
0084 :param retries: Number of retries.
0085 :param expired_at: The datetime when it expires.
0086 :param coll_metadata: The metadata as json.
0087
0088 :raises DuplicatedObject: If a collection with the same name exists.
0089 :raises DatabaseException: If there is a database error.
0090
0091 :returns: collection id.
0092 """
0093 try:
0094 new_coll = create_collection(request_id=request_id, workload_id=workload_id, scope=scope, name=name,
0095 coll_type=coll_type, transform_id=transform_id,
0096 relation_type=relation_type, bytes=bytes, status=status, locking=locking,
0097 total_files=total_files, new_files=new_files, retries=retries,
0098 processing_files=processing_files, processed_files=processed_files,
0099 expired_at=expired_at, coll_metadata=coll_metadata)
0100 new_coll.save(session=session)
0101 coll_id = new_coll.coll_id
0102 return coll_id
0103 except IntegrityError as error:
0104 raise exceptions.DuplicatedObject('Collection scope:name(%s:%s) with transform_id(%s) already exists!: %s' %
0105 (scope, name, transform_id, error))
0106 except DatabaseError as error:
0107 raise exceptions.DatabaseException(error)
0108
0109
0110 @read_session
0111 def get_collection_id(transform_id, relation_type, session=None):
0112 """
0113 Get collection id or raise a NoObject exception.
0114
0115 :param transform_id: The transform id related to this collection.
0116 :param relation_type: The relation between this collection and its transform,
0117 such as Input, Output, Log and so on.
0118 :param session: The database session in use.
0119
0120 :raises NoObject: If no request is founded.
0121
0122 :returns: Collection id.
0123 """
0124
0125 try:
0126 query = session.query(models.Collection.coll_id)
0127 query = query.filter_by(transform_id=transform_id)
0128 query = query.filter(models.Collection.relation_type == relation_type)
0129 ret = query.first()
0130 if not ret:
0131 return None
0132 else:
0133 return ret[0]
0134 except sqlalchemy.orm.exc.NoResultFound as error:
0135 raise exceptions.NoObject('collection(transform_id: %s, relation_type: %s) cannot be found: %s' %
0136 (transform_id, relation_type, error))
0137 except Exception as error:
0138 raise error
0139
0140
0141 @read_session
0142 def get_collection(coll_id=None, transform_id=None, relation_type=None, to_json=False, session=None):
0143 """
0144 Get a collection or raise a NoObject exception.
0145
0146 :param coll_id: The id of the collection.
0147 :param transform_id: The transform id related to this collection.
0148 :param relation_type: The relation between this collection and its transform,
0149 such as Input, Output, Log and so on.
0150 :param to_json: return json format.
0151 :param session: The database session in use.
0152
0153 :raises NoObject: If no request is founded.
0154
0155 :returns: Collection.
0156 """
0157
0158 try:
0159 if coll_id:
0160 query = session.query(models.Collection).filter_by(coll_id=coll_id)
0161 else:
0162 query = session.query(models.Collection).filter_by(transform_id=transform_id)\
0163 .filter(models.Collection.relation_type == relation_type)
0164
0165 ret = query.first()
0166 if not ret:
0167 return None
0168 else:
0169 if to_json:
0170 return ret.to_dict_json()
0171 else:
0172 return ret.to_dict()
0173 except sqlalchemy.orm.exc.NoResultFound as error:
0174 raise exceptions.NoObject('collection(coll_id: %s, transform_id: %s, relation_type: %s) cannot be found: %s' %
0175 (coll_id, transform_id, relation_type, error))
0176 except Exception as error:
0177 raise error
0178
0179
0180 @read_session
0181 def get_collection_ids_by_transform_id(transform_id=None, session=None):
0182 """
0183 Get collection ids by transform id or raise a NoObject exception.
0184
0185 :param transform_id: The transform id related to this collection.
0186 :param session: The database session in use.
0187
0188 :raises NoObject: If no collections are founded.
0189
0190 :returns: list of Collection ids.
0191 """
0192 try:
0193 query = session.query(models.Collection.coll_id)\
0194 .filter_by(transform_id=transform_id)
0195 ret = query.all()
0196 if not ret:
0197 return []
0198 else:
0199 items = []
0200 for t in ret:
0201 items.append(t[0])
0202 return items
0203 except sqlalchemy.orm.exc.NoResultFound as error:
0204 raise exceptions.NoObject('No collections with transform_id(%s): %s' %
0205 (transform_id, error))
0206 except Exception as error:
0207 raise error
0208
0209
0210 @read_session
0211 def get_collections_by_status(status, relation_type=CollectionRelationType.Input, time_period=None,
0212 locking=False, bulk_size=None, to_json=False, session=None):
0213 """
0214 Get collections by status, relation_type and time_period or raise a NoObject exception.
0215
0216 :param status: The collection status.
0217 :param relation_type: The relation_type of the collection to the transform.
0218 :param time_period: time period in seconds since last update.
0219 :param locking: Wheter to retrieve unlocked files.
0220 :param to_json: return json format.
0221 :param session: The database session in use.
0222
0223 :raises NoObject: If no collections are founded.
0224
0225 :returns: list of Collections.
0226 """
0227 try:
0228 if not isinstance(status, (list, tuple)):
0229 status = [status]
0230 if len(status) == 1:
0231 status = [status[0], status[0]]
0232
0233 query = session.query(models.Collection)\
0234 .filter(models.Collection.status.in_(status))\
0235 .filter(models.Collection.next_poll_at < datetime.datetime.utcnow())
0236
0237 if relation_type is not None:
0238 query = query.filter(models.Collection.relation_type == relation_type)
0239 if time_period:
0240 query = query.filter(models.Collection.updated_at < datetime.datetime.utcnow() - datetime.timedelta(seconds=time_period))
0241 if locking:
0242 query = query.filter(models.Collection.locking == CollectionLocking.Idle)
0243
0244 query = query.order_by(asc(models.Collection.updated_at))
0245 if bulk_size:
0246 query = query.limit(bulk_size)
0247
0248 tmp = query.all()
0249 rets = []
0250 if tmp:
0251 for t in tmp:
0252 if to_json:
0253 rets.append(t.to_dict_json())
0254 else:
0255 rets.append(t.to_dict())
0256 return rets
0257 except sqlalchemy.orm.exc.NoResultFound as error:
0258 raise exceptions.NoObject('No collections with status(%s), relation_type(%s), time_period(%s): %s' %
0259 (status, relation_type, time_period, error))
0260 except Exception as error:
0261 raise error
0262
0263
0264 @read_session
0265 def get_collections(scope=None, name=None, request_id=None, workload_id=None, transform_id=None,
0266 relation_type=None, to_json=False, session=None):
0267 """
0268 Get collections by request id or raise a NoObject exception.
0269
0270 :param scope: collection scope.
0271 :param name: collection name, can be wildcard.
0272 :param request_id: The request id.
0273 :param workload_id: The workload id.
0274 :param transform_id: list of transform id related to this collection.
0275 :param relation_type: The relation type between this collection and the transform: Input, Ouput and Log.
0276 :param to_json: return json format.
0277 :param session: The database session in use.
0278
0279 :raises NoObject: If no collections are founded.
0280
0281 :returns: list of Collections.
0282 """
0283 try:
0284 if transform_id and type(transform_id) not in (list, tuple):
0285 transform_id = [transform_id]
0286
0287 query = session.query(models.Collection)
0288 if request_id:
0289 query = query.filter(models.Collection.request_id == request_id)
0290 if workload_id:
0291 query = query.filter(models.Collection.workload_id == workload_id)
0292 if transform_id:
0293 query = query.filter(models.Collection.transform_id.in_(transform_id))
0294 if relation_type is not None:
0295 query = query.filter(models.Collection.relation_type == relation_type)
0296
0297 if scope:
0298 query = query.filter(models.Collection.scope == scope)
0299 if name:
0300 query = query.filter(models.Collection.name.like(name.replace('*', '%')))
0301
0302 query = query.order_by(asc(models.Collection.updated_at))
0303
0304 tmp = query.all()
0305 rets = []
0306 if tmp:
0307 for t in tmp:
0308 if to_json:
0309 rets.append(t.to_dict_json())
0310 else:
0311 rets.append(t.to_dict())
0312 return rets
0313 except sqlalchemy.orm.exc.NoResultFound as error:
0314 raise exceptions.NoObject('No collection with scope(%s), name(%s), transform_id(%s): %s, relation_type: %s' %
0315 (scope, name, transform_id, relation_type, error))
0316 except Exception as error:
0317 raise error
0318
0319
0320 @read_session
0321 def get_collections_by_request_ids(request_ids, session=None):
0322 """"
0323 Get collections by a list of request ids.
0324
0325 :param request_ids: list of request ids.
0326
0327 :return collections: list of collections.
0328 """
0329 try:
0330 if request_ids and type(request_ids) not in (list, tuple):
0331 request_ids = [request_ids]
0332
0333 columns = [models.Collection.coll_id,
0334 models.Collection.request_id,
0335 models.Collection.transform_id,
0336 models.Collection.workload_id]
0337 column_names = [column.name for column in columns]
0338 query = session.query(*columns)
0339
0340 if request_ids:
0341 query = query.filter(models.Collection.request_id.in_(request_ids))
0342
0343 tmp = query.all()
0344 rets = []
0345 if tmp:
0346 for t in tmp:
0347
0348 t2 = dict(zip(column_names, t))
0349 rets.append(t2)
0350 return rets
0351 except Exception as error:
0352 raise error
0353
0354
0355 @transactional_session
0356 def update_collection(coll_id, parameters, session=None):
0357 """
0358 update a collection.
0359
0360 :param coll_id: the collection id.
0361 :param parameters: A dictionary of parameters.
0362 :param session: The database session in use.
0363
0364 :raises NoObject: If no request is founded.
0365 :raises DatabaseException: If there is a database error.
0366
0367 """
0368 try:
0369 parameters['updated_at'] = datetime.datetime.utcnow()
0370 session.query(models.Collection).filter_by(coll_id=coll_id)\
0371 .update(parameters, synchronize_session=False)
0372 except sqlalchemy.orm.exc.NoResultFound as error:
0373 raise exceptions.NoObject('Collection %s cannot be found: %s' % (coll_id, error))
0374
0375
0376 @transactional_session
0377 def update_collections(parameters, session=None):
0378 """
0379 update collections.
0380
0381 :param parameters: list of dictionary of parameters.
0382 :param session: The database session in use.
0383
0384 :raises NoObject: If no content is founded.
0385 :raises DatabaseException: If there is a database error.
0386
0387 """
0388 try:
0389 for parameter in parameters:
0390 parameter['updated_at'] = datetime.datetime.utcnow()
0391
0392 session.bulk_update_mappings(models.Collection, parameters)
0393 except sqlalchemy.orm.exc.NoResultFound as error:
0394 raise exceptions.NoObject('Collection cannot be found: %s' % (error))
0395
0396
0397 @transactional_session
0398 def delete_collection(coll_id=None, session=None):
0399 """
0400 delete a collection.
0401
0402 :param coll_id: The id of the collection.
0403 :param session: The database session in use.
0404
0405 :raises NoObject: If no request is founded.
0406 :raises DatabaseException: If there is a database error.
0407 """
0408 try:
0409 session.query(models.Collection).filter_by(coll_id=coll_id).delete()
0410 except sqlalchemy.orm.exc.NoResultFound as error:
0411 raise exceptions.NoObject('Collection %s cannot be found: %s' % (coll_id, error))
0412
0413
0414 @transactional_session
0415 def clean_locking(time_period=3600, session=None):
0416 """
0417 Clearn locking which is older than time period.
0418
0419 :param time_period in seconds
0420 """
0421 params = {'locking': 0}
0422 session.query(models.Collection).filter(models.Collection.locking == CollectionLocking.Locking)\
0423 .filter(models.Collection.updated_at < datetime.datetime.utcnow() - datetime.timedelta(seconds=time_period))\
0424 .update(params, synchronize_session=False)
0425
0426
0427 @transactional_session
0428 def clean_next_poll_at(status, session=None):
0429 """
0430 Clearn next_poll_at.
0431
0432 :param status: status of the collection
0433 """
0434 if not isinstance(status, (list, tuple)):
0435 status = [status]
0436 if len(status) == 1:
0437 status = [status[0], status[0]]
0438
0439 params = {'next_poll_at': datetime.datetime.utcnow()}
0440 session.query(models.Collection).filter(models.Collection.status.in_(status))\
0441 .update(params, synchronize_session=False)