File indexing completed on 2026-04-09 07:58:33
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 import copy
0012 import datetime
0013 import logging
0014 import os
0015 import stat
0016 import uuid
0017 import traceback
0018
0019 from idds.common import exceptions
0020 from idds.common.constants import (WorkStatus, ProcessingStatus,
0021 CollectionStatus, CollectionType)
0022 from idds.common.constants import get_work_status_from_transform_processing_status
0023 from idds.common.utils import setup_logging
0024 from idds.common.utils import str_to_date
0025
0026
0027 from .base import Base
0028
0029
0030 setup_logging(__name__)
0031
0032
0033 class Parameter(object):
0034 def __init__(self, params):
0035 assert (type(params) in [dict])
0036 self.params = params
0037
0038 def add(self, name, value):
0039 self.params[name] = value
0040
0041 def get_param_names(self):
0042 return self.params.keys()
0043
0044 def get_param_value(self, name):
0045 value = self.params.get(name, None)
0046 if value and callable(value):
0047 value = value()
0048 return value
0049
0050
0051 class Collection(Base):
0052
0053 def __init__(self, scope=None, name=None, coll_type=CollectionType.Dataset, coll_metadata={}):
0054 super(Collection, self).__init__()
0055 self.scope = scope
0056 self.name = name
0057 self.coll_metadata = coll_metadata
0058
0059 self.collection = None
0060
0061 self.internal_id = str(uuid.uuid4())[:8]
0062 self.coll_id = None
0063 self.coll_type = coll_type
0064 self.status = CollectionStatus.New
0065 self.substatus = CollectionStatus.New
0066
0067 self.total_files = 0
0068 self.processed_files = 0
0069 self.processing_files = 0
0070 self.activated_files = 0
0071 self.preprocessing_files = 0
0072 self.bytes = 0
0073 self.new_files = 0
0074 self.failed_files = 0
0075 self.missing_files = 0
0076
0077 self.ext_files = 0
0078 self.processed_ext_files = 0
0079 self.failed_ext_files = 0
0080 self.missing_ext_files = 0
0081
0082 @property
0083 def internal_id(self):
0084 return self.get_metadata_item('internal_id')
0085
0086 @internal_id.setter
0087 def internal_id(self, value):
0088 self.add_metadata_item('internal_id', value)
0089
0090 @property
0091 def coll_id(self):
0092 return self.get_metadata_item('coll_id', None)
0093
0094 @coll_id.setter
0095 def coll_id(self, value):
0096 self.add_metadata_item('coll_id', value)
0097
0098 @property
0099 def status(self):
0100 st = self.get_metadata_item('status', CollectionStatus.New)
0101 if type(st) in [int]:
0102 st = CollectionStatus(st)
0103 return st
0104
0105 @status.setter
0106 def status(self, value):
0107 self.add_metadata_item('status', value.value if value else value)
0108 if self.collection:
0109 self.collection['status'] = value
0110
0111 @property
0112 def coll_type(self):
0113 st = self.get_metadata_item('coll_type', CollectionType.Dataset)
0114 if type(st) in [int]:
0115 st = CollectionType(st)
0116 return st
0117
0118 @coll_type.setter
0119 def coll_type(self, value):
0120 self.add_metadata_item('coll_type', value.value if value else value)
0121 if self.collection:
0122 self.collection['coll_type'] = value
0123
0124 @property
0125 def substatus(self):
0126 st = self.get_metadata_item('substatus', CollectionStatus.New)
0127 if type(st) in [int]:
0128 st = CollectionStatus(st)
0129 return st
0130
0131 @substatus.setter
0132 def substatus(self, value):
0133 self.add_metadata_item('substatus', value.value if value else value)
0134 if self.collection:
0135 self.collection['substatus'] = value
0136
0137 @property
0138 def collection(self):
0139 return self._collection
0140
0141 @collection.setter
0142 def collection(self, value):
0143 self._collection = value
0144 if self._collection:
0145 self.scope = self._collection['scope']
0146 self.name = self._collection['name']
0147 self.coll_metadata = self._collection['coll_metadata']
0148 self.coll_id = self._collection['coll_id']
0149 self.coll_type = self._collection['coll_type']
0150 self.status = self._collection['status']
0151 self.substatus = self._collection['substatus']
0152
0153 self.total_files = self._collection['total_files']
0154 self.processed_files = self._collection['processed_files']
0155 self.processing_files = self._collection['processing_files']
0156 self.activated_files = self._collection.get('activated_files', 0)
0157 self.preprocessing_files = self._collection.get('preprocessing_files', 0)
0158 self.bytes = self._collection['bytes']
0159
0160 def to_origin_dict(self):
0161 return {'scope': self.scope, 'name': self.name}
0162
0163
0164 class Processing(Base):
0165
0166 def __init__(self, processing_metadata={}):
0167 super(Processing, self).__init__()
0168
0169 self.processing_metadata = processing_metadata
0170 if self.processing_metadata and 'work' in self.processing_metadata:
0171 self.work = self.processing_metadata['work']
0172 else:
0173 self.work = None
0174
0175 self.processing = None
0176
0177 self.internal_id = str(uuid.uuid4())[:8]
0178 self.task_name = None
0179 self.processing_id = None
0180 self.workload_id = None
0181 self.status = ProcessingStatus.New
0182 self.substatus = ProcessingStatus.New
0183 self.last_updated_at = datetime.datetime.utcnow()
0184 self.polling_retries = 0
0185 self.tocancel = False
0186 self.tosuspend = False
0187 self.toresume = False
0188 self.toexpire = False
0189 self.tofinish = False
0190 self.toforcefinish = False
0191 self.operation_time = datetime.datetime.utcnow()
0192 self.submitted_at = None
0193
0194 self.username = None
0195
0196 self.external_id = None
0197 self.errors = None
0198
0199 self.output_data = None
0200
0201 self.retries = 0
0202
0203 @property
0204 def internal_id(self):
0205 return self.get_metadata_item('internal_id')
0206
0207 @internal_id.setter
0208 def internal_id(self, value):
0209 self.add_metadata_item('internal_id', value)
0210
0211 @property
0212 def processing_id(self):
0213 return self.get_metadata_item('processing_id', None)
0214
0215 @processing_id.setter
0216 def processing_id(self, value):
0217 self.add_metadata_item('processing_id', value)
0218
0219 @property
0220 def workload_id(self):
0221 return self.get_metadata_item('workload_id', None)
0222
0223 @workload_id.setter
0224 def workload_id(self, value):
0225 self.add_metadata_item('workload_id', value)
0226
0227 def get_workload_id(self):
0228 return self.workload_id
0229
0230 @property
0231 def status(self):
0232 st = self.get_metadata_item('status', ProcessingStatus.New)
0233 if type(st) in [int]:
0234 st = ProcessingStatus(st)
0235 return st
0236
0237 @status.setter
0238 def status(self, value):
0239 self.add_metadata_item('status', value.value if value else value)
0240 if self.processing:
0241 self.processing['status'] = value
0242
0243 @property
0244 def substatus(self):
0245 st = self.get_metadata_item('substatus', ProcessingStatus.New)
0246 if type(st) in [int]:
0247 st = ProcessingStatus(st)
0248 return st
0249
0250 @substatus.setter
0251 def substatus(self, value):
0252 self.add_metadata_item('substatus', value.value if value else value)
0253 if self.processing:
0254 self.processing['substatus'] = value
0255
0256 @property
0257 def retries(self):
0258 return self.get_metadata_item('retries', 0)
0259
0260 @retries.setter
0261 def retries(self, value):
0262 self.add_metadata_item('retries', value)
0263
0264 @property
0265 def last_updated_at(self):
0266 last_updated_at = self.get_metadata_item('last_updated_at', None)
0267 if last_updated_at and type(last_updated_at) in [str]:
0268 last_updated_at = str_to_date(last_updated_at)
0269 return last_updated_at
0270
0271 @last_updated_at.setter
0272 def last_updated_at(self, value):
0273 self.add_metadata_item('last_updated_at', value)
0274
0275 @property
0276 def polling_retries(self):
0277 return self.get_metadata_item('polling_retries', 0)
0278
0279 @polling_retries.setter
0280 def polling_retries(self, value):
0281 self.add_metadata_item('polling_retries', value)
0282
0283 @property
0284 def tocancel(self):
0285 return self.get_metadata_item('tocancel', False)
0286
0287 @tocancel.setter
0288 def tocancel(self, value):
0289 old_value = self.get_metadata_item('tocancel', False)
0290 if old_value != value:
0291 self.operation_time = datetime.datetime.utcnow()
0292 self.add_metadata_item('tocancel', value)
0293
0294 @property
0295 def tosuspend(self):
0296 return self.get_metadata_item('tosuspend', False)
0297
0298 @tosuspend.setter
0299 def tosuspend(self, value):
0300 old_value = self.get_metadata_item('tosuspend', False)
0301 if old_value != value:
0302 self.operation_time = datetime.datetime.utcnow()
0303 self.add_metadata_item('tosuspend', value)
0304
0305 @property
0306 def toresume(self):
0307 return self.get_metadata_item('toresume', False)
0308
0309 @toresume.setter
0310 def toresume(self, value):
0311 old_value = self.get_metadata_item('toresume', False)
0312 if old_value != value:
0313 self.operation_time = datetime.datetime.utcnow()
0314 self.add_metadata_item('toresume', value)
0315
0316 @property
0317 def toexpire(self):
0318 return self.get_metadata_item('toexpire', False)
0319
0320 @toexpire.setter
0321 def toexpire(self, value):
0322 old_value = self.get_metadata_item('toexpire', False)
0323 if old_value != value:
0324 self.operation_time = datetime.datetime.utcnow()
0325 self.add_metadata_item('toexpire', value)
0326
0327 @property
0328 def tofinish(self):
0329 return self.get_metadata_item('tofinish', False)
0330
0331 @tofinish.setter
0332 def tofinish(self, value):
0333 old_value = self.get_metadata_item('tofinish', False)
0334 if old_value != value:
0335 self.operation_time = datetime.datetime.utcnow()
0336 self.add_metadata_item('tofinish', value)
0337
0338 @property
0339 def toforcefinish(self):
0340 return self.get_metadata_item('toforcefinish', False)
0341
0342 @toforcefinish.setter
0343 def toforcefinish(self, value):
0344 old_value = self.get_metadata_item('toforcefinish', False)
0345 if old_value != value:
0346 self.operation_time = datetime.datetime.utcnow()
0347 self.add_metadata_item('toforcefinish', value)
0348
0349 @property
0350 def operation_time(self):
0351 opt_time = self.get_metadata_item('operation_time', None)
0352 if opt_time and type(opt_time) in [str]:
0353 opt_time = str_to_date(opt_time)
0354 return opt_time
0355
0356 @operation_time.setter
0357 def operation_time(self, value):
0358 self.add_metadata_item('operation_time', value)
0359
0360 def in_operation_time(self):
0361 if self.operation_time:
0362 time_diff = datetime.datetime.utcnow() - self.operation_time
0363 time_diff = time_diff.total_seconds()
0364 if time_diff < 120:
0365 return True
0366 return False
0367
0368 @property
0369 def submitted_at(self):
0370 opt_time = self.get_metadata_item('submitted_at', None)
0371 if opt_time and type(opt_time) in [str]:
0372 opt_time = str_to_date(opt_time)
0373 return opt_time
0374
0375 @submitted_at.setter
0376 def submitted_at(self, value):
0377 self.add_metadata_item('submitted_at', value)
0378
0379 @property
0380 def errors(self):
0381 return self.get_metadata_item('errors', None)
0382
0383 @errors.setter
0384 def errors(self, value):
0385 self.add_metadata_item('errors', value)
0386
0387 @property
0388 def external_id(self):
0389 return self.get_metadata_item('external_id', None)
0390
0391 @external_id.setter
0392 def external_id(self, value):
0393 self.add_metadata_item('external_id', value)
0394
0395 @property
0396 def old_external_id(self):
0397 return self.get_metadata_item('old_external_id', [])
0398
0399 @old_external_id.setter
0400 def old_external_id(self, value):
0401 self.add_metadata_item('old_external_id', value)
0402
0403 @property
0404 def task_name(self):
0405 return self.get_metadata_item('task_name', None)
0406
0407 @task_name.setter
0408 def task_name(self, value):
0409 self.add_metadata_item('task_name', value)
0410
0411 @property
0412 def processing(self):
0413 return self._processing
0414
0415 @processing.setter
0416 def processing(self, value):
0417 self._processing = value
0418 if self._processing:
0419 self.processing_id = self._processing.get('processing_id', None)
0420 self.workload_id = self._processing.get('workload_id', None)
0421 self.status = self._processing.get('status', None)
0422 self.substatus = self._processing.get('substatus', None)
0423 self.processing_metadata = self._processing.get('processing_metadata', None)
0424 self.submitted_at = self._processing.get('submitted_at', None)
0425 if self.processing_metadata and 'processing' in self.processing_metadata:
0426 proc = self.processing_metadata['processing']
0427 self.work = proc.work
0428 self.external_id = proc.external_id
0429 self.errors = proc.errors
0430 if not self.submitted_at:
0431 self.submitted_at = proc.submitted_at
0432
0433 self.output_data = self._processing.get('output_metadata', None)
0434
0435 def has_new_updates(self):
0436 self.last_updated_at = datetime.datetime.utcnow()
0437
0438
0439 class Work(Base):
0440
0441 def __init__(self, executable=None, arguments=None, parameters=None, setup=None, work_type=None,
0442 work_tag=None, exec_type='local', sandbox=None, request_id=None, work_id=None, work_name=None,
0443 primary_input_collection=None, other_input_collections=None, input_collections=None,
0444 primary_output_collection=None, other_output_collections=None, output_collections=None,
0445 log_collections=None, release_inputs_after_submitting=False, username=None,
0446 agent_attributes=None, is_template=False, loading=False,
0447 logger=None):
0448 """
0449 Init a work/task/transformation.
0450
0451 :param setup: A string to setup the executable enviroment, it can be None.
0452 :param executable: The executable.
0453 :param arguments: The arguments.
0454 :param parameters: A dict with arguments needed to be replaced.
0455 :param work_type: The work type like data carousel, hyperparameteroptimization and so on.
0456 :param exec_type: The exec type like 'local', 'remote'(with remote_package set), 'docker' and so on.
0457 :param sandbox: The sandbox.
0458 :param work_id: The work/task id.
0459 :param primary_input_collection: The primary input collection.
0460 :param other_input_collections: List of the input collections.
0461 :param output_collections: List of the output collections.
0462 # :param workflow: The workflow the current work belongs to.
0463 """
0464 self._collections = {}
0465 self._primary_input_collection = None
0466 self._primary_output_collection = None
0467 self._other_input_collections = []
0468 self._other_output_collections = []
0469
0470 self._processings = {}
0471
0472 super(Work, self).__init__(loading=loading)
0473
0474 self.internal_id = str(uuid.uuid4())[:8]
0475 self.template_work_id = self.internal_id
0476 self.is_template = is_template
0477 self.class_name = self.__class__.__name__.lower()
0478 self.initialized = False
0479 self.sequence_id = 0
0480
0481 self.logger = logger
0482 if self.logger is None:
0483 self.setup_logger()
0484
0485 self.setup = setup
0486 self.executable = executable
0487 self.arguments = arguments
0488 self.parameters = parameters
0489
0490 self.username = username
0491 self.work_type = work_type
0492 self.work_tag = work_tag
0493 self.exec_type = exec_type
0494 self.sandbox = sandbox
0495 self.request_id = request_id
0496 self.work_id = work_id
0497 self.work_name = work_name
0498 if not self.work_name:
0499 self.work_name = self.template_work_id
0500
0501 self.transforming = False
0502 self.workdir = None
0503
0504 self.log_collections = []
0505 if input_collections and (primary_input_collection or other_input_collections):
0506 raise Exception("input_collections and (primary_input_collection, other_input_collections) cannot be used at the same time.")
0507 if output_collections and (primary_output_collection or other_output_collections):
0508 raise Exception("output_collections and (primary_output_collection, other_output_collections) cannot be used at the same time.")
0509
0510 if input_collections and type(input_collections) not in [list, tuple]:
0511 input_collections = [input_collections]
0512 if output_collections and type(output_collections) not in [list, tuple]:
0513 output_collections = [output_collections]
0514
0515 if input_collections:
0516 primary_input_collection = input_collections[0]
0517 if len(input_collections) > 1:
0518 other_input_collections = input_collections[1:]
0519 if output_collections:
0520 primary_output_collection = output_collections[0]
0521 if len(output_collections) > 1:
0522 other_output_collections = output_collections[1:]
0523
0524
0525 self.set_primary_input_collection(primary_input_collection)
0526 self.set_primary_output_collection(primary_output_collection)
0527
0528
0529 if other_input_collections and type(other_input_collections) not in [list, tuple]:
0530 other_input_collections = [other_input_collections]
0531 self.add_other_input_collections(other_input_collections)
0532 if other_output_collections and type(other_output_collections) not in [list, tuple]:
0533 other_output_collections = [other_output_collections]
0534 self.add_other_output_collections(other_output_collections)
0535
0536
0537
0538
0539
0540
0541
0542
0543 if log_collections and type(log_collections) not in [list, tuple]:
0544 log_collections = [log_collections]
0545 self.add_log_collections(log_collections)
0546
0547 self.release_inputs_after_submitting = release_inputs_after_submitting
0548 self.has_new_inputs = True
0549
0550 self.started = False
0551 self.status = WorkStatus.New
0552 self.substatus = WorkStatus.New
0553 self.polling_retries = 0
0554 self.errors = []
0555 self.next_works = []
0556
0557 self.work_name_to_coll_map = []
0558
0559 self.processings = {}
0560 self.active_processings = []
0561 self.cancelled_processings = []
0562 self.suspended_processings = []
0563 self.old_processings = []
0564 self.terminated_msg = ""
0565 self.output_data = {}
0566 self.parameters_for_next_task = None
0567
0568 self.status_statistics = {}
0569
0570 self.agent_attributes = agent_attributes
0571
0572 self.proxy = None
0573 self.original_proxy = None
0574
0575 self.tocancel = False
0576 self.tosuspend = False
0577 self.toresume = False
0578 self.toexpire = False
0579 self.tofinish = False
0580 self.toforcefinish = False
0581
0582 self.last_updated_at = datetime.datetime.utcnow()
0583
0584 self.to_update_processings = {}
0585
0586 self.backup_to_release_inputs = {'0': [], '1': [], '2': []}
0587
0588 self.num_run = 0
0589
0590 self.or_custom_conditions = {}
0591 self.and_custom_conditions = {}
0592
0593 self.sliced_global_parameters = None
0594
0595 self.is_build_work = False
0596
0597 self.func_site_to_cloud = None
0598
0599 self.dispatch_ext_content = False
0600
0601 """
0602 self._running_data_names = []
0603 for name in ['internal_id', 'template_work_id', 'initialized', 'sequence_id', 'parameters', 'work_id', 'transforming', 'workdir',
0604 # 'collections', 'primary_input_collection', 'other_input_collections', 'output_collections', 'log_collections',
0605 'collections',
0606 # 'output_data',
0607 '_has_new_inputs', 'status', 'substatus', 'polling_retries', 'errors', 'next_works',
0608 'processings', 'active_processings', 'cancelled_processings', 'suspended_processings', 'old_processings',
0609 # 'terminated_msg', 'output_data', 'parameters_for_next_task', 'status_statistics',
0610 'tocancel', 'tosuspend', 'toresume']:
0611 self._running_data_names.append(name)
0612 """
0613
0614 def get_logger(self):
0615 if self.logger is None:
0616 self.logger = self.setup_logger()
0617 return self.logger
0618
0619 def get_class_name(self):
0620 return self.__class__.__name__
0621
0622 def get_internal_id(self):
0623 return self.internal_id
0624
0625 def get_template_work_id(self):
0626 return self.template_work_id
0627
0628 def get_sequence_id(self):
0629 return self.sequence_id
0630
0631 def get_site(self):
0632 return None
0633
0634 def get_cloud(self):
0635 return None
0636
0637 def get_queue(self):
0638 return None
0639
0640 def is_data_work(self):
0641 return False
0642
0643 @property
0644 def internal_id(self):
0645 return self.get_metadata_item('internal_id')
0646
0647 @internal_id.setter
0648 def internal_id(self, value):
0649 self.add_metadata_item('internal_id', value)
0650
0651 @property
0652 def parent_internal_id(self):
0653 return self.get_metadata_item('parent_internal_id')
0654
0655 @parent_internal_id.setter
0656 def parent_internal_id(self, value):
0657 self.add_metadata_item('parent_internal_id', value)
0658
0659 @property
0660 def parent_internal_ids(self):
0661 return self.get_metadata_item('parent_internal_ids', None)
0662
0663 @parent_internal_ids.setter
0664 def parent_internal_ids(self, value):
0665 self.add_metadata_item('parent_internal_ids', value)
0666
0667 @property
0668 def template_work_id(self):
0669 return self.get_metadata_item('template_work_id')
0670
0671 @template_work_id.setter
0672 def template_work_id(self, value):
0673 self.add_metadata_item('template_work_id', value)
0674
0675 @property
0676 def workload_id(self):
0677 return self.get_metadata_item('workload_id', None)
0678
0679 @workload_id.setter
0680 def workload_id(self, value):
0681 self.add_metadata_item('workload_id', value)
0682
0683 @property
0684 def external_id(self):
0685 return self.get_metadata_item('external_id', None)
0686
0687 @external_id.setter
0688 def external_id(self, value):
0689 self.add_metadata_item('external_id', value)
0690
0691 def get_workload_id(self):
0692 return self.workload_id
0693
0694 @property
0695 def initialized(self):
0696 return self.get_metadata_item('initialized', False)
0697
0698 @initialized.setter
0699 def initialized(self, value):
0700 self.add_metadata_item('initialized', value)
0701
0702 @property
0703 def sequence_id(self):
0704 return self.get_metadata_item('sequence_id', 0)
0705
0706 @sequence_id.setter
0707 def sequence_id(self, value):
0708 self.add_metadata_item('sequence_id', value)
0709
0710 @property
0711 def num_inputs(self):
0712 num = self.get_metadata_item('num_inputs', None)
0713 return num
0714
0715 @num_inputs.setter
0716 def num_inputs(self, value):
0717 self.add_metadata_item('num_inputs', value)
0718
0719 @property
0720 def has_unmapped_jobs(self):
0721 value = self.get_metadata_item('has_unmapped_jobs', True)
0722 return value
0723
0724 @has_unmapped_jobs.setter
0725 def has_unmapped_jobs(self, value):
0726 self.add_metadata_item('has_unmapped_jobs', value)
0727
0728 @property
0729 def parameters(self):
0730 return self.get_metadata_item('parameters', None)
0731
0732 @parameters.setter
0733 def parameters(self, value):
0734 self.add_metadata_item('parameters', value)
0735
0736 @property
0737 def output_data(self):
0738 return self.get_metadata_item('output_data', {})
0739
0740 @output_data.setter
0741 def output_data(self, value):
0742 self.add_metadata_item('output_data', value)
0743 if value and type(value) in [dict]:
0744 for key in value:
0745 new_key = "user_" + str(key)
0746 setattr(self, new_key, value[key])
0747
0748 @property
0749 def work_id(self):
0750 return self.get_metadata_item('work_id', None)
0751
0752 @work_id.setter
0753 def work_id(self, value):
0754 self.add_metadata_item('work_id', value)
0755
0756 @property
0757 def parent_workload_id(self):
0758 return self.get_metadata_item('parent_workload_id', None)
0759
0760 @parent_workload_id.setter
0761 def parent_workload_id(self, value):
0762 self.add_metadata_item('parent_workload_id', value)
0763
0764 @property
0765 def transforming(self):
0766 return self.get_metadata_item('transforming', False)
0767
0768 @transforming.setter
0769 def transforming(self, value):
0770 self.add_metadata_item('transforming', value)
0771
0772 @property
0773 def submitted(self):
0774 return self.get_metadata_item('submitted', False)
0775
0776 @submitted.setter
0777 def submitted(self, value):
0778 self.add_metadata_item('submitted', value)
0779
0780 @property
0781 def workdir(self):
0782 return self.get_metadata_item('workdir', None)
0783
0784 @workdir.setter
0785 def workdir(self, value):
0786 self.add_metadata_item('workdir', value)
0787
0788 @property
0789 def work_name(self):
0790 return self.get_metadata_item('work_name', 0)
0791
0792 @work_name.setter
0793 def work_name(self, value):
0794 self.add_metadata_item('work_name', value)
0795
0796 @property
0797 def has_new_inputs(self):
0798 return self.get_metadata_item('has_new_inputs', 0)
0799
0800 @has_new_inputs.setter
0801 def has_new_inputs(self, value):
0802 self.add_metadata_item('has_new_inputs', value)
0803
0804 @property
0805 def started(self):
0806 return self.get_metadata_item('started', False)
0807
0808 @started.setter
0809 def started(self, value):
0810 self.add_metadata_item('started', value)
0811
0812 @property
0813 def status(self):
0814 st = self.get_metadata_item('status', WorkStatus.New)
0815 if type(st) in [int]:
0816 st = WorkStatus(st)
0817 return st
0818
0819 @status.setter
0820 def status(self, value):
0821 if not self.transforming:
0822 if value and value in [WorkStatus.Transforming,
0823 WorkStatus.Finished,
0824 WorkStatus.SubFinished,
0825 WorkStatus.Failed,
0826 WorkStatus.Running]:
0827 self.transforming = True
0828 self.add_metadata_item('status', value.value if value else value)
0829
0830 @property
0831 def substatus(self):
0832 st = self.get_metadata_item('substatus', WorkStatus.New)
0833 if type(st) in [int]:
0834 st = WorkStatus(st)
0835 return st
0836
0837 @substatus.setter
0838 def substatus(self, value):
0839 self.add_metadata_item('substatus', value.value if value else value)
0840
0841 @property
0842 def is_build_work(self):
0843 st = self.get_metadata_item('is_build_work', False)
0844 return st
0845
0846 @is_build_work.setter
0847 def is_build_work(self, value):
0848 self.add_metadata_item('is_build_work', value)
0849
0850 def set_build_work(self):
0851 self.is_build_work = True
0852
0853 @property
0854 def signature(self):
0855 st = self.get_metadata_item('signature', None)
0856 return st
0857
0858 @signature.setter
0859 def signature(self, value):
0860 self.add_metadata_item('signature', value)
0861
0862 def sign(self):
0863 self.signature = str(uuid.uuid4())
0864
0865 def get_signature(self):
0866 return self.signature
0867
0868 @property
0869 def polling_retries(self):
0870 return self.get_metadata_item('polling_retries', 0)
0871
0872 @polling_retries.setter
0873 def polling_retries(self, value):
0874 self.add_metadata_item('polling_retries', value)
0875
0876 @property
0877 def errors(self):
0878 return self.get_metadata_item('errors', None)
0879
0880 @errors.setter
0881 def errors(self, value):
0882 self.add_metadata_item('errors', value)
0883
0884 @property
0885 def next_works(self):
0886 return self.get_metadata_item('next_works', [])
0887
0888 @next_works.setter
0889 def next_works(self, value):
0890 self.add_metadata_item('next_works', value)
0891
0892 @property
0893 def collections(self):
0894 return self._collections
0895
0896 @collections.setter
0897 def collections(self, value):
0898 self._collections = value
0899 coll_metadata = {}
0900 if self._collections:
0901 for k in self._collections:
0902 coll = self._collections[k]
0903 if type(coll) in [Collection]:
0904 coll_metadata[k] = {'coll_id': coll.coll_id}
0905 self.add_metadata_item('collections', coll_metadata)
0906
0907 def with_sub_map_id(self):
0908 return False
0909
0910 @property
0911 def processings(self):
0912 return self._processings
0913
0914 @processings.setter
0915 def processings(self, value):
0916 self._processings = value
0917 proc_metadata = {}
0918 if self._processings:
0919 for k in self._processings:
0920 proc = self._processings[k]
0921 if type(proc) in [Processing]:
0922 proc_metadata[k] = {'processing_id': proc.processing_id,
0923 'workload_id': proc.workload_id,
0924 'external_id': proc.external_id}
0925 self.add_metadata_item('processings', proc_metadata)
0926
0927 def generating_new_inputs(self):
0928 return False
0929
0930 def refresh_work(self):
0931 coll_metadata = {}
0932 if self._collections:
0933 for k in self._collections:
0934 coll = self._collections[k]
0935 if type(coll) in [Collection]:
0936 coll_metadata[k] = {'coll_id': coll.coll_id}
0937 self.add_metadata_item('collections', coll_metadata)
0938
0939 proc_metadata = {}
0940 if self._processings:
0941 for k in self._processings:
0942 proc = self._processings[k]
0943 if type(proc) in [Processing]:
0944 proc_metadata[k] = {'processing_id': proc.processing_id,
0945 'workload_id': proc.workload_id,
0946 'external_id': proc.external_id}
0947 self.add_metadata_item('processings', proc_metadata)
0948
0949 def load_work(self):
0950 coll_metadata = self.get_metadata_item('collections', {})
0951 for k in self._collections:
0952 if k in coll_metadata:
0953 coll_id = coll_metadata[k]['coll_id']
0954 self._collections[k].coll_id = coll_id
0955
0956 proc_metadata = self.get_metadata_item('processings', {})
0957 for k in self._processings:
0958 if k in proc_metadata:
0959 proc_id = proc_metadata[k]['processing_id']
0960 self._processings[k].processing_id = proc_id
0961 if 'workload_id' in proc_metadata[k] and proc_metadata[k]['workload_id']:
0962 self._processings[k].workload_id = proc_metadata[k]['workload_id']
0963 self.workload_id = proc_metadata[k]['workload_id']
0964 if 'external_id' in proc_metadata[k] and proc_metadata[k]['external_id']:
0965 self._processings[k].external_id = proc_metadata[k]['external_id']
0966 self.external_id = proc_metadata[k]['external_id']
0967 for k in proc_metadata:
0968 if k not in self._processings:
0969 self._processings[k] = Processing(processing_metadata={})
0970 proc_id = proc_metadata[k]['processing_id']
0971 self._processings[k].processing_id = proc_id
0972 self._processings[k].internal_id = k
0973 if 'workload_id' in proc_metadata[k] and proc_metadata[k]['workload_id']:
0974 self._processings[k].workload_id = proc_metadata[k]['workload_id']
0975 self.workload_id = proc_metadata[k]['workload_id']
0976 if 'external_id' in proc_metadata[k] and proc_metadata[k]['external_id']:
0977 self._processings[k].external_id = proc_metadata[k]['external_id']
0978 self.external_id = proc_metadata[k]['external_id']
0979
0980 def load_metadata(self):
0981 self.load_work()
0982
0983 @property
0984 def active_processings(self):
0985 return self.get_metadata_item('active_processings', [])
0986
0987 @active_processings.setter
0988 def active_processings(self, value):
0989 self.add_metadata_item('active_processings', value)
0990
0991 @property
0992 def cancelled_processings(self):
0993 return self.get_metadata_item('cancelled_processings', [])
0994
0995 @cancelled_processings.setter
0996 def cancelled_processings(self, value):
0997 self.add_metadata_item('cancelled_processings', value)
0998
0999 @property
1000 def suspended_processings(self):
1001 return self.get_metadata_item('suspended_processings', [])
1002
1003 @suspended_processings.setter
1004 def suspended_processings(self, value):
1005 self.add_metadata_item('suspended_processings', value)
1006
1007 @property
1008 def old_processings(self):
1009 return self.get_metadata_item('old_processings', [])
1010
1011 @old_processings.setter
1012 def old_processings(self, value):
1013 self.add_metadata_item('old_processings', value)
1014
1015 @property
1016 def tocancel(self):
1017 return self.get_metadata_item('tocancel', False)
1018
1019 @tocancel.setter
1020 def tocancel(self, value):
1021 self.add_metadata_item('tocancel', value)
1022
1023 @property
1024 def tosuspend(self):
1025 return self.get_metadata_item('tosuspend', False)
1026
1027 @tosuspend.setter
1028 def tosuspend(self, value):
1029 self.add_metadata_item('tosuspend', value)
1030
1031 @property
1032 def toresume(self):
1033 return self.get_metadata_item('toresume', False)
1034
1035 @toresume.setter
1036 def toresume(self, value):
1037 self.add_metadata_item('toresume', value)
1038
1039 @property
1040 def toexpire(self):
1041 return self.get_metadata_item('toexpire', False)
1042
1043 @toexpire.setter
1044 def toexpire(self, value):
1045 self.add_metadata_item('toexpire', value)
1046
1047 @property
1048 def tofinish(self):
1049 return self.get_metadata_item('tofinish', False)
1050
1051 @tofinish.setter
1052 def tofinish(self, value):
1053 self.add_metadata_item('tofinish', value)
1054
1055 @property
1056 def toforcefinish(self):
1057 return self.get_metadata_item('toforcefinish', False)
1058
1059 @toforcefinish.setter
1060 def toforcefinish(self, value):
1061 self.add_metadata_item('toforcefinish', value)
1062
1063 @property
1064 def last_updated_at(self):
1065 last_updated_at = self.get_metadata_item('last_updated_at', None)
1066 if last_updated_at and type(last_updated_at) in [str]:
1067 last_updated_at = str_to_date(last_updated_at)
1068 return last_updated_at
1069
1070 @last_updated_at.setter
1071 def last_updated_at(self, value):
1072 self.add_metadata_item('last_updated_at', value)
1073
1074 def has_new_updates(self):
1075 self.last_updated_at = datetime.datetime.utcnow()
1076
1077 @property
1078 def to_update_processings(self):
1079 return self.get_metadata_item('to_update_processings', {})
1080
1081 @to_update_processings.setter
1082 def to_update_processings(self, value):
1083 self.add_metadata_item('to_update_processings', value)
1084
1085 @property
1086 def num_run(self):
1087 return self.get_metadata_item('num_run', 0)
1088
1089 @num_run.setter
1090 def num_run(self, value):
1091 self.add_metadata_item('num_run', value)
1092 if value is not None:
1093
1094 for coll in self.output_collections:
1095 if type(coll) in [Collection]:
1096 if "___idds___" not in coll.name:
1097 coll.name = coll.name + "." + str(value)
1098
1099 def get_loop_index(self):
1100 return self.num_run
1101
1102 @property
1103 def primary_input_collection(self):
1104 if self._primary_input_collection:
1105 return self.collections[self._primary_input_collection]
1106 return None
1107
1108 @primary_input_collection.setter
1109 def primary_input_collection(self, value):
1110 self.set_primary_input_collection(value)
1111
1112 @property
1113 def primary_output_collection(self):
1114 if self._primary_output_collection:
1115 return self.collections[self._primary_output_collection]
1116 return None
1117
1118 @primary_output_collection.setter
1119 def primary_output_collection(self, value):
1120 self.set_primary_output_collection(value)
1121
1122 @property
1123 def input_collections(self):
1124 if self._primary_input_collection:
1125 keys = [self._primary_input_collection] + self._other_input_collections
1126 else:
1127 keys = self._other_input_collections
1128 return [self.collections[k] for k in keys]
1129
1130 @input_collections.setter
1131 def input_collections(self, value):
1132 if value and type(value) not in [list, tuple]:
1133 value = [value]
1134
1135 if value:
1136 primary_collection = value[0]
1137 other_collections = []
1138 if len(value) > 1:
1139 other_collections = value[1:]
1140
1141 self.set_primary_input_collection(primary_collection)
1142
1143 if other_collections and type(other_collections) not in [list, tuple]:
1144 other_collections = [other_collections]
1145 self.add_other_input_collections(other_collections)
1146
1147 @property
1148 def output_collections(self):
1149 if self._primary_output_collection:
1150 keys = [self._primary_output_collection] + self._other_output_collections
1151 else:
1152 keys = self._other_output_collections
1153 return [self.collections[k] for k in keys]
1154
1155 @output_collections.setter
1156 def output_collections(self, value):
1157 if value and type(value) not in [list, tuple]:
1158 value = [value]
1159
1160 if value:
1161 primary_collection = value[0]
1162 other_collections = []
1163 if len(value) > 1:
1164 other_collections = value[1:]
1165
1166 self.set_primary_output_collection(primary_collection)
1167
1168 if other_collections and type(other_collections) not in [list, tuple]:
1169 other_collections = [other_collections]
1170 self.add_other_output_collections(other_collections)
1171
1172 def set_work_name(self, work_name):
1173 self.work_name = work_name
1174
1175 def get_work_name(self):
1176 return self.work_name
1177
1178 def set_func_site_to_cloud(self, func):
1179 self.func_site_to_cloud = func
1180
1181 def get_func_site_to_cloud(self):
1182 return self.func_site_to_cloud
1183
1184 def get_is_template(self):
1185 self.is_template
1186
1187 def sync_global_parameters(self, global_parameters, sliced_global_parameters=None):
1188 if sliced_global_parameters:
1189 self.sliced_global_parameters = sliced_global_parameters
1190
1191 if global_parameters:
1192 for key in global_parameters:
1193 sliced_index = None
1194 sliced_name = None
1195 if self.sliced_global_parameters and key in self.sliced_global_parameters:
1196 sliced_index = self.sliced_global_parameters[key]['index']
1197 sliced_name = self.sliced_global_parameters[key]['name']
1198 if type(global_parameters[key]) in [list, tuple] and sliced_index < len(global_parameters[key]):
1199 pass
1200 else:
1201 sliced_index = None
1202 if not sliced_name:
1203 sliced_name = key
1204
1205 if sliced_index is None:
1206 setattr(self, sliced_name, global_parameters[key])
1207 else:
1208 setattr(self, sliced_name, global_parameters[key][sliced_index])
1209
1210 def get_global_parameter_from_output_data(self, key):
1211 self.logger.debug("get_global_parameter_from_output_data, key: %s, output_data: %s" % (key, str(self.output_data)))
1212 gp_output_data = {}
1213 if self.output_data and type(self.output_data) in [dict]:
1214 for key in self.output_data:
1215 new_key = "user_" + str(key)
1216 gp_output_data[new_key] = self.output_data[key]
1217 if key in gp_output_data:
1218 return True, gp_output_data[key]
1219 else:
1220 return False, None
1221
1222 def renew_parameters_from_attributes(self):
1223 pass
1224
1225 def add_custom_condition(self, key, value, op='and'):
1226
1227 if op and op == 'or':
1228 op = 'or'
1229 else:
1230 op = 'and'
1231 if op == 'and':
1232 self.and_custom_conditions[key] = value
1233 else:
1234 self.or_custom_conditions[key] = value
1235
1236 def get_custom_condition_status_value_bool(self, key):
1237 user_key = "user_" + key
1238 if hasattr(self, user_key):
1239 key = user_key
1240
1241 if hasattr(self, key) and getattr(self, key):
1242 value = getattr(self, key)
1243 if type(value) in [str]:
1244 value = value.lower()
1245 if value == 'true':
1246 return True
1247 else:
1248 return False
1249 elif type(value) in [bool]:
1250 return value
1251 elif type(value) in [int]:
1252 if value > 0:
1253 return True
1254 else:
1255 return False
1256 else:
1257 return value
1258 else:
1259 return False
1260
1261 def get_custom_condition_status_value(self, key):
1262 if self.output_data and key in self.output_data:
1263 return self.output_data[key]
1264
1265 user_key = "user_" + key
1266 if hasattr(self, user_key):
1267 key = user_key
1268
1269 if hasattr(self, key) and getattr(self, key):
1270 return getattr(self, key)
1271 else:
1272 return None
1273
1274 def get_custom_condition_status_real(self):
1275 if self.or_custom_conditions:
1276 for key in self.or_custom_conditions:
1277 value = self.get_custom_condition_status_value(key)
1278 if value == self.or_custom_conditions[key]:
1279 return True
1280
1281 if self.and_custom_conditions:
1282 for key in self.and_custom_conditions:
1283 value = self.get_custom_condition_status_value(key)
1284 if not (value == self.and_custom_conditions[key]):
1285 return False
1286 return True
1287
1288 return False
1289
1290 def get_custom_condition_status(self):
1291
1292
1293
1294
1295 status = self.get_custom_condition_status_real()
1296 self.logger.debug("get_custom_condition_status, status: %s" % (status))
1297 return status
1298
1299 def get_not_custom_condition_status(self):
1300 return not self.get_custom_condition_status()
1301
1302 def setup_logger(self):
1303 """
1304 Setup logger
1305 """
1306 self.logger = logging.getLogger(self.get_class_name())
1307 return self.logger
1308
1309 def add_errors(self, error):
1310 self.errors.append(error)
1311
1312 def get_errors(self):
1313 return self.errors
1314
1315 def set_work_id(self, work_id, transforming=True):
1316 """
1317 *** Function called by Marshaller and clerk agent.
1318 *** It's the transform_id set by core_workprogresses
1319 """
1320 self.work_id = work_id
1321 self.transforming = transforming
1322
1323 def get_work_id(self):
1324 """
1325 *** Function called by Marshaller and clerk agent.
1326 """
1327 return self.work_id
1328
1329 def set_request_id(self, request_id):
1330 """
1331 *** Function called by Marshaller and clerk agent.
1332 *** It's the transform_id set by core_workprogresses
1333 """
1334 self.request_id = request_id
1335
1336 def get_request_id(self):
1337 """
1338 *** Function called by Marshaller and clerk agent.
1339 """
1340 return self.request_id
1341
1342
1343
1344
1345 def clean_work(self):
1346 self.processings = {}
1347 self.active_processings = []
1348 self.cancelled_processings = []
1349 self.suspended_processings = []
1350 self.old_processings = []
1351 self.terminated_msg = ""
1352 self.output_data = {}
1353 self.parameters_for_next_task = None
1354 self.last_updated_at = datetime.datetime.utcnow()
1355
1356 def set_agent_attributes(self, attrs, req_attributes=None):
1357 if self.agent_attributes is None:
1358 self.agent_attributes = {}
1359 if attrs and self.class_name in attrs:
1360 for key, value in attrs[self.class_name].items():
1361 self.agent_attributes[key] = value
1362 self.logger.info("agent_attributes: %s" % self.agent_attributes)
1363
1364 def get_agent_attributes(self):
1365 return self.agent_attributes
1366
1367 def set_workdir(self, workdir):
1368 self.workdir = workdir
1369
1370 def get_workdir(self):
1371 return self.workdir
1372
1373 def set_status(self, status):
1374 """
1375 *** Function called by Marshaller agent.
1376 """
1377 assert (isinstance(status, WorkStatus))
1378 self.status = status
1379
1380
1381
1382 def get_status(self):
1383 return self.status
1384
1385 def set_terminated_msg(self, msg):
1386 """
1387 *** Function called by Marshaller agent.
1388 """
1389 self.terminated_msg = msg
1390 self.add_errors(msg)
1391
1392 def get_terminated_msg(self):
1393 return self.terminated_msg
1394
1395 def set_output_data(self, data):
1396 self.output_data = data
1397
1398 def get_output_data(self):
1399 return self.output_data
1400
1401 def set_parameters_for_next_task(self, params):
1402 self.parameters_for_next_task = params
1403
1404 def get_parameters_for_next_task(self):
1405 return self.parameters_for_next_task
1406
1407 def __eq__(self, obj):
1408 if self.work_id == obj.work_id:
1409 return True
1410 return False
1411
1412 def __hash__(self):
1413 return self.work_id
1414
1415 def __str__(self):
1416 return str(self.to_dict())
1417
1418 def get_work_type(self):
1419 """
1420 *** Function called by Marshaller agent.
1421 """
1422 return self.work_type
1423
1424 def get_work_tag(self):
1425 """
1426 *** Function called by Marshaller agent.
1427 """
1428 return self.work_tag
1429
1430 def set_parameters(self, parameters):
1431 self.parameters = parameters
1432 for p in self.parameters:
1433 if self.parameters[p] is not None and hasattr(self, p):
1434
1435
1436 setattr(self, p, self.parameters[p])
1437
1438 def get_parameters(self):
1439 return self.parameters
1440
1441 def set_arguments(self, arguments):
1442 self.arguments = arguments
1443
1444 def get_arguments(self):
1445 return self.arguments
1446
1447 def convert_data_to_additional_data_storage(self, storage, storage_name=None, replace_storage_name=False):
1448 pass
1449
1450 def get_ancestry_works(self):
1451 return []
1452
1453 def get_processing_job_ids(self, processing, log_prefix=''):
1454 return []
1455
1456 def get_processing_job_name_to_ids(self, processing, job_ids, log_prefix=''):
1457 return {}
1458
1459 def has_to_release_inputs(self):
1460 if self.backup_to_release_inputs['0'] or self.backup_to_release_inputs['1'] or self.backup_to_release_inputs['2']:
1461 return True
1462 return False
1463
1464 def add_backup_to_release_inputs(self, to_release_inputs):
1465 if to_release_inputs:
1466 self.backup_to_release_inputs['0'] = self.backup_to_release_inputs['0'] + to_release_inputs
1467
1468 def get_backup_to_release_inputs(self):
1469 to_release_inputs = self.backup_to_release_inputs['0'] + self.backup_to_release_inputs['1'] + self.backup_to_release_inputs['2']
1470 self.backup_to_release_inputs['2'] = self.backup_to_release_inputs['1']
1471 self.backup_to_release_inputs['1'] = self.backup_to_release_inputs['0']
1472 self.backup_to_release_inputs['0'] = []
1473 return to_release_inputs
1474
1475 def is_to_expire(self, expired_at=None, pending_time=None, request_id=None):
1476 if expired_at:
1477 if type(expired_at) in [str]:
1478 expired_at = str_to_date(expired_at)
1479 if expired_at < datetime.datetime.utcnow():
1480 self.logger.info("Request(%s) expired_at(%s) is smaller than utc now(%s), expiring" % (request_id,
1481 expired_at,
1482 datetime.datetime.utcnow()))
1483 return True
1484
1485 if pending_time:
1486 act_pending_time = float(pending_time)
1487 act_pending_seconds = int(86400 * act_pending_time)
1488 if self.last_updated_at + datetime.timedelta(seconds=act_pending_seconds) < datetime.datetime.utcnow():
1489 log_str = "Request(%s) last updated at(%s) + pending seconds(%s)" % (request_id,
1490 self.last_updated_at,
1491 act_pending_seconds)
1492 log_str += " is smaller than utc now(%s), expiring" % (datetime.datetime.utcnow())
1493 self.logger.info(log_str)
1494 return True
1495 return False
1496
1497 def is_starting(self):
1498 return self.transforming
1499
1500 def is_started(self):
1501 return self.started or self.submitted
1502
1503 def is_running(self):
1504 if self.status in [WorkStatus.Running, WorkStatus.Transforming]:
1505 return True
1506 return False
1507
1508 def is_terminated(self, synchronize=True):
1509 """
1510 *** Function called by Transformer agent.
1511 """
1512 if (self.status in [WorkStatus.Finished, WorkStatus.SubFinished, WorkStatus.Failed, WorkStatus.Cancelled, WorkStatus.Suspended, WorkStatus.Expired]
1513 and self.substatus not in [WorkStatus.ToCancel, WorkStatus.ToSuspend, WorkStatus.ToResume]):
1514 return True
1515 return False
1516
1517 def is_finished(self, synchronize=True):
1518 """
1519 *** Function called by Transformer agent.
1520 """
1521 if self.status in [WorkStatus.Finished] and self.substatus not in [WorkStatus.ToCancel, WorkStatus.ToSuspend, WorkStatus.ToResume]:
1522 return True
1523 return False
1524
1525 def is_subfinished(self, synchronize=True):
1526 """
1527 *** Function called by Transformer agent.
1528 """
1529 if self.status in [WorkStatus.SubFinished] and self.substatus not in [WorkStatus.ToCancel, WorkStatus.ToSuspend, WorkStatus.ToResume]:
1530 return True
1531 return False
1532
1533 def is_processed(self, synchronize=True):
1534 """
1535 *** Function called by Transformer agent.
1536 """
1537 if self.status in [WorkStatus.Finished, WorkStatus.SubFinished] and self.substatus not in [WorkStatus.ToCancel, WorkStatus.ToSuspend, WorkStatus.ToResume]:
1538 return True
1539 return False
1540
1541 def is_failed(self, synchronize=True):
1542 """
1543 *** Function called by Transformer agent.
1544 """
1545 if self.status in [WorkStatus.Failed] and self.substatus not in [WorkStatus.ToCancel, WorkStatus.ToSuspend, WorkStatus.ToResume]:
1546 return True
1547 return False
1548
1549 def is_expired(self, synchronize=True):
1550 """
1551 *** Function called by Transformer agent.
1552 """
1553 if self.status in [WorkStatus.Expired] and self.substatus not in [WorkStatus.ToCancel, WorkStatus.ToSuspend, WorkStatus.ToResume]:
1554 return True
1555 return False
1556
1557 def is_cancelled(self, synchronize=True):
1558 """
1559 *** Function called by Transformer agent.
1560 """
1561 if self.status in [WorkStatus.Cancelled] and self.substatus not in [WorkStatus.ToCancel, WorkStatus.ToSuspend, WorkStatus.ToResume]:
1562 return True
1563 return False
1564
1565 def is_suspended(self, synchronize=True):
1566 """
1567 *** Function called by Transformer agent.
1568 """
1569 if self.status in [WorkStatus.Suspended] and self.substatus not in [WorkStatus.ToCancel, WorkStatus.ToSuspend, WorkStatus.ToResume]:
1570 return True
1571 return False
1572
1573 def add_next_work(self, work):
1574 next_works = self.next_works
1575 next_works.append(work)
1576 self.next_works = next_works
1577
1578 def parse_arguments(self):
1579 try:
1580 arguments = self.get_arguments()
1581 parameters = self.get_parameters()
1582 arguments = arguments.format(**parameters)
1583 return arguments
1584 except Exception as ex:
1585 self.add_errors(str(ex))
1586
1587 def set_initialized(self):
1588 self.initialized = True
1589
1590 def unset_initialized(self):
1591 self.initialized = False
1592
1593 def is_initialized(self):
1594 return self.initialized
1595
1596 def initialize_work(self):
1597 if self.parameters and self.arguments:
1598
1599
1600
1601 pass
1602 self.sign()
1603 if not self.is_initialized():
1604 self.set_initialized()
1605
1606 def copy(self):
1607 new_work = copy.deepcopy(self)
1608 return new_work
1609
1610 def __deepcopy__(self, memo):
1611 logger = self.logger
1612 self.logger = None
1613
1614 cls = self.__class__
1615 result = cls.__new__(cls)
1616
1617 memo[id(self)] = result
1618
1619
1620 for k, v in self.__dict__.items():
1621 setattr(result, k, copy.deepcopy(v, memo))
1622
1623 self.logger = logger
1624 result.logger = logger
1625 return result
1626
1627 def depend_on(self, work):
1628 return False
1629
1630 def generate_work_from_template(self):
1631 logger = self.logger
1632 self.logger = None
1633 new_work = copy.deepcopy(self)
1634 self.logger = logger
1635 new_work.logger = logger
1636
1637 if self.is_template:
1638 new_work.internal_id = str(uuid.uuid4())[:8]
1639 return new_work
1640
1641 def get_template_id(self):
1642 return self.template_work_id
1643
1644 def resume_work(self):
1645 if self.status in [WorkStatus.New, WorkStatus.Ready]:
1646 pass
1647 else:
1648 self.status = WorkStatus.Transforming
1649 self.polling_retries = 0
1650
1651 def add_collection_to_collections(self, coll):
1652 assert (isinstance(coll, dict))
1653 assert ('scope' in coll)
1654 assert ('name' in coll)
1655
1656 coll_metadata = copy.copy(coll)
1657 del coll_metadata['scope']
1658 del coll_metadata['name']
1659 if 'type' in coll_metadata:
1660 coll_type = coll_metadata['type']
1661 del coll_metadata['type']
1662 else:
1663 coll_type = CollectionType.Dataset
1664
1665 collection = Collection(scope=coll['scope'], name=coll['name'], coll_type=coll_type, coll_metadata=coll_metadata)
1666 self.collections[collection.internal_id] = collection
1667 return collection
1668
1669 def set_primary_input_collection(self, coll):
1670 if coll:
1671 collection = self.add_collection_to_collections(coll)
1672 self._primary_input_collection = collection.internal_id
1673
1674 def get_primary_input_collection(self):
1675 """
1676 *** Function called by Marshaller agent.
1677 """
1678 if self._primary_input_collection:
1679 return self.collections[self._primary_input_collection]
1680 return None
1681
1682 def set_primary_output_collection(self, coll):
1683 if coll:
1684 collection = self.add_collection_to_collections(coll)
1685 self._primary_output_collection = collection.internal_id
1686
1687 def get_primary_output_collection(self):
1688 """
1689 *** Function called by Marshaller agent.
1690 """
1691 if self._primary_output_collection:
1692 return self.collections[self._primary_output_collection]
1693 return None
1694
1695 def add_other_input_collections(self, colls):
1696 if not colls:
1697 return
1698 if type(colls) not in [list, tuple]:
1699 colls = [colls]
1700
1701 for coll in colls:
1702 collection = self.add_collection_to_collections(coll)
1703 self._other_input_collections.append(collection.internal_id)
1704
1705 def get_other_input_collections(self):
1706 return [self.collections[k] for k in self._other_input_collections]
1707
1708 def add_other_output_collections(self, colls):
1709 if not colls:
1710 return
1711 if type(colls) not in [list, tuple]:
1712 colls = [colls]
1713
1714 for coll in colls:
1715 collection = self.add_collection_to_collections(coll)
1716 self._other_output_collections.append(collection.internal_id)
1717
1718 def get_other_output_collections(self):
1719 return [self.collections[k] for k in self._other_output_collections]
1720
1721 def get_input_collections(self, poll_externel=False):
1722 """
1723 *** Function called by Transformer agent.
1724 """
1725 if self._primary_input_collection:
1726 keys = [self._primary_input_collection] + self._other_input_collections
1727 else:
1728 keys = self._other_input_collections
1729 return [self.collections[k] for k in keys]
1730
1731 def get_output_collections(self):
1732 """
1733 *** Function called by Transformer agent.
1734 """
1735 if self._primary_output_collection:
1736 keys = [self._primary_output_collection] + self._other_output_collections
1737 else:
1738 keys = self._other_output_collections
1739 return [self.collections[k] for k in keys]
1740
1741 def get_collections(self):
1742 return [self.collections[k] for k in self.collections.keys()]
1743
1744 def is_input_collections_closed(self):
1745 colls = self.get_input_collections()
1746 for coll in colls:
1747 if coll.status not in [CollectionStatus.Closed]:
1748 return False
1749 return True
1750
1751 def is_internal_collection(self, coll):
1752 if (coll.coll_metadata and 'source' in coll.coll_metadata and coll.coll_metadata['source']
1753 and type(coll.coll_metadata['source']) == str and coll.coll_metadata['source'].lower() == 'idds'):
1754 return True
1755 return False
1756
1757 def get_internal_collections(self, coll):
1758 if coll.coll_metadata and 'request_id' in coll.coll_metadata:
1759
1760
1761
1762
1763
1764 return []
1765 return []
1766
1767 def poll_external_collection(self, coll):
1768 return coll
1769
1770 def poll_internal_collection(self, coll):
1771 try:
1772 if coll.status in [CollectionStatus.Closed]:
1773 return coll
1774 else:
1775 if coll.coll_metadata is None:
1776 coll.coll_metadata = {}
1777 coll.coll_metadata['bytes'] = 0
1778 coll.coll_metadata['availability'] = 0
1779 coll.coll_metadata['events'] = 0
1780 coll.coll_metadata['is_open'] = True
1781 coll.coll_metadata['run_number'] = 1
1782 coll.coll_metadata['did_type'] = 'DATASET'
1783 coll.coll_metadata['list_all_files'] = False
1784 coll.coll_metadata['interal_colls'] = []
1785
1786 is_open = False
1787 internal_colls = self.get_internal_collections(coll)
1788 for i_coll in internal_colls:
1789 if i_coll.status not in [CollectionStatus.Closed]:
1790 is_open = True
1791 coll.coll_metadata['bytes'] += i_coll['bytes']
1792
1793 if not is_open:
1794 coll_status = CollectionStatus.Closed
1795 else:
1796 coll_status = CollectionStatus.Open
1797 coll.status = coll_status
1798 if len(internal_colls) > 1:
1799 coll.coll_metadata['coll_type'] = CollectionType.Container
1800 else:
1801 coll.coll_metadata['coll_type'] = CollectionType.Dataset
1802
1803 return coll
1804 except Exception as ex:
1805 self.logger.error(ex)
1806 self.logger.error(traceback.format_exc())
1807 raise exceptions.IDDSException('%s: %s' % (str(ex), traceback.format_exc()))
1808
1809 def get_internal_input_contents(self, coll):
1810 """
1811 Get all input contents from iDDS collections.
1812 """
1813 coll = self.collections[self._primary_input_collection]
1814 internal_colls = self.get_internal_collection(coll)
1815 internal_coll_ids = [coll.coll_id for coll in internal_colls]
1816 if internal_coll_ids:
1817
1818 contents = []
1819 else:
1820 contents = []
1821 return contents
1822
1823 def get_input_contents(self):
1824 """
1825 Get all input contents from DDM.
1826 """
1827 pass
1828
1829 def add_output_collections(self, colls):
1830 """
1831 """
1832 if not colls:
1833 return
1834 if type(colls) not in [list, tuple]:
1835 colls = [colls]
1836
1837 value = colls
1838 if value:
1839 primary_collection = value[0]
1840 other_collections = []
1841 if len(value) > 1:
1842 other_collections = value[1:]
1843
1844 self.set_primary_output_collection(primary_collection)
1845
1846 if other_collections and type(other_collections) not in [list, tuple]:
1847 other_collections = [other_collections]
1848 self.add_other_output_collections(other_collections)
1849
1850 def get_output_contents(self):
1851 pass
1852
1853 def add_log_collections(self, colls):
1854 if not colls:
1855 return
1856 if type(colls) not in [list, tuple]:
1857 colls = [colls]
1858
1859 for coll in colls:
1860 collection = self.add_collection_to_collections(coll)
1861 self.log_collections.append(collection.internal_id)
1862
1863 def get_log_collections(self):
1864 return [self.collections[k] for k in self.log_collections]
1865
1866 def set_has_new_inputs(self, yes=True):
1867 self.has_new_inputs = yes
1868
1869 def has_dependency(self):
1870 return False
1871
1872 def get_parent_work_names(self):
1873 return []
1874
1875 def get_parent_workload_ids(self):
1876 return []
1877
1878 def get_new_input_output_maps(self, mapped_input_output_maps={}):
1879 """
1880 *** Function called by Transformer agent.
1881 New inputs which are not yet mapped to outputs.
1882
1883 :param mapped_input_output_maps: Inputs that are already mapped.
1884 """
1885 inputs = self.get_input_contents()
1886
1887
1888 mapped_inputs = []
1889 for map_id in mapped_input_output_maps:
1890 map_id_inputs = mapped_input_output_maps[map_id]
1891 for ip in map_id_inputs:
1892 if ip['coll_id'] == self.primary_input_collection['coll_id']:
1893 mapped_inputs.append(ip['scope'] + ':' + ip['name'])
1894
1895 new_inputs = []
1896 for ip in inputs:
1897 if ip in mapped_inputs:
1898 pass
1899 else:
1900 new_inputs.append(ip)
1901
1902 new_input_output_maps = {}
1903 mapped_keys = mapped_input_output_maps.keys()
1904 if mapped_keys:
1905 next_key = max(mapped_keys) + 1
1906 else:
1907 next_key = 1
1908 for ip in new_inputs:
1909 self.num_mapped_inputs += 1
1910 out_ip = copy.deepcopy(ip)
1911 out_ip['coll_id'] = self.collections[self._primary_output_collection]['coll_id']
1912 new_input_output_maps[next_key] = {'inputs': [ip],
1913 'outputs': [out_ip],
1914 'inputs_dependency': [],
1915 'logs': []}
1916 next_key += 1
1917
1918 return new_input_output_maps
1919
1920 def set_collection_id(self, collection, coll_id):
1921
1922
1923 self.collections[collection.internal_id]['coll_id'] = coll_id
1924
1925 def should_release_inputs(self, processing=None, poll_operation_time_period=120):
1926 if self.release_inputs_after_submitting:
1927 if not poll_operation_time_period:
1928 poll_operation_time_period = 120
1929
1930 processing_model = processing.processing
1931 if (processing_model and processing_model['submitted_at']
1932 and processing_model['submitted_at'] + datetime.timedelta(seconds=int(poll_operation_time_period))
1933 < datetime.datetime.utcnow()):
1934
1935 if (processing and processing.status
1936 and processing.status not in [ProcessingStatus.New, ProcessingStatus.New.value]):
1937
1938
1939 return True
1940 return False
1941 return True
1942
1943 def use_dependency_to_release_jobs(self):
1944 """
1945 *** Function called by Transformer agent.
1946 """
1947 return False
1948
1949 def require_ext_contents(self):
1950 return False
1951
1952 def has_external_content_id(self):
1953 return False
1954
1955 def set_work_name_to_coll_map(self, work_name_to_coll_map):
1956 self.work_name_to_coll_map = work_name_to_coll_map
1957
1958 def get_work_name_to_coll_map(self):
1959 return self.work_name_to_coll_map
1960
1961 def add_processing_to_processings(self, processing):
1962
1963
1964
1965
1966
1967
1968
1969 self.processings[processing.internal_id] = processing
1970
1971 def get_processing_ids(self):
1972 ids = []
1973 for p_id in self.active_processings:
1974 p = self.processings[p_id]
1975 if p.processing_id:
1976 ids.append(p.processing_id)
1977 return ids
1978
1979
1980
1981
1982 def set_processing_id(self, processing, processing_id):
1983 """
1984 *** Function called by Transformer agent.
1985 """
1986 self.processings[processing.internal_id].processing_id = processing_id
1987
1988 def set_processing_status(self, processing, status, substatus):
1989 """
1990 *** Function called by Transformer agent.
1991 """
1992 self.processings[processing.internal_id].status = status
1993 self.processings[processing.internal_id].substatus = substatus
1994
1995
1996
1997
1998
1999 def set_processing_output_metadata(self, processing, output_metadata):
2000 """
2001 *** Function called by Transformer agent.
2002 """
2003 processing = self.processings[processing.internal_id]
2004
2005 self.set_output_data(output_metadata)
2006
2007 def is_processing_substatus_new_operationing(self, processing):
2008 if processing.substatus in [ProcessingStatus.ToCancel,
2009 ProcessingStatus.ToSuspend,
2010 ProcessingStatus.ToResume,
2011 ProcessingStatus.ToFinish,
2012 ProcessingStatus.ToForceFinish]:
2013 return True
2014 return False
2015
2016 def is_processing_terminated(self, processing):
2017 self.logger.debug("is_processing_terminated: status: %s, substatus: %s" % (processing.status, processing.substatus))
2018 if self.is_processing_substatus_new_operationing(processing):
2019 return False
2020
2021 if processing.status not in [ProcessingStatus.New,
2022 ProcessingStatus.Submitting,
2023 ProcessingStatus.Submitted,
2024 ProcessingStatus.Running,
2025 ProcessingStatus.ToCancel,
2026 ProcessingStatus.Cancelling,
2027 ProcessingStatus.ToSuspend,
2028 ProcessingStatus.Suspending,
2029 ProcessingStatus.ToResume,
2030 ProcessingStatus.Resuming,
2031 ProcessingStatus.ToFinish,
2032 ProcessingStatus.ToForceFinish]:
2033 return True
2034 return False
2035
2036 def reap_processing(self, processing):
2037 if self.is_processing_terminated(processing):
2038 self.active_processings.remove(processing.internal_id)
2039 else:
2040 self.logger.error("Cannot reap an unterminated processing: %s" % processing)
2041
2042 def is_processings_started(self):
2043 """
2044 *** Function called by Transformer agent.
2045 """
2046
2047 for p_id in self.processings:
2048 p = self.processings[p_id]
2049 if p.submitted_at:
2050 return True
2051 return False
2052
2053 def is_processings_running(self):
2054 """
2055 *** Function called by Transformer agent.
2056 """
2057 for p_id in self.active_processings:
2058 p = self.processings[p_id]
2059 if p.status in [ProcessingStatus.Running]:
2060 return True
2061 return False
2062
2063 def is_processings_terminated(self):
2064 """
2065 *** Function called by Transformer agent.
2066 """
2067 for p_id in self.active_processings:
2068 p = self.processings[p_id]
2069 if self.is_processing_terminated(p):
2070 pass
2071 else:
2072 return False
2073 return True
2074
2075 def is_processings_finished(self):
2076 """
2077 *** Function called by Transformer agent.
2078 """
2079 for p_id in self.active_processings:
2080 p = self.processings[p_id]
2081 if not self.is_processing_terminated(p) or p.status not in [ProcessingStatus.Finished]:
2082 return False
2083 return True
2084
2085 def is_processings_subfinished(self):
2086 """
2087 *** Function called by Transformer agent.
2088 """
2089 has_finished = False
2090 has_failed = False
2091 for p_id in self.active_processings:
2092 p = self.processings[p_id]
2093 if not self.is_processing_terminated(p):
2094 return False
2095 else:
2096 if p.status in [ProcessingStatus.Finished]:
2097 has_finished = True
2098 if p.status in [ProcessingStatus.Failed]:
2099 has_failed = True
2100 if has_finished and has_failed:
2101 return True
2102 return False
2103
2104 def is_processings_failed(self):
2105 """
2106 *** Function called by Transformer agent.
2107 """
2108 for p_id in self.active_processings:
2109 p = self.processings[p_id]
2110 if not self.is_processing_terminated(p) or p.status not in [ProcessingStatus.Failed]:
2111 return False
2112 return True
2113
2114 def is_processings_expired(self):
2115 """
2116 *** Function called by Transformer agent.
2117 """
2118 has_expired = False
2119 for p_id in self.active_processings:
2120 p = self.processings[p_id]
2121 if not self.is_processing_terminated(p):
2122 return False
2123 elif p.status in [ProcessingStatus.Expired]:
2124 has_expired = True
2125 if has_expired:
2126 return True
2127 return False
2128
2129 def is_processings_cancelled(self):
2130 """
2131 *** Function called by Transformer agent.
2132 """
2133 has_cancelled = False
2134 for p_id in self.active_processings:
2135 p = self.processings[p_id]
2136 if not self.is_processing_terminated(p):
2137 return False
2138 elif p.status in [ProcessingStatus.Cancelled]:
2139 has_cancelled = True
2140 if has_cancelled:
2141 return True
2142 return False
2143
2144 def is_processings_suspended(self):
2145 """
2146 *** Function called by Transformer agent.
2147 """
2148 has_suspended = False
2149 for p_id in self.active_processings:
2150 p = self.processings[p_id]
2151 if not self.is_processing_terminated(p):
2152 return False
2153 elif p.status in [ProcessingStatus.Suspended]:
2154 has_suspended = True
2155 if has_suspended:
2156 return True
2157 return False
2158
2159 def create_processing(self, input_output_maps=[]):
2160 """
2161 *** Function called by Transformer agent.
2162 """
2163
2164 proc = Processing()
2165 self.add_processing_to_processings(proc)
2166 self.active_processings.append(proc.internal_id)
2167 return proc
2168
2169 def get_processing(self, input_output_maps, without_creating=False):
2170 """
2171 *** Function called by Transformer agent.
2172 """
2173 if self.active_processings:
2174 return self.processings[self.active_processings[0]]
2175 else:
2176 if not without_creating:
2177
2178 return self.create_processing(input_output_maps)
2179
2180
2181 return None
2182
2183 def sync_processing(self, processing, processing_model):
2184 processing.processing = processing_model
2185 self.set_processing_status(processing, processing_model['status'], processing_model['substatus'])
2186
2187 def submit_processing(self, processing):
2188 """
2189 *** Function called by Carrier agent.
2190 """
2191 raise exceptions.NotImplementedException
2192
2193 def abort_processing_old(self, processing):
2194 """
2195 *** Function called by Carrier agent.
2196 """
2197
2198 self.tocancel = True
2199 if (processing and 'processing_metadata' in processing and processing['processing_metadata']
2200 and 'processing' in processing['processing_metadata'] and processing['processing_metadata']['processing']):
2201 proc = processing['processing_metadata']['processing']
2202 proc.tocancel = True
2203
2204 def suspend_processing(self, processing):
2205 """
2206 *** Function called by Carrier agent.
2207 """
2208
2209 self.tosuspend = True
2210 if (processing and 'processing_metadata' in processing and processing['processing_metadata']
2211 and 'processing' in processing['processing_metadata'] and processing['processing_metadata']['processing']):
2212 proc = processing['processing_metadata']['processing']
2213 proc.tosuspend = True
2214
2215 def resume_processing_old(self, processing):
2216 """
2217 *** Function called by Carrier agent.
2218 """
2219
2220 self.toresume = True
2221 if (processing and 'processing_metadata' in processing and processing['processing_metadata']
2222 and 'processing' in processing['processing_metadata'] and processing['processing_metadata']['processing']):
2223 proc = processing['processing_metadata']['processing']
2224 proc.toresume = True
2225
2226 def expire_processing(self, processing):
2227 """
2228 *** Function called by Carrier agent.
2229 """
2230
2231 self.toexpire = True
2232 if (processing and 'processing_metadata' in processing and processing['processing_metadata']
2233 and 'processing' in processing['processing_metadata'] and processing['processing_metadata']['processing']):
2234 proc = processing['processing_metadata']['processing']
2235 proc.toexpire = True
2236
2237 def finish_processing(self, processing, forcing=False):
2238 """
2239 *** Function called by Carrier agent.
2240 """
2241
2242 if forcing:
2243 self.toforcefinish = True
2244 else:
2245 self.tofinish = True
2246 if (processing and 'processing_metadata' in processing and processing['processing_metadata']
2247 and 'processing' in processing['processing_metadata'] and processing['processing_metadata']['processing']):
2248 proc = processing['processing_metadata']['processing']
2249 proc.tofinish = True
2250 if forcing:
2251 proc.toforcefinish = True
2252
2253 def poll_processing_updates(self, processing, input_output_maps, contents_ext=None, log_prefix=''):
2254 """
2255 *** Function called by Carrier agent.
2256 """
2257 processing['processing'].has_new_updates()
2258 raise exceptions.NotImplementedException
2259
2260 def is_all_outputs_flushed(self, input_output_maps):
2261 for map_id in input_output_maps:
2262 outputs = input_output_maps[map_id]['outputs']
2263
2264 for content in outputs:
2265 if content['status'] != content['substatus']:
2266 return False
2267 return True
2268
2269 def syn_work_status(self, input_output_maps, all_updates_flushed=True, output_statistics={}, to_release_input_contents=[]):
2270 """
2271 *** Function called by Transformer agent.
2272 """
2273
2274 self.logger.debug("syn_work_status(%s): is_processings_terminated: %s" % (str(self.get_processing_ids()), str(self.is_processings_terminated())))
2275 self.logger.debug("syn_work_status(%s): is_input_collections_closed: %s" % (str(self.get_processing_ids()), str(self.is_input_collections_closed())))
2276 self.logger.debug("syn_work_status(%s): has_new_inputs: %s" % (str(self.get_processing_ids()), str(self.has_new_inputs)))
2277 self.logger.debug("syn_work_status(%s): has_to_release_inputs: %s" % (str(self.get_processing_ids()), str(self.has_to_release_inputs())))
2278 self.logger.debug("syn_work_status(%s): to_release_input_contents: %s" % (str(self.get_processing_ids()), str(to_release_input_contents)))
2279 if self.is_processings_terminated() and self.is_input_collections_closed() and not self.has_new_inputs and not self.has_to_release_inputs() and not to_release_input_contents:
2280 if not self.is_all_outputs_flushed(input_output_maps):
2281 self.logger.warn("The work processings %s is terminated. but not all outputs are flushed. Wait to flush the outputs then finish the transform" % str(self.get_processing_ids()))
2282 return
2283
2284 if self.is_processings_finished():
2285 self.status = WorkStatus.Finished
2286 elif self.is_processings_subfinished():
2287 self.status = WorkStatus.SubFinished
2288 elif self.is_processings_failed():
2289 self.status = WorkStatus.Failed
2290 elif self.is_processings_expired():
2291 self.status = WorkStatus.Expired
2292 elif self.is_processings_cancelled():
2293 self.status = WorkStatus.Cancelled
2294 elif self.is_processings_suspended():
2295 self.status = WorkStatus.Suspended
2296 elif self.is_processings_running():
2297 self.status = WorkStatus.Running
2298 else:
2299 self.status = WorkStatus.Transforming
2300
2301 if self.is_processings_terminated() or self.is_processings_running() or self.is_processings_started():
2302 self.started = True
2303 self.logger.debug("syn_work_status(%s): work.status: %s" % (str(self.get_processing_ids()), str(self.status)))
2304
2305 def sync_work_data(self, status, substatus, work, workload_id=None, output_data=None, processing=None):
2306
2307 work.work_id = self.work_id
2308 work.transforming = self.transforming
2309
2310
2311
2312 next_works = self.next_works
2313
2314 self.next_works = next_works
2315
2316 self.status_statistics = work.status_statistics
2317
2318 if output_data:
2319 self.output_data = output_data
2320 else:
2321 self.output_data = work.output_data
2322
2323 self.status = get_work_status_from_transform_processing_status(status)
2324 self.substatus = get_work_status_from_transform_processing_status(substatus)
2325 if workload_id:
2326 self.workload_id = workload_id
2327 if processing is not None:
2328
2329 if processing.submitted_at:
2330 self.submitted = True
2331 else:
2332
2333 self.submitted = work.submitted
2334
2335 if self.submitted:
2336 self.started = True
2337
2338 """
2339 self.status = WorkStatus(status.value)
2340 self.substatus = WorkStatus(substatus.value)
2341 self.workdir = work.workdir
2342 self.has_new_inputs = work.has_new_inputs
2343 self.errors = work.errors
2344 self.next_works = work.next_works
2345
2346 self.terminated_msg = work.terminated_msg
2347 self.output_data = work.output_data
2348 self.parameters_for_next_task = work.parameters_for_next_task
2349
2350 self.status_statistics = work.status_statistics
2351
2352 self.processings = work.processings
2353 self.active_processings = work.active_processings
2354 self.cancelled_processings = work.cancelled_processings
2355 self.suspended_processings = work.suspended_processings
2356 """
2357
2358 def abort_processing(self, processing, log_prefix=''):
2359 msg = "abort processing is not implemented"
2360 self.logger.error(log_prefix + msg)
2361
2362 def resume_processing(self, processing, log_prefix=''):
2363 msg = "resume processing is not implemented"
2364 self.logger.error(log_prefix + msg)
2365
2366 def add_proxy(self, proxy):
2367 self.proxy = proxy
2368
2369 def get_proxy(self):
2370 return self.proxy
2371
2372 def set_user_proxy(self):
2373 if 'X509_USER_PROXY' in os.environ:
2374 self.original_proxy = os.environ['X509_USER_PROXY']
2375 if self.get_proxy():
2376 user_proxy = '/tmp/idds_user_proxy'
2377 with open(user_proxy, 'w') as fp:
2378 fp.write(self.get_proxy())
2379 os.chmod(user_proxy, stat.S_IRUSR | stat.S_IWUSR)
2380 os.environ['X509_USER_PROXY'] = user_proxy
2381
2382 def unset_user_proxy(self):
2383 if self.original_proxy:
2384 os.environ['X509_USER_PROXY'] = self.original_proxy
2385 else:
2386 del os.environ['X509_USER_PROXY']
2387
2388 def get_external_content_ids(self, processing, log_prefix=''):
2389 return []