Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:20

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2019 - 2025
0010 
0011 
0012 """
0013 operations related to Processings.
0014 """
0015 
0016 import datetime
0017 
0018 import sqlalchemy
0019 from sqlalchemy import func, select, not_
0020 from sqlalchemy.exc import DatabaseError, IntegrityError
0021 from sqlalchemy.sql.expression import asc
0022 
0023 from idds.common import exceptions
0024 from idds.common.constants import CommandType, ProcessingType, ProcessingStatus, ProcessingLocking, GranularityType
0025 from idds.common.utils import get_process_thread_info
0026 from idds.orm.base.session import read_session, transactional_session, safe_bulk_update_mappings
0027 from idds.orm.base import models
0028 
0029 
0030 def create_processing(request_id, workload_id, transform_id, status=ProcessingStatus.New, locking=ProcessingLocking.Idle, submitter=None,
0031                       granularity=None, granularity_type=GranularityType.File, expired_at=None, processing_metadata=None,
0032                       new_poll_period=1, update_poll_period=10, processing_type=ProcessingType.Workflow,
0033                       new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0,
0034                       command=CommandType.NoneCommand, site=None,
0035                       internal_id=None, parent_internal_id=None, loop_index=None,
0036                       substatus=ProcessingStatus.New, output_metadata=None):
0037     """
0038     Create a processing.
0039 
0040     :param request_id: The request id.
0041     :param workload_id: The workload id.
0042     :param transform_id: Transform id.
0043     :param status: processing status.
0044     :param locking: processing locking.
0045     :param submitter: submitter name.
0046     :param granularity: Granularity size.
0047     :param granularity_type: Granularity type.
0048     :param expired_at: The datetime when it expires.
0049     :param processing_metadata: The metadata as json.
0050 
0051     :returns: processing.
0052     """
0053     new_processing = models.Processing(request_id=request_id, workload_id=workload_id, transform_id=transform_id,
0054                                        status=status, substatus=substatus, locking=locking,
0055                                        submitter=submitter, granularity=granularity, granularity_type=granularity_type,
0056                                        expired_at=expired_at, processing_metadata=processing_metadata,
0057                                        new_retries=new_retries, update_retries=update_retries,
0058                                        processing_type=processing_type, command=command,
0059                                        internal_id=internal_id, parent_internal_id=parent_internal_id,
0060                                        loop_index=loop_index, site=site,
0061                                        max_new_retries=max_new_retries, max_update_retries=max_update_retries,
0062                                        output_metadata=output_metadata)
0063 
0064     if new_poll_period:
0065         new_poll_period = datetime.timedelta(seconds=new_poll_period)
0066         new_processing.new_poll_period = new_poll_period
0067     if update_poll_period:
0068         update_poll_period = datetime.timedelta(seconds=update_poll_period)
0069         new_processing.update_poll_period = update_poll_period
0070     return new_processing
0071 
0072 
0073 @transactional_session
0074 def add_processing(request_id, workload_id, transform_id, status=ProcessingStatus.New,
0075                    locking=ProcessingLocking.Idle, submitter=None, substatus=ProcessingStatus.New,
0076                    granularity=None, granularity_type=GranularityType.File, expired_at=None,
0077                    processing_metadata=None, new_poll_period=1, update_poll_period=10,
0078                    processing_type=ProcessingType.Workflow, site=None,
0079                    command=CommandType.NoneCommand,
0080                    internal_id=None, parent_internal_id=None, loop_index=None,
0081                    new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0,
0082                    output_metadata=None, session=None):
0083     """
0084     Add a processing.
0085 
0086     :param request_id: The request id.
0087     :param workload_id: The workload id.
0088     :param transform_id: Transform id.
0089     :param status: processing status.
0090     :param locking: processing locking.
0091     :param submitter: submitter name.
0092     :param granularity: Granularity size.
0093     :param granularity_type: Granularity type.
0094     :param expired_at: The datetime when it expires.
0095     :param processing_metadata: The metadata as json.
0096 
0097     :raises DuplicatedObject: If a processing with the same name exists.
0098     :raises DatabaseException: If there is a database error.
0099 
0100     :returns: processing id.
0101     """
0102     try:
0103         new_processing = create_processing(request_id=request_id, workload_id=workload_id, transform_id=transform_id,
0104                                            status=status, substatus=substatus, locking=locking, submitter=submitter,
0105                                            granularity=granularity, granularity_type=granularity_type,
0106                                            expired_at=expired_at, new_poll_period=new_poll_period,
0107                                            update_poll_period=update_poll_period, processing_type=processing_type,
0108                                            new_retries=new_retries, update_retries=update_retries,
0109                                            command=command, site=site,
0110                                            internal_id=internal_id, parent_internal_id=parent_internal_id,
0111                                            loop_index=loop_index,
0112                                            max_new_retries=max_new_retries, max_update_retries=max_update_retries,
0113                                            processing_metadata=processing_metadata, output_metadata=output_metadata)
0114         new_processing.save(session=session)
0115         proc_id = new_processing.processing_id
0116         return proc_id
0117     except IntegrityError as error:
0118         raise exceptions.DuplicatedObject('Processing already exists!: %s' % (error))
0119     except DatabaseError as error:
0120         raise exceptions.DatabaseException(error)
0121 
0122 
0123 @read_session
0124 def get_processing(processing_id, request_id=None, transform_id=None, to_json=False, session=None):
0125     """
0126     Get processing or raise a NoObject exception.
0127 
0128     :param processing_id: Processing id.
0129     :param to_json: return json format.
0130 
0131     :param session: The database session in use.
0132 
0133     :raises NoObject: If no processing is founded.
0134 
0135     :returns: Processing.
0136     """
0137 
0138     try:
0139         query = session.query(models.Processing)\
0140                        .filter_by(processing_id=processing_id)
0141         if request_id is not None:
0142             query = query.filter_by(request_id=request_id)
0143         if transform_id is not None:
0144             query = query.filter_by(transform_id=transform_id)
0145 
0146         ret = query.first()
0147         if not ret:
0148             return None
0149         else:
0150             if to_json:
0151                 return ret.to_dict_json()
0152             else:
0153                 return ret.to_dict()
0154     except sqlalchemy.orm.exc.NoResultFound as error:
0155         raise exceptions.NoObject('Processing(processing_id: %s) cannot be found: %s' %
0156                                   (processing_id, error))
0157     except Exception as error:
0158         raise error
0159 
0160 
0161 @read_session
0162 def get_processing_by_id_status(processing_id, status=None, exclude_status=None, locking=False, to_lock=False, session=None):
0163     """
0164     Get a processing or raise a NoObject exception.
0165 
0166     :param processing_id: The id of the processing.
0167     :param status: request status.
0168     :param locking: the locking status.
0169 
0170     :param session: The database session in use.
0171 
0172     :raises NoObject: If no request is founded.
0173 
0174     :returns: Processing.
0175     """
0176 
0177     try:
0178         query = select(models.Processing).filter(models.Processing.processing_id == processing_id)
0179 
0180         if status:
0181             if not isinstance(status, (list, tuple)):
0182                 status = [status]
0183             if len(status) == 1:
0184                 status = [status[0], status[0]]
0185             query = query.where(models.Processing.status.in_(status))
0186 
0187         if exclude_status:
0188             if not isinstance(exclude_status, (list, tuple)):
0189                 exclude_status = [exclude_status]
0190             if len(exclude_status) == 1:
0191                 exclude_status = [exclude_status[0], exclude_status[0]]
0192             query = query.where(not_(models.Processing.status.in_(exclude_status)))
0193 
0194         if locking:
0195             query = query.where(models.Processing.locking == ProcessingLocking.Idle)
0196             query = query.with_for_update(skip_locked=True)
0197 
0198         ret = session.execute(query).fetchone()
0199         if not ret:
0200             return None
0201         else:
0202             if locking:
0203                 ret[0].updated_at = datetime.datetime.utcnow()
0204                 ret[0].locking = ProcessingLocking.Locking
0205                 hostname, pid, thread_id, thread_name = get_process_thread_info()
0206                 ret[0].locking_hostname = hostname
0207                 ret[0].locking_pid = pid
0208                 ret[0].locking_thread_id = thread_id
0209                 ret[0].locking_thread_name = thread_name
0210 
0211             return ret[0].to_dict()
0212     except sqlalchemy.orm.exc.NoResultFound as error:
0213         raise exceptions.NoObject('processing processing_id: %s cannot be found: %s' % (processing_id, error))
0214 
0215 
0216 @read_session
0217 def get_processings(request_id=None, workload_id=None, transform_id=None, loop_index=None, internal_ids=None,
0218                     site=None, parent_internal_ids=None, to_json=False, session=None):
0219     """
0220     Get processing or raise a NoObject exception.
0221 
0222     :param processing_id: Processing id.
0223     :param to_json: return json format.
0224 
0225     :param session: The database session in use.
0226 
0227     :raises NoObject: If no processing is founded.
0228 
0229     :returns: Processing.
0230     """
0231 
0232     try:
0233         query = session.query(models.Processing)
0234 
0235         if request_id:
0236             query = query.filter(models.Processing.request_id == request_id)
0237         if workload_id:
0238             query = query.filter(models.Processing.workload_id == workload_id)
0239         if transform_id:
0240             query = query.filter(models.Processing.transform_id == transform_id)
0241         if loop_index is not None:
0242             query = query.filter(models.Processing.loop_index == loop_index)
0243         if site:
0244             query = query.filter(models.Processing.site == site)
0245         if internal_ids:
0246             if not isinstance(internal_ids, (list, tuple)):
0247                 internal_ids = [internal_ids]
0248             if len(internal_ids) == 1:
0249                 internal_ids = [internal_ids[0], internal_ids[0]]
0250             query = query.filter(models.Processing.internal_id.in_(internal_ids))
0251         if parent_internal_ids:
0252             if not isinstance(parent_internal_ids, (list, tuple)):
0253                 parent_internal_ids = [parent_internal_ids]
0254             if len(parent_internal_ids) == 1:
0255                 parent_internal_ids = [parent_internal_ids[0], parent_internal_ids[0]]
0256             query = query.filter(models.Processing.parent_internal_id.in_(parent_internal_ids))
0257 
0258         tmp = query.all()
0259         rets = []
0260         if tmp:
0261             for t in tmp:
0262                 if to_json:
0263                     rets.append(t.to_dict_json())
0264                 else:
0265                     rets.append(t.to_dict())
0266         return rets
0267     except sqlalchemy.orm.exc.NoResultFound as error:
0268         raise exceptions.NoObject('Processing(request_id: %s, workload_id: %s, transform_id: %s) cannot be found: %s' %
0269                                   (request_id, workload_id, transform_id, error))
0270     except Exception as error:
0271         raise error
0272 
0273 
0274 @read_session
0275 def get_processings_by_transform_id(transform_id=None, to_json=False, session=None):
0276     """
0277     Get processings or raise a NoObject exception.
0278 
0279     :param tranform_id: Transform id.
0280     :param session: The database session in use.
0281 
0282     :raises NoObject: If no processing is founded.
0283 
0284     :returns: Processings.
0285     """
0286 
0287     try:
0288         query = session.query(models.Processing)\
0289                        .filter_by(transform_id=transform_id)
0290         query = query.order_by(asc(models.Processing.processing_id))
0291 
0292         ret = query.all()
0293         if not ret:
0294             return []
0295         else:
0296             items = []
0297             for t in ret:
0298                 if to_json:
0299                     items.append(t.to_dict_json())
0300                 else:
0301                     items.append(t.to_dict())
0302             return items
0303     except sqlalchemy.orm.exc.NoResultFound as error:
0304         raise exceptions.NoObject('Processings(transform_id: %s) cannot be found: %s' %
0305                                   (transform_id, error))
0306     except Exception as error:
0307         raise error
0308 
0309 
0310 @transactional_session
0311 def get_processings_by_status(status, period=None, processing_ids=[], locking=False, locking_for_update=False,
0312                               bulk_size=None, submitter=None, to_json=False, by_substatus=False, only_return_id=False,
0313                               not_lock=False, min_request_id=None, new_poll=False, update_poll=False, for_poller=False, session=None):
0314     """
0315     Get processing or raise a NoObject exception.
0316 
0317     :param status: Processing status of list of processing status.
0318     :param period: Time period in seconds.
0319     :param locking: Whether to retrieve only unlocked items.
0320     :param bulk_size: bulk size limitation.
0321     :param submitter: The submitter name.
0322     :param to_json: return json format.
0323 
0324     :param session: The database session in use.
0325 
0326     :raises NoObject: If no processing is founded.
0327 
0328     :returns: Processings.
0329     """
0330 
0331     try:
0332         if status:
0333             if not isinstance(status, (list, tuple)):
0334                 status = [status]
0335             if len(status) == 1:
0336                 status = [status[0], status[0]]
0337 
0338         if only_return_id:
0339             query = session.query(models.Processing.processing_id)
0340         else:
0341             query = session.query(models.Processing)
0342 
0343         if status:
0344             if by_substatus:
0345                 query = query.filter(models.Processing.substatus.in_(status))
0346             else:
0347                 query = query.filter(models.Processing.status.in_(status))
0348         if new_poll:
0349             query = query.filter(models.Processing.updated_at + models.Processing.new_poll_period <= datetime.datetime.utcnow())
0350         if update_poll:
0351             query = query.filter(models.Processing.updated_at + models.Processing.update_poll_period <= datetime.datetime.utcnow())
0352 
0353         if processing_ids:
0354             query = query.filter(models.Processing.processing_id.in_(processing_ids))
0355         if min_request_id:
0356             query = query.filter(models.Processing.request_id >= min_request_id)
0357         # if period:
0358         #     query = query.filter(models.Processing.updated_at < datetime.datetime.utcnow() - datetime.timedelta(seconds=period))
0359         if locking:
0360             query = query.filter(models.Processing.locking == ProcessingLocking.Idle)
0361         if submitter:
0362             query = query.filter(models.Processing.submitter == submitter)
0363 
0364         # if for_poller:
0365         #     query = query.order_by(asc(models.Processing.poller_updated_at))
0366         if locking_for_update:
0367             query = query.with_for_update(skip_locked=True)
0368         else:
0369             query = query.order_by(asc(models.Processing.updated_at))
0370 
0371         if bulk_size:
0372             query = query.limit(bulk_size)
0373 
0374         tmp = query.all()
0375         rets = []
0376         if tmp:
0377             for t in tmp:
0378                 if locking:
0379                     t.updated_at = datetime.datetime.utcnow()
0380                     t.locking = ProcessingLocking.Locking
0381 
0382                     hostname, pid, thread_id, thread_name = get_process_thread_info()
0383                     t.locking_hostname = hostname
0384                     t.locking_pid = pid
0385                     t.locking_thread_id = thread_id
0386                     t.locking_thread_name = thread_name
0387 
0388                 if only_return_id:
0389                     rets.append(t[0])
0390                 else:
0391                     if to_json:
0392                         rets.append(t.to_dict_json())
0393                     else:
0394                         rets.append(t.to_dict())
0395         return rets
0396     except sqlalchemy.orm.exc.NoResultFound as error:
0397         raise exceptions.NoObject('No processing attached with status (%s): %s' % (status, error))
0398     except Exception as error:
0399         raise error
0400 
0401 
0402 @transactional_session
0403 def update_processing(processing_id, parameters, locking=False, session=None):
0404     """
0405     update a processing.
0406 
0407     :param processing_id: the transform id.
0408     :param parameters: A dictionary of parameters.
0409     :param session: The database session in use.
0410 
0411     :raises NoObject: If no content is founded.
0412     :raises DatabaseException: If there is a database error.
0413 
0414     """
0415     try:
0416         if 'new_poll_period' in parameters and type(parameters['new_poll_period']) not in [datetime.timedelta]:
0417             parameters['new_poll_period'] = datetime.timedelta(seconds=parameters['new_poll_period'])
0418         if 'update_poll_period' in parameters and type(parameters['update_poll_period']) not in [datetime.timedelta]:
0419             parameters['update_poll_period'] = datetime.timedelta(seconds=parameters['update_poll_period'])
0420 
0421         parameters['updated_at'] = datetime.datetime.utcnow()
0422         if 'status' in parameters and parameters['status'] in [ProcessingStatus.Finished, ProcessingStatus.Failed,
0423                                                                ProcessingStatus.Lost]:
0424             parameters['finished_at'] = datetime.datetime.utcnow()
0425 
0426         if parameters and 'processing_metadata' in parameters and 'processing' in parameters['processing_metadata']:
0427             proc = parameters['processing_metadata']['processing']
0428             if proc is not None:
0429                 if 'running_metadata' not in parameters:
0430                     parameters['running_metadata'] = {}
0431                 parameters['running_metadata']['processing_data'] = proc.metadata
0432         if parameters and 'processing_metadata' in parameters:
0433             del parameters['processing_metadata']
0434         if parameters and 'running_metadata' in parameters:
0435             parameters['_running_metadata'] = parameters['running_metadata']
0436             del parameters['running_metadata']
0437 
0438         query = session.query(models.Processing).filter_by(processing_id=processing_id)
0439         if locking:
0440             query = query.filter(models.Processing.locking == ProcessingLocking.Idle)
0441             query = query.with_for_update(skip_locked=True)
0442         row = query.one_or_none()
0443         if not row:
0444             return 0
0445 
0446         # apply updates
0447         for k, v in parameters.items():
0448             setattr(row, k, v)
0449 
0450         return 1
0451     except sqlalchemy.orm.exc.NoResultFound as error:
0452         raise exceptions.NoObject('Processing %s cannot be found: %s' % (processing_id, error))
0453     return 0
0454 
0455 
0456 @transactional_session
0457 def abort_resume_processings(transform_id=None, request_id=None, processing_id=None, abort=False, resume=False, session=None):
0458     """
0459     abort/resume processings.
0460 
0461     :param request_id: The request id.
0462     :param transform_id: The id of the transform.
0463     :param session: The database session in use.
0464 
0465     :raises NoObject: If no content is founded.
0466     :raises DatabaseException: If there is a database error.
0467     """
0468     if not abort and not resume:
0469         return
0470     if not transform_id and not request_id and not processing_id:
0471         return
0472 
0473     try:
0474         if abort:
0475             # parameters = {'substatus': ProcessingStatus.ToCancel}
0476             parameters = {'command': CommandType.AbortProcessing}
0477             command = CommandType.AbortProcessing
0478         if resume:
0479             command = CommandType.ResumeProcessing
0480             # parameters = {'status': ProcessingStatus.ToResume,
0481             #               'substatus': ProcessingStatus.ToResume}
0482             parameters = {'status': ProcessingStatus.ToResume, 'command': CommandType.ResumeProcessing}
0483 
0484         query = session.query(models.Processing)
0485         if processing_id:
0486             query = query.filter_by(processing_id=processing_id)
0487         if transform_id:
0488             query = query.filter_by(transform_id=transform_id)
0489         if request_id:
0490             query = query.filter_by(request_id=request_id)
0491         query = query.filter(models.Processing.command != command)
0492         num_rows = query.update(parameters, synchronize_session=False)
0493         return num_rows
0494     except sqlalchemy.orm.exc.NoResultFound as error:
0495         raise exceptions.NoObject('Transfrom %s cannot be found: %s' % (transform_id, error))
0496     return 0
0497 
0498 
0499 @transactional_session
0500 def delete_processing(processing_id=None, session=None):
0501     """
0502     delete a processing.
0503 
0504     :param processing_id: The id of the processing.
0505     :param session: The database session in use.
0506 
0507     :raises NoObject: If no processing is founded.
0508     :raises DatabaseException: If there is a database error.
0509     """
0510     try:
0511         session.query(models.Processing).filter_by(processing_id=processing_id).delete()
0512     except sqlalchemy.orm.exc.NoResultFound as error:
0513         raise exceptions.NoObject('Processing %s cannot be found: %s' % (processing_id, error))
0514 
0515 
0516 @transactional_session
0517 def clean_locking(time_period=3600, min_request_id=None, health_items=[], force=False, hostname=None, pid=None, session=None):
0518     """
0519     Clearn locking which is older than time period.
0520 
0521     :param time_period in seconds
0522     """
0523     health_dict = {}
0524     for item in health_items:
0525         hostname = item['hostname']
0526         pid = item['pid']
0527         thread_id = item['thread_id']
0528         if hostname not in health_dict:
0529             health_dict[hostname] = {}
0530         if pid not in health_dict[hostname]:
0531             health_dict[hostname][pid] = []
0532         if thread_id not in health_dict[hostname][pid]:
0533             health_dict[hostname][pid].append(thread_id)
0534     query = session.query(models.Processing.processing_id,
0535                           models.Processing.locking_hostname,
0536                           models.Processing.locking_pid,
0537                           models.Processing.locking_thread_id,
0538                           models.Processing.locking_thread_name,
0539                           models.Processing.updated_at)
0540     query = query.filter(models.Processing.locking == ProcessingLocking.Locking)
0541     if min_request_id:
0542         query = query.filter(models.Processing.request_id >= min_request_id)
0543 
0544     lost_processing_ids = []
0545     tmp = query.all()
0546     if tmp:
0547         for req in tmp:
0548             pr_id, locking_hostname, locking_pid, locking_thread_id, locking_thread_name, updated_at = req
0549             if (
0550                 (locking_hostname not in health_dict or locking_pid not in health_dict[locking_hostname])
0551                 or (force and hostname == locking_hostname and pid == locking_pid)        # noqa W503
0552                 or (updated_at < datetime.datetime.utcnow() - datetime.timedelta(seconds=time_period))    # noqa W503
0553             ):
0554                 lost_processing_ids.append({"processing_id": pr_id, 'locking': 0})
0555 
0556     # This one can cause dead locks
0557     # session.bulk_update_mappings(models.Processing, lost_processing_ids)
0558     safe_bulk_update_mappings(session, models.Processing, lost_processing_ids)
0559 
0560 
0561 @transactional_session
0562 def clean_next_poll_at(status, session=None):
0563     """
0564     Clearn next_poll_at.
0565 
0566     :param status: status of the processing
0567     """
0568     if not isinstance(status, (list, tuple)):
0569         status = [status]
0570     if len(status) == 1:
0571         status = [status[0], status[0]]
0572 
0573     params = {'next_poll_at': datetime.datetime.utcnow()}
0574     session.query(models.Processing).filter(models.Processing.status.in_(status))\
0575            .update(params, synchronize_session=False)
0576 
0577 
0578 @read_session
0579 def get_num_active_processings(active_status=None, session=None):
0580     if active_status and not isinstance(active_status, (list, tuple)):
0581         active_status = [active_status]
0582     if active_status and len(active_status) == 1:
0583         active_status = [active_status[0], active_status[0]]
0584 
0585     try:
0586         query = session.query(models.Processing.status, models.Processing.site, func.count(models.Processing.processing_id))
0587         if active_status:
0588             query = query.filter(models.Processing.status.in_(active_status))
0589         query = query.group_by(models.Processing.status, models.Processing.site)
0590         tmp = query.all()
0591         return tmp
0592     except Exception as error:
0593         raise error
0594 
0595 
0596 @read_session
0597 def get_active_processings(active_status=None, session=None):
0598     if active_status and not isinstance(active_status, (list, tuple)):
0599         active_status = [active_status]
0600     if active_status and len(active_status) == 1:
0601         active_status = [active_status[0], active_status[0]]
0602 
0603     try:
0604         query = session.query(models.Processing.request_id,
0605                               models.Processing.transform_id,
0606                               models.Processing.processing_id,
0607                               models.Processing.site,
0608                               models.Processing.status)
0609         if active_status:
0610             query = query.filter(models.Processing.status.in_(active_status))
0611         tmp = query.all()
0612         return tmp
0613     except Exception as error:
0614         raise error