File indexing completed on 2026-04-09 07:58:20
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 operations related to Processings.
0014 """
0015
0016 import datetime
0017
0018 import sqlalchemy
0019 from sqlalchemy import func, select, not_
0020 from sqlalchemy.exc import DatabaseError, IntegrityError
0021 from sqlalchemy.sql.expression import asc
0022
0023 from idds.common import exceptions
0024 from idds.common.constants import CommandType, ProcessingType, ProcessingStatus, ProcessingLocking, GranularityType
0025 from idds.common.utils import get_process_thread_info
0026 from idds.orm.base.session import read_session, transactional_session, safe_bulk_update_mappings
0027 from idds.orm.base import models
0028
0029
0030 def create_processing(request_id, workload_id, transform_id, status=ProcessingStatus.New, locking=ProcessingLocking.Idle, submitter=None,
0031 granularity=None, granularity_type=GranularityType.File, expired_at=None, processing_metadata=None,
0032 new_poll_period=1, update_poll_period=10, processing_type=ProcessingType.Workflow,
0033 new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0,
0034 command=CommandType.NoneCommand, site=None,
0035 internal_id=None, parent_internal_id=None, loop_index=None,
0036 substatus=ProcessingStatus.New, output_metadata=None):
0037 """
0038 Create a processing.
0039
0040 :param request_id: The request id.
0041 :param workload_id: The workload id.
0042 :param transform_id: Transform id.
0043 :param status: processing status.
0044 :param locking: processing locking.
0045 :param submitter: submitter name.
0046 :param granularity: Granularity size.
0047 :param granularity_type: Granularity type.
0048 :param expired_at: The datetime when it expires.
0049 :param processing_metadata: The metadata as json.
0050
0051 :returns: processing.
0052 """
0053 new_processing = models.Processing(request_id=request_id, workload_id=workload_id, transform_id=transform_id,
0054 status=status, substatus=substatus, locking=locking,
0055 submitter=submitter, granularity=granularity, granularity_type=granularity_type,
0056 expired_at=expired_at, processing_metadata=processing_metadata,
0057 new_retries=new_retries, update_retries=update_retries,
0058 processing_type=processing_type, command=command,
0059 internal_id=internal_id, parent_internal_id=parent_internal_id,
0060 loop_index=loop_index, site=site,
0061 max_new_retries=max_new_retries, max_update_retries=max_update_retries,
0062 output_metadata=output_metadata)
0063
0064 if new_poll_period:
0065 new_poll_period = datetime.timedelta(seconds=new_poll_period)
0066 new_processing.new_poll_period = new_poll_period
0067 if update_poll_period:
0068 update_poll_period = datetime.timedelta(seconds=update_poll_period)
0069 new_processing.update_poll_period = update_poll_period
0070 return new_processing
0071
0072
0073 @transactional_session
0074 def add_processing(request_id, workload_id, transform_id, status=ProcessingStatus.New,
0075 locking=ProcessingLocking.Idle, submitter=None, substatus=ProcessingStatus.New,
0076 granularity=None, granularity_type=GranularityType.File, expired_at=None,
0077 processing_metadata=None, new_poll_period=1, update_poll_period=10,
0078 processing_type=ProcessingType.Workflow, site=None,
0079 command=CommandType.NoneCommand,
0080 internal_id=None, parent_internal_id=None, loop_index=None,
0081 new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0,
0082 output_metadata=None, session=None):
0083 """
0084 Add a processing.
0085
0086 :param request_id: The request id.
0087 :param workload_id: The workload id.
0088 :param transform_id: Transform id.
0089 :param status: processing status.
0090 :param locking: processing locking.
0091 :param submitter: submitter name.
0092 :param granularity: Granularity size.
0093 :param granularity_type: Granularity type.
0094 :param expired_at: The datetime when it expires.
0095 :param processing_metadata: The metadata as json.
0096
0097 :raises DuplicatedObject: If a processing with the same name exists.
0098 :raises DatabaseException: If there is a database error.
0099
0100 :returns: processing id.
0101 """
0102 try:
0103 new_processing = create_processing(request_id=request_id, workload_id=workload_id, transform_id=transform_id,
0104 status=status, substatus=substatus, locking=locking, submitter=submitter,
0105 granularity=granularity, granularity_type=granularity_type,
0106 expired_at=expired_at, new_poll_period=new_poll_period,
0107 update_poll_period=update_poll_period, processing_type=processing_type,
0108 new_retries=new_retries, update_retries=update_retries,
0109 command=command, site=site,
0110 internal_id=internal_id, parent_internal_id=parent_internal_id,
0111 loop_index=loop_index,
0112 max_new_retries=max_new_retries, max_update_retries=max_update_retries,
0113 processing_metadata=processing_metadata, output_metadata=output_metadata)
0114 new_processing.save(session=session)
0115 proc_id = new_processing.processing_id
0116 return proc_id
0117 except IntegrityError as error:
0118 raise exceptions.DuplicatedObject('Processing already exists!: %s' % (error))
0119 except DatabaseError as error:
0120 raise exceptions.DatabaseException(error)
0121
0122
0123 @read_session
0124 def get_processing(processing_id, request_id=None, transform_id=None, to_json=False, session=None):
0125 """
0126 Get processing or raise a NoObject exception.
0127
0128 :param processing_id: Processing id.
0129 :param to_json: return json format.
0130
0131 :param session: The database session in use.
0132
0133 :raises NoObject: If no processing is founded.
0134
0135 :returns: Processing.
0136 """
0137
0138 try:
0139 query = session.query(models.Processing)\
0140 .filter_by(processing_id=processing_id)
0141 if request_id is not None:
0142 query = query.filter_by(request_id=request_id)
0143 if transform_id is not None:
0144 query = query.filter_by(transform_id=transform_id)
0145
0146 ret = query.first()
0147 if not ret:
0148 return None
0149 else:
0150 if to_json:
0151 return ret.to_dict_json()
0152 else:
0153 return ret.to_dict()
0154 except sqlalchemy.orm.exc.NoResultFound as error:
0155 raise exceptions.NoObject('Processing(processing_id: %s) cannot be found: %s' %
0156 (processing_id, error))
0157 except Exception as error:
0158 raise error
0159
0160
0161 @read_session
0162 def get_processing_by_id_status(processing_id, status=None, exclude_status=None, locking=False, to_lock=False, session=None):
0163 """
0164 Get a processing or raise a NoObject exception.
0165
0166 :param processing_id: The id of the processing.
0167 :param status: request status.
0168 :param locking: the locking status.
0169
0170 :param session: The database session in use.
0171
0172 :raises NoObject: If no request is founded.
0173
0174 :returns: Processing.
0175 """
0176
0177 try:
0178 query = select(models.Processing).filter(models.Processing.processing_id == processing_id)
0179
0180 if status:
0181 if not isinstance(status, (list, tuple)):
0182 status = [status]
0183 if len(status) == 1:
0184 status = [status[0], status[0]]
0185 query = query.where(models.Processing.status.in_(status))
0186
0187 if exclude_status:
0188 if not isinstance(exclude_status, (list, tuple)):
0189 exclude_status = [exclude_status]
0190 if len(exclude_status) == 1:
0191 exclude_status = [exclude_status[0], exclude_status[0]]
0192 query = query.where(not_(models.Processing.status.in_(exclude_status)))
0193
0194 if locking:
0195 query = query.where(models.Processing.locking == ProcessingLocking.Idle)
0196 query = query.with_for_update(skip_locked=True)
0197
0198 ret = session.execute(query).fetchone()
0199 if not ret:
0200 return None
0201 else:
0202 if locking:
0203 ret[0].updated_at = datetime.datetime.utcnow()
0204 ret[0].locking = ProcessingLocking.Locking
0205 hostname, pid, thread_id, thread_name = get_process_thread_info()
0206 ret[0].locking_hostname = hostname
0207 ret[0].locking_pid = pid
0208 ret[0].locking_thread_id = thread_id
0209 ret[0].locking_thread_name = thread_name
0210
0211 return ret[0].to_dict()
0212 except sqlalchemy.orm.exc.NoResultFound as error:
0213 raise exceptions.NoObject('processing processing_id: %s cannot be found: %s' % (processing_id, error))
0214
0215
0216 @read_session
0217 def get_processings(request_id=None, workload_id=None, transform_id=None, loop_index=None, internal_ids=None,
0218 site=None, parent_internal_ids=None, to_json=False, session=None):
0219 """
0220 Get processing or raise a NoObject exception.
0221
0222 :param processing_id: Processing id.
0223 :param to_json: return json format.
0224
0225 :param session: The database session in use.
0226
0227 :raises NoObject: If no processing is founded.
0228
0229 :returns: Processing.
0230 """
0231
0232 try:
0233 query = session.query(models.Processing)
0234
0235 if request_id:
0236 query = query.filter(models.Processing.request_id == request_id)
0237 if workload_id:
0238 query = query.filter(models.Processing.workload_id == workload_id)
0239 if transform_id:
0240 query = query.filter(models.Processing.transform_id == transform_id)
0241 if loop_index is not None:
0242 query = query.filter(models.Processing.loop_index == loop_index)
0243 if site:
0244 query = query.filter(models.Processing.site == site)
0245 if internal_ids:
0246 if not isinstance(internal_ids, (list, tuple)):
0247 internal_ids = [internal_ids]
0248 if len(internal_ids) == 1:
0249 internal_ids = [internal_ids[0], internal_ids[0]]
0250 query = query.filter(models.Processing.internal_id.in_(internal_ids))
0251 if parent_internal_ids:
0252 if not isinstance(parent_internal_ids, (list, tuple)):
0253 parent_internal_ids = [parent_internal_ids]
0254 if len(parent_internal_ids) == 1:
0255 parent_internal_ids = [parent_internal_ids[0], parent_internal_ids[0]]
0256 query = query.filter(models.Processing.parent_internal_id.in_(parent_internal_ids))
0257
0258 tmp = query.all()
0259 rets = []
0260 if tmp:
0261 for t in tmp:
0262 if to_json:
0263 rets.append(t.to_dict_json())
0264 else:
0265 rets.append(t.to_dict())
0266 return rets
0267 except sqlalchemy.orm.exc.NoResultFound as error:
0268 raise exceptions.NoObject('Processing(request_id: %s, workload_id: %s, transform_id: %s) cannot be found: %s' %
0269 (request_id, workload_id, transform_id, error))
0270 except Exception as error:
0271 raise error
0272
0273
0274 @read_session
0275 def get_processings_by_transform_id(transform_id=None, to_json=False, session=None):
0276 """
0277 Get processings or raise a NoObject exception.
0278
0279 :param tranform_id: Transform id.
0280 :param session: The database session in use.
0281
0282 :raises NoObject: If no processing is founded.
0283
0284 :returns: Processings.
0285 """
0286
0287 try:
0288 query = session.query(models.Processing)\
0289 .filter_by(transform_id=transform_id)
0290 query = query.order_by(asc(models.Processing.processing_id))
0291
0292 ret = query.all()
0293 if not ret:
0294 return []
0295 else:
0296 items = []
0297 for t in ret:
0298 if to_json:
0299 items.append(t.to_dict_json())
0300 else:
0301 items.append(t.to_dict())
0302 return items
0303 except sqlalchemy.orm.exc.NoResultFound as error:
0304 raise exceptions.NoObject('Processings(transform_id: %s) cannot be found: %s' %
0305 (transform_id, error))
0306 except Exception as error:
0307 raise error
0308
0309
0310 @transactional_session
0311 def get_processings_by_status(status, period=None, processing_ids=[], locking=False, locking_for_update=False,
0312 bulk_size=None, submitter=None, to_json=False, by_substatus=False, only_return_id=False,
0313 not_lock=False, min_request_id=None, new_poll=False, update_poll=False, for_poller=False, session=None):
0314 """
0315 Get processing or raise a NoObject exception.
0316
0317 :param status: Processing status of list of processing status.
0318 :param period: Time period in seconds.
0319 :param locking: Whether to retrieve only unlocked items.
0320 :param bulk_size: bulk size limitation.
0321 :param submitter: The submitter name.
0322 :param to_json: return json format.
0323
0324 :param session: The database session in use.
0325
0326 :raises NoObject: If no processing is founded.
0327
0328 :returns: Processings.
0329 """
0330
0331 try:
0332 if status:
0333 if not isinstance(status, (list, tuple)):
0334 status = [status]
0335 if len(status) == 1:
0336 status = [status[0], status[0]]
0337
0338 if only_return_id:
0339 query = session.query(models.Processing.processing_id)
0340 else:
0341 query = session.query(models.Processing)
0342
0343 if status:
0344 if by_substatus:
0345 query = query.filter(models.Processing.substatus.in_(status))
0346 else:
0347 query = query.filter(models.Processing.status.in_(status))
0348 if new_poll:
0349 query = query.filter(models.Processing.updated_at + models.Processing.new_poll_period <= datetime.datetime.utcnow())
0350 if update_poll:
0351 query = query.filter(models.Processing.updated_at + models.Processing.update_poll_period <= datetime.datetime.utcnow())
0352
0353 if processing_ids:
0354 query = query.filter(models.Processing.processing_id.in_(processing_ids))
0355 if min_request_id:
0356 query = query.filter(models.Processing.request_id >= min_request_id)
0357
0358
0359 if locking:
0360 query = query.filter(models.Processing.locking == ProcessingLocking.Idle)
0361 if submitter:
0362 query = query.filter(models.Processing.submitter == submitter)
0363
0364
0365
0366 if locking_for_update:
0367 query = query.with_for_update(skip_locked=True)
0368 else:
0369 query = query.order_by(asc(models.Processing.updated_at))
0370
0371 if bulk_size:
0372 query = query.limit(bulk_size)
0373
0374 tmp = query.all()
0375 rets = []
0376 if tmp:
0377 for t in tmp:
0378 if locking:
0379 t.updated_at = datetime.datetime.utcnow()
0380 t.locking = ProcessingLocking.Locking
0381
0382 hostname, pid, thread_id, thread_name = get_process_thread_info()
0383 t.locking_hostname = hostname
0384 t.locking_pid = pid
0385 t.locking_thread_id = thread_id
0386 t.locking_thread_name = thread_name
0387
0388 if only_return_id:
0389 rets.append(t[0])
0390 else:
0391 if to_json:
0392 rets.append(t.to_dict_json())
0393 else:
0394 rets.append(t.to_dict())
0395 return rets
0396 except sqlalchemy.orm.exc.NoResultFound as error:
0397 raise exceptions.NoObject('No processing attached with status (%s): %s' % (status, error))
0398 except Exception as error:
0399 raise error
0400
0401
0402 @transactional_session
0403 def update_processing(processing_id, parameters, locking=False, session=None):
0404 """
0405 update a processing.
0406
0407 :param processing_id: the transform id.
0408 :param parameters: A dictionary of parameters.
0409 :param session: The database session in use.
0410
0411 :raises NoObject: If no content is founded.
0412 :raises DatabaseException: If there is a database error.
0413
0414 """
0415 try:
0416 if 'new_poll_period' in parameters and type(parameters['new_poll_period']) not in [datetime.timedelta]:
0417 parameters['new_poll_period'] = datetime.timedelta(seconds=parameters['new_poll_period'])
0418 if 'update_poll_period' in parameters and type(parameters['update_poll_period']) not in [datetime.timedelta]:
0419 parameters['update_poll_period'] = datetime.timedelta(seconds=parameters['update_poll_period'])
0420
0421 parameters['updated_at'] = datetime.datetime.utcnow()
0422 if 'status' in parameters and parameters['status'] in [ProcessingStatus.Finished, ProcessingStatus.Failed,
0423 ProcessingStatus.Lost]:
0424 parameters['finished_at'] = datetime.datetime.utcnow()
0425
0426 if parameters and 'processing_metadata' in parameters and 'processing' in parameters['processing_metadata']:
0427 proc = parameters['processing_metadata']['processing']
0428 if proc is not None:
0429 if 'running_metadata' not in parameters:
0430 parameters['running_metadata'] = {}
0431 parameters['running_metadata']['processing_data'] = proc.metadata
0432 if parameters and 'processing_metadata' in parameters:
0433 del parameters['processing_metadata']
0434 if parameters and 'running_metadata' in parameters:
0435 parameters['_running_metadata'] = parameters['running_metadata']
0436 del parameters['running_metadata']
0437
0438 query = session.query(models.Processing).filter_by(processing_id=processing_id)
0439 if locking:
0440 query = query.filter(models.Processing.locking == ProcessingLocking.Idle)
0441 query = query.with_for_update(skip_locked=True)
0442 row = query.one_or_none()
0443 if not row:
0444 return 0
0445
0446
0447 for k, v in parameters.items():
0448 setattr(row, k, v)
0449
0450 return 1
0451 except sqlalchemy.orm.exc.NoResultFound as error:
0452 raise exceptions.NoObject('Processing %s cannot be found: %s' % (processing_id, error))
0453 return 0
0454
0455
0456 @transactional_session
0457 def abort_resume_processings(transform_id=None, request_id=None, processing_id=None, abort=False, resume=False, session=None):
0458 """
0459 abort/resume processings.
0460
0461 :param request_id: The request id.
0462 :param transform_id: The id of the transform.
0463 :param session: The database session in use.
0464
0465 :raises NoObject: If no content is founded.
0466 :raises DatabaseException: If there is a database error.
0467 """
0468 if not abort and not resume:
0469 return
0470 if not transform_id and not request_id and not processing_id:
0471 return
0472
0473 try:
0474 if abort:
0475
0476 parameters = {'command': CommandType.AbortProcessing}
0477 command = CommandType.AbortProcessing
0478 if resume:
0479 command = CommandType.ResumeProcessing
0480
0481
0482 parameters = {'status': ProcessingStatus.ToResume, 'command': CommandType.ResumeProcessing}
0483
0484 query = session.query(models.Processing)
0485 if processing_id:
0486 query = query.filter_by(processing_id=processing_id)
0487 if transform_id:
0488 query = query.filter_by(transform_id=transform_id)
0489 if request_id:
0490 query = query.filter_by(request_id=request_id)
0491 query = query.filter(models.Processing.command != command)
0492 num_rows = query.update(parameters, synchronize_session=False)
0493 return num_rows
0494 except sqlalchemy.orm.exc.NoResultFound as error:
0495 raise exceptions.NoObject('Transfrom %s cannot be found: %s' % (transform_id, error))
0496 return 0
0497
0498
0499 @transactional_session
0500 def delete_processing(processing_id=None, session=None):
0501 """
0502 delete a processing.
0503
0504 :param processing_id: The id of the processing.
0505 :param session: The database session in use.
0506
0507 :raises NoObject: If no processing is founded.
0508 :raises DatabaseException: If there is a database error.
0509 """
0510 try:
0511 session.query(models.Processing).filter_by(processing_id=processing_id).delete()
0512 except sqlalchemy.orm.exc.NoResultFound as error:
0513 raise exceptions.NoObject('Processing %s cannot be found: %s' % (processing_id, error))
0514
0515
0516 @transactional_session
0517 def clean_locking(time_period=3600, min_request_id=None, health_items=[], force=False, hostname=None, pid=None, session=None):
0518 """
0519 Clearn locking which is older than time period.
0520
0521 :param time_period in seconds
0522 """
0523 health_dict = {}
0524 for item in health_items:
0525 hostname = item['hostname']
0526 pid = item['pid']
0527 thread_id = item['thread_id']
0528 if hostname not in health_dict:
0529 health_dict[hostname] = {}
0530 if pid not in health_dict[hostname]:
0531 health_dict[hostname][pid] = []
0532 if thread_id not in health_dict[hostname][pid]:
0533 health_dict[hostname][pid].append(thread_id)
0534 query = session.query(models.Processing.processing_id,
0535 models.Processing.locking_hostname,
0536 models.Processing.locking_pid,
0537 models.Processing.locking_thread_id,
0538 models.Processing.locking_thread_name,
0539 models.Processing.updated_at)
0540 query = query.filter(models.Processing.locking == ProcessingLocking.Locking)
0541 if min_request_id:
0542 query = query.filter(models.Processing.request_id >= min_request_id)
0543
0544 lost_processing_ids = []
0545 tmp = query.all()
0546 if tmp:
0547 for req in tmp:
0548 pr_id, locking_hostname, locking_pid, locking_thread_id, locking_thread_name, updated_at = req
0549 if (
0550 (locking_hostname not in health_dict or locking_pid not in health_dict[locking_hostname])
0551 or (force and hostname == locking_hostname and pid == locking_pid)
0552 or (updated_at < datetime.datetime.utcnow() - datetime.timedelta(seconds=time_period))
0553 ):
0554 lost_processing_ids.append({"processing_id": pr_id, 'locking': 0})
0555
0556
0557
0558 safe_bulk_update_mappings(session, models.Processing, lost_processing_ids)
0559
0560
0561 @transactional_session
0562 def clean_next_poll_at(status, session=None):
0563 """
0564 Clearn next_poll_at.
0565
0566 :param status: status of the processing
0567 """
0568 if not isinstance(status, (list, tuple)):
0569 status = [status]
0570 if len(status) == 1:
0571 status = [status[0], status[0]]
0572
0573 params = {'next_poll_at': datetime.datetime.utcnow()}
0574 session.query(models.Processing).filter(models.Processing.status.in_(status))\
0575 .update(params, synchronize_session=False)
0576
0577
0578 @read_session
0579 def get_num_active_processings(active_status=None, session=None):
0580 if active_status and not isinstance(active_status, (list, tuple)):
0581 active_status = [active_status]
0582 if active_status and len(active_status) == 1:
0583 active_status = [active_status[0], active_status[0]]
0584
0585 try:
0586 query = session.query(models.Processing.status, models.Processing.site, func.count(models.Processing.processing_id))
0587 if active_status:
0588 query = query.filter(models.Processing.status.in_(active_status))
0589 query = query.group_by(models.Processing.status, models.Processing.site)
0590 tmp = query.all()
0591 return tmp
0592 except Exception as error:
0593 raise error
0594
0595
0596 @read_session
0597 def get_active_processings(active_status=None, session=None):
0598 if active_status and not isinstance(active_status, (list, tuple)):
0599 active_status = [active_status]
0600 if active_status and len(active_status) == 1:
0601 active_status = [active_status[0], active_status[0]]
0602
0603 try:
0604 query = session.query(models.Processing.request_id,
0605 models.Processing.transform_id,
0606 models.Processing.processing_id,
0607 models.Processing.site,
0608 models.Processing.status)
0609 if active_status:
0610 query = query.filter(models.Processing.status.in_(active_status))
0611 tmp = query.all()
0612 return tmp
0613 except Exception as error:
0614 raise error