Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:33

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2020 - 2025
0010 
0011 import copy
0012 import datetime
0013 import logging
0014 import os
0015 import stat
0016 import uuid
0017 import traceback
0018 
0019 from idds.common import exceptions
0020 from idds.common.constants import (WorkStatus, ProcessingStatus,
0021                                    CollectionStatus, CollectionType)
0022 from idds.common.constants import get_work_status_from_transform_processing_status
0023 from idds.common.utils import setup_logging
0024 from idds.common.utils import str_to_date
0025 # from idds.common.utils import json_dumps
0026 
0027 from .base import Base
0028 
0029 
0030 setup_logging(__name__)
0031 
0032 
0033 class Parameter(object):
0034     def __init__(self, params):
0035         assert (type(params) in [dict])
0036         self.params = params
0037 
0038     def add(self, name, value):
0039         self.params[name] = value
0040 
0041     def get_param_names(self):
0042         return self.params.keys()
0043 
0044     def get_param_value(self, name):
0045         value = self.params.get(name, None)
0046         if value and callable(value):
0047             value = value()
0048         return value
0049 
0050 
0051 class Collection(Base):
0052 
0053     def __init__(self, scope=None, name=None, coll_type=CollectionType.Dataset, coll_metadata={}):
0054         super(Collection, self).__init__()
0055         self.scope = scope
0056         self.name = name
0057         self.coll_metadata = coll_metadata
0058 
0059         self.collection = None
0060 
0061         self.internal_id = str(uuid.uuid4())[:8]
0062         self.coll_id = None
0063         self.coll_type = coll_type
0064         self.status = CollectionStatus.New
0065         self.substatus = CollectionStatus.New
0066 
0067         self.total_files = 0
0068         self.processed_files = 0
0069         self.processing_files = 0
0070         self.activated_files = 0
0071         self.preprocessing_files = 0
0072         self.bytes = 0
0073         self.new_files = 0
0074         self.failed_files = 0
0075         self.missing_files = 0
0076 
0077         self.ext_files = 0
0078         self.processed_ext_files = 0
0079         self.failed_ext_files = 0
0080         self.missing_ext_files = 0
0081 
0082     @property
0083     def internal_id(self):
0084         return self.get_metadata_item('internal_id')
0085 
0086     @internal_id.setter
0087     def internal_id(self, value):
0088         self.add_metadata_item('internal_id', value)
0089 
0090     @property
0091     def coll_id(self):
0092         return self.get_metadata_item('coll_id', None)
0093 
0094     @coll_id.setter
0095     def coll_id(self, value):
0096         self.add_metadata_item('coll_id', value)
0097 
0098     @property
0099     def status(self):
0100         st = self.get_metadata_item('status', CollectionStatus.New)
0101         if type(st) in [int]:
0102             st = CollectionStatus(st)
0103         return st
0104 
0105     @status.setter
0106     def status(self, value):
0107         self.add_metadata_item('status', value.value if value else value)
0108         if self.collection:
0109             self.collection['status'] = value
0110 
0111     @property
0112     def coll_type(self):
0113         st = self.get_metadata_item('coll_type', CollectionType.Dataset)
0114         if type(st) in [int]:
0115             st = CollectionType(st)
0116         return st
0117 
0118     @coll_type.setter
0119     def coll_type(self, value):
0120         self.add_metadata_item('coll_type', value.value if value else value)
0121         if self.collection:
0122             self.collection['coll_type'] = value
0123 
0124     @property
0125     def substatus(self):
0126         st = self.get_metadata_item('substatus', CollectionStatus.New)
0127         if type(st) in [int]:
0128             st = CollectionStatus(st)
0129         return st
0130 
0131     @substatus.setter
0132     def substatus(self, value):
0133         self.add_metadata_item('substatus', value.value if value else value)
0134         if self.collection:
0135             self.collection['substatus'] = value
0136 
0137     @property
0138     def collection(self):
0139         return self._collection
0140 
0141     @collection.setter
0142     def collection(self, value):
0143         self._collection = value
0144         if self._collection:
0145             self.scope = self._collection['scope']
0146             self.name = self._collection['name']
0147             self.coll_metadata = self._collection['coll_metadata']
0148             self.coll_id = self._collection['coll_id']
0149             self.coll_type = self._collection['coll_type']
0150             self.status = self._collection['status']
0151             self.substatus = self._collection['substatus']
0152 
0153             self.total_files = self._collection['total_files']
0154             self.processed_files = self._collection['processed_files']
0155             self.processing_files = self._collection['processing_files']
0156             self.activated_files = self._collection.get('activated_files', 0)
0157             self.preprocessing_files = self._collection.get('preprocessing_files', 0)
0158             self.bytes = self._collection['bytes']
0159 
0160     def to_origin_dict(self):
0161         return {'scope': self.scope, 'name': self.name}
0162 
0163 
0164 class Processing(Base):
0165 
0166     def __init__(self, processing_metadata={}):
0167         super(Processing, self).__init__()
0168 
0169         self.processing_metadata = processing_metadata
0170         if self.processing_metadata and 'work' in self.processing_metadata:
0171             self.work = self.processing_metadata['work']
0172         else:
0173             self.work = None
0174 
0175         self.processing = None
0176 
0177         self.internal_id = str(uuid.uuid4())[:8]
0178         self.task_name = None
0179         self.processing_id = None
0180         self.workload_id = None
0181         self.status = ProcessingStatus.New
0182         self.substatus = ProcessingStatus.New
0183         self.last_updated_at = datetime.datetime.utcnow()
0184         self.polling_retries = 0
0185         self.tocancel = False
0186         self.tosuspend = False
0187         self.toresume = False
0188         self.toexpire = False
0189         self.tofinish = False
0190         self.toforcefinish = False
0191         self.operation_time = datetime.datetime.utcnow()
0192         self.submitted_at = None
0193 
0194         self.username = None
0195 
0196         self.external_id = None
0197         self.errors = None
0198 
0199         self.output_data = None
0200 
0201         self.retries = 0
0202 
0203     @property
0204     def internal_id(self):
0205         return self.get_metadata_item('internal_id')
0206 
0207     @internal_id.setter
0208     def internal_id(self, value):
0209         self.add_metadata_item('internal_id', value)
0210 
0211     @property
0212     def processing_id(self):
0213         return self.get_metadata_item('processing_id', None)
0214 
0215     @processing_id.setter
0216     def processing_id(self, value):
0217         self.add_metadata_item('processing_id', value)
0218 
0219     @property
0220     def workload_id(self):
0221         return self.get_metadata_item('workload_id', None)
0222 
0223     @workload_id.setter
0224     def workload_id(self, value):
0225         self.add_metadata_item('workload_id', value)
0226 
0227     def get_workload_id(self):
0228         return self.workload_id
0229 
0230     @property
0231     def status(self):
0232         st = self.get_metadata_item('status', ProcessingStatus.New)
0233         if type(st) in [int]:
0234             st = ProcessingStatus(st)
0235         return st
0236 
0237     @status.setter
0238     def status(self, value):
0239         self.add_metadata_item('status', value.value if value else value)
0240         if self.processing:
0241             self.processing['status'] = value
0242 
0243     @property
0244     def substatus(self):
0245         st = self.get_metadata_item('substatus', ProcessingStatus.New)
0246         if type(st) in [int]:
0247             st = ProcessingStatus(st)
0248         return st
0249 
0250     @substatus.setter
0251     def substatus(self, value):
0252         self.add_metadata_item('substatus', value.value if value else value)
0253         if self.processing:
0254             self.processing['substatus'] = value
0255 
0256     @property
0257     def retries(self):
0258         return self.get_metadata_item('retries', 0)
0259 
0260     @retries.setter
0261     def retries(self, value):
0262         self.add_metadata_item('retries', value)
0263 
0264     @property
0265     def last_updated_at(self):
0266         last_updated_at = self.get_metadata_item('last_updated_at', None)
0267         if last_updated_at and type(last_updated_at) in [str]:
0268             last_updated_at = str_to_date(last_updated_at)
0269         return last_updated_at
0270 
0271     @last_updated_at.setter
0272     def last_updated_at(self, value):
0273         self.add_metadata_item('last_updated_at', value)
0274 
0275     @property
0276     def polling_retries(self):
0277         return self.get_metadata_item('polling_retries', 0)
0278 
0279     @polling_retries.setter
0280     def polling_retries(self, value):
0281         self.add_metadata_item('polling_retries', value)
0282 
0283     @property
0284     def tocancel(self):
0285         return self.get_metadata_item('tocancel', False)
0286 
0287     @tocancel.setter
0288     def tocancel(self, value):
0289         old_value = self.get_metadata_item('tocancel', False)
0290         if old_value != value:
0291             self.operation_time = datetime.datetime.utcnow()
0292         self.add_metadata_item('tocancel', value)
0293 
0294     @property
0295     def tosuspend(self):
0296         return self.get_metadata_item('tosuspend', False)
0297 
0298     @tosuspend.setter
0299     def tosuspend(self, value):
0300         old_value = self.get_metadata_item('tosuspend', False)
0301         if old_value != value:
0302             self.operation_time = datetime.datetime.utcnow()
0303         self.add_metadata_item('tosuspend', value)
0304 
0305     @property
0306     def toresume(self):
0307         return self.get_metadata_item('toresume', False)
0308 
0309     @toresume.setter
0310     def toresume(self, value):
0311         old_value = self.get_metadata_item('toresume', False)
0312         if old_value != value:
0313             self.operation_time = datetime.datetime.utcnow()
0314         self.add_metadata_item('toresume', value)
0315 
0316     @property
0317     def toexpire(self):
0318         return self.get_metadata_item('toexpire', False)
0319 
0320     @toexpire.setter
0321     def toexpire(self, value):
0322         old_value = self.get_metadata_item('toexpire', False)
0323         if old_value != value:
0324             self.operation_time = datetime.datetime.utcnow()
0325         self.add_metadata_item('toexpire', value)
0326 
0327     @property
0328     def tofinish(self):
0329         return self.get_metadata_item('tofinish', False)
0330 
0331     @tofinish.setter
0332     def tofinish(self, value):
0333         old_value = self.get_metadata_item('tofinish', False)
0334         if old_value != value:
0335             self.operation_time = datetime.datetime.utcnow()
0336         self.add_metadata_item('tofinish', value)
0337 
0338     @property
0339     def toforcefinish(self):
0340         return self.get_metadata_item('toforcefinish', False)
0341 
0342     @toforcefinish.setter
0343     def toforcefinish(self, value):
0344         old_value = self.get_metadata_item('toforcefinish', False)
0345         if old_value != value:
0346             self.operation_time = datetime.datetime.utcnow()
0347         self.add_metadata_item('toforcefinish', value)
0348 
0349     @property
0350     def operation_time(self):
0351         opt_time = self.get_metadata_item('operation_time', None)
0352         if opt_time and type(opt_time) in [str]:
0353             opt_time = str_to_date(opt_time)
0354         return opt_time
0355 
0356     @operation_time.setter
0357     def operation_time(self, value):
0358         self.add_metadata_item('operation_time', value)
0359 
0360     def in_operation_time(self):
0361         if self.operation_time:
0362             time_diff = datetime.datetime.utcnow() - self.operation_time
0363             time_diff = time_diff.total_seconds()
0364             if time_diff < 120:
0365                 return True
0366         return False
0367 
0368     @property
0369     def submitted_at(self):
0370         opt_time = self.get_metadata_item('submitted_at', None)
0371         if opt_time and type(opt_time) in [str]:
0372             opt_time = str_to_date(opt_time)
0373         return opt_time
0374 
0375     @submitted_at.setter
0376     def submitted_at(self, value):
0377         self.add_metadata_item('submitted_at', value)
0378 
0379     @property
0380     def errors(self):
0381         return self.get_metadata_item('errors', None)
0382 
0383     @errors.setter
0384     def errors(self, value):
0385         self.add_metadata_item('errors', value)
0386 
0387     @property
0388     def external_id(self):
0389         return self.get_metadata_item('external_id', None)
0390 
0391     @external_id.setter
0392     def external_id(self, value):
0393         self.add_metadata_item('external_id', value)
0394 
0395     @property
0396     def old_external_id(self):
0397         return self.get_metadata_item('old_external_id', [])
0398 
0399     @old_external_id.setter
0400     def old_external_id(self, value):
0401         self.add_metadata_item('old_external_id', value)
0402 
0403     @property
0404     def task_name(self):
0405         return self.get_metadata_item('task_name', None)
0406 
0407     @task_name.setter
0408     def task_name(self, value):
0409         self.add_metadata_item('task_name', value)
0410 
0411     @property
0412     def processing(self):
0413         return self._processing
0414 
0415     @processing.setter
0416     def processing(self, value):
0417         self._processing = value
0418         if self._processing:
0419             self.processing_id = self._processing.get('processing_id', None)
0420             self.workload_id = self._processing.get('workload_id', None)
0421             self.status = self._processing.get('status', None)
0422             self.substatus = self._processing.get('substatus', None)
0423             self.processing_metadata = self._processing.get('processing_metadata', None)
0424             self.submitted_at = self._processing.get('submitted_at', None)
0425             if self.processing_metadata and 'processing' in self.processing_metadata:
0426                 proc = self.processing_metadata['processing']
0427                 self.work = proc.work
0428                 self.external_id = proc.external_id
0429                 self.errors = proc.errors
0430                 if not self.submitted_at:
0431                     self.submitted_at = proc.submitted_at
0432 
0433             self.output_data = self._processing.get('output_metadata', None)
0434 
0435     def has_new_updates(self):
0436         self.last_updated_at = datetime.datetime.utcnow()
0437 
0438 
0439 class Work(Base):
0440 
0441     def __init__(self, executable=None, arguments=None, parameters=None, setup=None, work_type=None,
0442                  work_tag=None, exec_type='local', sandbox=None, request_id=None, work_id=None, work_name=None,
0443                  primary_input_collection=None, other_input_collections=None, input_collections=None,
0444                  primary_output_collection=None, other_output_collections=None, output_collections=None,
0445                  log_collections=None, release_inputs_after_submitting=False, username=None,
0446                  agent_attributes=None, is_template=False, loading=False,
0447                  logger=None):
0448         """
0449         Init a work/task/transformation.
0450 
0451         :param setup: A string to setup the executable enviroment, it can be None.
0452         :param executable: The executable.
0453         :param arguments: The arguments.
0454         :param parameters: A dict with arguments needed to be replaced.
0455         :param work_type: The work type like data carousel, hyperparameteroptimization and so on.
0456         :param exec_type: The exec type like 'local', 'remote'(with remote_package set), 'docker' and so on.
0457         :param sandbox: The sandbox.
0458         :param work_id: The work/task id.
0459         :param primary_input_collection: The primary input collection.
0460         :param other_input_collections: List of the input collections.
0461         :param output_collections: List of the output collections.
0462         # :param workflow: The workflow the current work belongs to.
0463         """
0464         self._collections = {}
0465         self._primary_input_collection = None
0466         self._primary_output_collection = None
0467         self._other_input_collections = []
0468         self._other_output_collections = []
0469 
0470         self._processings = {}
0471 
0472         super(Work, self).__init__(loading=loading)
0473 
0474         self.internal_id = str(uuid.uuid4())[:8]
0475         self.template_work_id = self.internal_id
0476         self.is_template = is_template
0477         self.class_name = self.__class__.__name__.lower()
0478         self.initialized = False
0479         self.sequence_id = 0
0480 
0481         self.logger = logger
0482         if self.logger is None:
0483             self.setup_logger()
0484 
0485         self.setup = setup
0486         self.executable = executable
0487         self.arguments = arguments
0488         self.parameters = parameters
0489 
0490         self.username = username
0491         self.work_type = work_type
0492         self.work_tag = work_tag
0493         self.exec_type = exec_type
0494         self.sandbox = sandbox
0495         self.request_id = request_id
0496         self.work_id = work_id
0497         self.work_name = work_name
0498         if not self.work_name:
0499             self.work_name = self.template_work_id
0500         # self.workflow = workflow
0501         self.transforming = False
0502         self.workdir = None
0503 
0504         self.log_collections = []
0505         if input_collections and (primary_input_collection or other_input_collections):
0506             raise Exception("input_collections and (primary_input_collection, other_input_collections) cannot be used at the same time.")
0507         if output_collections and (primary_output_collection or other_output_collections):
0508             raise Exception("output_collections and (primary_output_collection, other_output_collections) cannot be used at the same time.")
0509 
0510         if input_collections and type(input_collections) not in [list, tuple]:
0511             input_collections = [input_collections]
0512         if output_collections and type(output_collections) not in [list, tuple]:
0513             output_collections = [output_collections]
0514 
0515         if input_collections:
0516             primary_input_collection = input_collections[0]
0517             if len(input_collections) > 1:
0518                 other_input_collections = input_collections[1:]
0519         if output_collections:
0520             primary_output_collection = output_collections[0]
0521             if len(output_collections) > 1:
0522                 other_output_collections = output_collections[1:]
0523 
0524         # self.primary_input_collection = primary_input_collection
0525         self.set_primary_input_collection(primary_input_collection)
0526         self.set_primary_output_collection(primary_output_collection)
0527 
0528         # self.other_input_collections = other_input_collections
0529         if other_input_collections and type(other_input_collections) not in [list, tuple]:
0530             other_input_collections = [other_input_collections]
0531         self.add_other_input_collections(other_input_collections)
0532         if other_output_collections and type(other_output_collections) not in [list, tuple]:
0533             other_output_collections = [other_output_collections]
0534         self.add_other_output_collections(other_output_collections)
0535 
0536         # if input_collections and type(input_collections) not in [list, tuple]:
0537         #     input_collections = [input_collections]
0538         # self.add_input_collections(input_collections)
0539         # if output_collections and type(output_collections) not in [list, tuple]:
0540         #     output_collections = [output_collections]
0541         # self.add_output_collections(output_collections)
0542 
0543         if log_collections and type(log_collections) not in [list, tuple]:
0544             log_collections = [log_collections]
0545         self.add_log_collections(log_collections)
0546 
0547         self.release_inputs_after_submitting = release_inputs_after_submitting
0548         self.has_new_inputs = True
0549 
0550         self.started = False
0551         self.status = WorkStatus.New
0552         self.substatus = WorkStatus.New
0553         self.polling_retries = 0
0554         self.errors = []
0555         self.next_works = []
0556 
0557         self.work_name_to_coll_map = []
0558 
0559         self.processings = {}
0560         self.active_processings = []
0561         self.cancelled_processings = []
0562         self.suspended_processings = []
0563         self.old_processings = []
0564         self.terminated_msg = ""
0565         self.output_data = {}
0566         self.parameters_for_next_task = None
0567 
0568         self.status_statistics = {}
0569 
0570         self.agent_attributes = agent_attributes
0571 
0572         self.proxy = None
0573         self.original_proxy = None
0574 
0575         self.tocancel = False
0576         self.tosuspend = False
0577         self.toresume = False
0578         self.toexpire = False
0579         self.tofinish = False
0580         self.toforcefinish = False
0581 
0582         self.last_updated_at = datetime.datetime.utcnow()
0583 
0584         self.to_update_processings = {}
0585 
0586         self.backup_to_release_inputs = {'0': [], '1': [], '2': []}
0587 
0588         self.num_run = 0
0589 
0590         self.or_custom_conditions = {}
0591         self.and_custom_conditions = {}
0592 
0593         self.sliced_global_parameters = None
0594 
0595         self.is_build_work = False
0596 
0597         self.func_site_to_cloud = None
0598 
0599         self.dispatch_ext_content = False
0600 
0601         """
0602         self._running_data_names = []
0603         for name in ['internal_id', 'template_work_id', 'initialized', 'sequence_id', 'parameters', 'work_id', 'transforming', 'workdir',
0604                      # 'collections', 'primary_input_collection', 'other_input_collections', 'output_collections', 'log_collections',
0605                      'collections',
0606                      # 'output_data',
0607                      '_has_new_inputs', 'status', 'substatus', 'polling_retries', 'errors', 'next_works',
0608                      'processings', 'active_processings', 'cancelled_processings', 'suspended_processings', 'old_processings',
0609                      # 'terminated_msg', 'output_data', 'parameters_for_next_task', 'status_statistics',
0610                      'tocancel', 'tosuspend', 'toresume']:
0611             self._running_data_names.append(name)
0612         """
0613 
0614     def get_logger(self):
0615         if self.logger is None:
0616             self.logger = self.setup_logger()
0617         return self.logger
0618 
0619     def get_class_name(self):
0620         return self.__class__.__name__
0621 
0622     def get_internal_id(self):
0623         return self.internal_id
0624 
0625     def get_template_work_id(self):
0626         return self.template_work_id
0627 
0628     def get_sequence_id(self):
0629         return self.sequence_id
0630 
0631     def get_site(self):
0632         return None
0633 
0634     def get_cloud(self):
0635         return None
0636 
0637     def get_queue(self):
0638         return None
0639 
0640     def is_data_work(self):
0641         return False
0642 
0643     @property
0644     def internal_id(self):
0645         return self.get_metadata_item('internal_id')
0646 
0647     @internal_id.setter
0648     def internal_id(self, value):
0649         self.add_metadata_item('internal_id', value)
0650 
0651     @property
0652     def parent_internal_id(self):
0653         return self.get_metadata_item('parent_internal_id')
0654 
0655     @parent_internal_id.setter
0656     def parent_internal_id(self, value):
0657         self.add_metadata_item('parent_internal_id', value)
0658 
0659     @property
0660     def parent_internal_ids(self):
0661         return self.get_metadata_item('parent_internal_ids', None)
0662 
0663     @parent_internal_ids.setter
0664     def parent_internal_ids(self, value):
0665         self.add_metadata_item('parent_internal_ids', value)
0666 
0667     @property
0668     def template_work_id(self):
0669         return self.get_metadata_item('template_work_id')
0670 
0671     @template_work_id.setter
0672     def template_work_id(self, value):
0673         self.add_metadata_item('template_work_id', value)
0674 
0675     @property
0676     def workload_id(self):
0677         return self.get_metadata_item('workload_id', None)
0678 
0679     @workload_id.setter
0680     def workload_id(self, value):
0681         self.add_metadata_item('workload_id', value)
0682 
0683     @property
0684     def external_id(self):
0685         return self.get_metadata_item('external_id', None)
0686 
0687     @external_id.setter
0688     def external_id(self, value):
0689         self.add_metadata_item('external_id', value)
0690 
0691     def get_workload_id(self):
0692         return self.workload_id
0693 
0694     @property
0695     def initialized(self):
0696         return self.get_metadata_item('initialized', False)
0697 
0698     @initialized.setter
0699     def initialized(self, value):
0700         self.add_metadata_item('initialized', value)
0701 
0702     @property
0703     def sequence_id(self):
0704         return self.get_metadata_item('sequence_id', 0)
0705 
0706     @sequence_id.setter
0707     def sequence_id(self, value):
0708         self.add_metadata_item('sequence_id', value)
0709 
0710     @property
0711     def num_inputs(self):
0712         num = self.get_metadata_item('num_inputs', None)
0713         return num
0714 
0715     @num_inputs.setter
0716     def num_inputs(self, value):
0717         self.add_metadata_item('num_inputs', value)
0718 
0719     @property
0720     def has_unmapped_jobs(self):
0721         value = self.get_metadata_item('has_unmapped_jobs', True)
0722         return value
0723 
0724     @has_unmapped_jobs.setter
0725     def has_unmapped_jobs(self, value):
0726         self.add_metadata_item('has_unmapped_jobs', value)
0727 
0728     @property
0729     def parameters(self):
0730         return self.get_metadata_item('parameters', None)
0731 
0732     @parameters.setter
0733     def parameters(self, value):
0734         self.add_metadata_item('parameters', value)
0735 
0736     @property
0737     def output_data(self):
0738         return self.get_metadata_item('output_data', {})
0739 
0740     @output_data.setter
0741     def output_data(self, value):
0742         self.add_metadata_item('output_data', value)
0743         if value and type(value) in [dict]:
0744             for key in value:
0745                 new_key = "user_" + str(key)
0746                 setattr(self, new_key, value[key])
0747 
0748     @property
0749     def work_id(self):
0750         return self.get_metadata_item('work_id', None)
0751 
0752     @work_id.setter
0753     def work_id(self, value):
0754         self.add_metadata_item('work_id', value)
0755 
0756     @property
0757     def parent_workload_id(self):
0758         return self.get_metadata_item('parent_workload_id', None)
0759 
0760     @parent_workload_id.setter
0761     def parent_workload_id(self, value):
0762         self.add_metadata_item('parent_workload_id', value)
0763 
0764     @property
0765     def transforming(self):
0766         return self.get_metadata_item('transforming', False)
0767 
0768     @transforming.setter
0769     def transforming(self, value):
0770         self.add_metadata_item('transforming', value)
0771 
0772     @property
0773     def submitted(self):
0774         return self.get_metadata_item('submitted', False)
0775 
0776     @submitted.setter
0777     def submitted(self, value):
0778         self.add_metadata_item('submitted', value)
0779 
0780     @property
0781     def workdir(self):
0782         return self.get_metadata_item('workdir', None)
0783 
0784     @workdir.setter
0785     def workdir(self, value):
0786         self.add_metadata_item('workdir', value)
0787 
0788     @property
0789     def work_name(self):
0790         return self.get_metadata_item('work_name', 0)
0791 
0792     @work_name.setter
0793     def work_name(self, value):
0794         self.add_metadata_item('work_name', value)
0795 
0796     @property
0797     def has_new_inputs(self):
0798         return self.get_metadata_item('has_new_inputs', 0)
0799 
0800     @has_new_inputs.setter
0801     def has_new_inputs(self, value):
0802         self.add_metadata_item('has_new_inputs', value)
0803 
0804     @property
0805     def started(self):
0806         return self.get_metadata_item('started', False)
0807 
0808     @started.setter
0809     def started(self, value):
0810         self.add_metadata_item('started', value)
0811 
0812     @property
0813     def status(self):
0814         st = self.get_metadata_item('status', WorkStatus.New)
0815         if type(st) in [int]:
0816             st = WorkStatus(st)
0817         return st
0818 
0819     @status.setter
0820     def status(self, value):
0821         if not self.transforming:
0822             if value and value in [WorkStatus.Transforming,
0823                                    WorkStatus.Finished,
0824                                    WorkStatus.SubFinished,
0825                                    WorkStatus.Failed,
0826                                    WorkStatus.Running]:
0827                 self.transforming = True
0828         self.add_metadata_item('status', value.value if value else value)
0829 
0830     @property
0831     def substatus(self):
0832         st = self.get_metadata_item('substatus', WorkStatus.New)
0833         if type(st) in [int]:
0834             st = WorkStatus(st)
0835         return st
0836 
0837     @substatus.setter
0838     def substatus(self, value):
0839         self.add_metadata_item('substatus', value.value if value else value)
0840 
0841     @property
0842     def is_build_work(self):
0843         st = self.get_metadata_item('is_build_work', False)
0844         return st
0845 
0846     @is_build_work.setter
0847     def is_build_work(self, value):
0848         self.add_metadata_item('is_build_work', value)
0849 
0850     def set_build_work(self):
0851         self.is_build_work = True
0852 
0853     @property
0854     def signature(self):
0855         st = self.get_metadata_item('signature', None)
0856         return st
0857 
0858     @signature.setter
0859     def signature(self, value):
0860         self.add_metadata_item('signature', value)
0861 
0862     def sign(self):
0863         self.signature = str(uuid.uuid4())
0864 
0865     def get_signature(self):
0866         return self.signature
0867 
0868     @property
0869     def polling_retries(self):
0870         return self.get_metadata_item('polling_retries', 0)
0871 
0872     @polling_retries.setter
0873     def polling_retries(self, value):
0874         self.add_metadata_item('polling_retries', value)
0875 
0876     @property
0877     def errors(self):
0878         return self.get_metadata_item('errors', None)
0879 
0880     @errors.setter
0881     def errors(self, value):
0882         self.add_metadata_item('errors', value)
0883 
0884     @property
0885     def next_works(self):
0886         return self.get_metadata_item('next_works', [])
0887 
0888     @next_works.setter
0889     def next_works(self, value):
0890         self.add_metadata_item('next_works', value)
0891 
0892     @property
0893     def collections(self):
0894         return self._collections
0895 
0896     @collections.setter
0897     def collections(self, value):
0898         self._collections = value
0899         coll_metadata = {}
0900         if self._collections:
0901             for k in self._collections:
0902                 coll = self._collections[k]
0903                 if type(coll) in [Collection]:
0904                     coll_metadata[k] = {'coll_id': coll.coll_id}
0905         self.add_metadata_item('collections', coll_metadata)
0906 
0907     def with_sub_map_id(self):
0908         return False
0909 
0910     @property
0911     def processings(self):
0912         return self._processings
0913 
0914     @processings.setter
0915     def processings(self, value):
0916         self._processings = value
0917         proc_metadata = {}
0918         if self._processings:
0919             for k in self._processings:
0920                 proc = self._processings[k]
0921                 if type(proc) in [Processing]:
0922                     proc_metadata[k] = {'processing_id': proc.processing_id,
0923                                         'workload_id': proc.workload_id,
0924                                         'external_id': proc.external_id}
0925         self.add_metadata_item('processings', proc_metadata)
0926 
0927     def generating_new_inputs(self):
0928         return False
0929 
0930     def refresh_work(self):
0931         coll_metadata = {}
0932         if self._collections:
0933             for k in self._collections:
0934                 coll = self._collections[k]
0935                 if type(coll) in [Collection]:
0936                     coll_metadata[k] = {'coll_id': coll.coll_id}
0937         self.add_metadata_item('collections', coll_metadata)
0938 
0939         proc_metadata = {}
0940         if self._processings:
0941             for k in self._processings:
0942                 proc = self._processings[k]
0943                 if type(proc) in [Processing]:
0944                     proc_metadata[k] = {'processing_id': proc.processing_id,
0945                                         'workload_id': proc.workload_id,
0946                                         'external_id': proc.external_id}
0947         self.add_metadata_item('processings', proc_metadata)
0948 
0949     def load_work(self):
0950         coll_metadata = self.get_metadata_item('collections', {})
0951         for k in self._collections:
0952             if k in coll_metadata:
0953                 coll_id = coll_metadata[k]['coll_id']
0954                 self._collections[k].coll_id = coll_id
0955 
0956         proc_metadata = self.get_metadata_item('processings', {})
0957         for k in self._processings:
0958             if k in proc_metadata:
0959                 proc_id = proc_metadata[k]['processing_id']
0960                 self._processings[k].processing_id = proc_id
0961                 if 'workload_id' in proc_metadata[k] and proc_metadata[k]['workload_id']:
0962                     self._processings[k].workload_id = proc_metadata[k]['workload_id']
0963                     self.workload_id = proc_metadata[k]['workload_id']
0964                 if 'external_id' in proc_metadata[k] and proc_metadata[k]['external_id']:
0965                     self._processings[k].external_id = proc_metadata[k]['external_id']
0966                     self.external_id = proc_metadata[k]['external_id']
0967         for k in proc_metadata:
0968             if k not in self._processings:
0969                 self._processings[k] = Processing(processing_metadata={})
0970                 proc_id = proc_metadata[k]['processing_id']
0971                 self._processings[k].processing_id = proc_id
0972                 self._processings[k].internal_id = k
0973                 if 'workload_id' in proc_metadata[k] and proc_metadata[k]['workload_id']:
0974                     self._processings[k].workload_id = proc_metadata[k]['workload_id']
0975                     self.workload_id = proc_metadata[k]['workload_id']
0976                 if 'external_id' in proc_metadata[k] and proc_metadata[k]['external_id']:
0977                     self._processings[k].external_id = proc_metadata[k]['external_id']
0978                     self.external_id = proc_metadata[k]['external_id']
0979 
0980     def load_metadata(self):
0981         self.load_work()
0982 
0983     @property
0984     def active_processings(self):
0985         return self.get_metadata_item('active_processings', [])
0986 
0987     @active_processings.setter
0988     def active_processings(self, value):
0989         self.add_metadata_item('active_processings', value)
0990 
0991     @property
0992     def cancelled_processings(self):
0993         return self.get_metadata_item('cancelled_processings', [])
0994 
0995     @cancelled_processings.setter
0996     def cancelled_processings(self, value):
0997         self.add_metadata_item('cancelled_processings', value)
0998 
0999     @property
1000     def suspended_processings(self):
1001         return self.get_metadata_item('suspended_processings', [])
1002 
1003     @suspended_processings.setter
1004     def suspended_processings(self, value):
1005         self.add_metadata_item('suspended_processings', value)
1006 
1007     @property
1008     def old_processings(self):
1009         return self.get_metadata_item('old_processings', [])
1010 
1011     @old_processings.setter
1012     def old_processings(self, value):
1013         self.add_metadata_item('old_processings', value)
1014 
1015     @property
1016     def tocancel(self):
1017         return self.get_metadata_item('tocancel', False)
1018 
1019     @tocancel.setter
1020     def tocancel(self, value):
1021         self.add_metadata_item('tocancel', value)
1022 
1023     @property
1024     def tosuspend(self):
1025         return self.get_metadata_item('tosuspend', False)
1026 
1027     @tosuspend.setter
1028     def tosuspend(self, value):
1029         self.add_metadata_item('tosuspend', value)
1030 
1031     @property
1032     def toresume(self):
1033         return self.get_metadata_item('toresume', False)
1034 
1035     @toresume.setter
1036     def toresume(self, value):
1037         self.add_metadata_item('toresume', value)
1038 
1039     @property
1040     def toexpire(self):
1041         return self.get_metadata_item('toexpire', False)
1042 
1043     @toexpire.setter
1044     def toexpire(self, value):
1045         self.add_metadata_item('toexpire', value)
1046 
1047     @property
1048     def tofinish(self):
1049         return self.get_metadata_item('tofinish', False)
1050 
1051     @tofinish.setter
1052     def tofinish(self, value):
1053         self.add_metadata_item('tofinish', value)
1054 
1055     @property
1056     def toforcefinish(self):
1057         return self.get_metadata_item('toforcefinish', False)
1058 
1059     @toforcefinish.setter
1060     def toforcefinish(self, value):
1061         self.add_metadata_item('toforcefinish', value)
1062 
1063     @property
1064     def last_updated_at(self):
1065         last_updated_at = self.get_metadata_item('last_updated_at', None)
1066         if last_updated_at and type(last_updated_at) in [str]:
1067             last_updated_at = str_to_date(last_updated_at)
1068         return last_updated_at
1069 
1070     @last_updated_at.setter
1071     def last_updated_at(self, value):
1072         self.add_metadata_item('last_updated_at', value)
1073 
1074     def has_new_updates(self):
1075         self.last_updated_at = datetime.datetime.utcnow()
1076 
1077     @property
1078     def to_update_processings(self):
1079         return self.get_metadata_item('to_update_processings', {})
1080 
1081     @to_update_processings.setter
1082     def to_update_processings(self, value):
1083         self.add_metadata_item('to_update_processings', value)
1084 
1085     @property
1086     def num_run(self):
1087         return self.get_metadata_item('num_run', 0)
1088 
1089     @num_run.setter
1090     def num_run(self, value):
1091         self.add_metadata_item('num_run', value)
1092         if value is not None:
1093             # for k in self._collections:
1094             for coll in self.output_collections:
1095                 if type(coll) in [Collection]:
1096                     if "___idds___" not in coll.name:
1097                         coll.name = coll.name + "." + str(value)
1098 
1099     def get_loop_index(self):
1100         return self.num_run
1101 
1102     @property
1103     def primary_input_collection(self):
1104         if self._primary_input_collection:
1105             return self.collections[self._primary_input_collection]
1106         return None
1107 
1108     @primary_input_collection.setter
1109     def primary_input_collection(self, value):
1110         self.set_primary_input_collection(value)
1111 
1112     @property
1113     def primary_output_collection(self):
1114         if self._primary_output_collection:
1115             return self.collections[self._primary_output_collection]
1116         return None
1117 
1118     @primary_output_collection.setter
1119     def primary_output_collection(self, value):
1120         self.set_primary_output_collection(value)
1121 
1122     @property
1123     def input_collections(self):
1124         if self._primary_input_collection:
1125             keys = [self._primary_input_collection] + self._other_input_collections
1126         else:
1127             keys = self._other_input_collections
1128         return [self.collections[k] for k in keys]
1129 
1130     @input_collections.setter
1131     def input_collections(self, value):
1132         if value and type(value) not in [list, tuple]:
1133             value = [value]
1134 
1135         if value:
1136             primary_collection = value[0]
1137             other_collections = []
1138             if len(value) > 1:
1139                 other_collections = value[1:]
1140 
1141             self.set_primary_input_collection(primary_collection)
1142 
1143             if other_collections and type(other_collections) not in [list, tuple]:
1144                 other_collections = [other_collections]
1145             self.add_other_input_collections(other_collections)
1146 
1147     @property
1148     def output_collections(self):
1149         if self._primary_output_collection:
1150             keys = [self._primary_output_collection] + self._other_output_collections
1151         else:
1152             keys = self._other_output_collections
1153         return [self.collections[k] for k in keys]
1154 
1155     @output_collections.setter
1156     def output_collections(self, value):
1157         if value and type(value) not in [list, tuple]:
1158             value = [value]
1159 
1160         if value:
1161             primary_collection = value[0]
1162             other_collections = []
1163             if len(value) > 1:
1164                 other_collections = value[1:]
1165 
1166             self.set_primary_output_collection(primary_collection)
1167 
1168             if other_collections and type(other_collections) not in [list, tuple]:
1169                 other_collections = [other_collections]
1170             self.add_other_output_collections(other_collections)
1171 
1172     def set_work_name(self, work_name):
1173         self.work_name = work_name
1174 
1175     def get_work_name(self):
1176         return self.work_name
1177 
1178     def set_func_site_to_cloud(self, func):
1179         self.func_site_to_cloud = func
1180 
1181     def get_func_site_to_cloud(self):
1182         return self.func_site_to_cloud
1183 
1184     def get_is_template(self):
1185         self.is_template
1186 
1187     def sync_global_parameters(self, global_parameters, sliced_global_parameters=None):
1188         if sliced_global_parameters:
1189             self.sliced_global_parameters = sliced_global_parameters
1190 
1191         if global_parameters:
1192             for key in global_parameters:
1193                 sliced_index = None
1194                 sliced_name = None
1195                 if self.sliced_global_parameters and key in self.sliced_global_parameters:
1196                     sliced_index = self.sliced_global_parameters[key]['index']
1197                     sliced_name = self.sliced_global_parameters[key]['name']
1198                     if type(global_parameters[key]) in [list, tuple] and sliced_index < len(global_parameters[key]):
1199                         pass
1200                     else:
1201                         sliced_index = None
1202                 if not sliced_name:
1203                     sliced_name = key
1204 
1205                 if sliced_index is None:
1206                     setattr(self, sliced_name, global_parameters[key])
1207                 else:
1208                     setattr(self, sliced_name, global_parameters[key][sliced_index])
1209 
1210     def get_global_parameter_from_output_data(self, key):
1211         self.logger.debug("get_global_parameter_from_output_data, key: %s, output_data: %s" % (key, str(self.output_data)))
1212         gp_output_data = {}
1213         if self.output_data and type(self.output_data) in [dict]:
1214             for key in self.output_data:
1215                 new_key = "user_" + str(key)
1216                 gp_output_data[new_key] = self.output_data[key]
1217         if key in gp_output_data:
1218             return True, gp_output_data[key]
1219         else:
1220             return False, None
1221 
1222     def renew_parameters_from_attributes(self):
1223         pass
1224 
1225     def add_custom_condition(self, key, value, op='and'):
1226         # op in ['and', 'or']
1227         if op and op == 'or':
1228             op = 'or'
1229         else:
1230             op = 'and'
1231         if op == 'and':
1232             self.and_custom_conditions[key] = value
1233         else:
1234             self.or_custom_conditions[key] = value
1235 
1236     def get_custom_condition_status_value_bool(self, key):
1237         user_key = "user_" + key
1238         if hasattr(self, user_key):
1239             key = user_key
1240 
1241         if hasattr(self, key) and getattr(self, key):
1242             value = getattr(self, key)
1243             if type(value) in [str]:
1244                 value = value.lower()
1245                 if value == 'true':
1246                     return True
1247                 else:
1248                     return False
1249             elif type(value) in [bool]:
1250                 return value
1251             elif type(value) in [int]:
1252                 if value > 0:
1253                     return True
1254                 else:
1255                     return False
1256             else:
1257                 return value
1258         else:
1259             return False
1260 
1261     def get_custom_condition_status_value(self, key):
1262         if self.output_data and key in self.output_data:
1263             return self.output_data[key]
1264 
1265         user_key = "user_" + key
1266         if hasattr(self, user_key):
1267             key = user_key
1268 
1269         if hasattr(self, key) and getattr(self, key):
1270             return getattr(self, key)
1271         else:
1272             return None
1273 
1274     def get_custom_condition_status_real(self):
1275         if self.or_custom_conditions:
1276             for key in self.or_custom_conditions:
1277                 value = self.get_custom_condition_status_value(key)
1278                 if value == self.or_custom_conditions[key]:
1279                     return True
1280 
1281         if self.and_custom_conditions:
1282             for key in self.and_custom_conditions:
1283                 value = self.get_custom_condition_status_value(key)
1284                 if not (value == self.and_custom_conditions[key]):
1285                     return False
1286             return True
1287 
1288         return False
1289 
1290     def get_custom_condition_status(self):
1291         # self.logger.debug("get_custom_condition_status, or_custom_conditions: %s" % str(self.or_custom_conditions))
1292         # self.logger.debug("get_custom_condition_status, and_custom_conditions: %s" % str(self.and_custom_conditions))
1293         # self.logger.debug("get_custom_condition_status, work: %s" % (json_dumps(self, sort_keys=True, indent=4)))
1294 
1295         status = self.get_custom_condition_status_real()
1296         self.logger.debug("get_custom_condition_status, status: %s" % (status))
1297         return status
1298 
1299     def get_not_custom_condition_status(self):
1300         return not self.get_custom_condition_status()
1301 
1302     def setup_logger(self):
1303         """
1304         Setup logger
1305         """
1306         self.logger = logging.getLogger(self.get_class_name())
1307         return self.logger
1308 
1309     def add_errors(self, error):
1310         self.errors.append(error)
1311 
1312     def get_errors(self):
1313         return self.errors
1314 
1315     def set_work_id(self, work_id, transforming=True):
1316         """
1317         *** Function called by Marshaller and clerk agent.
1318         *** It's the transform_id set by core_workprogresses
1319         """
1320         self.work_id = work_id
1321         self.transforming = transforming
1322 
1323     def get_work_id(self):
1324         """
1325         *** Function called by Marshaller and clerk agent.
1326         """
1327         return self.work_id
1328 
1329     def set_request_id(self, request_id):
1330         """
1331         *** Function called by Marshaller and clerk agent.
1332         *** It's the transform_id set by core_workprogresses
1333         """
1334         self.request_id = request_id
1335 
1336     def get_request_id(self):
1337         """
1338         *** Function called by Marshaller and clerk agent.
1339         """
1340         return self.request_id
1341 
1342     # def set_workflow(self, workflow):
1343     #     self.workflow = workflow
1344 
1345     def clean_work(self):
1346         self.processings = {}
1347         self.active_processings = []
1348         self.cancelled_processings = []
1349         self.suspended_processings = []
1350         self.old_processings = []
1351         self.terminated_msg = ""
1352         self.output_data = {}
1353         self.parameters_for_next_task = None
1354         self.last_updated_at = datetime.datetime.utcnow()
1355 
1356     def set_agent_attributes(self, attrs, req_attributes=None):
1357         if self.agent_attributes is None:
1358             self.agent_attributes = {}
1359         if attrs and self.class_name in attrs:
1360             for key, value in attrs[self.class_name].items():
1361                 self.agent_attributes[key] = value
1362         self.logger.info("agent_attributes: %s" % self.agent_attributes)
1363 
1364     def get_agent_attributes(self):
1365         return self.agent_attributes
1366 
1367     def set_workdir(self, workdir):
1368         self.workdir = workdir
1369 
1370     def get_workdir(self):
1371         return self.workdir
1372 
1373     def set_status(self, status):
1374         """
1375         *** Function called by Marshaller agent.
1376         """
1377         assert (isinstance(status, WorkStatus))
1378         self.status = status
1379         # if self.workflow:
1380         #     self.workflow.work_status_update_trigger(self, status)
1381 
1382     def get_status(self):
1383         return self.status
1384 
1385     def set_terminated_msg(self, msg):
1386         """
1387         *** Function called by Marshaller agent.
1388         """
1389         self.terminated_msg = msg
1390         self.add_errors(msg)
1391 
1392     def get_terminated_msg(self):
1393         return self.terminated_msg
1394 
1395     def set_output_data(self, data):
1396         self.output_data = data
1397 
1398     def get_output_data(self):
1399         return self.output_data
1400 
1401     def set_parameters_for_next_task(self, params):
1402         self.parameters_for_next_task = params
1403 
1404     def get_parameters_for_next_task(self):
1405         return self.parameters_for_next_task
1406 
1407     def __eq__(self, obj):
1408         if self.work_id == obj.work_id:
1409             return True
1410         return False
1411 
1412     def __hash__(self):
1413         return self.work_id
1414 
1415     def __str__(self):
1416         return str(self.to_dict())
1417 
1418     def get_work_type(self):
1419         """
1420         *** Function called by Marshaller agent.
1421         """
1422         return self.work_type
1423 
1424     def get_work_tag(self):
1425         """
1426         *** Function called by Marshaller agent.
1427         """
1428         return self.work_tag
1429 
1430     def set_parameters(self, parameters):
1431         self.parameters = parameters
1432         for p in self.parameters:
1433             if self.parameters[p] is not None and hasattr(self, p):
1434                 # fp = getattr(self, p)
1435                 # fp = self.parameters[p]  # noqa F841
1436                 setattr(self, p, self.parameters[p])
1437 
1438     def get_parameters(self):
1439         return self.parameters
1440 
1441     def set_arguments(self, arguments):
1442         self.arguments = arguments
1443 
1444     def get_arguments(self):
1445         return self.arguments
1446 
1447     def convert_data_to_additional_data_storage(self, storage, storage_name=None, replace_storage_name=False):
1448         pass
1449 
1450     def get_ancestry_works(self):
1451         return []
1452 
1453     def get_processing_job_ids(self, processing, log_prefix=''):
1454         return []
1455 
1456     def get_processing_job_name_to_ids(self, processing, job_ids, log_prefix=''):
1457         return {}
1458 
1459     def has_to_release_inputs(self):
1460         if self.backup_to_release_inputs['0'] or self.backup_to_release_inputs['1'] or self.backup_to_release_inputs['2']:
1461             return True
1462         return False
1463 
1464     def add_backup_to_release_inputs(self, to_release_inputs):
1465         if to_release_inputs:
1466             self.backup_to_release_inputs['0'] = self.backup_to_release_inputs['0'] + to_release_inputs
1467 
1468     def get_backup_to_release_inputs(self):
1469         to_release_inputs = self.backup_to_release_inputs['0'] + self.backup_to_release_inputs['1'] + self.backup_to_release_inputs['2']
1470         self.backup_to_release_inputs['2'] = self.backup_to_release_inputs['1']
1471         self.backup_to_release_inputs['1'] = self.backup_to_release_inputs['0']
1472         self.backup_to_release_inputs['0'] = []
1473         return to_release_inputs
1474 
1475     def is_to_expire(self, expired_at=None, pending_time=None, request_id=None):
1476         if expired_at:
1477             if type(expired_at) in [str]:
1478                 expired_at = str_to_date(expired_at)
1479             if expired_at < datetime.datetime.utcnow():
1480                 self.logger.info("Request(%s) expired_at(%s) is smaller than utc now(%s), expiring" % (request_id,
1481                                                                                                        expired_at,
1482                                                                                                        datetime.datetime.utcnow()))
1483                 return True
1484 
1485         if pending_time:
1486             act_pending_time = float(pending_time)
1487             act_pending_seconds = int(86400 * act_pending_time)
1488             if self.last_updated_at + datetime.timedelta(seconds=act_pending_seconds) < datetime.datetime.utcnow():
1489                 log_str = "Request(%s) last updated at(%s) + pending seconds(%s)" % (request_id,
1490                                                                                      self.last_updated_at,
1491                                                                                      act_pending_seconds)
1492                 log_str += " is smaller than utc now(%s), expiring" % (datetime.datetime.utcnow())
1493                 self.logger.info(log_str)
1494                 return True
1495         return False
1496 
1497     def is_starting(self):
1498         return self.transforming
1499 
1500     def is_started(self):
1501         return self.started or self.submitted
1502 
1503     def is_running(self):
1504         if self.status in [WorkStatus.Running, WorkStatus.Transforming]:
1505             return True
1506         return False
1507 
1508     def is_terminated(self, synchronize=True):
1509         """
1510         *** Function called by Transformer agent.
1511         """
1512         if (self.status in [WorkStatus.Finished, WorkStatus.SubFinished, WorkStatus.Failed, WorkStatus.Cancelled, WorkStatus.Suspended, WorkStatus.Expired]
1513             and self.substatus not in [WorkStatus.ToCancel, WorkStatus.ToSuspend, WorkStatus.ToResume]):   # noqa W503
1514             return True
1515         return False
1516 
1517     def is_finished(self, synchronize=True):
1518         """
1519         *** Function called by Transformer agent.
1520         """
1521         if self.status in [WorkStatus.Finished] and self.substatus not in [WorkStatus.ToCancel, WorkStatus.ToSuspend, WorkStatus.ToResume]:
1522             return True
1523         return False
1524 
1525     def is_subfinished(self, synchronize=True):
1526         """
1527         *** Function called by Transformer agent.
1528         """
1529         if self.status in [WorkStatus.SubFinished] and self.substatus not in [WorkStatus.ToCancel, WorkStatus.ToSuspend, WorkStatus.ToResume]:
1530             return True
1531         return False
1532 
1533     def is_processed(self, synchronize=True):
1534         """
1535         *** Function called by Transformer agent.
1536         """
1537         if self.status in [WorkStatus.Finished, WorkStatus.SubFinished] and self.substatus not in [WorkStatus.ToCancel, WorkStatus.ToSuspend, WorkStatus.ToResume]:
1538             return True
1539         return False
1540 
1541     def is_failed(self, synchronize=True):
1542         """
1543         *** Function called by Transformer agent.
1544         """
1545         if self.status in [WorkStatus.Failed] and self.substatus not in [WorkStatus.ToCancel, WorkStatus.ToSuspend, WorkStatus.ToResume]:
1546             return True
1547         return False
1548 
1549     def is_expired(self, synchronize=True):
1550         """
1551         *** Function called by Transformer agent.
1552         """
1553         if self.status in [WorkStatus.Expired] and self.substatus not in [WorkStatus.ToCancel, WorkStatus.ToSuspend, WorkStatus.ToResume]:
1554             return True
1555         return False
1556 
1557     def is_cancelled(self, synchronize=True):
1558         """
1559         *** Function called by Transformer agent.
1560         """
1561         if self.status in [WorkStatus.Cancelled] and self.substatus not in [WorkStatus.ToCancel, WorkStatus.ToSuspend, WorkStatus.ToResume]:
1562             return True
1563         return False
1564 
1565     def is_suspended(self, synchronize=True):
1566         """
1567         *** Function called by Transformer agent.
1568         """
1569         if self.status in [WorkStatus.Suspended] and self.substatus not in [WorkStatus.ToCancel, WorkStatus.ToSuspend, WorkStatus.ToResume]:
1570             return True
1571         return False
1572 
1573     def add_next_work(self, work):
1574         next_works = self.next_works
1575         next_works.append(work)
1576         self.next_works = next_works
1577 
1578     def parse_arguments(self):
1579         try:
1580             arguments = self.get_arguments()
1581             parameters = self.get_parameters()
1582             arguments = arguments.format(**parameters)
1583             return arguments
1584         except Exception as ex:
1585             self.add_errors(str(ex))
1586 
1587     def set_initialized(self):
1588         self.initialized = True
1589 
1590     def unset_initialized(self):
1591         self.initialized = False
1592 
1593     def is_initialized(self):
1594         return self.initialized
1595 
1596     def initialize_work(self):
1597         if self.parameters and self.arguments:
1598             # for key in self.parameters.get_param_names():
1599             #    self.arguments = re.sub(key, str(self.parameters.get_param_value(key)), self.arguments)
1600             # self.arguments = self.arguments.format(**self.parameters)
1601             pass
1602         self.sign()
1603         if not self.is_initialized():
1604             self.set_initialized()
1605 
1606     def copy(self):
1607         new_work = copy.deepcopy(self)
1608         return new_work
1609 
1610     def __deepcopy__(self, memo):
1611         logger = self.logger
1612         self.logger = None
1613 
1614         cls = self.__class__
1615         result = cls.__new__(cls)
1616 
1617         memo[id(self)] = result
1618 
1619         # Deep copy all other attributes
1620         for k, v in self.__dict__.items():
1621             setattr(result, k, copy.deepcopy(v, memo))
1622 
1623         self.logger = logger
1624         result.logger = logger
1625         return result
1626 
1627     def depend_on(self, work):
1628         return False
1629 
1630     def generate_work_from_template(self):
1631         logger = self.logger
1632         self.logger = None
1633         new_work = copy.deepcopy(self)
1634         self.logger = logger
1635         new_work.logger = logger
1636         # new_work.template_work_id = self.get_internal_id()
1637         if self.is_template:
1638             new_work.internal_id = str(uuid.uuid4())[:8]
1639         return new_work
1640 
1641     def get_template_id(self):
1642         return self.template_work_id
1643 
1644     def resume_work(self):
1645         if self.status in [WorkStatus.New, WorkStatus.Ready]:
1646             pass
1647         else:
1648             self.status = WorkStatus.Transforming
1649         self.polling_retries = 0
1650 
1651     def add_collection_to_collections(self, coll):
1652         assert (isinstance(coll, dict))
1653         assert ('scope' in coll)
1654         assert ('name' in coll)
1655 
1656         coll_metadata = copy.copy(coll)
1657         del coll_metadata['scope']
1658         del coll_metadata['name']
1659         if 'type' in coll_metadata:
1660             coll_type = coll_metadata['type']
1661             del coll_metadata['type']
1662         else:
1663             coll_type = CollectionType.Dataset
1664 
1665         collection = Collection(scope=coll['scope'], name=coll['name'], coll_type=coll_type, coll_metadata=coll_metadata)
1666         self.collections[collection.internal_id] = collection
1667         return collection
1668 
1669     def set_primary_input_collection(self, coll):
1670         if coll:
1671             collection = self.add_collection_to_collections(coll)
1672             self._primary_input_collection = collection.internal_id
1673 
1674     def get_primary_input_collection(self):
1675         """
1676         *** Function called by Marshaller agent.
1677         """
1678         if self._primary_input_collection:
1679             return self.collections[self._primary_input_collection]
1680         return None
1681 
1682     def set_primary_output_collection(self, coll):
1683         if coll:
1684             collection = self.add_collection_to_collections(coll)
1685             self._primary_output_collection = collection.internal_id
1686 
1687     def get_primary_output_collection(self):
1688         """
1689         *** Function called by Marshaller agent.
1690         """
1691         if self._primary_output_collection:
1692             return self.collections[self._primary_output_collection]
1693         return None
1694 
1695     def add_other_input_collections(self, colls):
1696         if not colls:
1697             return
1698         if type(colls) not in [list, tuple]:
1699             colls = [colls]
1700 
1701         for coll in colls:
1702             collection = self.add_collection_to_collections(coll)
1703             self._other_input_collections.append(collection.internal_id)
1704 
1705     def get_other_input_collections(self):
1706         return [self.collections[k] for k in self._other_input_collections]
1707 
1708     def add_other_output_collections(self, colls):
1709         if not colls:
1710             return
1711         if type(colls) not in [list, tuple]:
1712             colls = [colls]
1713 
1714         for coll in colls:
1715             collection = self.add_collection_to_collections(coll)
1716             self._other_output_collections.append(collection.internal_id)
1717 
1718     def get_other_output_collections(self):
1719         return [self.collections[k] for k in self._other_output_collections]
1720 
1721     def get_input_collections(self, poll_externel=False):
1722         """
1723         *** Function called by Transformer agent.
1724         """
1725         if self._primary_input_collection:
1726             keys = [self._primary_input_collection] + self._other_input_collections
1727         else:
1728             keys = self._other_input_collections
1729         return [self.collections[k] for k in keys]
1730 
1731     def get_output_collections(self):
1732         """
1733         *** Function called by Transformer agent.
1734         """
1735         if self._primary_output_collection:
1736             keys = [self._primary_output_collection] + self._other_output_collections
1737         else:
1738             keys = self._other_output_collections
1739         return [self.collections[k] for k in keys]
1740 
1741     def get_collections(self):
1742         return [self.collections[k] for k in self.collections.keys()]
1743 
1744     def is_input_collections_closed(self):
1745         colls = self.get_input_collections()
1746         for coll in colls:
1747             if coll.status not in [CollectionStatus.Closed]:
1748                 return False
1749         return True
1750 
1751     def is_internal_collection(self, coll):
1752         if (coll.coll_metadata and 'source' in coll.coll_metadata and coll.coll_metadata['source']  # noqa W503
1753             and type(coll.coll_metadata['source']) == str and coll.coll_metadata['source'].lower() == 'idds'):  # noqa W503
1754             return True
1755         return False
1756 
1757     def get_internal_collections(self, coll):
1758         if coll.coll_metadata and 'request_id' in coll.coll_metadata:
1759             # relation_type = coll['coll_metadata']['relation_type'] if 'relation_type' in coll['coll_metadata'] else CollectionRelationType.Output
1760             # colls = core_catalog.get_collections(scope=coll['scope'],
1761             #                                 name=coll['name'],
1762             #                                 request_id=coll['coll_metadata']['request_id'],
1763             #                                 relation_type=relation_type)
1764             return []
1765         return []
1766 
1767     def poll_external_collection(self, coll):
1768         return coll
1769 
1770     def poll_internal_collection(self, coll):
1771         try:
1772             if coll.status in [CollectionStatus.Closed]:
1773                 return coll
1774             else:
1775                 if coll.coll_metadata is None:
1776                     coll.coll_metadata = {}
1777                 coll.coll_metadata['bytes'] = 0
1778                 coll.coll_metadata['availability'] = 0
1779                 coll.coll_metadata['events'] = 0
1780                 coll.coll_metadata['is_open'] = True
1781                 coll.coll_metadata['run_number'] = 1
1782                 coll.coll_metadata['did_type'] = 'DATASET'
1783                 coll.coll_metadata['list_all_files'] = False
1784                 coll.coll_metadata['interal_colls'] = []
1785 
1786                 is_open = False
1787                 internal_colls = self.get_internal_collections(coll)
1788                 for i_coll in internal_colls:
1789                     if i_coll.status not in [CollectionStatus.Closed]:
1790                         is_open = True
1791                     coll.coll_metadata['bytes'] += i_coll['bytes']
1792 
1793                 if not is_open:
1794                     coll_status = CollectionStatus.Closed
1795                 else:
1796                     coll_status = CollectionStatus.Open
1797                 coll.status = coll_status
1798                 if len(internal_colls) > 1:
1799                     coll.coll_metadata['coll_type'] = CollectionType.Container
1800                 else:
1801                     coll.coll_metadata['coll_type'] = CollectionType.Dataset
1802 
1803                 return coll
1804         except Exception as ex:
1805             self.logger.error(ex)
1806             self.logger.error(traceback.format_exc())
1807             raise exceptions.IDDSException('%s: %s' % (str(ex), traceback.format_exc()))
1808 
1809     def get_internal_input_contents(self, coll):
1810         """
1811         Get all input contents from iDDS collections.
1812         """
1813         coll = self.collections[self._primary_input_collection]
1814         internal_colls = self.get_internal_collection(coll)
1815         internal_coll_ids = [coll.coll_id for coll in internal_colls]
1816         if internal_coll_ids:
1817             # contents = catalog.get_contents_by_coll_id_status(coll_id=coll_ids)
1818             contents = []
1819         else:
1820             contents = []
1821         return contents
1822 
1823     def get_input_contents(self):
1824         """
1825         Get all input contents from DDM.
1826         """
1827         pass
1828 
1829     def add_output_collections(self, colls):
1830         """
1831         """
1832         if not colls:
1833             return
1834         if type(colls) not in [list, tuple]:
1835             colls = [colls]
1836 
1837         value = colls
1838         if value:
1839             primary_collection = value[0]
1840             other_collections = []
1841             if len(value) > 1:
1842                 other_collections = value[1:]
1843 
1844             self.set_primary_output_collection(primary_collection)
1845 
1846             if other_collections and type(other_collections) not in [list, tuple]:
1847                 other_collections = [other_collections]
1848             self.add_other_output_collections(other_collections)
1849 
1850     def get_output_contents(self):
1851         pass
1852 
1853     def add_log_collections(self, colls):
1854         if not colls:
1855             return
1856         if type(colls) not in [list, tuple]:
1857             colls = [colls]
1858 
1859         for coll in colls:
1860             collection = self.add_collection_to_collections(coll)
1861             self.log_collections.append(collection.internal_id)
1862 
1863     def get_log_collections(self):
1864         return [self.collections[k] for k in self.log_collections]
1865 
1866     def set_has_new_inputs(self, yes=True):
1867         self.has_new_inputs = yes
1868 
1869     def has_dependency(self):
1870         return False
1871 
1872     def get_parent_work_names(self):
1873         return []
1874 
1875     def get_parent_workload_ids(self):
1876         return []
1877 
1878     def get_new_input_output_maps(self, mapped_input_output_maps={}):
1879         """
1880         *** Function called by Transformer agent.
1881         New inputs which are not yet mapped to outputs.
1882 
1883         :param mapped_input_output_maps: Inputs that are already mapped.
1884         """
1885         inputs = self.get_input_contents()
1886         # mapped_inputs = mapped_input_output_maps.keys()
1887 
1888         mapped_inputs = []
1889         for map_id in mapped_input_output_maps:
1890             map_id_inputs = mapped_input_output_maps[map_id]
1891             for ip in map_id_inputs:
1892                 if ip['coll_id'] == self.primary_input_collection['coll_id']:
1893                     mapped_inputs.append(ip['scope'] + ':' + ip['name'])
1894 
1895         new_inputs = []
1896         for ip in inputs:
1897             if ip in mapped_inputs:
1898                 pass
1899             else:
1900                 new_inputs.append(ip)
1901 
1902         new_input_output_maps = {}
1903         mapped_keys = mapped_input_output_maps.keys()
1904         if mapped_keys:
1905             next_key = max(mapped_keys) + 1
1906         else:
1907             next_key = 1
1908         for ip in new_inputs:
1909             self.num_mapped_inputs += 1
1910             out_ip = copy.deepcopy(ip)
1911             out_ip['coll_id'] = self.collections[self._primary_output_collection]['coll_id']
1912             new_input_output_maps[next_key] = {'inputs': [ip],
1913                                                'outputs': [out_ip],
1914                                                'inputs_dependency': [],
1915                                                'logs': []}
1916             next_key += 1
1917 
1918         return new_input_output_maps
1919 
1920     def set_collection_id(self, collection, coll_id):
1921         # print(collection)
1922         # print(coll_id)
1923         self.collections[collection.internal_id]['coll_id'] = coll_id
1924 
1925     def should_release_inputs(self, processing=None, poll_operation_time_period=120):
1926         if self.release_inputs_after_submitting:
1927             if not poll_operation_time_period:
1928                 poll_operation_time_period = 120
1929 
1930             processing_model = processing.processing
1931             if (processing_model and processing_model['submitted_at']                                                 # noqa: W503
1932                and processing_model['submitted_at'] + datetime.timedelta(seconds=int(poll_operation_time_period))    # noqa: W503
1933                < datetime.datetime.utcnow()):                                                                        # noqa: W503
1934 
1935                 if (processing and processing.status
1936                    and processing.status not in [ProcessingStatus.New, ProcessingStatus.New.value]):                   # noqa: W503
1937                     # and processing.status not in [ProcessingStatus.New, ProcessingStatus.New.value,                   # noqa: W503
1938                     #                               ProcessingStatus.Submitting, ProcessingStatus.Submitting.value]):   # noqa: W503
1939                     return True
1940             return False
1941         return True
1942 
1943     def use_dependency_to_release_jobs(self):
1944         """
1945         *** Function called by Transformer agent.
1946         """
1947         return False
1948 
1949     def require_ext_contents(self):
1950         return False
1951 
1952     def has_external_content_id(self):
1953         return False
1954 
1955     def set_work_name_to_coll_map(self, work_name_to_coll_map):
1956         self.work_name_to_coll_map = work_name_to_coll_map
1957 
1958     def get_work_name_to_coll_map(self):
1959         return self.work_name_to_coll_map
1960 
1961     def add_processing_to_processings(self, processing):
1962         # assert(isinstance(processing, dict))
1963         # assert('processing_metadata' in processing)
1964         # if 'processing_metadata' not in processing:
1965         #     processing['processing_metadata'] = {}
1966 
1967         # if 'internal_id' not in processing['processing_metadata']:
1968         #     processing['processing_metadata']['internal_id'] = str(uuid.uuid1())
1969         self.processings[processing.internal_id] = processing
1970 
1971     def get_processing_ids(self):
1972         ids = []
1973         for p_id in self.active_processings:
1974             p = self.processings[p_id]
1975             if p.processing_id:
1976                 ids.append(p.processing_id)
1977         return ids
1978 
1979     # def set_processing(self, processing):
1980     #     self.processing = processing
1981 
1982     def set_processing_id(self, processing, processing_id):
1983         """
1984         *** Function called by Transformer agent.
1985         """
1986         self.processings[processing.internal_id].processing_id = processing_id
1987 
1988     def set_processing_status(self, processing, status, substatus):
1989         """
1990         *** Function called by Transformer agent.
1991         """
1992         self.processings[processing.internal_id].status = status
1993         self.processings[processing.internal_id].substatus = substatus
1994         # if status not in [ProcessingStatus.New, ProcessingStatus.Submitting,
1995         #                   ProcessingStatus.Submitted, ProcessingStatus.Running]:
1996         #     if processing['processing_metadata']['internal_id'] in self.active_processings:
1997         #         del self.active_processings[processing['processing_metadata']['internal_id']]
1998 
1999     def set_processing_output_metadata(self, processing, output_metadata):
2000         """
2001         *** Function called by Transformer agent.
2002         """
2003         processing = self.processings[processing.internal_id]
2004         # processing['output_metadata'] = output_metadata
2005         self.set_output_data(output_metadata)
2006 
2007     def is_processing_substatus_new_operationing(self, processing):
2008         if processing.substatus in [ProcessingStatus.ToCancel,
2009                                     ProcessingStatus.ToSuspend,
2010                                     ProcessingStatus.ToResume,
2011                                     ProcessingStatus.ToFinish,
2012                                     ProcessingStatus.ToForceFinish]:
2013             return True
2014         return False
2015 
2016     def is_processing_terminated(self, processing):
2017         self.logger.debug("is_processing_terminated: status: %s, substatus: %s" % (processing.status, processing.substatus))
2018         if self.is_processing_substatus_new_operationing(processing):
2019             return False
2020 
2021         if processing.status not in [ProcessingStatus.New,
2022                                      ProcessingStatus.Submitting,
2023                                      ProcessingStatus.Submitted,
2024                                      ProcessingStatus.Running,
2025                                      ProcessingStatus.ToCancel,
2026                                      ProcessingStatus.Cancelling,
2027                                      ProcessingStatus.ToSuspend,
2028                                      ProcessingStatus.Suspending,
2029                                      ProcessingStatus.ToResume,
2030                                      ProcessingStatus.Resuming,
2031                                      ProcessingStatus.ToFinish,
2032                                      ProcessingStatus.ToForceFinish]:
2033             return True
2034         return False
2035 
2036     def reap_processing(self, processing):
2037         if self.is_processing_terminated(processing):
2038             self.active_processings.remove(processing.internal_id)
2039         else:
2040             self.logger.error("Cannot reap an unterminated processing: %s" % processing)
2041 
2042     def is_processings_started(self):
2043         """
2044         *** Function called by Transformer agent.
2045         """
2046         # for p_id in self.active_processings:
2047         for p_id in self.processings:
2048             p = self.processings[p_id]
2049             if p.submitted_at:
2050                 return True
2051         return False
2052 
2053     def is_processings_running(self):
2054         """
2055         *** Function called by Transformer agent.
2056         """
2057         for p_id in self.active_processings:
2058             p = self.processings[p_id]
2059             if p.status in [ProcessingStatus.Running]:
2060                 return True
2061         return False
2062 
2063     def is_processings_terminated(self):
2064         """
2065         *** Function called by Transformer agent.
2066         """
2067         for p_id in self.active_processings:
2068             p = self.processings[p_id]
2069             if self.is_processing_terminated(p):
2070                 pass
2071             else:
2072                 return False
2073         return True
2074 
2075     def is_processings_finished(self):
2076         """
2077         *** Function called by Transformer agent.
2078         """
2079         for p_id in self.active_processings:
2080             p = self.processings[p_id]
2081             if not self.is_processing_terminated(p) or p.status not in [ProcessingStatus.Finished]:
2082                 return False
2083         return True
2084 
2085     def is_processings_subfinished(self):
2086         """
2087         *** Function called by Transformer agent.
2088         """
2089         has_finished = False
2090         has_failed = False
2091         for p_id in self.active_processings:
2092             p = self.processings[p_id]
2093             if not self.is_processing_terminated(p):
2094                 return False
2095             else:
2096                 if p.status in [ProcessingStatus.Finished]:
2097                     has_finished = True
2098                 if p.status in [ProcessingStatus.Failed]:
2099                     has_failed = True
2100         if has_finished and has_failed:
2101             return True
2102         return False
2103 
2104     def is_processings_failed(self):
2105         """
2106         *** Function called by Transformer agent.
2107         """
2108         for p_id in self.active_processings:
2109             p = self.processings[p_id]
2110             if not self.is_processing_terminated(p) or p.status not in [ProcessingStatus.Failed]:
2111                 return False
2112         return True
2113 
2114     def is_processings_expired(self):
2115         """
2116         *** Function called by Transformer agent.
2117         """
2118         has_expired = False
2119         for p_id in self.active_processings:
2120             p = self.processings[p_id]
2121             if not self.is_processing_terminated(p):
2122                 return False
2123             elif p.status in [ProcessingStatus.Expired]:
2124                 has_expired = True
2125         if has_expired:
2126             return True
2127         return False
2128 
2129     def is_processings_cancelled(self):
2130         """
2131         *** Function called by Transformer agent.
2132         """
2133         has_cancelled = False
2134         for p_id in self.active_processings:
2135             p = self.processings[p_id]
2136             if not self.is_processing_terminated(p):
2137                 return False
2138             elif p.status in [ProcessingStatus.Cancelled]:
2139                 has_cancelled = True
2140         if has_cancelled:
2141             return True
2142         return False
2143 
2144     def is_processings_suspended(self):
2145         """
2146         *** Function called by Transformer agent.
2147         """
2148         has_suspended = False
2149         for p_id in self.active_processings:
2150             p = self.processings[p_id]
2151             if not self.is_processing_terminated(p):
2152                 return False
2153             elif p.status in [ProcessingStatus.Suspended]:
2154                 has_suspended = True
2155         if has_suspended:
2156             return True
2157         return False
2158 
2159     def create_processing(self, input_output_maps=[]):
2160         """
2161         *** Function called by Transformer agent.
2162         """
2163         # proc = {'processing_metadata': {'internal_id': str(uuid.uuid1())}}
2164         proc = Processing()
2165         self.add_processing_to_processings(proc)
2166         self.active_processings.append(proc.internal_id)
2167         return proc
2168 
2169     def get_processing(self, input_output_maps, without_creating=False):
2170         """
2171         *** Function called by Transformer agent.
2172         """
2173         if self.active_processings:
2174             return self.processings[self.active_processings[0]]
2175         else:
2176             if not without_creating:
2177                 # return None
2178                 return self.create_processing(input_output_maps)
2179                 # self.process = process
2180                 # return process
2181         return None
2182 
2183     def sync_processing(self, processing, processing_model):
2184         processing.processing = processing_model
2185         self.set_processing_status(processing, processing_model['status'], processing_model['substatus'])
2186 
2187     def submit_processing(self, processing):
2188         """
2189         *** Function called by Carrier agent.
2190         """
2191         raise exceptions.NotImplementedException
2192 
2193     def abort_processing_old(self, processing):
2194         """
2195         *** Function called by Carrier agent.
2196         """
2197         # raise exceptions.NotImplementedException
2198         self.tocancel = True
2199         if (processing and 'processing_metadata' in processing and processing['processing_metadata']                     # noqa W503
2200             and 'processing' in processing['processing_metadata'] and processing['processing_metadata']['processing']):  # noqa W503
2201             proc = processing['processing_metadata']['processing']
2202             proc.tocancel = True
2203 
2204     def suspend_processing(self, processing):
2205         """
2206         *** Function called by Carrier agent.
2207         """
2208         # raise exceptions.NotImplementedException
2209         self.tosuspend = True
2210         if (processing and 'processing_metadata' in processing and processing['processing_metadata']                     # noqa W503
2211             and 'processing' in processing['processing_metadata'] and processing['processing_metadata']['processing']):  # noqa W503
2212             proc = processing['processing_metadata']['processing']
2213             proc.tosuspend = True
2214 
2215     def resume_processing_old(self, processing):
2216         """
2217         *** Function called by Carrier agent.
2218         """
2219         # raise exceptions.NotImplementedException
2220         self.toresume = True
2221         if (processing and 'processing_metadata' in processing and processing['processing_metadata']                     # noqa W503
2222             and 'processing' in processing['processing_metadata'] and processing['processing_metadata']['processing']):  # noqa W503
2223             proc = processing['processing_metadata']['processing']
2224             proc.toresume = True
2225 
2226     def expire_processing(self, processing):
2227         """
2228         *** Function called by Carrier agent.
2229         """
2230         # raise exceptions.NotImplementedException
2231         self.toexpire = True
2232         if (processing and 'processing_metadata' in processing and processing['processing_metadata']                     # noqa W503
2233             and 'processing' in processing['processing_metadata'] and processing['processing_metadata']['processing']):  # noqa W503
2234             proc = processing['processing_metadata']['processing']
2235             proc.toexpire = True
2236 
2237     def finish_processing(self, processing, forcing=False):
2238         """
2239         *** Function called by Carrier agent.
2240         """
2241         # raise exceptions.NotImplementedException
2242         if forcing:
2243             self.toforcefinish = True
2244         else:
2245             self.tofinish = True
2246         if (processing and 'processing_metadata' in processing and processing['processing_metadata']                     # noqa W503
2247             and 'processing' in processing['processing_metadata'] and processing['processing_metadata']['processing']):  # noqa W503
2248             proc = processing['processing_metadata']['processing']
2249             proc.tofinish = True
2250             if forcing:
2251                 proc.toforcefinish = True
2252 
2253     def poll_processing_updates(self, processing, input_output_maps, contents_ext=None, log_prefix=''):
2254         """
2255         *** Function called by Carrier agent.
2256         """
2257         processing['processing'].has_new_updates()
2258         raise exceptions.NotImplementedException
2259 
2260     def is_all_outputs_flushed(self, input_output_maps):
2261         for map_id in input_output_maps:
2262             outputs = input_output_maps[map_id]['outputs']
2263 
2264             for content in outputs:
2265                 if content['status'] != content['substatus']:
2266                     return False
2267         return True
2268 
2269     def syn_work_status(self, input_output_maps, all_updates_flushed=True, output_statistics={}, to_release_input_contents=[]):
2270         """
2271         *** Function called by Transformer agent.
2272         """
2273         # raise exceptions.NotImplementedException
2274         self.logger.debug("syn_work_status(%s): is_processings_terminated: %s" % (str(self.get_processing_ids()), str(self.is_processings_terminated())))
2275         self.logger.debug("syn_work_status(%s): is_input_collections_closed: %s" % (str(self.get_processing_ids()), str(self.is_input_collections_closed())))
2276         self.logger.debug("syn_work_status(%s): has_new_inputs: %s" % (str(self.get_processing_ids()), str(self.has_new_inputs)))
2277         self.logger.debug("syn_work_status(%s): has_to_release_inputs: %s" % (str(self.get_processing_ids()), str(self.has_to_release_inputs())))
2278         self.logger.debug("syn_work_status(%s): to_release_input_contents: %s" % (str(self.get_processing_ids()), str(to_release_input_contents)))
2279         if self.is_processings_terminated() and self.is_input_collections_closed() and not self.has_new_inputs and not self.has_to_release_inputs() and not to_release_input_contents:
2280             if not self.is_all_outputs_flushed(input_output_maps):
2281                 self.logger.warn("The work processings %s is terminated. but not all outputs are flushed. Wait to flush the outputs then finish the transform" % str(self.get_processing_ids()))
2282                 return
2283 
2284             if self.is_processings_finished():
2285                 self.status = WorkStatus.Finished
2286             elif self.is_processings_subfinished():
2287                 self.status = WorkStatus.SubFinished
2288             elif self.is_processings_failed():
2289                 self.status = WorkStatus.Failed
2290             elif self.is_processings_expired():
2291                 self.status = WorkStatus.Expired
2292             elif self.is_processings_cancelled():
2293                 self.status = WorkStatus.Cancelled
2294             elif self.is_processings_suspended():
2295                 self.status = WorkStatus.Suspended
2296         elif self.is_processings_running():
2297             self.status = WorkStatus.Running
2298         else:
2299             self.status = WorkStatus.Transforming
2300 
2301         if self.is_processings_terminated() or self.is_processings_running() or self.is_processings_started():
2302             self.started = True
2303         self.logger.debug("syn_work_status(%s): work.status: %s" % (str(self.get_processing_ids()), str(self.status)))
2304 
2305     def sync_work_data(self, status, substatus, work, workload_id=None, output_data=None, processing=None):
2306         # self.status = work.status
2307         work.work_id = self.work_id
2308         work.transforming = self.transforming
2309 
2310         # clerk will update next_works while transformer doesn't.
2311         # synchronizing work metadata from transformer to clerk needs to keep it at first.
2312         next_works = self.next_works
2313         # self.metadata = work.metadata
2314         self.next_works = next_works
2315 
2316         self.status_statistics = work.status_statistics
2317         # self.processings = work.processings
2318         if output_data:
2319             self.output_data = output_data
2320         else:
2321             self.output_data = work.output_data
2322 
2323         self.status = get_work_status_from_transform_processing_status(status)
2324         self.substatus = get_work_status_from_transform_processing_status(substatus)
2325         if workload_id:
2326             self.workload_id = workload_id
2327         if processing is not None:
2328             # called by transformer to sync from processing
2329             if processing.submitted_at:
2330                 self.submitted = True
2331         else:
2332             # called by clerk to syn from transform
2333             self.submitted = work.submitted
2334 
2335         if self.submitted:
2336             self.started = True
2337 
2338         """
2339         self.status = WorkStatus(status.value)
2340         self.substatus = WorkStatus(substatus.value)
2341         self.workdir = work.workdir
2342         self.has_new_inputs = work.has_new_inputs
2343         self.errors = work.errors
2344         self.next_works = work.next_works
2345 
2346         self.terminated_msg = work.terminated_msg
2347         self.output_data = work.output_data
2348         self.parameters_for_next_task = work.parameters_for_next_task
2349 
2350         self.status_statistics = work.status_statistics
2351 
2352         self.processings = work.processings
2353         self.active_processings = work.active_processings
2354         self.cancelled_processings = work.cancelled_processings
2355         self.suspended_processings = work.suspended_processings
2356         """
2357 
2358     def abort_processing(self, processing, log_prefix=''):
2359         msg = "abort processing is not implemented"
2360         self.logger.error(log_prefix + msg)
2361 
2362     def resume_processing(self, processing, log_prefix=''):
2363         msg = "resume processing is not implemented"
2364         self.logger.error(log_prefix + msg)
2365 
2366     def add_proxy(self, proxy):
2367         self.proxy = proxy
2368 
2369     def get_proxy(self):
2370         return self.proxy
2371 
2372     def set_user_proxy(self):
2373         if 'X509_USER_PROXY' in os.environ:
2374             self.original_proxy = os.environ['X509_USER_PROXY']
2375         if self.get_proxy():
2376             user_proxy = '/tmp/idds_user_proxy'
2377             with open(user_proxy, 'w') as fp:
2378                 fp.write(self.get_proxy())
2379             os.chmod(user_proxy, stat.S_IRUSR | stat.S_IWUSR)
2380             os.environ['X509_USER_PROXY'] = user_proxy
2381 
2382     def unset_user_proxy(self):
2383         if self.original_proxy:
2384             os.environ['X509_USER_PROXY'] = self.original_proxy
2385         else:
2386             del os.environ['X509_USER_PROXY']
2387 
2388     def get_external_content_ids(self, processing, log_prefix=''):
2389         return []