Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:19

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2019 - 2024
0010 
0011 
0012 """
0013 operations related to Requests.
0014 """
0015 
0016 import datetime
0017 
0018 import sqlalchemy
0019 from sqlalchemy.exc import DatabaseError, IntegrityError
0020 from sqlalchemy.sql.expression import asc
0021 
0022 from idds.common import exceptions
0023 from idds.common.constants import CollectionType, CollectionStatus, CollectionLocking, CollectionRelationType
0024 from idds.orm.base.session import read_session, transactional_session
0025 from idds.orm.base import models
0026 
0027 
0028 def create_collection(request_id, workload_id, scope, name, coll_type=CollectionType.Dataset, transform_id=None,
0029                       relation_type=CollectionRelationType.Input, bytes=0, status=CollectionStatus.New,
0030                       locking=CollectionLocking.Idle, total_files=0, new_files=0, processing_files=0,
0031                       processed_files=0, retries=0, expired_at=None,
0032                       coll_metadata=None):
0033     """
0034     Create a collection.
0035 
0036     :param scope: The scope of the request data.
0037     :param name: The name of the request data.
0038     :param coll_type: The type of dataset as dataset or container.
0039     :param request_id: The request id.
0040     :param workload_id: The workload id.
0041     :param transform_id: The transform id related to this collection.
0042     :param relation_type: The relation between this collection and its transform,
0043                           such as Input, Output, Log and so on.
0044     :param size: The size of the collection.
0045     :param status: The status.
0046     :param locking: The locking.
0047     :param total_files: Number of total files.
0048     :param retries: Number of retries.
0049     :param expired_at: The datetime when it expires.
0050     :param coll_metadata: The metadata as json.
0051 
0052     :returns: collection.
0053     """
0054     new_coll = models.Collection(request_id=request_id, workload_id=workload_id, scope=scope, name=name,
0055                                  coll_type=coll_type, transform_id=transform_id,
0056                                  relation_type=relation_type, bytes=bytes, status=status, locking=locking,
0057                                  total_files=total_files, new_files=new_files, processing_files=processing_files,
0058                                  processed_files=processed_files, retries=retries,
0059                                  expired_at=expired_at, coll_metadata=coll_metadata)
0060     return new_coll
0061 
0062 
0063 @transactional_session
0064 def add_collection(request_id, workload_id, scope, name, coll_type=CollectionType.Dataset, transform_id=None,
0065                    relation_type=CollectionRelationType.Input, bytes=0, status=CollectionStatus.New,
0066                    locking=CollectionLocking.Idle, total_files=0, new_files=0, processing_files=0,
0067                    processed_files=0, retries=0, expired_at=None,
0068                    coll_metadata=None, session=None):
0069     """
0070     Add a collection.
0071 
0072     :param scope: The scope of the request data.
0073     :param name: The name of the request data.
0074     :param coll_type: The type of dataset as dataset or container.
0075     :param request_id: The request id.
0076     :param workload_id: The workload id.
0077     :param transform_id: The transform id related to this collection.
0078     :param relation_type: The relation between this collection and its transform,
0079                           such as Input, Output, Log and so on.
0080     :param size: The size of the collection.
0081     :param status: The status.
0082     :param locking: The locking.
0083     :param total_files: Number of total files.
0084     :param retries: Number of retries.
0085     :param expired_at: The datetime when it expires.
0086     :param coll_metadata: The metadata as json.
0087 
0088     :raises DuplicatedObject: If a collection with the same name exists.
0089     :raises DatabaseException: If there is a database error.
0090 
0091     :returns: collection id.
0092     """
0093     try:
0094         new_coll = create_collection(request_id=request_id, workload_id=workload_id, scope=scope, name=name,
0095                                      coll_type=coll_type, transform_id=transform_id,
0096                                      relation_type=relation_type, bytes=bytes, status=status, locking=locking,
0097                                      total_files=total_files, new_files=new_files, retries=retries,
0098                                      processing_files=processing_files, processed_files=processed_files,
0099                                      expired_at=expired_at, coll_metadata=coll_metadata)
0100         new_coll.save(session=session)
0101         coll_id = new_coll.coll_id
0102         return coll_id
0103     except IntegrityError as error:
0104         raise exceptions.DuplicatedObject('Collection scope:name(%s:%s) with transform_id(%s) already exists!: %s' %
0105                                           (scope, name, transform_id, error))
0106     except DatabaseError as error:
0107         raise exceptions.DatabaseException(error)
0108 
0109 
0110 @read_session
0111 def get_collection_id(transform_id, relation_type, session=None):
0112     """
0113     Get collection id or raise a NoObject exception.
0114 
0115     :param transform_id: The transform id related to this collection.
0116     :param relation_type: The relation between this collection and its transform,
0117                           such as Input, Output, Log and so on.
0118     :param session: The database session in use.
0119 
0120     :raises NoObject: If no request is founded.
0121 
0122     :returns: Collection id.
0123     """
0124 
0125     try:
0126         query = session.query(models.Collection.coll_id)
0127         query = query.filter_by(transform_id=transform_id)
0128         query = query.filter(models.Collection.relation_type == relation_type)
0129         ret = query.first()
0130         if not ret:
0131             return None
0132         else:
0133             return ret[0]
0134     except sqlalchemy.orm.exc.NoResultFound as error:
0135         raise exceptions.NoObject('collection(transform_id: %s, relation_type: %s) cannot be found: %s' %
0136                                   (transform_id, relation_type, error))
0137     except Exception as error:
0138         raise error
0139 
0140 
0141 @read_session
0142 def get_collection(coll_id=None, transform_id=None, relation_type=None, to_json=False, session=None):
0143     """
0144     Get a collection or raise a NoObject exception.
0145 
0146     :param coll_id: The id of the collection.
0147     :param transform_id: The transform id related to this collection.
0148     :param relation_type: The relation between this collection and its transform,
0149                           such as Input, Output, Log and so on.
0150     :param to_json: return json format.
0151     :param session: The database session in use.
0152 
0153     :raises NoObject: If no request is founded.
0154 
0155     :returns: Collection.
0156     """
0157 
0158     try:
0159         if coll_id:
0160             query = session.query(models.Collection).filter_by(coll_id=coll_id)
0161         else:
0162             query = session.query(models.Collection).filter_by(transform_id=transform_id)\
0163                            .filter(models.Collection.relation_type == relation_type)
0164 
0165         ret = query.first()
0166         if not ret:
0167             return None
0168         else:
0169             if to_json:
0170                 return ret.to_dict_json()
0171             else:
0172                 return ret.to_dict()
0173     except sqlalchemy.orm.exc.NoResultFound as error:
0174         raise exceptions.NoObject('collection(coll_id: %s, transform_id: %s, relation_type: %s) cannot be found: %s' %
0175                                   (coll_id, transform_id, relation_type, error))
0176     except Exception as error:
0177         raise error
0178 
0179 
0180 @read_session
0181 def get_collection_ids_by_transform_id(transform_id=None, session=None):
0182     """
0183     Get collection ids by transform id or raise a NoObject exception.
0184 
0185     :param transform_id: The transform id related to this collection.
0186     :param session: The database session in use.
0187 
0188     :raises NoObject: If no collections are founded.
0189 
0190     :returns: list of Collection ids.
0191     """
0192     try:
0193         query = session.query(models.Collection.coll_id)\
0194                        .filter_by(transform_id=transform_id)
0195         ret = query.all()
0196         if not ret:
0197             return []
0198         else:
0199             items = []
0200             for t in ret:
0201                 items.append(t[0])
0202             return items
0203     except sqlalchemy.orm.exc.NoResultFound as error:
0204         raise exceptions.NoObject('No collections with  transform_id(%s): %s' %
0205                                   (transform_id, error))
0206     except Exception as error:
0207         raise error
0208 
0209 
0210 @read_session
0211 def get_collections_by_status(status, relation_type=CollectionRelationType.Input, time_period=None,
0212                               locking=False, bulk_size=None, to_json=False, session=None):
0213     """
0214     Get collections by status, relation_type and time_period or raise a NoObject exception.
0215 
0216     :param status: The collection status.
0217     :param relation_type: The relation_type of the collection to the transform.
0218     :param time_period: time period in seconds since last update.
0219     :param locking: Wheter to retrieve unlocked files.
0220     :param to_json: return json format.
0221     :param session: The database session in use.
0222 
0223     :raises NoObject: If no collections are founded.
0224 
0225     :returns: list of Collections.
0226     """
0227     try:
0228         if not isinstance(status, (list, tuple)):
0229             status = [status]
0230         if len(status) == 1:
0231             status = [status[0], status[0]]
0232 
0233         query = session.query(models.Collection)\
0234                        .filter(models.Collection.status.in_(status))\
0235                        .filter(models.Collection.next_poll_at < datetime.datetime.utcnow())
0236 
0237         if relation_type is not None:
0238             query = query.filter(models.Collection.relation_type == relation_type)
0239         if time_period:
0240             query = query.filter(models.Collection.updated_at < datetime.datetime.utcnow() - datetime.timedelta(seconds=time_period))
0241         if locking:
0242             query = query.filter(models.Collection.locking == CollectionLocking.Idle)
0243 
0244         query = query.order_by(asc(models.Collection.updated_at))
0245         if bulk_size:
0246             query = query.limit(bulk_size)
0247 
0248         tmp = query.all()
0249         rets = []
0250         if tmp:
0251             for t in tmp:
0252                 if to_json:
0253                     rets.append(t.to_dict_json())
0254                 else:
0255                     rets.append(t.to_dict())
0256         return rets
0257     except sqlalchemy.orm.exc.NoResultFound as error:
0258         raise exceptions.NoObject('No collections with  status(%s), relation_type(%s), time_period(%s): %s' %
0259                                   (status, relation_type, time_period, error))
0260     except Exception as error:
0261         raise error
0262 
0263 
0264 @read_session
0265 def get_collections(scope=None, name=None, request_id=None, workload_id=None, transform_id=None,
0266                     relation_type=None, to_json=False, session=None):
0267     """
0268     Get collections by request id or raise a NoObject exception.
0269 
0270     :param scope: collection scope.
0271     :param name: collection name, can be wildcard.
0272     :param request_id: The request id.
0273     :param workload_id: The workload id.
0274     :param transform_id: list of transform id related to this collection.
0275     :param relation_type: The relation type between this collection and the transform: Input, Ouput and Log.
0276     :param to_json: return json format.
0277     :param session: The database session in use.
0278 
0279     :raises NoObject: If no collections are founded.
0280 
0281     :returns: list of Collections.
0282     """
0283     try:
0284         if transform_id and type(transform_id) not in (list, tuple):
0285             transform_id = [transform_id]
0286 
0287         query = session.query(models.Collection)
0288         if request_id:
0289             query = query.filter(models.Collection.request_id == request_id)
0290         if workload_id:
0291             query = query.filter(models.Collection.workload_id == workload_id)
0292         if transform_id:
0293             query = query.filter(models.Collection.transform_id.in_(transform_id))
0294         if relation_type is not None:
0295             query = query.filter(models.Collection.relation_type == relation_type)
0296 
0297         if scope:
0298             query = query.filter(models.Collection.scope == scope)
0299         if name:
0300             query = query.filter(models.Collection.name.like(name.replace('*', '%')))
0301 
0302         query = query.order_by(asc(models.Collection.updated_at))
0303 
0304         tmp = query.all()
0305         rets = []
0306         if tmp:
0307             for t in tmp:
0308                 if to_json:
0309                     rets.append(t.to_dict_json())
0310                 else:
0311                     rets.append(t.to_dict())
0312         return rets
0313     except sqlalchemy.orm.exc.NoResultFound as error:
0314         raise exceptions.NoObject('No collection with  scope(%s), name(%s), transform_id(%s): %s, relation_type: %s' %
0315                                   (scope, name, transform_id, relation_type, error))
0316     except Exception as error:
0317         raise error
0318 
0319 
0320 @read_session
0321 def get_collections_by_request_ids(request_ids, session=None):
0322     """"
0323     Get collections by a list of request ids.
0324 
0325     :param request_ids: list of request ids.
0326 
0327     :return collections: list of collections.
0328     """
0329     try:
0330         if request_ids and type(request_ids) not in (list, tuple):
0331             request_ids = [request_ids]
0332 
0333         columns = [models.Collection.coll_id,
0334                    models.Collection.request_id,
0335                    models.Collection.transform_id,
0336                    models.Collection.workload_id]
0337         column_names = [column.name for column in columns]
0338         query = session.query(*columns)
0339 
0340         if request_ids:
0341             query = query.filter(models.Collection.request_id.in_(request_ids))
0342 
0343         tmp = query.all()
0344         rets = []
0345         if tmp:
0346             for t in tmp:
0347                 # rets.append(t.to_dict())
0348                 t2 = dict(zip(column_names, t))
0349                 rets.append(t2)
0350         return rets
0351     except Exception as error:
0352         raise error
0353 
0354 
0355 @transactional_session
0356 def update_collection(coll_id, parameters, session=None):
0357     """
0358     update a collection.
0359 
0360     :param coll_id: the collection id.
0361     :param parameters: A dictionary of parameters.
0362     :param session: The database session in use.
0363 
0364     :raises NoObject: If no request is founded.
0365     :raises DatabaseException: If there is a database error.
0366 
0367     """
0368     try:
0369         parameters['updated_at'] = datetime.datetime.utcnow()
0370         session.query(models.Collection).filter_by(coll_id=coll_id)\
0371                .update(parameters, synchronize_session=False)
0372     except sqlalchemy.orm.exc.NoResultFound as error:
0373         raise exceptions.NoObject('Collection %s cannot be found: %s' % (coll_id, error))
0374 
0375 
0376 @transactional_session
0377 def update_collections(parameters, session=None):
0378     """
0379     update collections.
0380 
0381     :param parameters: list of dictionary of parameters.
0382     :param session: The database session in use.
0383 
0384     :raises NoObject: If no content is founded.
0385     :raises DatabaseException: If there is a database error.
0386 
0387     """
0388     try:
0389         for parameter in parameters:
0390             parameter['updated_at'] = datetime.datetime.utcnow()
0391 
0392         session.bulk_update_mappings(models.Collection, parameters)
0393     except sqlalchemy.orm.exc.NoResultFound as error:
0394         raise exceptions.NoObject('Collection cannot be found: %s' % (error))
0395 
0396 
0397 @transactional_session
0398 def delete_collection(coll_id=None, session=None):
0399     """
0400     delete a collection.
0401 
0402     :param coll_id: The id of the collection.
0403     :param session: The database session in use.
0404 
0405     :raises NoObject: If no request is founded.
0406     :raises DatabaseException: If there is a database error.
0407     """
0408     try:
0409         session.query(models.Collection).filter_by(coll_id=coll_id).delete()
0410     except sqlalchemy.orm.exc.NoResultFound as error:
0411         raise exceptions.NoObject('Collection %s cannot be found: %s' % (coll_id, error))
0412 
0413 
0414 @transactional_session
0415 def clean_locking(time_period=3600, session=None):
0416     """
0417     Clearn locking which is older than time period.
0418 
0419     :param time_period in seconds
0420     """
0421     params = {'locking': 0}
0422     session.query(models.Collection).filter(models.Collection.locking == CollectionLocking.Locking)\
0423            .filter(models.Collection.updated_at < datetime.datetime.utcnow() - datetime.timedelta(seconds=time_period))\
0424            .update(params, synchronize_session=False)
0425 
0426 
0427 @transactional_session
0428 def clean_next_poll_at(status, session=None):
0429     """
0430     Clearn next_poll_at.
0431 
0432     :param status: status of the collection
0433     """
0434     if not isinstance(status, (list, tuple)):
0435         status = [status]
0436     if len(status) == 1:
0437         status = [status[0], status[0]]
0438 
0439     params = {'next_poll_at': datetime.datetime.utcnow()}
0440     session.query(models.Collection).filter(models.Collection.status.in_(status))\
0441            .update(params, synchronize_session=False)