File indexing completed on 2026-04-09 07:58:32
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 import base64
0013 import copy
0014 import datetime
0015 import functools
0016 import json
0017 import logging
0018 import os
0019 import pickle
0020 import time
0021 import traceback
0022 import zlib
0023
0024 from idds.common import exceptions
0025 from idds.common.constants import WorkflowType, TransformStatus, AsyncResultStatus
0026 from idds.common.imports import get_func_name
0027 from idds.common.utils import setup_logging, json_dumps, json_loads, encode_base64, modified_environ, is_panda_client_verbose
0028 from .asyncresult import AsyncResult, MapResult
0029 from .base import Base, Context
0030 from .workflow import WorkflowCanvas
0031
0032 setup_logging(__name__)
0033
0034
0035 class WorkContext(Context):
0036
0037 def __init__(self, name=None, workflow_context=None, source_dir=None, init_env=None, container_options=None, post_script=None):
0038 super(WorkContext, self).__init__()
0039 self._workflow_context = workflow_context
0040 self._transform_id = None
0041 self._processing_id = None
0042 self._workflow_type = WorkflowType.iWork
0043
0044 self._name = name
0045 self._site = None
0046 self._queue = None
0047
0048 self._priority = 500
0049 self._core_count = 1
0050 self._total_memory = None
0051 self._max_walltime = 7 * 24 * 3600
0052 self._max_attempt = 5
0053
0054 self._map_results = False
0055
0056 self.init_env = init_env
0057 self.container_options = container_options
0058 self._post_script = post_script
0059
0060 self._workload_id = None
0061 self._parent_workload_id = None
0062 self._no_wait_parent = None
0063
0064 self._other_attributes = {}
0065 self.logger = logging.getLogger(self.__class__.__name__)
0066
0067 self._start_time = None
0068 self._end_time = None
0069
0070 def get_service(self):
0071 return self._workflow_context.service
0072
0073 @property
0074 def distributed(self):
0075 return self._workflow_context.distributed
0076
0077 @distributed.setter
0078 def distributed(self, value):
0079 self._workflow_context.distributed = value
0080
0081 @property
0082 def service(self):
0083 return self._workflow_context.service
0084
0085 @service.setter
0086 def service(self, value):
0087 self._workflow_context.service = value
0088
0089 @property
0090 def vo(self):
0091 return self._workflow_context.vo
0092
0093 @vo.setter
0094 def vo(self, value):
0095 self._workflow_context.vo = value
0096
0097 @property
0098 def site(self):
0099 if self._site:
0100 return self._site
0101 return self._workflow_context.site
0102
0103 @site.setter
0104 def site(self, value):
0105 self._site = value
0106
0107 @property
0108 def queue(self):
0109 if self._queue:
0110 return self._queue
0111 return self._workflow_context.queue
0112
0113 @queue.setter
0114 def queue(self, value):
0115 self._queue = value
0116
0117 @property
0118 def cloud(self):
0119 return self._workflow_context.cloud
0120
0121 @cloud.setter
0122 def cloud(self, value):
0123 self._workflow_context.cloud = value
0124
0125 @property
0126 def working_group(self):
0127 return self._workflow_context.working_group
0128
0129 @working_group.setter
0130 def working_group(self, value):
0131 self._workflow_context.working_group = value
0132
0133 @property
0134 def task_type(self):
0135 return self._workflow_context.task_type
0136
0137 @task_type.setter
0138 def task_type(self, value):
0139 self._workflow_context.task_type = value
0140
0141 @property
0142 def processing_type(self):
0143 return self._workflow_context.processing_type
0144
0145 @processing_type.setter
0146 def processing_type(self, value):
0147 self._workflow_context.processing_type = value
0148
0149 @property
0150 def priority(self):
0151 if self._priority:
0152 return self._priority
0153 return self._workflow_context.priority
0154
0155 @priority.setter
0156 def priority(self, value):
0157 self._priority = value
0158
0159 @property
0160 def core_count(self):
0161 if self._core_count:
0162 return self._core_count
0163 return self._workflow_context.core_count
0164
0165 @core_count.setter
0166 def core_count(self, value):
0167 self._core_count = value
0168
0169 @property
0170 def total_memory(self):
0171 if self._total_memory:
0172 return self._total_memory
0173 return self._workflow_context.total_memory
0174
0175 @total_memory.setter
0176 def total_memory(self, value):
0177 self._total_memory = value
0178
0179 @property
0180 def max_walltime(self):
0181 if self._max_walltime:
0182 return self._max_walltime
0183 return self._workflow_context.max_walltime
0184
0185 @max_walltime.setter
0186 def max_walltime(self, value):
0187 self._max_walltime = value
0188
0189 @property
0190 def max_attempt(self):
0191 if self._max_attempt:
0192 return self._max_attempt
0193 return self._workflow_context.max_attempt
0194
0195 @max_attempt.setter
0196 def max_attempt(self, value):
0197 self._max_attempt = value
0198
0199 @property
0200 def username(self):
0201 return self._workflow_context.username
0202
0203 @username.setter
0204 def username(self, value):
0205 self._workflow_context.username = value
0206
0207 @property
0208 def userdn(self):
0209 return self._workflow_context.userdn
0210
0211 @userdn.setter
0212 def userdn(self, value):
0213 self._workflow_context.userdn = value
0214
0215 @property
0216 def workflow_type(self):
0217 return self._workflow_type
0218
0219 @workflow_type.setter
0220 def workflow_type(self, value):
0221 self._workflow_type = value
0222
0223 @property
0224 def lifetime(self):
0225 return self._workflow_context.lifetime
0226
0227 @lifetime.setter
0228 def lifetime(self, value):
0229 self._workflow_context.lifetime = value
0230
0231 @property
0232 def request_id(self):
0233 return self._workflow_context.request_id
0234
0235 @request_id.setter
0236 def request_id(self, value):
0237 self._workflow_context.request_id = value
0238
0239 @property
0240 def workload_id(self):
0241 return self._workload_id
0242
0243 @workload_id.setter
0244 def workload_id(self, value):
0245 self._workload_id = value
0246
0247 @property
0248 def parent_workload_id(self):
0249 return self._parent_workload_id
0250
0251 @parent_workload_id.setter
0252 def parent_workload_id(self, value):
0253 self._parent_workload_id = value
0254
0255 @property
0256 def no_wait_parent(self):
0257 return self._no_wait_parent
0258
0259 @no_wait_parent.setter
0260 def no_wait_parent(self, value):
0261 self._no_wait_parent = value
0262
0263 @property
0264 def other_attributes(self):
0265 return self._other_attributes
0266
0267 @other_attributes.setter
0268 def other_attributes(self, value):
0269 self._other_attributes = value
0270
0271 @property
0272 def transform_id(self):
0273 return self._transform_id
0274
0275 @transform_id.setter
0276 def transform_id(self, value):
0277 self._transform_id = int(value)
0278
0279 @property
0280 def processing_id(self):
0281 return self._processing_id
0282
0283 @processing_id.setter
0284 def processing_id(self, value):
0285 self._processing_id = value
0286
0287 @property
0288 def enable_separate_log(self):
0289 if self._workflow_context:
0290 return self._workflow_context.enable_separate_log
0291
0292 @enable_separate_log.setter
0293 def enable_separate_log(self, value):
0294 if self._workflow_context:
0295 self._workflow_context.enable_separate_log = value
0296
0297 @property
0298 def brokers(self):
0299 return self._workflow_context.brokers
0300
0301 @brokers.setter
0302 def brokers(self, value):
0303 self._workflow_context.brokers = value
0304
0305 @property
0306 def broker_timeout(self):
0307 return self._workflow_context.broker_timeout
0308
0309 @broker_timeout.setter
0310 def broker_timeout(self, value):
0311 self._workflow_context.broker_timeout = value
0312
0313 @property
0314 def broker_username(self):
0315 return self._workflow_context.broker_username
0316
0317 @broker_username.setter
0318 def broker_username(self, value):
0319 self._workflow_context.broker_username = value
0320
0321 @property
0322 def broker_password(self):
0323 return self._workflow_context.broker_password
0324
0325 @broker_password.setter
0326 def broker_password(self, value):
0327 self._workflow_context.broker_password = value
0328
0329 @property
0330 def broker_destination(self):
0331 return self._workflow_context.broker_destination
0332
0333 @broker_destination.setter
0334 def broker_destination(self, value):
0335 self._workflow_context.broker_destination = value
0336
0337 def get_source_dir(self):
0338 if self._workflow_context:
0339 return self._workflow_context.get_source_dir()
0340 return None
0341
0342 @property
0343 def token(self):
0344 return self._workflow_context.token
0345
0346 @token.setter
0347 def token(self, value):
0348 self._workflow_context.token = value
0349
0350 @property
0351 def map_results(self):
0352 return self._map_results
0353
0354 @map_results.setter
0355 def map_results(self, value):
0356 self._map_results = value
0357
0358 @property
0359 def init_env(self):
0360 return self._init_env
0361
0362 @init_env.setter
0363 def init_env(self, value):
0364 self._init_env = value
0365 if self._init_env:
0366 self._init_env = self._init_env + " "
0367
0368 @property
0369 def container_options(self):
0370 if self._container_options:
0371 return self._container_options
0372 return self._workflow_context.container_options
0373
0374 @container_options.setter
0375 def container_options(self, value):
0376 self._container_options = value
0377
0378 @property
0379 def panda_env(self):
0380 return self._workflow_context.panda_env
0381
0382 @panda_env.setter
0383 def panda_env(self, value):
0384 self._workflow_context.panda_env = value
0385
0386 @property
0387 def idds_env(self):
0388 return self._workflow_context.idds_env
0389
0390 @idds_env.setter
0391 def idds_env(self, value):
0392 self._workflow_context.idds_env = value
0393
0394 def get_panda_idds_env(self):
0395 idds_env = self.idds_env
0396 panda_env = self.panda_env
0397
0398 ret_env = {}
0399 env_list = ['IDDS_HOST', 'IDDS_AUTH_NO_VERIFY']
0400 for env in env_list:
0401 if env in idds_env and idds_env[env] is not None:
0402 ret_env[env] = idds_env[env]
0403
0404
0405
0406 env_list = ['PANDA_URL_SSL', 'PANDA_URL', 'PANDACACHE_URL', 'PANDAMON_URL',
0407 'PANDA_VERIFY_HOST', 'PANDA_BEHIND_REAL_LB']
0408 for env in env_list:
0409 if env in panda_env and panda_env[env] is not None:
0410 ret_env[env] = panda_env[env]
0411
0412 return ret_env
0413
0414 def get_idds_server(self):
0415 return self._workflow_context.get_idds_server()
0416
0417 def init_brokers(self):
0418 return self._workflow_context.init_brokers()
0419
0420 def initialize(self):
0421 return self._workflow_context.initialize()
0422
0423 def setup_source_files(self):
0424 """
0425 Setup source files.
0426 """
0427 return self._workflow_context.setup_source_files()
0428
0429 def setup(self):
0430 """
0431 :returns command: `str` to setup the workflow.
0432 """
0433 if not self.init_env:
0434 return self._workflow_context.setup()
0435
0436 global_set_up = self._workflow_context.global_setup()
0437 init_env = self.init_env
0438 ret = None
0439 if global_set_up:
0440 ret = global_set_up
0441 if init_env:
0442 if ret:
0443 ret = ret + "; " + init_env
0444 else:
0445 ret = init_env
0446 return ret
0447
0448 @property
0449 def post_script(self):
0450 """
0451 Return the post script bash code to be appended after workflow execution.
0452 Override or set self._post_script in subclasses or instances as needed.
0453 """
0454 post_script = self._post_script
0455 if post_script:
0456 return post_script
0457 return self._workflow_context.post_script
0458
0459 @post_script.setter
0460 def post_script(self, value):
0461 self._post_script = value
0462
0463 def get_clean_env(self):
0464 return self._workflow_context.get_clean_env()
0465
0466
0467 class Work(Base):
0468
0469 def __init__(self, func=None, workflow_context=None, context=None, pre_kwargs=None, args=None, kwargs=None, multi_jobs_kwargs_list=None,
0470 current_job_kwargs=None, map_results=False, source_dir=None, init_env=None, is_unique_func_name=False, name=None,
0471 parent_workload_id=None, no_wait_parent=False, container_options=None, input_datasets=None, output_file_name=None,
0472 output_dataset_name=None, num_events=None, num_events_per_job=None, parent_transform_id=None, parent_internal_id=None,
0473 log_dataset_name=None, inputs=None, input_map=None, inputs_group=None, enable_separate_log=False, job_key=None,
0474 json_load=False, post_script=None):
0475 """
0476 Init a workflow.
0477 """
0478 super(Work, self).__init__()
0479 self.prepared = False
0480 self.logger = logging.getLogger(self.__class__.__name__)
0481
0482 self._current_job_kwargs = current_job_kwargs
0483 if self._current_job_kwargs:
0484 self._current_job_kwargs = base64.b64encode(zlib.compress(pickle.dumps(self._current_job_kwargs))).decode("utf-8")
0485
0486 if name:
0487 self._name = name
0488 else:
0489 self._name = func.__name__ if func else None
0490 if self._name:
0491 self._name = self._name.replace('__main__:', '').replace('.py', '').replace(':', '.')
0492 self._name = self._name.replace("/", "_").replace(".", "_").replace(":", "_")
0493 if not is_unique_func_name:
0494 if self._name:
0495 self._name = self._name + "_" + datetime.datetime.utcnow().strftime("%Y_%m_%d_%H_%M_%S")
0496
0497 if context:
0498 self._context = context
0499 else:
0500 self._context = WorkContext(name=self._name, workflow_context=workflow_context, init_env=init_env, container_options=container_options, post_script=post_script)
0501
0502
0503 self._func, self._func_name_and_args, self._multi_jobs_kwargs_list = self.get_func_name_and_args(
0504 func=func,
0505 pre_kwargs=pre_kwargs,
0506 args=args,
0507 kwargs=kwargs,
0508 base_dir=self._context.get_source_dir(),
0509 multi_jobs_kwargs_list=multi_jobs_kwargs_list
0510 )
0511 self._func = None
0512 if not json_load:
0513 self.logger.info(f"func: {self._func}, func_name_and_args: {self._func_name_and_args}, multi_jobs_kwargs_list: {self._multi_jobs_kwargs_list}")
0514
0515 self._async_ret = None
0516
0517 self.enable_separate_log = enable_separate_log
0518 self.map_results = map_results
0519 self._results = None
0520 self._async_result_initialized = False
0521 self._async_result_status = None
0522
0523 self.parent_workload_id = parent_workload_id
0524 self.no_wait_parent = no_wait_parent
0525
0526 self.other_attributes = {'input_datasets': input_datasets,
0527 'output_file_name': output_file_name,
0528 'output_dataset_name': output_dataset_name,
0529 'log_dataset_name': log_dataset_name,
0530 'num_events': num_events,
0531 'num_events_per_job': num_events_per_job,
0532 'parent_transform_id': parent_transform_id,
0533 'parent_internal_id': parent_internal_id,
0534 'job_key': job_key}
0535 self.inputs = inputs
0536 self.input_map = input_map
0537 self.inputs_group = inputs_group
0538
0539 self.num_checks = 0
0540
0541 @property
0542 def internal_id(self):
0543 return self._context.internal_id
0544
0545 @internal_id.setter
0546 def internal_id(self, value):
0547 self._context.internal_id = value
0548
0549 @property
0550 def service(self):
0551 return self._context.service
0552
0553 @service.setter
0554 def service(self, value):
0555 self._context.service = value
0556
0557 @property
0558 def name(self):
0559 return self._name
0560
0561 @name.setter
0562 def name(self, value):
0563 self._name = value
0564
0565 @property
0566 def request_id(self):
0567 return self._context.request_id
0568
0569 @request_id.setter
0570 def request_id(self, value):
0571 self._context.request_id = value
0572
0573 @property
0574 def transform_id(self):
0575 return self._context.transform_id
0576
0577 @transform_id.setter
0578 def transform_id(self, value):
0579 self._context.transform_id = int(value)
0580
0581 @property
0582 def processing_id(self):
0583 return self._context.processing_id
0584
0585 @processing_id.setter
0586 def processing_id(self, value):
0587 self._context.processing_id = value
0588
0589 @property
0590 def vo(self):
0591 return self._context.vo
0592
0593 @vo.setter
0594 def vo(self, value):
0595 self._context.vo = value
0596
0597 @property
0598 def queue(self):
0599 return self._context.queue
0600
0601 @queue.setter
0602 def queue(self, value):
0603 self._context.queue = value
0604
0605 @property
0606 def site(self):
0607 return self._context.site
0608
0609 @site.setter
0610 def site(self, value):
0611 self._context.site = value
0612
0613 def get_site(self):
0614 return self.site
0615
0616 @property
0617 def cloud(self):
0618 return self._context.cloud
0619
0620 @cloud.setter
0621 def cloud(self, value):
0622 self._context.cloud = value
0623
0624 @property
0625 def working_group(self):
0626 return self._context.working_group
0627
0628 @working_group.setter
0629 def working_group(self, value):
0630 self._context.working_group = value
0631
0632 @property
0633 def priority(self):
0634 return self._context.priority
0635
0636 @priority.setter
0637 def priority(self, value):
0638 self._context.priority = value
0639
0640 @property
0641 def core_count(self):
0642 return self._context.core_count
0643
0644 @core_count.setter
0645 def core_count(self, value):
0646 self._context.core_count = value
0647
0648 @property
0649 def total_memory(self):
0650 return self._context.total_memory
0651
0652 @total_memory.setter
0653 def total_memory(self, value):
0654 self._context.total_memory = value
0655
0656 @property
0657 def max_walltime(self):
0658 return self._context.max_walltime
0659
0660 @max_walltime.setter
0661 def max_walltime(self, value):
0662 self._context.max_walltime = value
0663
0664 @property
0665 def max_attempt(self):
0666 return self._context.max_attempt
0667
0668 @max_attempt.setter
0669 def max_attempt(self, value):
0670 self._context.max_attempt = value
0671
0672 @property
0673 def username(self):
0674 return self._context.username
0675
0676 @username.setter
0677 def username(self, value):
0678 self._context.username = value
0679
0680 @property
0681 def userdn(self):
0682 return self._context.userdn
0683
0684 @userdn.setter
0685 def userdn(self, value):
0686 self._context.userdn = value
0687
0688 @property
0689 def workflow_type(self):
0690 return self._context.workflow_type
0691
0692 @workflow_type.setter
0693 def workflow_type(self, value):
0694 self._context.workflow_type = value
0695
0696 @property
0697 def task_type(self):
0698 return self._context.task_type
0699
0700 @task_type.setter
0701 def task_type(self, value):
0702 self._context.task_type = value
0703
0704 @property
0705 def processing_type(self):
0706 return self._context.processing_type
0707
0708 @processing_type.setter
0709 def processing_type(self, value):
0710 self._context.processing_type = value
0711
0712 @property
0713 def map_results(self):
0714 return self._context.map_results
0715
0716 @map_results.setter
0717 def map_results(self, value):
0718 self._context.map_results = value
0719
0720 @property
0721 def lifetime(self):
0722 return self._context.lifetime
0723
0724 @lifetime.setter
0725 def lifetime(self, value):
0726 self._context.lifetime = value
0727
0728 @property
0729 def workload_id(self):
0730 return self._context.workload_id
0731
0732 @workload_id.setter
0733 def workload_id(self, value):
0734 self._context.workload_id = value
0735
0736 def get_workload_id(self):
0737 return self.workload_id
0738
0739 @property
0740 def parent_workload_id(self):
0741 return self._context.parent_workload_id
0742
0743 @parent_workload_id.setter
0744 def parent_workload_id(self, value):
0745 self._context.parent_workload_id = value
0746
0747 def get_parent_workload_id(self):
0748 return self.parent_workload_id
0749
0750 @property
0751 def no_wait_parent(self):
0752 return self._context.no_wait_parent
0753
0754 @no_wait_parent.setter
0755 def no_wait_parent(self, value):
0756 self._context.no_wait_parent = value
0757
0758 @property
0759 def enable_separate_log(self):
0760 return self._context.enable_separate_log
0761
0762 @enable_separate_log.setter
0763 def enable_separate_log(self, value):
0764 self._context.enable_separate_log = value
0765
0766 @property
0767 def container_options(self):
0768 return self._context.container_options
0769
0770 @container_options.setter
0771 def container_options(self, value):
0772 self._context.container_options = value
0773
0774 @property
0775 def other_attributes(self):
0776 return self._context.other_attributes
0777
0778 @other_attributes.setter
0779 def other_attributes(self, value):
0780 self._context.other_attributes = value
0781
0782 @property
0783 def token(self):
0784 return self._context.token
0785
0786 @token.setter
0787 def token(self, value):
0788 self._context.token = value
0789
0790 @property
0791 def multi_jobs_kwargs_list(self):
0792 return self._multi_jobs_kwargs_list
0793
0794 @multi_jobs_kwargs_list.setter
0795 def multi_jobs_kwargs_list(self, value):
0796 raise Exception("Not allwed to update multi_jobs_kwargs_list")
0797
0798 def get_work_tag(self):
0799 return self._context.workflow_type.name
0800
0801 def get_work_type(self):
0802 return self._context.workflow_type.name
0803
0804 def get_work_name(self):
0805 return self._name
0806
0807 def add_other_attributes(self, other_attributes):
0808 for k, v in other_attributes.items():
0809 self._other_attributes[k] = v
0810 self.other_attributes = self._other_attributes
0811
0812 def get_parent_transform_id(self):
0813 if not self.other_attributes:
0814 return None
0815 return self.other_attributes.get("parent_transform_id", None)
0816
0817 def set_other_attribute(self, name, value):
0818 if self.other_attributes is None:
0819 self.other_attributes = {}
0820 self.other_attributes[name] = value
0821
0822 def get_other_attribute(self, name):
0823 if not self.other_attributes:
0824 return None
0825 return self.other_attributes.get(name, None)
0826
0827 @property
0828 def job_key(self):
0829 return self.get_other_attribute('job_key')
0830
0831 @job_key.setter
0832 def job_key(self, value):
0833 self.set_other_attribute('job_key', value)
0834
0835 @property
0836 def parent_internal_id(self):
0837 return self.get_other_attribute('parent_internal_id')
0838
0839 @parent_internal_id.setter
0840 def parent_internal_id(self, value):
0841 self.set_other_attribute('parent_internal_id', value)
0842
0843 def get_parent_internal_id(self):
0844 return self.get_other_attribute('parent_internal_id')
0845
0846 @property
0847 def input_datasets(self):
0848 return self.get_other_attribute('input_datasets')
0849
0850 @input_datasets.setter
0851 def input_datasets(self, value):
0852 self.set_other_attribute('input_datasets', value)
0853
0854 @property
0855 def output_file_name(self):
0856 return self.get_other_attribute('output_file_name')
0857
0858 @output_file_name.setter
0859 def output_file_name(self, value):
0860 self.set_other_attribute('output_file_name', value)
0861
0862 @property
0863 def output_dataset_name(self):
0864 return self.get_other_attribute('output_dataset_name')
0865
0866 @output_dataset_name.setter
0867 def output_dataset_name(self, value):
0868 self.set_other_attribute('output_dataset_name', value)
0869
0870 @property
0871 def log_dataset_name(self):
0872 return self.get_other_attribute('log_dataset_name')
0873
0874 @log_dataset_name.setter
0875 def log_dataset_name(self, value):
0876 self.set_other_attribute('log_dataset_name', value)
0877
0878 @property
0879 def num_events(self):
0880 return self.get_other_attribute('num_events')
0881
0882 @num_events.setter
0883 def num_events(self, value):
0884 self.set_other_attribute('num_events', value)
0885
0886 @property
0887 def num_events_per_job(self):
0888 return self.get_other_attribute('num_events_per_job')
0889
0890 @num_events_per_job.setter
0891 def num_events_per_job(self, value):
0892 self.set_other_attribute('num_events_per_job', value)
0893
0894 @property
0895 def parent_transform_id(self):
0896 return self.get_other_attribute('parent_transform_id')
0897
0898 @parent_transform_id.setter
0899 def parent_transform_id(self, value):
0900 self.set_other_attribute('parent_transform_id', value)
0901
0902 def to_dict(self):
0903 func = self._func
0904 self._func = None
0905 obj = super(Work, self).to_dict()
0906 self._func = func
0907 return obj
0908
0909 def store(self):
0910 if self._context:
0911 content = {'type': 'work',
0912 'name': self.name,
0913 'context': self._context,
0914 'original_args': self._func_name_and_args,
0915 'multi_jobs_kwargs_list': self._multi_jobs_kwargs_list,
0916 'current_job_kwargs': self._current_job_kwargs}
0917 content = json_dumps(content)
0918 source_dir = self._context.get_source_dir()
0919 self.save_context(source_dir, self._name, content)
0920
0921 def load(self, source_dir=None):
0922 if not source_dir:
0923 source_dir = self._context.get_source_dir()
0924 if not source_dir:
0925 source_dir = os.getcwd()
0926 ret = self.load_context(source_dir, self._name)
0927 if ret:
0928 ret = json_loads(ret)
0929 self.logger.info(f"Loaded context: {ret}")
0930 if 'multi_jobs_kwargs_list' in ret:
0931 self._multi_jobs_kwargs_list = ret['multi_jobs_kwargs_list']
0932
0933 def submit_to_idds_server(self):
0934 """
0935 Submit the workflow to the iDDS server.
0936
0937 :returns id: The workflow id.
0938 :raise Exception when failing to submit the workflow.
0939 """
0940
0941 from idds.client.clientmanager import ClientManager
0942 client = ClientManager(host=self._context.get_idds_server(), timeout=60)
0943 request_id = self._context.request_id
0944 transform_id = client.submit_work(request_id, self, use_dataset_name=False)
0945 self.logger.info("Submitted into iDDS with transform id=%s", str(transform_id))
0946 return transform_id
0947
0948 def submit_to_panda_server(self):
0949 """
0950 Submit the workflow to the iDDS server through PanDA service.
0951
0952 :returns id: The workflow id.
0953 :raise Exception when failing to submit the workflow.
0954 """
0955 import idds.common.utils as idds_utils
0956 import pandaclient.idds_api as idds_api
0957 idds_server = self._context.get_idds_server()
0958 request_id = self._context.request_id
0959 client = idds_api.get_api(idds_utils.json_dumps,
0960 idds_host=idds_server,
0961 compress=True,
0962 verbose=is_panda_client_verbose(),
0963 manager=True)
0964 ret = client.submit_work(request_id, self, use_dataset_name=False)
0965 if ret[0] == 0 and ret[1][0]:
0966 transform_id = ret[1][1]
0967 else:
0968 transform_id = None
0969 self.logger.error("Failed to submit work to PanDA-iDDS with error: %s" % str(ret))
0970
0971 self.logger.info("Submitted work into PanDA-iDDS with transform id=%s", str(transform_id))
0972 return transform_id
0973
0974 def submit(self):
0975 """
0976 Submit the workflow to the iDDS server.
0977
0978 :returns id: The workflow id.
0979 :raise Exception when failing to submit the workflow.
0980 """
0981 try:
0982
0983 if self._context.get_service() == 'panda':
0984 tf_id = self.submit_to_panda_server()
0985 else:
0986 tf_id = self.submit_to_idds_server()
0987 except Exception as ex:
0988 self.logger.error("Failed to submit work: %s" % str(ex))
0989
0990 try:
0991 self._context.transform_id = int(tf_id)
0992 return tf_id
0993 except Exception as ex:
0994 self.logger.error("Transform id (%s) is not integer, there should be some submission errors: %s" % (tf_id, str(ex)))
0995
0996 return None
0997
0998 def get_status_from_panda_server(self):
0999 import idds.common.utils as idds_utils
1000 import pandaclient.idds_api as idds_api
1001
1002 idds_server = self._context.get_idds_server()
1003 client = idds_api.get_api(idds_utils.json_dumps,
1004 idds_host=idds_server,
1005 compress=True,
1006 verbose=is_panda_client_verbose(),
1007 manager=True)
1008
1009 request_id = self._context.request_id
1010 transform_id = self._context.transform_id
1011 if not transform_id:
1012 log_msg = f"No transform id defined (request_id: {request_id}, transform_id: {transform_id}, internal_id: {self.internal_id})"
1013 self.logger.error(log_msg)
1014 return exceptions.IDDSException(log_msg)
1015
1016 ret = client.get_transform(request_id=request_id, transform_id=transform_id)
1017 if ret[0] == 0 and ret[1][0]:
1018 tf = ret[1][1]
1019 if type(tf) in [dict]:
1020 tf = json_loads(json.dumps(tf))
1021 elif type(tf) in [str]:
1022 try:
1023 tf = json_loads(tf)
1024 except Exception as ex:
1025 self.logger.warn(f"Failed to json loads transform({tf}): {ex}")
1026 else:
1027 tf = None
1028 self.logger.error(f"Failed to get transform (request_id: {request_id}, transform_id: {transform_id}, internal_id: {self.internal_id}) status from PanDA-iDDS: {ret}")
1029 return TransformStatus.Transforming
1030
1031 if not tf:
1032 self.logger.info(f"Get transform (request_id: {request_id}, transform_id: {transform_id}, internal_id: {self.internal_id}) from PanDA-iDDS: {tf}")
1033 return None
1034
1035 if self.num_checks % 60 == 0:
1036 if type(tf) in [dict] and "status" in tf:
1037 self.logger.info(f"Get transform status (request_id: {request_id}, transform_id: {transform_id}, internal_id: {self.internal_id}) from PanDA-iDDS: {tf['status']}")
1038 else:
1039 self.logger.info(f"Get transform status (request_id: {request_id}, transform_id: {transform_id}, internal_id: {self.internal_id}) from PanDA-iDDS: {tf}")
1040 self.num_checks += 1
1041
1042 if type(tf) in [dict] and 'status' in tf:
1043 return tf['status']
1044 else:
1045 self.logger.info(f"Get transform status (request_id: {request_id}, transform_id: {transform_id}, internal_id: {self.internal_id}) from PanDA-iDDS: {tf}")
1046 raise Exception(f"Wrong transfrom status: {tf}")
1047
1048 def get_status_from_idds_server(self):
1049 from idds.client.clientmanager import ClientManager
1050 client = ClientManager(host=self._context.get_idds_server(), timeout=60)
1051
1052 request_id = self._context.request_id
1053 transform_id = self._context.transform_id
1054 if not transform_id:
1055 log_msg = f"No transform id defined (request_id: {request_id}, transform_id: {transform_id}, internal_id: {self.internal_id})"
1056 self.logger.error(log_msg)
1057 return exceptions.IDDSException(log_msg)
1058
1059 tf = client.get_transform(request_id=request_id, transform_id=transform_id)
1060 if not tf:
1061 self.logger.info(f"Get transform (request_id: {request_id}, transform_id: {transform_id}, internal_id: {self.internal_id}) from iDDS: {tf}")
1062 return None
1063
1064 self.logger.info(f"Get transform status (request_id: {request_id}, transform_id: {transform_id}, internal_id: {self.internal_id}) from iDDS: {tf['status']}")
1065 return tf['status']
1066
1067 def get_status(self):
1068 if self._context.get_service() == 'panda':
1069 return self.get_status_from_panda_server()
1070 return self.get_status_from_idds_server()
1071
1072 try:
1073 if self._context.get_service() == 'panda':
1074 return self.get_status_from_panda_server()
1075 return self.get_status_from_idds_server()
1076 except Exception as ex:
1077 self.logger.info("Failed to get transform status: %s" % str(ex))
1078
1079 def cancel_from_panda_server(self):
1080 import idds.common.utils as idds_utils
1081 import pandaclient.idds_api as idds_api
1082
1083 idds_server = self._context.get_idds_server()
1084 client = idds_api.get_api(idds_utils.json_dumps,
1085 idds_host=idds_server,
1086 compress=True,
1087 verbose=is_panda_client_verbose(),
1088 manager=True)
1089
1090 request_id = self._context.request_id
1091
1092 workload_id = self._context.workload_id
1093 if not workload_id:
1094 log_msg = f"No workload id defined (request_id: {request_id}, workload_id: {workload_id}, internal_id: {self.internal_id})"
1095 self.logger.error(log_msg)
1096 return exceptions.IDDSException(log_msg)
1097
1098 ret = client.abort_task(request_id=request_id, workload_id=workload_id)
1099 if ret[0] == 0 and ret[1][0]:
1100 ret_value = ret[1][1]
1101 if type(ret_value) in [dict]:
1102 ret_value = json_loads(json.dumps(ret_value))
1103 elif type(ret_value) in [str]:
1104 try:
1105 ret_value = json_loads(ret_value)
1106 except Exception as ex:
1107 self.logger.warn(f"Failed to json cancel transform({ret_value}): {ex}")
1108 else:
1109 ret_value = None
1110 self.logger.error(f"Failed to cancel transform (request_id: {request_id}, worklaod_id: {workload_id}, internal_id: {self.internal_id}) status from PanDA-iDDS: {ret}")
1111 return TransformStatus.Transforming
1112
1113 if not ret_value:
1114 self.logger.info(f"Failed to cancel transform (request_id: {request_id}, workload_id: {workload_id}, internal_id: {self.internal_id}) from PanDA-iDDS: {ret_value}")
1115 return None
1116
1117 self.logger.info(f"Cancel transform (request_id: {request_id}, workload_id: {workload_id}, internal_id: {self.internal_id}) from PanDA-iDDS: {ret_value}")
1118
1119 return ret_value
1120
1121 def cancel_from_idds_server(self):
1122 from idds.client.clientmanager import ClientManager
1123 client = ClientManager(host=self._context.get_idds_server(), timeout=60)
1124
1125 request_id = self._context.request_id
1126
1127 workload_id = self._context.workload_id
1128 if not workload_id:
1129 log_msg = f"No workload id defined (request_id: {request_id}, workload_id: {workload_id}, internal_id: {self.internal_id})"
1130 self.logger.error(log_msg)
1131 return exceptions.IDDSException(log_msg)
1132
1133 ret = client.abort_task(request_id=request_id, workload_id=workload_id)
1134 if not ret:
1135 self.logger.info(f"Cancel transform (request_id: {request_id}, workload_id: {workload_id}, internal_id: {self.internal_id}) from iDDS: {ret}")
1136 return None
1137
1138 self.logger.info(f"Cancel transform (request_id: {request_id}, workload_id: {workload_id}, internal_id: {self.internal_id}) from iDDS: {ret}")
1139 return ret
1140
1141 def cancel(self):
1142 try:
1143 if self._context.get_service() == 'panda':
1144 return self.cancel_from_panda_server()
1145 return self.cancel_from_idds_server()
1146 except Exception as ex:
1147 self.logger.info("Failed to get transform status: %s" % str(ex))
1148
1149 def get_finished_status(self):
1150 return [TransformStatus.Finished]
1151
1152 def get_subfinished_status(self):
1153 return [TransformStatus.SubFinished]
1154
1155 def get_failed_status(self):
1156 return [None, TransformStatus.Failed, TransformStatus.Cancelled,
1157 TransformStatus.Suspended, TransformStatus.Expired]
1158
1159 def get_terminated_status(self):
1160 return [None, TransformStatus.Finished, TransformStatus.SubFinished,
1161 TransformStatus.Failed, TransformStatus.Cancelled,
1162 TransformStatus.Suspended, TransformStatus.Expired]
1163
1164 def is_terminated(self, status=None):
1165 if status is None:
1166 status = self.get_status()
1167 if status in self.get_terminated_status():
1168 self.stop_async_result()
1169 return True
1170 if self._async_ret:
1171 self._async_ret.get_results(nologs=True)
1172 if self._async_ret.is_terminated:
1173 self.stop_async_result()
1174 return True
1175 if self._async_result_status in [AsyncResultStatus.Finished, AsyncResultStatus.SubFinished, AsyncResultStatus.Failed]:
1176 return True
1177 return False
1178
1179 def is_finished(self, status=None):
1180 if status is None:
1181 status = self.get_status()
1182 if status in self.get_finished_status():
1183 self.stop_async_result()
1184 return True
1185 if self._async_ret:
1186 self._async_ret.get_results(nologs=True)
1187 if self._async_ret.is_finished:
1188 self.stop_async_result()
1189 return True
1190 if self._async_result_status in [AsyncResultStatus.Finished]:
1191 return True
1192 return False
1193
1194 def is_subfinished(self, status=None):
1195 if status is None:
1196 status = self.get_status()
1197 if status in self.get_subfinished_status():
1198 self.stop_async_result()
1199 return True
1200 if self._async_ret:
1201 self._async_ret.get_results(nologs=True)
1202 if self._async_ret.is_subfinished:
1203 self.stop_async_result()
1204 return True
1205 if self._async_result_status in [AsyncResultStatus.SubFinished]:
1206 return True
1207 return False
1208
1209 def is_failed(self, status=None):
1210 if status is None:
1211 status = self.get_status()
1212 if status in self.get_failed_status():
1213 self.stop_async_result()
1214 return True
1215 if self._async_ret:
1216 self._async_ret.get_results(nologs=True)
1217 if self._async_ret.is_failed:
1218 self.stop_async_result()
1219 return True
1220 if self._async_result_status in [AsyncResultStatus.Failed]:
1221 return True
1222 return False
1223
1224 def get_func_name(self):
1225 func_name = self._func_name_and_args[0]
1226 return func_name
1227
1228 def get_multi_jobs_kwargs_list(self):
1229 multi_jobs_kwargs_list = self.multi_jobs_kwargs_list
1230 multi_jobs_kwargs_list = [pickle.loads(zlib.decompress(base64.b64decode(k))) for k in multi_jobs_kwargs_list]
1231 return multi_jobs_kwargs_list
1232
1233 def init_async_result(self):
1234 if not self._async_result_initialized:
1235 multi_jobs_kwargs_list = self.get_multi_jobs_kwargs_list()
1236 if multi_jobs_kwargs_list:
1237 self._async_ret = AsyncResult(self._context, name=self.get_func_name(), multi_jobs_kwargs_list=multi_jobs_kwargs_list,
1238 map_results=self.map_results, internal_id=self.internal_id)
1239 else:
1240 self._async_ret = AsyncResult(self._context, name=self.get_func_name(), wait_num=1, internal_id=self.internal_id,
1241 map_results=self.map_results)
1242
1243 self._async_result_initialized = True
1244 self._async_result_status = AsyncResultStatus.Running
1245 self._async_ret.subscribe()
1246
1247 def stop_async_result(self):
1248 if self._async_ret:
1249 self._async_ret.stop()
1250 self._results = self._async_ret.get_results()
1251 if self._async_ret.is_finished:
1252 self._async_result_status = AsyncResultStatus.Finished
1253 elif self._async_ret.is_subfinished:
1254 self._async_result_status = AsyncResultStatus.SubFinished
1255 elif self._async_ret.is_failed:
1256 self._async_result_status = AsyncResultStatus.Failed
1257 self._async_ret = None
1258
1259
1260 def wait_results(self):
1261 try:
1262 terminated_status = self.get_terminated_status()
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272 self.init_async_result()
1273
1274 status = self.get_status()
1275 time_last_check_status = time.time()
1276 self.logger.info("waiting for results")
1277 while status not in terminated_status:
1278
1279 ret = self._async_ret.wait_results(timeout=10)
1280 if ret:
1281 self.logger.info("Recevied result: %s" % str(ret))
1282 break
1283 if self._async_ret.waiting_result_terminated:
1284 self.logger.info("waiting_result_terminated is set, Received result is: %s" % str(ret))
1285 if time.time() - time_last_check_status > 600:
1286 status = self.get_status()
1287 time_last_check_status = time.time()
1288
1289 self._results = self._async_ret.wait_results(force_return_results=True)
1290 self.stop_async_result()
1291 return self._results
1292 except Exception as ex:
1293 self.logger.error("wait_results got some errors: %s" % str(ex))
1294 self.stop_async_result()
1295 return ex
1296
1297 def get_results(self):
1298 if self._async_ret:
1299 self._results = self._async_ret.get_results()
1300 return self._results
1301
1302 def setup(self):
1303 """
1304 :returns command: `str` to setup the workflow.
1305 """
1306 return self._context.setup()
1307
1308 def post_script(self):
1309 """
1310 Return the post script bash code to be appended after workflow execution.
1311 Override or set self._post_script in subclasses or instances as needed.
1312 """
1313 return self._context.post_script()
1314
1315 def get_clean_env(self):
1316 """
1317 :returns command: `str` to clean the workflow.
1318 """
1319 return self._context.get_clean_env()
1320
1321 def load_func(self, func_name):
1322 """
1323 Load the function from the source files.
1324
1325 :raise Exception
1326 """
1327 with modified_environ(IDDS_IGNORE_WORK_DECORATOR='true'):
1328 func = super(Work, self).load_func(func_name)
1329
1330 return func
1331
1332 def pre_run(self):
1333
1334 a_ret = None
1335 try:
1336 workflow_context = self._context
1337 if workflow_context.distributed:
1338 self.logger.info("Test AsyncResult")
1339 a_ret = AsyncResult(workflow_context, wait_num=1, timeout=30)
1340 ret = a_ret.is_ok()
1341 a_ret.stop()
1342 self.logger.info(f"pre_run asyncresult test is_ok: {ret}")
1343 return ret
1344 return True
1345 except Exception as ex:
1346 self.logger.error(f"pre_run failed with error: {ex}")
1347 self.logger.error(traceback.format_exc())
1348 if a_ret:
1349 a_ret.stop()
1350 return False
1351
1352 def run(self):
1353 self.logger.info("Start work run().")
1354 ret = None
1355 try:
1356 ret = self.run_local()
1357 except Exception as ex:
1358 self.logger.error(f"Failed to run function: {ex}")
1359 self.logger.error(traceback.format_exc())
1360 except:
1361 self.logger.error("Unknow error")
1362 self.logger.error(traceback.format_exc())
1363 self.logger.info(f"finish work run() with ret: {ret}")
1364 return ret
1365
1366 def run_local(self):
1367 """
1368 Run the work.
1369 """
1370 is_ok = self.pre_run()
1371 if not is_ok:
1372 self.logger.error(f"pre_run is_ok: {is_ok}, will exit.")
1373 raise Exception("work pre_run failed")
1374
1375 self._start_time = datetime.datetime.utcnow()
1376
1377 func_name, pre_kwargs, args, kwargs = self._func_name_and_args
1378 multi_jobs_kwargs_list = self.multi_jobs_kwargs_list
1379 current_job_kwargs = self._current_job_kwargs
1380
1381 if args:
1382 args = pickle.loads(zlib.decompress(base64.b64decode(args)))
1383 if pre_kwargs:
1384 pre_kwargs = pickle.loads(zlib.decompress(base64.b64decode(pre_kwargs)))
1385 if kwargs:
1386 kwargs = pickle.loads(zlib.decompress(base64.b64decode(kwargs)))
1387 if multi_jobs_kwargs_list:
1388 multi_jobs_kwargs_list = [pickle.loads(zlib.decompress(base64.b64decode(k))) for k in multi_jobs_kwargs_list]
1389 if self._current_job_kwargs:
1390 current_job_kwargs = pickle.loads(zlib.decompress(base64.b64decode(current_job_kwargs)))
1391
1392 if self._func is None:
1393 func = self.load_func(func_name)
1394 self._func = func
1395
1396 if self._context.distributed:
1397 args_copy = copy.deepcopy(args)
1398 pre_kwargs_copy = copy.deepcopy(pre_kwargs)
1399 kwargs_copy = copy.deepcopy(kwargs)
1400 if current_job_kwargs and type(current_job_kwargs) in [dict]:
1401 kwargs_copy.update(current_job_kwargs)
1402 elif current_job_kwargs and type(current_job_kwargs) in [tuple, list]:
1403 args_copy = copy.deepcopy(current_job_kwargs)
1404
1405 if self.inputs and self.input_map:
1406 new_kwargs = {self.input_map: ",".join[self.inputs]}
1407 kwargs_copy.update(new_kwargs)
1408 if self.inputs_group:
1409
1410
1411
1412
1413 new_kwargs = {k: v for k, v in self.inputs_group.items()}
1414 kwargs_copy.update(new_kwargs)
1415
1416 ret_status, ret_output, ret_err = self.run_func(self._func, pre_kwargs_copy, args_copy, kwargs_copy)
1417
1418 self._end_time = datetime.datetime.utcnow()
1419
1420 request_id = self._context.request_id
1421 transform_id = self._context.transform_id
1422 ret_log = f"(status: {ret_status}, return: {ret_output}, error: {ret_err})"
1423 self.logger.info(f"publishing AsyncResult to (request_id: {request_id}, transform_id: {transform_id}): {ret_log}")
1424 async_ret = AsyncResult(self._context, name=self.get_func_name(), internal_id=self.internal_id, current_job_kwargs=current_job_kwargs)
1425 async_ret.publish(
1426 ret_output,
1427 ret_status=ret_status,
1428 ret_error=ret_err,
1429 key=self.job_key,
1430 metrics={"start_time": self._start_time, "end_time": self._end_time}
1431 )
1432
1433 if not self.map_results:
1434 self._results = ret_output
1435 else:
1436 self._results = MapResult()
1437 self._results.add_result(name=self.get_func_name(), args=current_job_kwargs, result=ret_output, key=self.job_key)
1438 return ret_status
1439 else:
1440 if not multi_jobs_kwargs_list:
1441 ret_status, rets, ret_err = self.run_func(self._func, pre_kwargs, args, kwargs)
1442 self._end_time = datetime.datetime.utcnow()
1443 if not self.map_results:
1444 self._results = rets
1445 else:
1446 self._results = MapResult()
1447 self._results.add_result(name=self.get_func_name(), args=kwargs, result=rets, key=self.job_key)
1448 return ret_status
1449 else:
1450 if not self.map_results:
1451 self._results = []
1452 for one_job_kwargs in multi_jobs_kwargs_list:
1453 kwargs_copy = copy.deepcopy(kwargs)
1454 args_copy = copy.deepcopy(args)
1455 pre_kwargs_copy = copy.deepcopy(pre_kwargs)
1456 if type(one_job_kwargs) in [dict]:
1457 kwargs_copy.update(one_job_kwargs)
1458 elif type(one_job_kwargs) in [tuple, list]:
1459 args_copy = copy.deepcopy(one_job_kwargs)
1460
1461 ret_status, rets, ret_error = self.run_func(self._func, pre_kwargs_copy, args_copy, kwargs_copy)
1462 self._end_time = datetime.datetime.utcnow()
1463 self._results.append(rets)
1464 else:
1465 self._results = MapResult()
1466 for one_job_kwargs in multi_jobs_kwargs_list:
1467 kwargs_copy = copy.deepcopy(kwargs)
1468 args_copy = copy.deepcopy(args)
1469 pre_kwargs_copy = copy.deepcopy(pre_kwargs)
1470 if type(one_job_kwargs) in [dict]:
1471 kwargs_copy.update(one_job_kwargs)
1472 elif type(one_job_kwargs) in [tuple, list]:
1473 args_copy = copy.deepcopy(one_job_kwargs)
1474
1475 ret_status, rets, ret_error = self.run_func(self._func, pre_kwargs_copy, args_copy, kwargs_copy)
1476 self._end_time = datetime.datetime.utcnow()
1477 self._results.add_result(name=self.get_func_name(), args=one_job_kwargs, result=rets, key=self.job_key)
1478 return ret_status
1479
1480 def get_run_command(self):
1481 cmd = f"run_workflow --type work --name {self.name} --key {self.job_key} "
1482 cmd += "--context %s --original_args %s " % (encode_base64(json_dumps(self._context)),
1483 encode_base64(json_dumps(self._func_name_and_args)))
1484 cmd += '--current_job_kwargs "${IN/T}"'
1485 return cmd
1486
1487 def get_run_args_to_file_cmd(self):
1488 args = {'type': 'work',
1489 'name': self.name,
1490 'context': self._context,
1491 'original_args': self._func_name_and_args,
1492 'current_job_kwargs': '"${IN/T}"'}
1493 args_json = encode_base64(json_dumps(args))
1494 cmd = 'echo ' + args_json + ' > run_workflow_args; '
1495 return cmd
1496
1497 def get_run_command_test(self):
1498 cmd = "run_workflow.sh"
1499 return cmd
1500
1501 def get_runner(self):
1502 setup = self.setup()
1503 cmd = ""
1504
1505 run_command = self.get_run_command()
1506
1507 if setup:
1508 pre_setup, main_setup = self.split_setup(setup)
1509 pre_setup = encode_base64(json_dumps(pre_setup))
1510 main_setup = encode_base64(json_dumps(main_setup))
1511 if pre_setup:
1512 cmd = " --pre_setup " + pre_setup + " "
1513 cmd = cmd + " --setup " + main_setup + " "
1514 post_script = self.post_script
1515 if post_script:
1516 post_script = encode_base64(json_dumps(post_script))
1517 cmd = cmd + " --post_script " + post_script + " "
1518 if cmd:
1519 cmd = cmd + " " + run_command
1520 else:
1521 cmd = run_command
1522
1523 clean_env = self.get_clean_env()
1524 if clean_env:
1525
1526 cmd = cmd + "; ret=$?; " + clean_env + "; exit $ret"
1527
1528 return cmd
1529
1530
1531 def run_work_distributed(w):
1532 try:
1533 logger = logging.getLogger("run_work_distributed")
1534 tf_id = w.submit()
1535 if tf_id:
1536 logger.info("wait for results")
1537 rets = w.wait_results()
1538 logger.info("Got results: %s" % rets)
1539 return rets
1540 else:
1541 logger.error("Failed to distribute work: %s" % w.name)
1542 return None
1543 except Exception as ex:
1544 logger.error("Failed to run the work distributedly: %s" % ex)
1545 logger.error(traceback.format_exc())
1546 return None
1547
1548
1549
1550 def work(func=None, *, workflow=None, pre_kwargs={}, name=None, return_work=False, map_results=False, lazy=False, init_env=None, no_wraps=False,
1551 container_options=None, parent_workload_id=None, no_wait_parent=False, input_datasets=None, output_file_name=None,
1552 enable_separate_log=False, output_dataset_name=None, log_dataset_name=None, num_events=None, num_events_per_job=None,
1553 parent_transform_id=None, parent_internal_id=None, job_key=None, post_script=None):
1554 if func is None:
1555 return functools.partial(work, workflow=workflow, pre_kwargs=pre_kwargs, return_work=return_work, no_wraps=no_wraps,
1556 name=name, map_results=map_results, lazy=lazy, init_env=init_env, container_options=container_options,
1557 parent_workload_id=parent_workload_id, no_wait_parent=no_wait_parent, parent_transform_id=parent_transform_id,
1558 input_datasets=input_datasets, output_file_name=output_file_name, output_dataset_name=output_dataset_name,
1559 log_dataset_name=log_dataset_name, job_key=job_key,
1560 enable_separate_log=enable_separate_log, num_events=num_events, num_events_per_job=num_events_per_job,
1561 parent_internal_id=parent_internal_id, post_script=post_script)
1562
1563 if 'IDDS_IGNORE_WORK_DECORATOR' in os.environ:
1564 return func
1565
1566
1567 def wrapper(*args, **kwargs):
1568 try:
1569 logger = logging.getLogger("work_def")
1570 f = workflow or kwargs.pop('workflow', None) or WorkflowCanvas.get_current_workflow()
1571 workflow_context = f._context
1572 multi_jobs_kwargs_list = kwargs.pop('multi_jobs_kwargs_list', [])
1573 logger.debug("workflow context: %s" % workflow_context)
1574
1575 logger.debug("work decorator: func: %s, map_results: %s" % (func, map_results))
1576 if workflow_context:
1577 logger.debug("setup work")
1578 w = Work(workflow_context=workflow_context, func=func, pre_kwargs=pre_kwargs, args=args, kwargs=kwargs,
1579 name=name, multi_jobs_kwargs_list=multi_jobs_kwargs_list, map_results=map_results, init_env=init_env,
1580 container_options=container_options, parent_workload_id=parent_workload_id, no_wait_parent=no_wait_parent,
1581 parent_transform_id=parent_transform_id, input_datasets=input_datasets, output_file_name=output_file_name,
1582 output_dataset_name=output_dataset_name, num_events=num_events, num_events_per_job=num_events_per_job,
1583 parent_internal_id=parent_internal_id, enable_separate_log=enable_separate_log, log_dataset_name=log_dataset_name,
1584 job_key=job_key, post_script=post_script)
1585
1586
1587 if return_work:
1588 return w
1589
1590 if workflow_context.distributed:
1591 ret = run_work_distributed(w)
1592 return ret
1593
1594 return w.run()
1595 else:
1596 logger.info("workflow context is not defined, run function locally")
1597 if not multi_jobs_kwargs_list:
1598 kwargs_copy = copy.deepcopy(pre_kwargs)
1599 kwargs_copy.update(kwargs)
1600 return func(*args, **kwargs_copy)
1601
1602 if not kwargs:
1603 kwargs = {}
1604 if not map_results:
1605 rets = []
1606 for one_job_kwargs in multi_jobs_kwargs_list:
1607 kwargs_copy = copy.deepcopy(kwargs)
1608 args_copy = copy.deepcopy(args)
1609 pre_kwargs_copy = copy.deepcopy(pre_kwargs)
1610 if type(one_job_kwargs) in [dict]:
1611 kwargs_copy.update(one_job_kwargs)
1612 elif type(one_job_kwargs) in [tuple, list]:
1613 args_copy = copy.deepcopy(one_job_kwargs)
1614
1615 pre_kwargs_copy.update(kwargs_copy)
1616
1617 ret = func(*args_copy, **pre_kwargs_copy)
1618 rets.append(ret)
1619 return rets
1620 else:
1621 rets = MapResult()
1622 for one_job_kwargs in multi_jobs_kwargs_list:
1623 kwargs_copy = copy.deepcopy(kwargs)
1624 args_copy = copy.deepcopy(args)
1625 pre_kwargs_copy = copy.deepcopy(pre_kwargs)
1626 if type(one_job_kwargs) in [dict]:
1627 kwargs_copy.update(one_job_kwargs)
1628 elif type(one_job_kwargs) in [tuple, list]:
1629 args_copy = copy.deepcopy(one_job_kwargs)
1630
1631 pre_kwargs_copy.update(kwargs_copy)
1632
1633 ret = func(*args_copy, **pre_kwargs_copy)
1634 rets.add_result(name=get_func_name(func), args=one_job_kwargs, result=ret, key=job_key)
1635 return rets
1636 except Exception as ex:
1637 logger.error("Failed to run workflow %s: %s" % (func, ex))
1638 raise ex
1639 except:
1640 raise
1641 if no_wraps:
1642 return wrapper
1643 else:
1644 return functools.wraps(func)(wrapper)