Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:32

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2023 - 2025
0010 # - Lino Oscar Gerlach, <lino.oscar.gerlach@cern.ch>, 2024
0011 
0012 import base64
0013 import copy
0014 import datetime
0015 import functools
0016 import json
0017 import logging
0018 import os
0019 import pickle
0020 import time
0021 import traceback
0022 import zlib
0023 
0024 from idds.common import exceptions
0025 from idds.common.constants import WorkflowType, TransformStatus, AsyncResultStatus
0026 from idds.common.imports import get_func_name
0027 from idds.common.utils import setup_logging, json_dumps, json_loads, encode_base64, modified_environ, is_panda_client_verbose
0028 from .asyncresult import AsyncResult, MapResult
0029 from .base import Base, Context
0030 from .workflow import WorkflowCanvas
0031 
0032 setup_logging(__name__)
0033 
0034 
0035 class WorkContext(Context):
0036 
0037     def __init__(self, name=None, workflow_context=None, source_dir=None, init_env=None, container_options=None, post_script=None):
0038         super(WorkContext, self).__init__()
0039         self._workflow_context = workflow_context
0040         self._transform_id = None
0041         self._processing_id = None
0042         self._workflow_type = WorkflowType.iWork
0043 
0044         self._name = name
0045         self._site = None
0046         self._queue = None
0047 
0048         self._priority = 500
0049         self._core_count = 1
0050         self._total_memory = None        # MB
0051         self._max_walltime = 7 * 24 * 3600
0052         self._max_attempt = 5
0053 
0054         self._map_results = False
0055 
0056         self.init_env = init_env
0057         self.container_options = container_options
0058         self._post_script = post_script
0059 
0060         self._workload_id = None
0061         self._parent_workload_id = None
0062         self._no_wait_parent = None
0063 
0064         self._other_attributes = {}
0065         self.logger = logging.getLogger(self.__class__.__name__)
0066 
0067         self._start_time = None
0068         self._end_time = None
0069 
0070     def get_service(self):
0071         return self._workflow_context.service
0072 
0073     @property
0074     def distributed(self):
0075         return self._workflow_context.distributed
0076 
0077     @distributed.setter
0078     def distributed(self, value):
0079         self._workflow_context.distributed = value
0080 
0081     @property
0082     def service(self):
0083         return self._workflow_context.service
0084 
0085     @service.setter
0086     def service(self, value):
0087         self._workflow_context.service = value
0088 
0089     @property
0090     def vo(self):
0091         return self._workflow_context.vo
0092 
0093     @vo.setter
0094     def vo(self, value):
0095         self._workflow_context.vo = value
0096 
0097     @property
0098     def site(self):
0099         if self._site:
0100             return self._site
0101         return self._workflow_context.site
0102 
0103     @site.setter
0104     def site(self, value):
0105         self._site = value
0106 
0107     @property
0108     def queue(self):
0109         if self._queue:
0110             return self._queue
0111         return self._workflow_context.queue
0112 
0113     @queue.setter
0114     def queue(self, value):
0115         self._queue = value
0116 
0117     @property
0118     def cloud(self):
0119         return self._workflow_context.cloud
0120 
0121     @cloud.setter
0122     def cloud(self, value):
0123         self._workflow_context.cloud = value
0124 
0125     @property
0126     def working_group(self):
0127         return self._workflow_context.working_group
0128 
0129     @working_group.setter
0130     def working_group(self, value):
0131         self._workflow_context.working_group = value
0132 
0133     @property
0134     def task_type(self):
0135         return self._workflow_context.task_type
0136 
0137     @task_type.setter
0138     def task_type(self, value):
0139         self._workflow_context.task_type = value
0140 
0141     @property
0142     def processing_type(self):
0143         return self._workflow_context.processing_type
0144 
0145     @processing_type.setter
0146     def processing_type(self, value):
0147         self._workflow_context.processing_type = value
0148 
0149     @property
0150     def priority(self):
0151         if self._priority:
0152             return self._priority
0153         return self._workflow_context.priority
0154 
0155     @priority.setter
0156     def priority(self, value):
0157         self._priority = value
0158 
0159     @property
0160     def core_count(self):
0161         if self._core_count:
0162             return self._core_count
0163         return self._workflow_context.core_count
0164 
0165     @core_count.setter
0166     def core_count(self, value):
0167         self._core_count = value
0168 
0169     @property
0170     def total_memory(self):
0171         if self._total_memory:
0172             return self._total_memory
0173         return self._workflow_context.total_memory
0174 
0175     @total_memory.setter
0176     def total_memory(self, value):
0177         self._total_memory = value
0178 
0179     @property
0180     def max_walltime(self):
0181         if self._max_walltime:
0182             return self._max_walltime
0183         return self._workflow_context.max_walltime
0184 
0185     @max_walltime.setter
0186     def max_walltime(self, value):
0187         self._max_walltime = value
0188 
0189     @property
0190     def max_attempt(self):
0191         if self._max_attempt:
0192             return self._max_attempt
0193         return self._workflow_context.max_attempt
0194 
0195     @max_attempt.setter
0196     def max_attempt(self, value):
0197         self._max_attempt = value
0198 
0199     @property
0200     def username(self):
0201         return self._workflow_context.username
0202 
0203     @username.setter
0204     def username(self, value):
0205         self._workflow_context.username = value
0206 
0207     @property
0208     def userdn(self):
0209         return self._workflow_context.userdn
0210 
0211     @userdn.setter
0212     def userdn(self, value):
0213         self._workflow_context.userdn = value
0214 
0215     @property
0216     def workflow_type(self):
0217         return self._workflow_type
0218 
0219     @workflow_type.setter
0220     def workflow_type(self, value):
0221         self._workflow_type = value
0222 
0223     @property
0224     def lifetime(self):
0225         return self._workflow_context.lifetime
0226 
0227     @lifetime.setter
0228     def lifetime(self, value):
0229         self._workflow_context.lifetime = value
0230 
0231     @property
0232     def request_id(self):
0233         return self._workflow_context.request_id
0234 
0235     @request_id.setter
0236     def request_id(self, value):
0237         self._workflow_context.request_id = value
0238 
0239     @property
0240     def workload_id(self):
0241         return self._workload_id
0242 
0243     @workload_id.setter
0244     def workload_id(self, value):
0245         self._workload_id = value
0246 
0247     @property
0248     def parent_workload_id(self):
0249         return self._parent_workload_id
0250 
0251     @parent_workload_id.setter
0252     def parent_workload_id(self, value):
0253         self._parent_workload_id = value
0254 
0255     @property
0256     def no_wait_parent(self):
0257         return self._no_wait_parent
0258 
0259     @no_wait_parent.setter
0260     def no_wait_parent(self, value):
0261         self._no_wait_parent = value
0262 
0263     @property
0264     def other_attributes(self):
0265         return self._other_attributes
0266 
0267     @other_attributes.setter
0268     def other_attributes(self, value):
0269         self._other_attributes = value
0270 
0271     @property
0272     def transform_id(self):
0273         return self._transform_id
0274 
0275     @transform_id.setter
0276     def transform_id(self, value):
0277         self._transform_id = int(value)
0278 
0279     @property
0280     def processing_id(self):
0281         return self._processing_id
0282 
0283     @processing_id.setter
0284     def processing_id(self, value):
0285         self._processing_id = value
0286 
0287     @property
0288     def enable_separate_log(self):
0289         if self._workflow_context:
0290             return self._workflow_context.enable_separate_log
0291 
0292     @enable_separate_log.setter
0293     def enable_separate_log(self, value):
0294         if self._workflow_context:
0295             self._workflow_context.enable_separate_log = value
0296 
0297     @property
0298     def brokers(self):
0299         return self._workflow_context.brokers
0300 
0301     @brokers.setter
0302     def brokers(self, value):
0303         self._workflow_context.brokers = value
0304 
0305     @property
0306     def broker_timeout(self):
0307         return self._workflow_context.broker_timeout
0308 
0309     @broker_timeout.setter
0310     def broker_timeout(self, value):
0311         self._workflow_context.broker_timeout = value
0312 
0313     @property
0314     def broker_username(self):
0315         return self._workflow_context.broker_username
0316 
0317     @broker_username.setter
0318     def broker_username(self, value):
0319         self._workflow_context.broker_username = value
0320 
0321     @property
0322     def broker_password(self):
0323         return self._workflow_context.broker_password
0324 
0325     @broker_password.setter
0326     def broker_password(self, value):
0327         self._workflow_context.broker_password = value
0328 
0329     @property
0330     def broker_destination(self):
0331         return self._workflow_context.broker_destination
0332 
0333     @broker_destination.setter
0334     def broker_destination(self, value):
0335         self._workflow_context.broker_destination = value
0336 
0337     def get_source_dir(self):
0338         if self._workflow_context:
0339             return self._workflow_context.get_source_dir()
0340         return None
0341 
0342     @property
0343     def token(self):
0344         return self._workflow_context.token
0345 
0346     @token.setter
0347     def token(self, value):
0348         self._workflow_context.token = value
0349 
0350     @property
0351     def map_results(self):
0352         return self._map_results
0353 
0354     @map_results.setter
0355     def map_results(self, value):
0356         self._map_results = value
0357 
0358     @property
0359     def init_env(self):
0360         return self._init_env
0361 
0362     @init_env.setter
0363     def init_env(self, value):
0364         self._init_env = value
0365         if self._init_env:
0366             self._init_env = self._init_env + " "
0367 
0368     @property
0369     def container_options(self):
0370         if self._container_options:
0371             return self._container_options
0372         return self._workflow_context.container_options
0373 
0374     @container_options.setter
0375     def container_options(self, value):
0376         self._container_options = value
0377 
0378     @property
0379     def panda_env(self):
0380         return self._workflow_context.panda_env
0381 
0382     @panda_env.setter
0383     def panda_env(self, value):
0384         self._workflow_context.panda_env = value
0385 
0386     @property
0387     def idds_env(self):
0388         return self._workflow_context.idds_env
0389 
0390     @idds_env.setter
0391     def idds_env(self, value):
0392         self._workflow_context.idds_env = value
0393 
0394     def get_panda_idds_env(self):
0395         idds_env = self.idds_env
0396         panda_env = self.panda_env
0397 
0398         ret_env = {}
0399         env_list = ['IDDS_HOST', 'IDDS_AUTH_NO_VERIFY']
0400         for env in env_list:
0401             if env in idds_env and idds_env[env] is not None:
0402                 ret_env[env] = idds_env[env]
0403 
0404         # env_list = ['PANDA_CONFIG_ROOT', 'PANDA_URL_SSL', 'PANDA_URL', 'PANDACACHE_URL', 'PANDAMON_URL',
0405         #             'PANDA_AUTH', 'PANDA_VERIFY_HOST', 'PANDA_AUTH_VO', 'PANDA_BEHIND_REAL_LB', 'PANDA_CLIENT_VERBOSE']
0406         env_list = ['PANDA_URL_SSL', 'PANDA_URL', 'PANDACACHE_URL', 'PANDAMON_URL',
0407                     'PANDA_VERIFY_HOST', 'PANDA_BEHIND_REAL_LB']
0408         for env in env_list:
0409             if env in panda_env and panda_env[env] is not None:
0410                 ret_env[env] = panda_env[env]
0411 
0412         return ret_env
0413 
0414     def get_idds_server(self):
0415         return self._workflow_context.get_idds_server()
0416 
0417     def init_brokers(self):
0418         return self._workflow_context.init_brokers()
0419 
0420     def initialize(self):
0421         return self._workflow_context.initialize()
0422 
0423     def setup_source_files(self):
0424         """
0425         Setup source files.
0426         """
0427         return self._workflow_context.setup_source_files()
0428 
0429     def setup(self):
0430         """
0431         :returns command: `str` to setup the workflow.
0432         """
0433         if not self.init_env:
0434             return self._workflow_context.setup()
0435 
0436         global_set_up = self._workflow_context.global_setup()
0437         init_env = self.init_env
0438         ret = None
0439         if global_set_up:
0440             ret = global_set_up
0441         if init_env:
0442             if ret:
0443                 ret = ret + "; " + init_env
0444             else:
0445                 ret = init_env
0446         return ret
0447 
0448     @property
0449     def post_script(self):
0450         """
0451         Return the post script bash code to be appended after workflow execution.
0452         Override or set self._post_script in subclasses or instances as needed.
0453         """
0454         post_script = self._post_script
0455         if post_script:
0456             return post_script
0457         return self._workflow_context.post_script
0458 
0459     @post_script.setter
0460     def post_script(self, value):
0461         self._post_script = value
0462 
0463     def get_clean_env(self):
0464         return self._workflow_context.get_clean_env()
0465 
0466 
0467 class Work(Base):
0468 
0469     def __init__(self, func=None, workflow_context=None, context=None, pre_kwargs=None, args=None, kwargs=None, multi_jobs_kwargs_list=None,
0470                  current_job_kwargs=None, map_results=False, source_dir=None, init_env=None, is_unique_func_name=False, name=None,
0471                  parent_workload_id=None, no_wait_parent=False, container_options=None, input_datasets=None, output_file_name=None,
0472                  output_dataset_name=None, num_events=None, num_events_per_job=None, parent_transform_id=None, parent_internal_id=None,
0473                  log_dataset_name=None, inputs=None, input_map=None, inputs_group=None, enable_separate_log=False, job_key=None,
0474                  json_load=False, post_script=None):
0475         """
0476         Init a workflow.
0477         """
0478         super(Work, self).__init__()
0479         self.prepared = False
0480         self.logger = logging.getLogger(self.__class__.__name__)
0481 
0482         self._current_job_kwargs = current_job_kwargs
0483         if self._current_job_kwargs:
0484             self._current_job_kwargs = base64.b64encode(zlib.compress(pickle.dumps(self._current_job_kwargs))).decode("utf-8")
0485 
0486         if name:
0487             self._name = name
0488         else:
0489             self._name = func.__name__ if func else None
0490             if self._name:
0491                 self._name = self._name.replace('__main__:', '').replace('.py', '').replace(':', '.')
0492                 self._name = self._name.replace("/", "_").replace(".", "_").replace(":", "_")
0493             if not is_unique_func_name:
0494                 if self._name:
0495                     self._name = self._name + "_" + datetime.datetime.utcnow().strftime("%Y_%m_%d_%H_%M_%S")
0496 
0497         if context:
0498             self._context = context
0499         else:
0500             self._context = WorkContext(name=self._name, workflow_context=workflow_context, init_env=init_env, container_options=container_options, post_script=post_script)
0501 
0502         # self._func = func
0503         self._func, self._func_name_and_args, self._multi_jobs_kwargs_list = self.get_func_name_and_args(
0504             func=func,
0505             pre_kwargs=pre_kwargs,
0506             args=args,
0507             kwargs=kwargs,
0508             base_dir=self._context.get_source_dir(),
0509             multi_jobs_kwargs_list=multi_jobs_kwargs_list
0510         )
0511         self._func = None
0512         if not json_load:
0513             self.logger.info(f"func: {self._func}, func_name_and_args: {self._func_name_and_args}, multi_jobs_kwargs_list: {self._multi_jobs_kwargs_list}")
0514 
0515         self._async_ret = None
0516 
0517         self.enable_separate_log = enable_separate_log
0518         self.map_results = map_results
0519         self._results = None
0520         self._async_result_initialized = False
0521         self._async_result_status = None
0522 
0523         self.parent_workload_id = parent_workload_id
0524         self.no_wait_parent = no_wait_parent
0525 
0526         self.other_attributes = {'input_datasets': input_datasets,
0527                                  'output_file_name': output_file_name,
0528                                  'output_dataset_name': output_dataset_name,
0529                                  'log_dataset_name': log_dataset_name,
0530                                  'num_events': num_events,
0531                                  'num_events_per_job': num_events_per_job,
0532                                  'parent_transform_id': parent_transform_id,
0533                                  'parent_internal_id': parent_internal_id,
0534                                  'job_key': job_key}
0535         self.inputs = inputs
0536         self.input_map = input_map
0537         self.inputs_group = inputs_group
0538 
0539         self.num_checks = 0
0540 
0541     @property
0542     def internal_id(self):
0543         return self._context.internal_id
0544 
0545     @internal_id.setter
0546     def internal_id(self, value):
0547         self._context.internal_id = value
0548 
0549     @property
0550     def service(self):
0551         return self._context.service
0552 
0553     @service.setter
0554     def service(self, value):
0555         self._context.service = value
0556 
0557     @property
0558     def name(self):
0559         return self._name
0560 
0561     @name.setter
0562     def name(self, value):
0563         self._name = value
0564 
0565     @property
0566     def request_id(self):
0567         return self._context.request_id
0568 
0569     @request_id.setter
0570     def request_id(self, value):
0571         self._context.request_id = value
0572 
0573     @property
0574     def transform_id(self):
0575         return self._context.transform_id
0576 
0577     @transform_id.setter
0578     def transform_id(self, value):
0579         self._context.transform_id = int(value)
0580 
0581     @property
0582     def processing_id(self):
0583         return self._context.processing_id
0584 
0585     @processing_id.setter
0586     def processing_id(self, value):
0587         self._context.processing_id = value
0588 
0589     @property
0590     def vo(self):
0591         return self._context.vo
0592 
0593     @vo.setter
0594     def vo(self, value):
0595         self._context.vo = value
0596 
0597     @property
0598     def queue(self):
0599         return self._context.queue
0600 
0601     @queue.setter
0602     def queue(self, value):
0603         self._context.queue = value
0604 
0605     @property
0606     def site(self):
0607         return self._context.site
0608 
0609     @site.setter
0610     def site(self, value):
0611         self._context.site = value
0612 
0613     def get_site(self):
0614         return self.site
0615 
0616     @property
0617     def cloud(self):
0618         return self._context.cloud
0619 
0620     @cloud.setter
0621     def cloud(self, value):
0622         self._context.cloud = value
0623 
0624     @property
0625     def working_group(self):
0626         return self._context.working_group
0627 
0628     @working_group.setter
0629     def working_group(self, value):
0630         self._context.working_group = value
0631 
0632     @property
0633     def priority(self):
0634         return self._context.priority
0635 
0636     @priority.setter
0637     def priority(self, value):
0638         self._context.priority = value
0639 
0640     @property
0641     def core_count(self):
0642         return self._context.core_count
0643 
0644     @core_count.setter
0645     def core_count(self, value):
0646         self._context.core_count = value
0647 
0648     @property
0649     def total_memory(self):
0650         return self._context.total_memory
0651 
0652     @total_memory.setter
0653     def total_memory(self, value):
0654         self._context.total_memory = value
0655 
0656     @property
0657     def max_walltime(self):
0658         return self._context.max_walltime
0659 
0660     @max_walltime.setter
0661     def max_walltime(self, value):
0662         self._context.max_walltime = value
0663 
0664     @property
0665     def max_attempt(self):
0666         return self._context.max_attempt
0667 
0668     @max_attempt.setter
0669     def max_attempt(self, value):
0670         self._context.max_attempt = value
0671 
0672     @property
0673     def username(self):
0674         return self._context.username
0675 
0676     @username.setter
0677     def username(self, value):
0678         self._context.username = value
0679 
0680     @property
0681     def userdn(self):
0682         return self._context.userdn
0683 
0684     @userdn.setter
0685     def userdn(self, value):
0686         self._context.userdn = value
0687 
0688     @property
0689     def workflow_type(self):
0690         return self._context.workflow_type
0691 
0692     @workflow_type.setter
0693     def workflow_type(self, value):
0694         self._context.workflow_type = value
0695 
0696     @property
0697     def task_type(self):
0698         return self._context.task_type
0699 
0700     @task_type.setter
0701     def task_type(self, value):
0702         self._context.task_type = value
0703 
0704     @property
0705     def processing_type(self):
0706         return self._context.processing_type
0707 
0708     @processing_type.setter
0709     def processing_type(self, value):
0710         self._context.processing_type = value
0711 
0712     @property
0713     def map_results(self):
0714         return self._context.map_results
0715 
0716     @map_results.setter
0717     def map_results(self, value):
0718         self._context.map_results = value
0719 
0720     @property
0721     def lifetime(self):
0722         return self._context.lifetime
0723 
0724     @lifetime.setter
0725     def lifetime(self, value):
0726         self._context.lifetime = value
0727 
0728     @property
0729     def workload_id(self):
0730         return self._context.workload_id
0731 
0732     @workload_id.setter
0733     def workload_id(self, value):
0734         self._context.workload_id = value
0735 
0736     def get_workload_id(self):
0737         return self.workload_id
0738 
0739     @property
0740     def parent_workload_id(self):
0741         return self._context.parent_workload_id
0742 
0743     @parent_workload_id.setter
0744     def parent_workload_id(self, value):
0745         self._context.parent_workload_id = value
0746 
0747     def get_parent_workload_id(self):
0748         return self.parent_workload_id
0749 
0750     @property
0751     def no_wait_parent(self):
0752         return self._context.no_wait_parent
0753 
0754     @no_wait_parent.setter
0755     def no_wait_parent(self, value):
0756         self._context.no_wait_parent = value
0757 
0758     @property
0759     def enable_separate_log(self):
0760         return self._context.enable_separate_log
0761 
0762     @enable_separate_log.setter
0763     def enable_separate_log(self, value):
0764         self._context.enable_separate_log = value
0765 
0766     @property
0767     def container_options(self):
0768         return self._context.container_options
0769 
0770     @container_options.setter
0771     def container_options(self, value):
0772         self._context.container_options = value
0773 
0774     @property
0775     def other_attributes(self):
0776         return self._context.other_attributes
0777 
0778     @other_attributes.setter
0779     def other_attributes(self, value):
0780         self._context.other_attributes = value
0781 
0782     @property
0783     def token(self):
0784         return self._context.token
0785 
0786     @token.setter
0787     def token(self, value):
0788         self._context.token = value
0789 
0790     @property
0791     def multi_jobs_kwargs_list(self):
0792         return self._multi_jobs_kwargs_list
0793 
0794     @multi_jobs_kwargs_list.setter
0795     def multi_jobs_kwargs_list(self, value):
0796         raise Exception("Not allwed to update multi_jobs_kwargs_list")
0797 
0798     def get_work_tag(self):
0799         return self._context.workflow_type.name
0800 
0801     def get_work_type(self):
0802         return self._context.workflow_type.name
0803 
0804     def get_work_name(self):
0805         return self._name
0806 
0807     def add_other_attributes(self, other_attributes):
0808         for k, v in other_attributes.items():
0809             self._other_attributes[k] = v
0810         self.other_attributes = self._other_attributes
0811 
0812     def get_parent_transform_id(self):
0813         if not self.other_attributes:
0814             return None
0815         return self.other_attributes.get("parent_transform_id", None)
0816 
0817     def set_other_attribute(self, name, value):
0818         if self.other_attributes is None:
0819             self.other_attributes = {}
0820         self.other_attributes[name] = value
0821 
0822     def get_other_attribute(self, name):
0823         if not self.other_attributes:
0824             return None
0825         return self.other_attributes.get(name, None)
0826 
0827     @property
0828     def job_key(self):
0829         return self.get_other_attribute('job_key')
0830 
0831     @job_key.setter
0832     def job_key(self, value):
0833         self.set_other_attribute('job_key', value)
0834 
0835     @property
0836     def parent_internal_id(self):
0837         return self.get_other_attribute('parent_internal_id')
0838 
0839     @parent_internal_id.setter
0840     def parent_internal_id(self, value):
0841         self.set_other_attribute('parent_internal_id', value)
0842 
0843     def get_parent_internal_id(self):
0844         return self.get_other_attribute('parent_internal_id')
0845 
0846     @property
0847     def input_datasets(self):
0848         return self.get_other_attribute('input_datasets')
0849 
0850     @input_datasets.setter
0851     def input_datasets(self, value):
0852         self.set_other_attribute('input_datasets', value)
0853 
0854     @property
0855     def output_file_name(self):
0856         return self.get_other_attribute('output_file_name')
0857 
0858     @output_file_name.setter
0859     def output_file_name(self, value):
0860         self.set_other_attribute('output_file_name', value)
0861 
0862     @property
0863     def output_dataset_name(self):
0864         return self.get_other_attribute('output_dataset_name')
0865 
0866     @output_dataset_name.setter
0867     def output_dataset_name(self, value):
0868         self.set_other_attribute('output_dataset_name', value)
0869 
0870     @property
0871     def log_dataset_name(self):
0872         return self.get_other_attribute('log_dataset_name')
0873 
0874     @log_dataset_name.setter
0875     def log_dataset_name(self, value):
0876         self.set_other_attribute('log_dataset_name', value)
0877 
0878     @property
0879     def num_events(self):
0880         return self.get_other_attribute('num_events')
0881 
0882     @num_events.setter
0883     def num_events(self, value):
0884         self.set_other_attribute('num_events', value)
0885 
0886     @property
0887     def num_events_per_job(self):
0888         return self.get_other_attribute('num_events_per_job')
0889 
0890     @num_events_per_job.setter
0891     def num_events_per_job(self, value):
0892         self.set_other_attribute('num_events_per_job', value)
0893 
0894     @property
0895     def parent_transform_id(self):
0896         return self.get_other_attribute('parent_transform_id')
0897 
0898     @parent_transform_id.setter
0899     def parent_transform_id(self, value):
0900         self.set_other_attribute('parent_transform_id', value)
0901 
0902     def to_dict(self):
0903         func = self._func
0904         self._func = None
0905         obj = super(Work, self).to_dict()
0906         self._func = func
0907         return obj
0908 
0909     def store(self):
0910         if self._context:
0911             content = {'type': 'work',
0912                        'name': self.name,
0913                        'context': self._context,
0914                        'original_args': self._func_name_and_args,
0915                        'multi_jobs_kwargs_list': self._multi_jobs_kwargs_list,
0916                        'current_job_kwargs': self._current_job_kwargs}
0917             content = json_dumps(content)
0918             source_dir = self._context.get_source_dir()
0919             self.save_context(source_dir, self._name, content)
0920 
0921     def load(self, source_dir=None):
0922         if not source_dir:
0923             source_dir = self._context.get_source_dir()
0924             if not source_dir:
0925                 source_dir = os.getcwd()
0926         ret = self.load_context(source_dir, self._name)
0927         if ret:
0928             ret = json_loads(ret)
0929             self.logger.info(f"Loaded context: {ret}")
0930             if 'multi_jobs_kwargs_list' in ret:
0931                 self._multi_jobs_kwargs_list = ret['multi_jobs_kwargs_list']
0932 
0933     def submit_to_idds_server(self):
0934         """
0935         Submit the workflow to the iDDS server.
0936 
0937         :returns id: The workflow id.
0938         :raise Exception when failing to submit the workflow.
0939         """
0940         # iDDS ClientManager
0941         from idds.client.clientmanager import ClientManager
0942         client = ClientManager(host=self._context.get_idds_server(), timeout=60)
0943         request_id = self._context.request_id
0944         transform_id = client.submit_work(request_id, self, use_dataset_name=False)
0945         self.logger.info("Submitted into iDDS with transform id=%s", str(transform_id))
0946         return transform_id
0947 
0948     def submit_to_panda_server(self):
0949         """
0950         Submit the workflow to the iDDS server through PanDA service.
0951 
0952         :returns id: The workflow id.
0953         :raise Exception when failing to submit the workflow.
0954         """
0955         import idds.common.utils as idds_utils
0956         import pandaclient.idds_api as idds_api
0957         idds_server = self._context.get_idds_server()
0958         request_id = self._context.request_id
0959         client = idds_api.get_api(idds_utils.json_dumps,
0960                                   idds_host=idds_server,
0961                                   compress=True,
0962                                   verbose=is_panda_client_verbose(),
0963                                   manager=True)
0964         ret = client.submit_work(request_id, self, use_dataset_name=False)
0965         if ret[0] == 0 and ret[1][0]:
0966             transform_id = ret[1][1]
0967         else:
0968             transform_id = None
0969             self.logger.error("Failed to submit work to PanDA-iDDS with error: %s" % str(ret))
0970 
0971         self.logger.info("Submitted work into PanDA-iDDS with transform id=%s", str(transform_id))
0972         return transform_id
0973 
0974     def submit(self):
0975         """
0976         Submit the workflow to the iDDS server.
0977 
0978         :returns id: The workflow id.
0979         :raise Exception when failing to submit the workflow.
0980         """
0981         try:
0982             # self._func = None
0983             if self._context.get_service() == 'panda':
0984                 tf_id = self.submit_to_panda_server()
0985             else:
0986                 tf_id = self.submit_to_idds_server()
0987         except Exception as ex:
0988             self.logger.error("Failed to submit work: %s" % str(ex))
0989 
0990         try:
0991             self._context.transform_id = int(tf_id)
0992             return tf_id
0993         except Exception as ex:
0994             self.logger.error("Transform id (%s) is not integer, there should be some submission errors: %s" % (tf_id, str(ex)))
0995 
0996         return None
0997 
0998     def get_status_from_panda_server(self):
0999         import idds.common.utils as idds_utils
1000         import pandaclient.idds_api as idds_api
1001 
1002         idds_server = self._context.get_idds_server()
1003         client = idds_api.get_api(idds_utils.json_dumps,
1004                                   idds_host=idds_server,
1005                                   compress=True,
1006                                   verbose=is_panda_client_verbose(),
1007                                   manager=True)
1008 
1009         request_id = self._context.request_id
1010         transform_id = self._context.transform_id
1011         if not transform_id:
1012             log_msg = f"No transform id defined (request_id: {request_id}, transform_id: {transform_id}, internal_id: {self.internal_id})"
1013             self.logger.error(log_msg)
1014             return exceptions.IDDSException(log_msg)
1015 
1016         ret = client.get_transform(request_id=request_id, transform_id=transform_id)
1017         if ret[0] == 0 and ret[1][0]:
1018             tf = ret[1][1]
1019             if type(tf) in [dict]:
1020                 tf = json_loads(json.dumps(tf))
1021             elif type(tf) in [str]:
1022                 try:
1023                     tf = json_loads(tf)
1024                 except Exception as ex:
1025                     self.logger.warn(f"Failed to json loads transform({tf}): {ex}")
1026         else:
1027             tf = None
1028             self.logger.error(f"Failed to get transform (request_id: {request_id}, transform_id: {transform_id}, internal_id: {self.internal_id}) status from PanDA-iDDS: {ret}")
1029             return TransformStatus.Transforming
1030 
1031         if not tf:
1032             self.logger.info(f"Get transform (request_id: {request_id}, transform_id: {transform_id}, internal_id: {self.internal_id}) from PanDA-iDDS: {tf}")
1033             return None
1034 
1035         if self.num_checks % 60 == 0:
1036             if type(tf) in [dict] and "status" in tf:
1037                 self.logger.info(f"Get transform status (request_id: {request_id}, transform_id: {transform_id}, internal_id: {self.internal_id}) from PanDA-iDDS: {tf['status']}")
1038             else:
1039                 self.logger.info(f"Get transform status (request_id: {request_id}, transform_id: {transform_id}, internal_id: {self.internal_id}) from PanDA-iDDS: {tf}")
1040         self.num_checks += 1
1041 
1042         if type(tf) in [dict] and 'status' in tf:
1043             return tf['status']
1044         else:
1045             self.logger.info(f"Get transform status (request_id: {request_id}, transform_id: {transform_id}, internal_id: {self.internal_id}) from PanDA-iDDS: {tf}")
1046             raise Exception(f"Wrong transfrom status: {tf}")
1047 
1048     def get_status_from_idds_server(self):
1049         from idds.client.clientmanager import ClientManager
1050         client = ClientManager(host=self._context.get_idds_server(), timeout=60)
1051 
1052         request_id = self._context.request_id
1053         transform_id = self._context.transform_id
1054         if not transform_id:
1055             log_msg = f"No transform id defined (request_id: {request_id}, transform_id: {transform_id}, internal_id: {self.internal_id})"
1056             self.logger.error(log_msg)
1057             return exceptions.IDDSException(log_msg)
1058 
1059         tf = client.get_transform(request_id=request_id, transform_id=transform_id)
1060         if not tf:
1061             self.logger.info(f"Get transform (request_id: {request_id}, transform_id: {transform_id}, internal_id: {self.internal_id}) from iDDS: {tf}")
1062             return None
1063 
1064         self.logger.info(f"Get transform status (request_id: {request_id}, transform_id: {transform_id}, internal_id: {self.internal_id}) from iDDS: {tf['status']}")
1065         return tf['status']
1066 
1067     def get_status(self):
1068         if self._context.get_service() == 'panda':
1069             return self.get_status_from_panda_server()
1070         return self.get_status_from_idds_server()
1071 
1072         try:
1073             if self._context.get_service() == 'panda':
1074                 return self.get_status_from_panda_server()
1075             return self.get_status_from_idds_server()
1076         except Exception as ex:
1077             self.logger.info("Failed to get transform status: %s" % str(ex))
1078 
1079     def cancel_from_panda_server(self):
1080         import idds.common.utils as idds_utils
1081         import pandaclient.idds_api as idds_api
1082 
1083         idds_server = self._context.get_idds_server()
1084         client = idds_api.get_api(idds_utils.json_dumps,
1085                                   idds_host=idds_server,
1086                                   compress=True,
1087                                   verbose=is_panda_client_verbose(),
1088                                   manager=True)
1089 
1090         request_id = self._context.request_id
1091         # transform_id = self._context.transform_id
1092         workload_id = self._context.workload_id
1093         if not workload_id:
1094             log_msg = f"No workload id defined (request_id: {request_id}, workload_id: {workload_id}, internal_id: {self.internal_id})"
1095             self.logger.error(log_msg)
1096             return exceptions.IDDSException(log_msg)
1097 
1098         ret = client.abort_task(request_id=request_id, workload_id=workload_id)
1099         if ret[0] == 0 and ret[1][0]:
1100             ret_value = ret[1][1]
1101             if type(ret_value) in [dict]:
1102                 ret_value = json_loads(json.dumps(ret_value))
1103             elif type(ret_value) in [str]:
1104                 try:
1105                     ret_value = json_loads(ret_value)
1106                 except Exception as ex:
1107                     self.logger.warn(f"Failed to json cancel transform({ret_value}): {ex}")
1108         else:
1109             ret_value = None
1110             self.logger.error(f"Failed to cancel transform (request_id: {request_id}, worklaod_id: {workload_id}, internal_id: {self.internal_id}) status from PanDA-iDDS: {ret}")
1111             return TransformStatus.Transforming
1112 
1113         if not ret_value:
1114             self.logger.info(f"Failed to cancel transform (request_id: {request_id}, workload_id: {workload_id}, internal_id: {self.internal_id}) from PanDA-iDDS: {ret_value}")
1115             return None
1116 
1117         self.logger.info(f"Cancel transform (request_id: {request_id}, workload_id: {workload_id}, internal_id: {self.internal_id}) from PanDA-iDDS: {ret_value}")
1118 
1119         return ret_value
1120 
1121     def cancel_from_idds_server(self):
1122         from idds.client.clientmanager import ClientManager
1123         client = ClientManager(host=self._context.get_idds_server(), timeout=60)
1124 
1125         request_id = self._context.request_id
1126         # transform_id = self._context.transform_id
1127         workload_id = self._context.workload_id
1128         if not workload_id:
1129             log_msg = f"No workload id defined (request_id: {request_id}, workload_id: {workload_id}, internal_id: {self.internal_id})"
1130             self.logger.error(log_msg)
1131             return exceptions.IDDSException(log_msg)
1132 
1133         ret = client.abort_task(request_id=request_id, workload_id=workload_id)
1134         if not ret:
1135             self.logger.info(f"Cancel transform (request_id: {request_id}, workload_id: {workload_id}, internal_id: {self.internal_id}) from iDDS: {ret}")
1136             return None
1137 
1138         self.logger.info(f"Cancel transform (request_id: {request_id}, workload_id: {workload_id}, internal_id: {self.internal_id}) from iDDS: {ret}")
1139         return ret
1140 
1141     def cancel(self):
1142         try:
1143             if self._context.get_service() == 'panda':
1144                 return self.cancel_from_panda_server()
1145             return self.cancel_from_idds_server()
1146         except Exception as ex:
1147             self.logger.info("Failed to get transform status: %s" % str(ex))
1148 
1149     def get_finished_status(self):
1150         return [TransformStatus.Finished]
1151 
1152     def get_subfinished_status(self):
1153         return [TransformStatus.SubFinished]
1154 
1155     def get_failed_status(self):
1156         return [None, TransformStatus.Failed, TransformStatus.Cancelled,
1157                 TransformStatus.Suspended, TransformStatus.Expired]
1158 
1159     def get_terminated_status(self):
1160         return [None, TransformStatus.Finished, TransformStatus.SubFinished,
1161                 TransformStatus.Failed, TransformStatus.Cancelled,
1162                 TransformStatus.Suspended, TransformStatus.Expired]
1163 
1164     def is_terminated(self, status=None):
1165         if status is None:
1166             status = self.get_status()
1167         if status in self.get_terminated_status():
1168             self.stop_async_result()
1169             return True
1170         if self._async_ret:
1171             self._async_ret.get_results(nologs=True)
1172             if self._async_ret.is_terminated:
1173                 self.stop_async_result()
1174                 return True
1175         if self._async_result_status in [AsyncResultStatus.Finished, AsyncResultStatus.SubFinished, AsyncResultStatus.Failed]:
1176             return True
1177         return False
1178 
1179     def is_finished(self, status=None):
1180         if status is None:
1181             status = self.get_status()
1182         if status in self.get_finished_status():
1183             self.stop_async_result()
1184             return True
1185         if self._async_ret:
1186             self._async_ret.get_results(nologs=True)
1187             if self._async_ret.is_finished:
1188                 self.stop_async_result()
1189                 return True
1190         if self._async_result_status in [AsyncResultStatus.Finished]:
1191             return True
1192         return False
1193 
1194     def is_subfinished(self, status=None):
1195         if status is None:
1196             status = self.get_status()
1197         if status in self.get_subfinished_status():
1198             self.stop_async_result()
1199             return True
1200         if self._async_ret:
1201             self._async_ret.get_results(nologs=True)
1202             if self._async_ret.is_subfinished:
1203                 self.stop_async_result()
1204                 return True
1205         if self._async_result_status in [AsyncResultStatus.SubFinished]:
1206             return True
1207         return False
1208 
1209     def is_failed(self, status=None):
1210         if status is None:
1211             status = self.get_status()
1212         if status in self.get_failed_status():
1213             self.stop_async_result()
1214             return True
1215         if self._async_ret:
1216             self._async_ret.get_results(nologs=True)
1217             if self._async_ret.is_failed:
1218                 self.stop_async_result()
1219                 return True
1220         if self._async_result_status in [AsyncResultStatus.Failed]:
1221             return True
1222         return False
1223 
1224     def get_func_name(self):
1225         func_name = self._func_name_and_args[0]
1226         return func_name
1227 
1228     def get_multi_jobs_kwargs_list(self):
1229         multi_jobs_kwargs_list = self.multi_jobs_kwargs_list
1230         multi_jobs_kwargs_list = [pickle.loads(zlib.decompress(base64.b64decode(k))) for k in multi_jobs_kwargs_list]
1231         return multi_jobs_kwargs_list
1232 
1233     def init_async_result(self):
1234         if not self._async_result_initialized:
1235             multi_jobs_kwargs_list = self.get_multi_jobs_kwargs_list()
1236             if multi_jobs_kwargs_list:
1237                 self._async_ret = AsyncResult(self._context, name=self.get_func_name(), multi_jobs_kwargs_list=multi_jobs_kwargs_list,
1238                                               map_results=self.map_results, internal_id=self.internal_id)
1239             else:
1240                 self._async_ret = AsyncResult(self._context, name=self.get_func_name(), wait_num=1, internal_id=self.internal_id,
1241                                               map_results=self.map_results)
1242 
1243             self._async_result_initialized = True
1244             self._async_result_status = AsyncResultStatus.Running
1245             self._async_ret.subscribe()
1246 
1247     def stop_async_result(self):
1248         if self._async_ret:
1249             self._async_ret.stop()
1250             self._results = self._async_ret.get_results()
1251             if self._async_ret.is_finished:
1252                 self._async_result_status = AsyncResultStatus.Finished
1253             elif self._async_ret.is_subfinished:
1254                 self._async_result_status = AsyncResultStatus.SubFinished
1255             elif self._async_ret.is_failed:
1256                 self._async_result_status = AsyncResultStatus.Failed
1257             self._async_ret = None
1258             # self._async_result_initialized = False
1259 
1260     def wait_results(self):
1261         try:
1262             terminated_status = self.get_terminated_status()
1263 
1264             # multi_jobs_kwargs_list = self.get_multi_jobs_kwargs_list()
1265             # if multi_jobs_kwargs_list:
1266             #     async_ret = AsyncResult(self._context, name=self.get_func_name(), multi_jobs_kwargs_list=multi_jobs_kwargs_list,
1267             #                             map_results=self.map_results, internal_id=self.internal_id)
1268             # else:
1269             #     async_ret = AsyncResult(self._context, name=self.get_func_name(), wait_num=1, internal_id=self.internal_id)
1270 
1271             # async_ret.subscribe()
1272             self.init_async_result()
1273 
1274             status = self.get_status()
1275             time_last_check_status = time.time()
1276             self.logger.info("waiting for results")
1277             while status not in terminated_status:
1278                 # time.sleep(10)
1279                 ret = self._async_ret.wait_results(timeout=10)
1280                 if ret:
1281                     self.logger.info("Recevied result: %s" % str(ret))
1282                     break
1283                 if self._async_ret.waiting_result_terminated:
1284                     self.logger.info("waiting_result_terminated is set, Received result is: %s" % str(ret))
1285                 if time.time() - time_last_check_status > 600:   # 10 minutes
1286                     status = self.get_status()
1287                     time_last_check_status = time.time()
1288 
1289             self._results = self._async_ret.wait_results(force_return_results=True)
1290             self.stop_async_result()
1291             return self._results
1292         except Exception as ex:
1293             self.logger.error("wait_results got some errors: %s" % str(ex))
1294             self.stop_async_result()
1295             return ex
1296 
1297     def get_results(self):
1298         if self._async_ret:
1299             self._results = self._async_ret.get_results()
1300         return self._results
1301 
1302     def setup(self):
1303         """
1304         :returns command: `str` to setup the workflow.
1305         """
1306         return self._context.setup()
1307 
1308     def post_script(self):
1309         """
1310         Return the post script bash code to be appended after workflow execution.
1311         Override or set self._post_script in subclasses or instances as needed.
1312         """
1313         return self._context.post_script()
1314 
1315     def get_clean_env(self):
1316         """
1317         :returns command: `str` to clean the workflow.
1318         """
1319         return self._context.get_clean_env()
1320 
1321     def load_func(self, func_name):
1322         """
1323         Load the function from the source files.
1324 
1325         :raise Exception
1326         """
1327         with modified_environ(IDDS_IGNORE_WORK_DECORATOR='true'):
1328             func = super(Work, self).load_func(func_name)
1329 
1330         return func
1331 
1332     def pre_run(self):
1333         # test AsyncResult
1334         a_ret = None
1335         try:
1336             workflow_context = self._context
1337             if workflow_context.distributed:
1338                 self.logger.info("Test AsyncResult")
1339                 a_ret = AsyncResult(workflow_context, wait_num=1, timeout=30)
1340                 ret = a_ret.is_ok()
1341                 a_ret.stop()
1342                 self.logger.info(f"pre_run asyncresult test is_ok: {ret}")
1343                 return ret
1344             return True
1345         except Exception as ex:
1346             self.logger.error(f"pre_run failed with error: {ex}")
1347             self.logger.error(traceback.format_exc())
1348         if a_ret:
1349             a_ret.stop()
1350         return False
1351 
1352     def run(self):
1353         self.logger.info("Start work run().")
1354         ret = None
1355         try:
1356             ret = self.run_local()
1357         except Exception as ex:
1358             self.logger.error(f"Failed to run function: {ex}")
1359             self.logger.error(traceback.format_exc())
1360         except:
1361             self.logger.error("Unknow error")
1362             self.logger.error(traceback.format_exc())
1363         self.logger.info(f"finish work run() with ret: {ret}")
1364         return ret
1365 
1366     def run_local(self):
1367         """
1368         Run the work.
1369         """
1370         is_ok = self.pre_run()
1371         if not is_ok:
1372             self.logger.error(f"pre_run is_ok: {is_ok}, will exit.")
1373             raise Exception("work pre_run failed")
1374 
1375         self._start_time = datetime.datetime.utcnow()
1376 
1377         func_name, pre_kwargs, args, kwargs = self._func_name_and_args
1378         multi_jobs_kwargs_list = self.multi_jobs_kwargs_list
1379         current_job_kwargs = self._current_job_kwargs
1380 
1381         if args:
1382             args = pickle.loads(zlib.decompress(base64.b64decode(args)))
1383         if pre_kwargs:
1384             pre_kwargs = pickle.loads(zlib.decompress(base64.b64decode(pre_kwargs)))
1385         if kwargs:
1386             kwargs = pickle.loads(zlib.decompress(base64.b64decode(kwargs)))
1387         if multi_jobs_kwargs_list:
1388             multi_jobs_kwargs_list = [pickle.loads(zlib.decompress(base64.b64decode(k))) for k in multi_jobs_kwargs_list]
1389         if self._current_job_kwargs:
1390             current_job_kwargs = pickle.loads(zlib.decompress(base64.b64decode(current_job_kwargs)))
1391 
1392         if self._func is None:
1393             func = self.load_func(func_name)
1394             self._func = func
1395 
1396         if self._context.distributed:
1397             args_copy = copy.deepcopy(args)
1398             pre_kwargs_copy = copy.deepcopy(pre_kwargs)
1399             kwargs_copy = copy.deepcopy(kwargs)
1400             if current_job_kwargs and type(current_job_kwargs) in [dict]:
1401                 kwargs_copy.update(current_job_kwargs)
1402             elif current_job_kwargs and type(current_job_kwargs) in [tuple, list]:
1403                 args_copy = copy.deepcopy(current_job_kwargs)
1404 
1405             if self.inputs and self.input_map:
1406                 new_kwargs = {self.input_map: ",".join[self.inputs]}
1407                 kwargs_copy.update(new_kwargs)
1408             if self.inputs_group:
1409                 # new_kwargs = {
1410                 #     k: ",".join(v) if isinstance(v, (list, tuple)) else str(v)
1411                 #     for k, v in self.inputs_group.items()
1412                 # }
1413                 new_kwargs = {k: v for k, v in self.inputs_group.items()}
1414                 kwargs_copy.update(new_kwargs)
1415 
1416             ret_status, ret_output, ret_err = self.run_func(self._func, pre_kwargs_copy, args_copy, kwargs_copy)
1417 
1418             self._end_time = datetime.datetime.utcnow()
1419 
1420             request_id = self._context.request_id
1421             transform_id = self._context.transform_id
1422             ret_log = f"(status: {ret_status}, return: {ret_output}, error: {ret_err})"
1423             self.logger.info(f"publishing AsyncResult to (request_id: {request_id}, transform_id: {transform_id}): {ret_log}")
1424             async_ret = AsyncResult(self._context, name=self.get_func_name(), internal_id=self.internal_id, current_job_kwargs=current_job_kwargs)
1425             async_ret.publish(
1426                 ret_output,
1427                 ret_status=ret_status,
1428                 ret_error=ret_err,
1429                 key=self.job_key,
1430                 metrics={"start_time": self._start_time, "end_time": self._end_time}
1431             )
1432 
1433             if not self.map_results:
1434                 self._results = ret_output
1435             else:
1436                 self._results = MapResult()
1437                 self._results.add_result(name=self.get_func_name(), args=current_job_kwargs, result=ret_output, key=self.job_key)
1438             return ret_status
1439         else:
1440             if not multi_jobs_kwargs_list:
1441                 ret_status, rets, ret_err = self.run_func(self._func, pre_kwargs, args, kwargs)
1442                 self._end_time = datetime.datetime.utcnow()
1443                 if not self.map_results:
1444                     self._results = rets
1445                 else:
1446                     self._results = MapResult()
1447                     self._results.add_result(name=self.get_func_name(), args=kwargs, result=rets, key=self.job_key)
1448                 return ret_status
1449             else:
1450                 if not self.map_results:
1451                     self._results = []
1452                     for one_job_kwargs in multi_jobs_kwargs_list:
1453                         kwargs_copy = copy.deepcopy(kwargs)
1454                         args_copy = copy.deepcopy(args)
1455                         pre_kwargs_copy = copy.deepcopy(pre_kwargs)
1456                         if type(one_job_kwargs) in [dict]:
1457                             kwargs_copy.update(one_job_kwargs)
1458                         elif type(one_job_kwargs) in [tuple, list]:
1459                             args_copy = copy.deepcopy(one_job_kwargs)
1460 
1461                         ret_status, rets, ret_error = self.run_func(self._func, pre_kwargs_copy, args_copy, kwargs_copy)
1462                         self._end_time = datetime.datetime.utcnow()
1463                         self._results.append(rets)
1464                 else:
1465                     self._results = MapResult()
1466                     for one_job_kwargs in multi_jobs_kwargs_list:
1467                         kwargs_copy = copy.deepcopy(kwargs)
1468                         args_copy = copy.deepcopy(args)
1469                         pre_kwargs_copy = copy.deepcopy(pre_kwargs)
1470                         if type(one_job_kwargs) in [dict]:
1471                             kwargs_copy.update(one_job_kwargs)
1472                         elif type(one_job_kwargs) in [tuple, list]:
1473                             args_copy = copy.deepcopy(one_job_kwargs)
1474 
1475                         ret_status, rets, ret_error = self.run_func(self._func, pre_kwargs_copy, args_copy, kwargs_copy)
1476                         self._end_time = datetime.datetime.utcnow()
1477                         self._results.add_result(name=self.get_func_name(), args=one_job_kwargs, result=rets, key=self.job_key)
1478                 return ret_status
1479 
1480     def get_run_command(self):
1481         cmd = f"run_workflow --type work --name {self.name} --key {self.job_key} "
1482         cmd += "--context %s --original_args %s " % (encode_base64(json_dumps(self._context)),
1483                                                      encode_base64(json_dumps(self._func_name_and_args)))
1484         cmd += '--current_job_kwargs "${IN/T}"'
1485         return cmd
1486 
1487     def get_run_args_to_file_cmd(self):
1488         args = {'type': 'work',
1489                 'name': self.name,
1490                 'context': self._context,
1491                 'original_args': self._func_name_and_args,
1492                 'current_job_kwargs': '"${IN/T}"'}
1493         args_json = encode_base64(json_dumps(args))
1494         cmd = 'echo ' + args_json + ' > run_workflow_args; '
1495         return cmd
1496 
1497     def get_run_command_test(self):
1498         cmd = "run_workflow.sh"
1499         return cmd
1500 
1501     def get_runner(self):
1502         setup = self.setup()
1503         cmd = ""
1504 
1505         run_command = self.get_run_command()
1506 
1507         if setup:
1508             pre_setup, main_setup = self.split_setup(setup)
1509             pre_setup = encode_base64(json_dumps(pre_setup))
1510             main_setup = encode_base64(json_dumps(main_setup))
1511             if pre_setup:
1512                 cmd = " --pre_setup " + pre_setup + " "
1513             cmd = cmd + " --setup " + main_setup + " "
1514         post_script = self.post_script
1515         if post_script:
1516             post_script = encode_base64(json_dumps(post_script))
1517             cmd = cmd + " --post_script " + post_script + " "
1518         if cmd:
1519             cmd = cmd + " " + run_command
1520         else:
1521             cmd = run_command
1522 
1523         clean_env = self.get_clean_env()
1524         if clean_env:
1525             # cmd = cmd + "; " + clean_env
1526             cmd = cmd + "; ret=$?; " + clean_env + "; exit $ret"
1527 
1528         return cmd
1529 
1530 
1531 def run_work_distributed(w):
1532     try:
1533         logger = logging.getLogger("run_work_distributed")
1534         tf_id = w.submit()
1535         if tf_id:
1536             logger.info("wait for results")
1537             rets = w.wait_results()
1538             logger.info("Got results: %s" % rets)
1539             return rets
1540         else:
1541             logger.error("Failed to distribute work: %s" % w.name)
1542         return None
1543     except Exception as ex:
1544         logger.error("Failed to run the work distributedly: %s" % ex)
1545         logger.error(traceback.format_exc())
1546         return None
1547 
1548 
1549 # foo = work(arg)(foo)
1550 def work(func=None, *, workflow=None, pre_kwargs={}, name=None, return_work=False, map_results=False, lazy=False, init_env=None, no_wraps=False,
1551          container_options=None, parent_workload_id=None, no_wait_parent=False, input_datasets=None, output_file_name=None,
1552          enable_separate_log=False, output_dataset_name=None, log_dataset_name=None, num_events=None, num_events_per_job=None,
1553          parent_transform_id=None, parent_internal_id=None, job_key=None, post_script=None):
1554     if func is None:
1555         return functools.partial(work, workflow=workflow, pre_kwargs=pre_kwargs, return_work=return_work, no_wraps=no_wraps,
1556                                  name=name, map_results=map_results, lazy=lazy, init_env=init_env, container_options=container_options,
1557                                  parent_workload_id=parent_workload_id, no_wait_parent=no_wait_parent, parent_transform_id=parent_transform_id,
1558                                  input_datasets=input_datasets, output_file_name=output_file_name, output_dataset_name=output_dataset_name,
1559                                  log_dataset_name=log_dataset_name, job_key=job_key,
1560                                  enable_separate_log=enable_separate_log, num_events=num_events, num_events_per_job=num_events_per_job,
1561                                  parent_internal_id=parent_internal_id, post_script=post_script)
1562 
1563     if 'IDDS_IGNORE_WORK_DECORATOR' in os.environ:
1564         return func
1565 
1566     # @functools.wraps(func)
1567     def wrapper(*args, **kwargs):
1568         try:
1569             logger = logging.getLogger("work_def")
1570             f = workflow or kwargs.pop('workflow', None) or WorkflowCanvas.get_current_workflow()
1571             workflow_context = f._context
1572             multi_jobs_kwargs_list = kwargs.pop('multi_jobs_kwargs_list', [])
1573             logger.debug("workflow context: %s" % workflow_context)
1574 
1575             logger.debug("work decorator: func: %s, map_results: %s" % (func, map_results))
1576             if workflow_context:
1577                 logger.debug("setup work")
1578                 w = Work(workflow_context=workflow_context, func=func, pre_kwargs=pre_kwargs, args=args, kwargs=kwargs,
1579                          name=name, multi_jobs_kwargs_list=multi_jobs_kwargs_list, map_results=map_results, init_env=init_env,
1580                          container_options=container_options, parent_workload_id=parent_workload_id, no_wait_parent=no_wait_parent,
1581                          parent_transform_id=parent_transform_id, input_datasets=input_datasets, output_file_name=output_file_name,
1582                          output_dataset_name=output_dataset_name, num_events=num_events, num_events_per_job=num_events_per_job,
1583                          parent_internal_id=parent_internal_id, enable_separate_log=enable_separate_log, log_dataset_name=log_dataset_name,
1584                          job_key=job_key, post_script=post_script)
1585                 # if distributed:
1586 
1587                 if return_work:
1588                     return w
1589 
1590                 if workflow_context.distributed:
1591                     ret = run_work_distributed(w)
1592                     return ret
1593 
1594                 return w.run()
1595             else:
1596                 logger.info("workflow context is not defined, run function locally")
1597                 if not multi_jobs_kwargs_list:
1598                     kwargs_copy = copy.deepcopy(pre_kwargs)
1599                     kwargs_copy.update(kwargs)
1600                     return func(*args, **kwargs_copy)
1601 
1602                 if not kwargs:
1603                     kwargs = {}
1604                 if not map_results:
1605                     rets = []
1606                     for one_job_kwargs in multi_jobs_kwargs_list:
1607                         kwargs_copy = copy.deepcopy(kwargs)
1608                         args_copy = copy.deepcopy(args)
1609                         pre_kwargs_copy = copy.deepcopy(pre_kwargs)
1610                         if type(one_job_kwargs) in [dict]:
1611                             kwargs_copy.update(one_job_kwargs)
1612                         elif type(one_job_kwargs) in [tuple, list]:
1613                             args_copy = copy.deepcopy(one_job_kwargs)
1614 
1615                         pre_kwargs_copy.update(kwargs_copy)
1616 
1617                         ret = func(*args_copy, **pre_kwargs_copy)
1618                         rets.append(ret)
1619                     return rets
1620                 else:
1621                     rets = MapResult()
1622                     for one_job_kwargs in multi_jobs_kwargs_list:
1623                         kwargs_copy = copy.deepcopy(kwargs)
1624                         args_copy = copy.deepcopy(args)
1625                         pre_kwargs_copy = copy.deepcopy(pre_kwargs)
1626                         if type(one_job_kwargs) in [dict]:
1627                             kwargs_copy.update(one_job_kwargs)
1628                         elif type(one_job_kwargs) in [tuple, list]:
1629                             args_copy = copy.deepcopy(one_job_kwargs)
1630 
1631                         pre_kwargs_copy.update(kwargs_copy)
1632 
1633                         ret = func(*args_copy, **pre_kwargs_copy)
1634                         rets.add_result(name=get_func_name(func), args=one_job_kwargs, result=ret, key=job_key)
1635                     return rets
1636         except Exception as ex:
1637             logger.error("Failed to run workflow %s: %s" % (func, ex))
1638             raise ex
1639         except:
1640             raise
1641     if no_wraps:
1642         return wrapper
1643     else:
1644         return functools.wraps(func)(wrapper)