Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:32

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2023 - 2025
0010 # - Lino Oscar Gerlach, <lino.oscar.gerlach@cern.ch>, 2024
0011 
0012 import base64
0013 import collections
0014 import datetime
0015 import functools
0016 import logging
0017 import inspect
0018 import os
0019 import pickle
0020 import tarfile
0021 import traceback
0022 import uuid
0023 import zlib
0024 
0025 # from types import ModuleType
0026 
0027 from idds.common import exceptions
0028 from idds.common.constants import WorkflowType
0029 from idds.common.utils import setup_logging, create_archive_file, json_dumps, json_loads, encode_base64, modified_environ, is_panda_client_verbose
0030 from .asyncresult import AsyncResult
0031 from .base import Base, Context
0032 
0033 
0034 setup_logging(__name__)
0035 
0036 
0037 class WorkflowCanvas(object):
0038 
0039     _managed_workflows = collections.deque()
0040 
0041     @classmethod
0042     def push_managed_workflow(cls, workflow: Base):
0043         cls._managed_workflows.appendleft(workflow)
0044 
0045     @classmethod
0046     def pop_managed_workflow(cls):
0047         workflow = cls._managed_workflows.popleft()
0048         return workflow
0049 
0050     @classmethod
0051     def get_current_workflow(cls):
0052         try:
0053             return cls._managed_workflows[0]
0054         except IndexError:
0055             return None
0056 
0057 
0058 class WorkflowContext(Context):
0059     def __init__(self, name=None, service='panda', source_dir=None, workflow_type=WorkflowType.iWorkflow, distributed=True,
0060                  max_walltime=24 * 3600, init_env=None, exclude_source_files=[], clean_env=None, enable_separate_log=False,
0061                  cloud=None, site=None, queue=None, vo=None, container_options=None, task_type=None, working_group=None,
0062                  processing_type=None, post_script=None):
0063         super(WorkflowContext, self).__init__()
0064         self._service = service     # panda, idds, sharefs
0065         self._request_id = None
0066         self._workflow_type = workflow_type
0067 
0068         # self.idds_host = None
0069         # self.idds_async_host = None
0070         self._idds_env = {}
0071         self._panda_env = {}
0072 
0073         self._name = name
0074         self._source_dir = source_dir
0075         self.remote_source_file = None
0076 
0077         self._vo = vo
0078 
0079         self._queue = queue
0080         self._site = site
0081         self._cloud = cloud
0082 
0083         self._working_group = working_group
0084         self._task_type = task_type
0085         self._processing_type = processing_type
0086 
0087         self._priority = 500
0088         self._core_count = 1
0089         self._total_memory = 1000          # MB
0090         self._max_walltime = max_walltime
0091         self._max_attempt = 5
0092 
0093         self._username = None
0094         self._userdn = None
0095         self._workflow_type = workflow_type
0096         self._lifetime = 7 * 24 * 3600
0097         self._workload_id = None
0098         self._request_id = None
0099 
0100         self.distributed = distributed
0101 
0102         self._broker_initialized = False
0103         self._brokers = None
0104         self._broker_timeout = 180
0105         self._broker_username = None
0106         self._broker_password = None
0107         self._broker_destination = None
0108 
0109         # self.init_brokers()
0110 
0111         self._token = str(uuid.uuid4())
0112 
0113         self._panda_initialized = False
0114         self.init_panda()
0115         self._idds_initialized = False
0116         self.init_idds()
0117 
0118         self.init_env = init_env
0119         self._clean_env = clean_env
0120         self._post_script = post_script
0121 
0122         self._exclude_source_files = []
0123         if exclude_source_files:
0124             if type(exclude_source_files) in [list, tuple]:
0125                 self._exclude_source_files = exclude_source_files
0126             else:
0127                 self._exclude_source_files = [exclude_source_files]
0128 
0129         self._enable_separate_log = enable_separate_log
0130 
0131         self._container_options = container_options
0132 
0133         self._campaign = None
0134         self._campaign_scope = None
0135         self._campaign_group = None
0136         self._campaign_tag = None
0137         self._max_processing_requests = -1
0138 
0139         self.logger = logging.getLogger(self.__class__.__name__)
0140 
0141     @property
0142     def distributed(self):
0143         return self._distributed
0144 
0145     @distributed.setter
0146     def distributed(self, value):
0147         self._distributed = value
0148 
0149     @property
0150     def service(self):
0151         return self._service
0152 
0153     @service.setter
0154     def service(self, value):
0155         self._service = value
0156 
0157     @property
0158     def init_env(self):
0159         return self._init_env
0160 
0161     @init_env.setter
0162     def init_env(self, value):
0163         self._init_env = value
0164         if self._init_env:
0165             self._init_env = self._init_env + " "
0166 
0167     @property
0168     def clean_env(self):
0169         return self._clean_env
0170 
0171     @clean_env.setter
0172     def clean_env(self, value):
0173         self._clean_env = value
0174         if self._clean_env:
0175             self._clean_env = self._clean_env + " "
0176 
0177     @property
0178     def post_script(self):
0179         return self._post_script
0180 
0181     @post_script.setter
0182     def post_script(self, value):
0183         self._post_script = value
0184 
0185     @property
0186     def vo(self):
0187         return self._vo
0188 
0189     @vo.setter
0190     def vo(self, value):
0191         self._vo = value
0192 
0193     @property
0194     def queue(self):
0195         return self._queue
0196 
0197     @queue.setter
0198     def queue(self, value):
0199         self._queue = value
0200 
0201     @property
0202     def site(self):
0203         return self._site
0204 
0205     @site.setter
0206     def site(self, value):
0207         self._site = value
0208 
0209     @property
0210     def cloud(self):
0211         return self._cloud
0212 
0213     @cloud.setter
0214     def cloud(self, value):
0215         self._cloud = value
0216 
0217     @property
0218     def working_group(self):
0219         return self._working_group
0220 
0221     @working_group.setter
0222     def working_group(self, value):
0223         self._working_group = value
0224 
0225     @property
0226     def task_type(self):
0227         return self._task_type
0228 
0229     @task_type.setter
0230     def task_type(self, value):
0231         self._task_type = value
0232 
0233     @property
0234     def processing_type(self):
0235         return self._processing_type
0236 
0237     @processing_type.setter
0238     def processing_type(self, value):
0239         self._processing_type = value
0240 
0241     @property
0242     def priority(self):
0243         return self._priority
0244 
0245     @priority.setter
0246     def priority(self, value):
0247         self._priority = value
0248 
0249     @property
0250     def core_count(self):
0251         return self._core_count
0252 
0253     @core_count.setter
0254     def core_count(self, value):
0255         self._core_count = value
0256 
0257     @property
0258     def total_memory(self):
0259         return self._total_memory
0260 
0261     @total_memory.setter
0262     def total_memory(self, value):
0263         self._total_memory = value
0264 
0265     @property
0266     def max_walltime(self):
0267         return self._max_walltime
0268 
0269     @max_walltime.setter
0270     def max_walltime(self, value):
0271         self._max_walltime = value
0272 
0273     @property
0274     def max_attempt(self):
0275         return self._max_attempt
0276 
0277     @max_attempt.setter
0278     def max_attempt(self, value):
0279         self._max_attempt = value
0280 
0281     @property
0282     def username(self):
0283         return self._username
0284 
0285     @username.setter
0286     def username(self, value):
0287         self._username = value
0288 
0289     @property
0290     def userdn(self):
0291         return self._userdn
0292 
0293     @userdn.setter
0294     def userdn(self, value):
0295         self._userdn = value
0296 
0297     @property
0298     def workflow_type(self):
0299         return self._workflow_type
0300 
0301     @workflow_type.setter
0302     def workflow_type(self, value):
0303         self._workflow_type = value
0304 
0305     @property
0306     def lifetime(self):
0307         return self._lifetime
0308 
0309     @lifetime.setter
0310     def lifetime(self, value):
0311         self._lifetime = value
0312 
0313     @property
0314     def request_id(self):
0315         return self._request_id
0316 
0317     @request_id.setter
0318     def request_id(self, value):
0319         self._request_id = int(value)
0320 
0321     @property
0322     def workload_id(self):
0323         return self._workload_id
0324 
0325     @workload_id.setter
0326     def workload_id(self, value):
0327         self._workload_id = value
0328 
0329     @property
0330     def enable_separate_log(self):
0331         return self._enable_separate_log
0332 
0333     @enable_separate_log.setter
0334     def enable_separate_log(self, value):
0335         self._enable_separate_log = value
0336 
0337     @property
0338     def brokers(self):
0339         return self._brokers
0340 
0341     @brokers.setter
0342     def brokers(self, value):
0343         self._brokers = value
0344 
0345     @property
0346     def broker_timeout(self):
0347         return self._broker_timeout
0348 
0349     @broker_timeout.setter
0350     def broker_timeout(self, value):
0351         self._broker_timeout = value
0352 
0353     @property
0354     def broker_username(self):
0355         return self._broker_username
0356 
0357     @broker_username.setter
0358     def broker_username(self, value):
0359         self._broker_username = value
0360 
0361     @property
0362     def broker_password(self):
0363         if self._broker_password:
0364             return self._broker_password
0365         return None
0366 
0367     @broker_password.setter
0368     def broker_password(self, value):
0369         self._broker_password = value
0370 
0371     @property
0372     def broker_destination(self):
0373         return self._broker_destination
0374 
0375     @broker_destination.setter
0376     def broker_destination(self, value):
0377         self._broker_destination = value
0378 
0379     def get_source_dir(self):
0380         return self._source_dir
0381 
0382     @property
0383     def container_options(self):
0384         return self._container_options
0385 
0386     @container_options.setter
0387     def container_options(self, value):
0388         self._container_options = value
0389 
0390     @property
0391     def token(self):
0392         return self._token
0393 
0394     @token.setter
0395     def token(self, value):
0396         self._token = value
0397 
0398     @property
0399     def campaign(self):
0400         return self._campaign
0401 
0402     @campaign.setter
0403     def campaign(self, value):
0404         self._campaign = value
0405 
0406     @property
0407     def campaign_scope(self):
0408         return self._campaign_scope
0409 
0410     @campaign_scope.setter
0411     def campaign_scope(self, value):
0412         self._campaign_scope = value
0413 
0414     @property
0415     def campaign_group(self):
0416         return self._campaign_group
0417 
0418     @campaign_group.setter
0419     def campaign_group(self, value):
0420         self._campaign_group = value
0421 
0422     @property
0423     def campaign_tag(self):
0424         return self._campaign_tag
0425 
0426     @campaign_tag.setter
0427     def campaign_tag(self, value):
0428         self._campaign_tag = value
0429 
0430     @property
0431     def max_processing_requests(self):
0432         return self._max_processing_requests
0433 
0434     @max_processing_requests.setter
0435     def max_processing_requests(self, value):
0436         self._max_processing_requests = value
0437 
0438     @property
0439     def panda_env(self):
0440         return self._panda_env
0441 
0442     @panda_env.setter
0443     def panda_env(self, value):
0444         self._panda_env = value
0445 
0446     @property
0447     def idds_env(self):
0448         return self._idds_env
0449 
0450     @idds_env.setter
0451     def idds_env(self, value):
0452         self._idds_env = value
0453 
0454     def init_brokers(self):
0455         if not self._broker_initialized:
0456             self.logger.info("To initialize broker")
0457             brokers = os.environ.get("IDDS_BROKERS", None)
0458             broker_destination = os.environ.get("IDDS_BROKER_DESTINATION", None)
0459             broker_timeout = os.environ.get("IDDS_BROKER_TIMEOUT", 180)
0460             broker_username = os.environ.get("IDDS_BROKER_USERNAME", None)
0461             broker_password = os.environ.get("IDDS_BROKER_PASSWORD", None)
0462             if brokers and broker_destination and broker_username and broker_password:
0463                 self._brokers = brokers
0464                 self._broker_timeout = int(broker_timeout)
0465                 self._broker_username = broker_username
0466                 self._broker_password = broker_password
0467                 self._broker_destination = broker_destination
0468 
0469                 self._broker_initialized = True
0470                 self.logger.info("Initialized brokers from environment")
0471             else:
0472                 self.logger.info("Getting brokers information from central service")
0473                 broker_info = self.get_broker_info()
0474                 if broker_info:
0475                     brokers = broker_info.get("brokers", None)
0476                     broker_destination = broker_info.get("broker_destination", None)
0477                     broker_timeout = broker_info.get("broker_timeout", 180)
0478                     broker_username = broker_info.get("broker_username", None)
0479                     broker_password = broker_info.get("broker_password", None)
0480                     if brokers and broker_destination and broker_username and broker_password:
0481                         self._brokers = brokers
0482                         self._broker_timeout = int(broker_timeout)
0483                         self._broker_username = broker_username
0484                         self._broker_password = broker_password
0485                         self._broker_destination = broker_destination
0486 
0487                         self._broker_initialized = True
0488                         self.logger.info("Initialized brokers from central service")
0489                     else:
0490                         self.logger.warn("Broker information from the central service is missing, will not initialize it")
0491         if not self._broker_initialized:
0492             self.logger.warn("Broker is not initialized")
0493         return self._broker_initialized
0494 
0495     def get_broker_info_from_idds_server(self):
0496         """
0497         Get broker infomation from the iDDS server.
0498 
0499         :raise Exception when failing to get broker information.
0500         """
0501         self.logger.info("Getting broker information through idds server.")
0502         # iDDS ClientManager
0503         from idds.client.clientmanager import ClientManager
0504 
0505         client = ClientManager(host=self.get_idds_server(), timeout=60)
0506         ret = client.get_metainfo(name='asyncresult_config')
0507         if type(ret) in (list, tuple) and ret[0] is True:
0508             return ret[1]
0509         else:
0510             self.logger.warn(f"Failed to get broker info: {ret}")
0511             return None
0512 
0513         return ret
0514 
0515     def get_broker_info_from_panda_server(self):
0516         """
0517         Get broker infomation from the iDDS server through PanDA service.
0518 
0519         :raise Exception when failing to get broker information.
0520         """
0521         self.logger.info("Get broker information through panda server.")
0522 
0523         import idds.common.utils as idds_utils
0524         import pandaclient.idds_api as idds_api
0525 
0526         idds_server = self.get_idds_server()
0527         client = idds_api.get_api(idds_utils.json_dumps,
0528                                   idds_host=idds_server,
0529                                   compress=True,
0530                                   verbose=is_panda_client_verbose(),
0531                                   manager=True)
0532         ret = client.get_metainfo(name='asyncresult_config')
0533         if ret[0] == 0 and ret[1][0]:
0534             idds_ret = ret[1][1]
0535             if type(idds_ret) in (list, tuple) and idds_ret[0] is True:
0536                 meta_info = idds_ret[1]
0537                 if type(meta_info) in [dict]:
0538                     pass
0539                 elif type(meta_info) in [str]:
0540                     try:
0541                         meta_info = json_loads(meta_info)
0542                     except Exception as ex:
0543                         self.logger.warn("Failed to json loads meta info(%s): %s" % (meta_info, ex))
0544             else:
0545                 meta_info = None
0546                 self.logger.warn("Failed to get meta info: %s" % str(ret))
0547         else:
0548             meta_info = None
0549             self.logger.warn("Failed to get meta info: %s" % str(ret))
0550 
0551         return meta_info
0552 
0553     def get_broker_info(self):
0554         try:
0555             if self.service == 'panda':
0556                 return self.get_broker_info_from_panda_server()
0557             return self.get_broker_info_from_idds_server()
0558         except Exception as ex:
0559             self.logger.error("Failed to get broker info: %s" % str(ex))
0560 
0561     def init_idds(self):
0562         if not self._idds_initialized:
0563             self._idds_initialized = True
0564             self._idds_env = self.get_idds_env()
0565 
0566     def init_panda(self):
0567         if not self._panda_initialized:
0568             self._panda_initialized = True
0569             self._panda_env = self.get_panda_env()
0570             if not self.site:
0571                 self.site = os.environ.get("PANDA_SITE", None)
0572             if not self.queue:
0573                 self.queue = os.environ.get("PANDA_QUEUE", None)
0574             if not self.cloud:
0575                 self.cloud = os.environ.get("PANDA_CLOUD", None)
0576             if not self.vo:
0577                 self.vo = os.environ.get("PANDA_VO", None)
0578             if not self.working_group:
0579                 self.working_group = os.environ.get("PANDA_WORKING_GROUP", None)
0580 
0581     def initialize(self):
0582         # env_list = ['IDDS_HOST', 'IDDS_AUTH_TYPE', 'IDDS_VO', 'IDDS_AUTH_NO_VERIFY',
0583         #             'OIDC_AUTH_ID_TOKEN', 'OIDC_AUTH_VO']
0584         env_list = ['IDDS_HOST', 'IDDS_AUTH_NO_VERIFY']
0585         for env in env_list:
0586             if env not in os.environ and env in self._idds_env:
0587                 os.environ[env] = self._idds_env[env]
0588 
0589         # env_list = ['PANDA_CONFIG_ROOT', 'PANDA_URL_SSL', 'PANDA_URL', 'PANDACACHE_URL', 'PANDAMON_URL',
0590         #             'PANDA_AUTH', 'PANDA_VERIFY_HOST', 'PANDA_AUTH_VO', 'PANDA_BEHIND_REAL_LB']
0591         env_list = ['PANDA_URL_SSL', 'PANDA_URL', 'PANDACACHE_URL', 'PANDAMON_URL',
0592                     'PANDA_VERIFY_HOST', 'PANDA_BEHIND_REAL_LB', 'PANDA_CLIENT_VERBOSE']
0593         for env in env_list:
0594             if env not in os.environ and env in self._panda_env:
0595                 os.environ[env] = self._panda_env[env]
0596         if 'PANDA_CONFIG_ROOT' not in os.environ:
0597             os.environ['PANDA_CONFIG_ROOT'] = os.getcwd()
0598 
0599     def global_setup(self):
0600         if self.service == 'panda':
0601             set_up = self.setup_panda()
0602         elif self.service == 'idds':
0603             set_up = self.setup_idds()
0604         elif self.service == 'sharefs':
0605             set_up = self.setup_sharefs()
0606         else:
0607             set_up = self.setup_sharefs()
0608         return set_up
0609 
0610     def setup(self):
0611         """
0612         :returns command: `str` to setup the workflow.
0613         """
0614         set_up = self.global_setup()
0615 
0616         init_env = self.init_env
0617         ret = None
0618         if set_up:
0619             ret = set_up
0620         if init_env:
0621             if ret:
0622                 ret = ret + "; " + init_env
0623             else:
0624                 ret = init_env
0625         return ret
0626 
0627     def get_clean_env(self):
0628         return self.clean_env
0629 
0630     def setup_source_files(self):
0631         """
0632         Setup source files.
0633         """
0634         if self.service == 'panda':
0635             return self.setup_panda_source_files()
0636         elif self.service == 'idds':
0637             return self.setup_idds_source_files()
0638         elif self.service == 'sharefs':
0639             return self.setup_sharefs_source_files()
0640         return self.setup_sharefs_source_files()
0641 
0642     def download_source_files_from_panda(self, filename):
0643         """Download and extract the tarball from pandacache"""
0644         archive_basename = os.path.basename(filename)
0645         target_dir = os.getcwd()
0646         full_output_filename = os.path.join(target_dir, archive_basename)
0647         self.logger.info("Downloading %s to %s" % (filename, full_output_filename))
0648 
0649         if filename.startswith("https:"):
0650             panda_cache_url = os.path.dirname(os.path.dirname(filename))
0651             os.environ["PANDACACHE_URL"] = panda_cache_url
0652         elif "PANDACACHE_URL" not in os.environ and "PANDA_URL_SSL" in os.environ:
0653             os.environ["PANDACACHE_URL"] = os.environ["PANDA_URL_SSL"]
0654         self.logger.info("PANDACACHE_URL: %s" % os.environ.get("PANDACACHE_URL", None))
0655 
0656         from pandaclient import Client
0657 
0658         attempt = 0
0659         max_attempts = 3
0660         done = False
0661         while attempt < max_attempts and not done:
0662             attempt += 1
0663             status, output = Client.getFile(archive_basename, output_path=full_output_filename)
0664             if status == 0:
0665                 done = True
0666         self.logger.info(f"Download archive file from pandacache status: {status}, output: {output}")
0667         if status != 0:
0668             raise RuntimeError("Failed to download archive file from pandacache")
0669         with tarfile.open(full_output_filename, "r:gz") as f:
0670             f.extractall(target_dir)
0671         self.logger.info(f"Extract {full_output_filename} to {target_dir}")
0672         os.remove(full_output_filename)
0673         self.logger.info("Remove %s" % full_output_filename)
0674         return target_dir
0675 
0676     def setup_panda(self):
0677         """
0678         Download source files from the panda cache and return the setup env.
0679 
0680         :returns command: `str` to setup the workflow.
0681         """
0682         # setup = 'source setup.sh'
0683         # return setup
0684         return None
0685 
0686     def setup_idds(self):
0687         """
0688         Download source files from the idds cache and return the setup env.
0689 
0690         :returns command: `str` to setup the workflow.
0691         """
0692         return None
0693 
0694     def setup_sharefs(self):
0695         """
0696         Download source files from the share file system or use the codes from the share file system.
0697         Return the setup env.
0698 
0699         :returns command: `str` to setup the workflow.
0700         """
0701         return None
0702 
0703     def setup_panda_source_files(self):
0704         """
0705         Download source files from the panda cache and return the setup env.
0706 
0707         :returns command: `str` to setup the workflow.
0708         """
0709         if self.remote_source_file:
0710             target_dir = self.download_source_files_from_panda(self.remote_source_file)
0711             self._source_dir = target_dir
0712         return None
0713 
0714     def setup_idds_source_files(self):
0715         """
0716         Download source files from the idds cache and return the setup env.
0717 
0718         :returns command: `str` to setup the workflow.
0719         """
0720         if self.remote_source_file:
0721             self.download_source_files_from_panda(self.remote_source_file)
0722 
0723         return None
0724 
0725     def setup_sharefs_source_files(self):
0726         """
0727         Download source files from the share file system or use the codes from the share file system.
0728         Return the setup env.
0729 
0730         :returns command: `str` to setup the workflow.
0731         """
0732         return None
0733 
0734     def get_panda_env(self):
0735         env_list = ['PANDA_CONFIG_ROOT', 'PANDA_URL_SSL', 'PANDA_URL', 'PANDACACHE_URL', 'PANDAMON_URL',
0736                     'PANDA_AUTH', 'PANDA_VERIFY_HOST', 'PANDA_AUTH_VO', 'PANDA_BEHIND_REAL_LB']
0737         ret_envs = {}
0738         for env in env_list:
0739             if env in os.environ:
0740                 ret_envs[env] = os.environ[env]
0741         return ret_envs
0742 
0743     def get_archive_name(self):
0744         name = self._name.split(":")[-1]
0745         # name = name + "_" + datetime.datetime.utcnow().strftime("%Y_%m_%d_%H_%M_%S")
0746         archive_name = "%s.tar.gz" % name
0747         return archive_name
0748 
0749     def upload_source_files_to_panda(self):
0750         if not self._source_dir:
0751             return None
0752 
0753         archive_name = self.get_archive_name()
0754         archive_file = create_archive_file('/tmp', archive_name, [self._source_dir], exclude_files=self._exclude_source_files)
0755         self.logger.info(f"created archive file {archive_file} from [{self._source_dir}]")
0756         from pandaclient import Client
0757 
0758         attempt = 0
0759         max_attempts = 3
0760         done = False
0761         while attempt < max_attempts and not done:
0762             attempt += 1
0763             status, out = Client.putFile(archive_file, True)
0764             if status == 0:
0765                 done = True
0766         self.logger.info(f"copy_files_to_pandacache: status: {status}, out: {out}")
0767         if out.startswith("NewFileName:"):
0768             # found the same input sandbox to reuse
0769             archive_file = out.split(":")[-1]
0770         elif out != "True":
0771             self.logger.error(out)
0772             return None
0773 
0774         filename = os.path.basename(archive_file)
0775         cache_path = os.path.join(os.environ["PANDACACHE_URL"], "cache")
0776         filename = os.path.join(cache_path, filename)
0777         return filename
0778 
0779     def prepare_with_panda(self):
0780         """
0781         Upload the source files to the panda server.
0782 
0783         :raise Exception when failed.
0784         """
0785         self.logger.info("preparing workflow with PanDA")
0786         self._panda_env = self.get_panda_env()
0787         remote_file_name = self.upload_source_files_to_panda()
0788         self.remote_source_file = remote_file_name
0789         self.logger.info("remote source file: %s" % self.remote_source_file)
0790         if self.remote_source_file is None:
0791             raise exceptions.IDDSException("Failed to upload source files to PanDA cache. Please check log file ${IDDS_LOG_FILE} or direct output for details.")
0792         self.logger.info("prepared workflow with PanDA")
0793 
0794     def get_idds_env(self):
0795         env_list = ['IDDS_HOST', 'IDDS_AUTH_TYPE', 'IDDS_VO', 'IDDS_AUTH_NO_VERIFY',
0796                     'OIDC_AUTH_ID_TOKEN', 'OIDC_AUTH_VO', 'IDDS_CONFIG']
0797         ret_envs = {}
0798         for env in env_list:
0799             if env in os.environ:
0800                 ret_envs[env] = os.environ[env]
0801         return ret_envs
0802 
0803     def get_idds_server(self):
0804         if 'IDDS_HOST' in self._idds_env:
0805             return self._idds_env['IDDS_HOST']
0806         return os.environ.get('IDDS_HOST', None)
0807 
0808     def prepare_with_idds(self):
0809         """
0810         Upload the source files to the idds server.
0811 
0812         :raise Exception when failed.
0813         """
0814         # idds_env = self.get_idds_env()
0815         pass
0816 
0817     def prepare_with_sharefs(self):
0818         """
0819         Upload the source files to the share file system
0820         Or directly use the source files on the share file system..
0821 
0822         :raise Exception when failed.
0823         """
0824         pass
0825 
0826     def prepare(self):
0827         """
0828         Prepare the workflow.
0829         """
0830         if self.service == 'panda':
0831             return self.prepare_with_panda()
0832         elif self.service == 'idds':
0833             # return self.prepare_with_idds()
0834             return self.prepare_with_panda()
0835         elif self.service == 'sharefs':
0836             return self.prepare_with_sharefs()
0837         return self.prepare_with_sharefs()
0838 
0839 
0840 class Workflow(Base):
0841 
0842     def __init__(self, func=None, service='panda', context=None, source_dir=None, local=False, distributed=True,
0843                  pre_kwargs={}, args=None, kwargs={}, multi_jobs_kwargs_list=[], current_job_kwargs=None, name=None,
0844                  init_env=None, is_unique_func_name=False, max_walltime=24 * 3600, source_dir_parent_level=None,
0845                  enable_separate_log=False, exclude_source_files=[], clean_env=None, container_options=None,
0846                  use_func_dir=False, json_load=False, post_script=None):
0847         """
0848         Init a workflow.
0849         """
0850         super(Workflow, self).__init__()
0851         self.prepared = False
0852 
0853         self.logger = logging.getLogger(self.__class__.__name__)
0854 
0855         # self._func = func
0856         source_dir = self.get_source_dir(func, source_dir, source_dir_parent_level=source_dir_parent_level)
0857         self._func, self._func_name_and_args, self._multi_jobs_kwargs_list = self.get_func_name_and_args(
0858             func=func,
0859             pre_kwargs=pre_kwargs,
0860             args=args,
0861             kwargs=kwargs,
0862             base_dir=source_dir,
0863             multi_jobs_kwargs_list=multi_jobs_kwargs_list
0864         )
0865         if not json_load:
0866             self.logger.info(f"func: {self._func}, func_name_and_args: {self._func_name_and_args}, multi_jobs_kwargs_list: {self._multi_jobs_kwargs_list}")
0867 
0868         self._current_job_kwargs = current_job_kwargs
0869         if self._current_job_kwargs:
0870             self._current_job_kwargs = base64.b64encode(zlib.compress(pickle.dumps(self._current_job_kwargs))).decode("utf-8")
0871 
0872         if name:
0873             self._name = name
0874         else:
0875             self._name = self._func_name_and_args[0]
0876             if self._name:
0877                 self._name = self._name.replace('__main__:', '').replace('.py', '').replace(':', '.')
0878                 self._name = self._name.replace("/", "_").replace(".", "_").replace(":", "_")
0879             if not is_unique_func_name:
0880                 if self._name:
0881                     self._name = self._name + "_" + datetime.datetime.utcnow().strftime("%Y_%m_%d_%H_%M_%S")
0882 
0883         workflow_type = WorkflowType.iWorkflow
0884         if local:
0885             self._func = None
0886             workflow_type = WorkflowType.iWorkflowLocal
0887 
0888         if context is not None:
0889             self._context = context
0890         else:
0891             self._context = WorkflowContext(name=self._name, service=service, workflow_type=workflow_type, source_dir=source_dir,
0892                                             distributed=distributed, init_env=init_env, max_walltime=max_walltime,
0893                                             exclude_source_files=exclude_source_files, clean_env=clean_env,
0894                                             enable_separate_log=enable_separate_log, container_options=container_options,
0895                                             post_script=post_script)
0896 
0897     @property
0898     def service(self):
0899         return self._context.service
0900 
0901     @property
0902     def internal_id(self):
0903         return self._context.internal_id
0904 
0905     @internal_id.setter
0906     def internal_id(self, value):
0907         self._context.internal_id = value
0908 
0909     @service.setter
0910     def service(self, value):
0911         self._context.service = value
0912 
0913     @property
0914     def name(self):
0915         return self._name
0916 
0917     @name.setter
0918     def name(self, value):
0919         self._name = value
0920 
0921     @property
0922     def request_id(self):
0923         return self._context.request_id
0924 
0925     @request_id.setter
0926     def request_id(self, value):
0927         self._context.request_id = value
0928 
0929     def set_request_id(self, request_id):
0930         self.request_id = request_id
0931 
0932     @property
0933     def vo(self):
0934         return self._context.vo
0935 
0936     @vo.setter
0937     def vo(self, value):
0938         self._context.vo = value
0939 
0940     @property
0941     def site(self):
0942         return self._context.site
0943 
0944     @site.setter
0945     def site(self, value):
0946         self._context.site = value
0947 
0948     @property
0949     def queue(self):
0950         return self._context.queue
0951 
0952     @queue.setter
0953     def queue(self, value):
0954         self._context.queue = value
0955 
0956     def get_site(self):
0957         return self.site
0958 
0959     @property
0960     def cloud(self):
0961         return self._context.cloud
0962 
0963     @cloud.setter
0964     def cloud(self, value):
0965         self._context.cloud = value
0966 
0967     @property
0968     def working_group(self):
0969         return self._context.working_group
0970 
0971     @working_group.setter
0972     def working_group(self, value):
0973         self._context.working_group = value
0974 
0975     @property
0976     def task_type(self):
0977         return self._context.task_type
0978 
0979     @task_type.setter
0980     def task_type(self, value):
0981         self._context.task_type = value
0982 
0983     @property
0984     def processing_type(self):
0985         return self._context.processing_type
0986 
0987     @processing_type.setter
0988     def processing_type(self, value):
0989         self._context.processing_type = value
0990 
0991     @property
0992     def priority(self):
0993         return self._context.priority
0994 
0995     @priority.setter
0996     def priority(self, value):
0997         self._context.priority = value
0998 
0999     @property
1000     def core_count(self):
1001         return self._context.core_count
1002 
1003     @core_count.setter
1004     def core_count(self, value):
1005         self._context.core_count = value
1006 
1007     @property
1008     def total_memory(self):
1009         return self._context.total_memory
1010 
1011     @total_memory.setter
1012     def total_memory(self, value):
1013         self._context.total_memory = value
1014 
1015     @property
1016     def max_walltime(self):
1017         return self._context.max_walltime
1018 
1019     @max_walltime.setter
1020     def max_walltime(self, value):
1021         self._context.max_walltime = value
1022 
1023     @property
1024     def max_attempt(self):
1025         return self._context.max_attempt
1026 
1027     @max_attempt.setter
1028     def max_attempt(self, value):
1029         self._context.max_attempt = value
1030 
1031     @property
1032     def username(self):
1033         return self._context.username
1034 
1035     @username.setter
1036     def username(self, value):
1037         self._context.username = value
1038 
1039     @property
1040     def userdn(self):
1041         return self._context.userdn
1042 
1043     @userdn.setter
1044     def userdn(self, value):
1045         self._context.userdn = value
1046 
1047     @property
1048     def workflow_type(self):
1049         return self._context.workflow_type
1050 
1051     @workflow_type.setter
1052     def workflow_type(self, value):
1053         self._context.workflow_type = value
1054 
1055     @property
1056     def lifetime(self):
1057         return self._context.lifetime
1058 
1059     @lifetime.setter
1060     def lifetime(self, value):
1061         self._context.lifetime = value
1062 
1063     @property
1064     def workload_id(self):
1065         return self._context.workload_id
1066 
1067     @workload_id.setter
1068     def workload_id(self, value):
1069         self._context.workload_id = value
1070 
1071     def get_workload_id(self):
1072         return self.workload_id
1073 
1074     @property
1075     def token(self):
1076         return self._context.token
1077 
1078     @token.setter
1079     def token(self, value):
1080         self._context.token = value
1081 
1082     @property
1083     def enable_separate_log(self):
1084         return self._context.enable_separate_log
1085 
1086     @enable_separate_log.setter
1087     def enable_separate_log(self, value):
1088         self._context.enable_separate_log = value
1089 
1090     @property
1091     def container_options(self):
1092         return self._context.container_options
1093 
1094     @container_options.setter
1095     def container_options(self, value):
1096         self._context.container_options = value
1097 
1098     def get_work_tag(self):
1099         return self._context.workflow_type.name
1100 
1101     def get_work_type(self):
1102         return self._context.workflow_type
1103 
1104     def get_work_name(self):
1105         return self._name
1106 
1107     @property
1108     def campaign(self):
1109         return self._context.campaign
1110 
1111     @campaign.setter
1112     def campaign(self, value):
1113         self._context.campaign = value
1114 
1115     @property
1116     def campaign_scope(self):
1117         return self._context.campaign_scope
1118 
1119     @campaign_scope.setter
1120     def campaign_scope(self, value):
1121         self._context.campaign_scope = value
1122 
1123     @property
1124     def campaign_group(self):
1125         return self._context.campaign_group
1126 
1127     @campaign_group.setter
1128     def campaign_group(self, value):
1129         self._context.campaign_group = value
1130 
1131     @property
1132     def campaign_tag(self):
1133         return self._context.campaign_tag
1134 
1135     @campaign_tag.setter
1136     def campaign_tag(self, value):
1137         self._context.campaign_tag = value
1138 
1139     @property
1140     def max_processing_requests(self):
1141         return self._context.max_processing_requests
1142 
1143     @max_processing_requests.setter
1144     def max_processing_requests(self, value):
1145         self._context.max_processing_requests = value
1146 
1147     @property
1148     def multi_jobs_kwargs_list(self):
1149         return self._multi_jobs_kwargs_list
1150 
1151     @multi_jobs_kwargs_list.setter
1152     def multi_jobs_kwargs_list(self, value):
1153         raise Exception("Not allwed to update multi_jobs_kwargs_list")
1154 
1155     def to_dict(self):
1156         func = self._func
1157         self._func = None
1158         obj = super(Workflow, self).to_dict()
1159         self._func = func
1160         return obj
1161 
1162     def get_source_dir(self, func, source_dir, source_dir_parent_level=None, use_func_dir=False):
1163         if source_dir:
1164             return source_dir
1165         if func and use_func_dir:
1166             if inspect.isbuiltin(func):
1167                 return None
1168             source_file = inspect.getsourcefile(func)
1169             if not source_file:
1170                 return None
1171             file_path = os.path.abspath(source_file)
1172             source_dir = os.path.dirname(file_path)
1173             if source_dir_parent_level and source_dir_parent_level > 0:
1174                 for _ in range(0, source_dir_parent_level):
1175                     source_dir = os.path.dirname(source_dir)
1176             return source_dir
1177         current_dir = os.path.abspath(os.getcwd())
1178         return current_dir
1179 
1180     def store(self):
1181         if self._context:
1182             content = {'type': 'work',
1183                        'name': self.name,
1184                        'context': self._context,
1185                        'original_args': self._func_name_and_args,
1186                        'multi_jobs_kwargs_list': self._multi_jobs_kwargs_list,
1187                        'current_job_kwargs': self._current_job_kwargs}
1188             content = json_dumps(content)
1189             source_dir = self._context.get_source_dir()
1190             self.save_context(source_dir, self._name, content)
1191 
1192     def load(self, source_dir=None):
1193         if not source_dir:
1194             source_dir = self._context.get_source_dir()
1195             if not source_dir:
1196                 source_dir = os.getcwd()
1197         ret = self.load_context(source_dir, self._name)
1198         if ret:
1199             ret = json_loads(ret)
1200             self.logger.info(f"Loaded context: {ret}")
1201             if 'multi_jobs_kwargs_list' in ret:
1202                 self._multi_jobs_kwargs_list = ret['multi_jobs_kwargs_list']
1203 
1204     def prepare(self):
1205         """
1206         Prepare the workflow: for example uploading the source codes to cache server.
1207         :returns command: `str` to setup the workflow.
1208         """
1209         if not self.prepared:
1210             self._context.prepare()
1211             self.prepared = True
1212 
1213     def submit_to_idds_server(self):
1214         """
1215         Submit the workflow to the iDDS server.
1216 
1217         :returns id: the workflow id.
1218         :raise Exception when failing to submit the workflow.
1219         """
1220         # iDDS ClientManager
1221         from idds.client.clientmanager import ClientManager
1222 
1223         client = ClientManager(host=self._context.get_idds_server(), timeout=60)
1224         request_id = client.submit(self, use_dataset_name=False)
1225 
1226         self.logger.info("Submitted into iDDS with request id=%s", str(request_id))
1227         return request_id
1228 
1229     def submit_to_panda_server(self):
1230         """
1231         Submit the workflow to the iDDS server through PanDA service.
1232 
1233         :returns id: the workflow id.
1234         :raise Exception when failing to submit the workflow.
1235         """
1236         import idds.common.utils as idds_utils
1237         import pandaclient.idds_api as idds_api
1238 
1239         idds_server = self._context.get_idds_server()
1240         client = idds_api.get_api(dumper=idds_utils.json_dumps,
1241                                   idds_host=idds_server,
1242                                   compress=True,
1243                                   verbose=is_panda_client_verbose(),
1244                                   manager=True)
1245         ret = client.submit(self, username=None, use_dataset_name=False)
1246         if ret[0] == 0 and ret[1][0]:
1247             request_id = ret[1][1]
1248         else:
1249             request_id = None
1250             self.logger.error("Failed to submit workflow to PanDA-iDDS with error: %s" % str(ret))
1251 
1252         self.logger.info("Submitted into PanDA-iDDS with request id=%s", str(request_id))
1253         return request_id
1254 
1255     def submit(self):
1256         """
1257         Submit the workflow to the iDDS server.
1258 
1259         :returns id: the workflow id.
1260         :raise Exception when failing to submit the workflow.
1261         """
1262         try:
1263             request_id = None
1264             self.prepare()
1265             if self.service == 'panda':
1266                 request_id = self.submit_to_panda_server()
1267             else:
1268                 request_id = self.submit_to_idds_server()
1269         except Exception as ex:
1270             self.logger.error("Failed to submit workflow: %s" % str(ex))
1271 
1272         try:
1273             self._context.request_id = int(request_id)
1274             return request_id
1275         except Exception as ex:
1276             self.logger.error("Request id (%s) is not integer, there should be some submission errors: %s" % (request_id, str(ex)))
1277 
1278         return None
1279 
1280     def close_to_idds_server(self, request_id):
1281         """
1282         close the workflow to the iDDS server.
1283 
1284         :param request_id: the workflow id.
1285         :raise Exception when failing to close the workflow.
1286         """
1287         # iDDS ClientManager
1288         from idds.client.clientmanager import ClientManager
1289 
1290         client = ClientManager(host=self._context.get_idds_server(), timeout=60)
1291         ret = client.close(request_id)
1292 
1293         self.logger.info("Close request id=%s to iDDS server: %s", str(request_id), str(ret))
1294         return request_id
1295 
1296     def close_to_panda_server(self, request_id):
1297         """
1298         close the workflow to the iDDS server through PanDA service.
1299 
1300         :param request_id: the workflow id.
1301         :raise Exception when failing to closet the workflow.
1302         """
1303         import idds.common.utils as idds_utils
1304         import pandaclient.idds_api as idds_api
1305 
1306         idds_server = self._context.get_idds_server()
1307         client = idds_api.get_api(idds_utils.json_dumps,
1308                                   idds_host=idds_server,
1309                                   compress=True,
1310                                   verbose=is_panda_client_verbose(),
1311                                   manager=True)
1312         ret = client.close(request_id)
1313 
1314         self.logger.info("Close request id=%s through PanDA-iDDS: %s", str(request_id), str(ret))
1315         return request_id
1316 
1317     def close(self):
1318         """
1319         close the workflow to the iDDS server.
1320 
1321         :raise Exception when failing to close the workflow.
1322         """
1323         if self._context.request_id is not None:
1324             try:
1325                 if self.service == 'panda':
1326                     self.close_to_panda_server(self._context.request_id)
1327                 else:
1328                     self.close_to_idds_server(self._context.request_id)
1329             except Exception as ex:
1330                 self.logger.error("Failed to close request(%s): %s" % (self._context.request_id, str(ex)))
1331 
1332     def __del__(self):
1333         # self.close()
1334         pass
1335 
1336     def setup(self):
1337         """
1338         :returns command: `str` to setup the workflow.
1339         """
1340         return self._context.setup()
1341 
1342     def get_clean_env(self):
1343         """
1344         :returns command: `str` to clean the workflow.
1345         """
1346         return self._context.get_clean_env()
1347 
1348     def setup_source_files(self):
1349         """
1350         Setup location of source files
1351         """
1352         return self._context.setup_source_files()
1353 
1354     def load_func(self, func_name):
1355         """
1356         Load the function from the source files.
1357 
1358         :raise Exception
1359         """
1360         with modified_environ(IDDS_IGNORE_WORKFLOW_DECORATOR='true'):
1361             func = super(Workflow, self).load_func(func_name)
1362 
1363         return func
1364 
1365     def pre_run(self):
1366         # test AsyncResult
1367         a_ret = None
1368         try:
1369             workflow_context = self._context
1370             if workflow_context.distributed:
1371                 self.logger.info("Test AsyncResult")
1372                 a_ret = AsyncResult(workflow_context, wait_num=1, timeout=30)
1373                 ret = a_ret.is_ok()
1374                 a_ret.stop()
1375                 self.logger.info(f"pre_run asyncresult test is_ok: {ret}")
1376                 return ret
1377             return True
1378         except Exception as ex:
1379             self.logger.error(f"pre_run failed with error: {ex}")
1380             self.logger.error(traceback.format_exc())
1381         if a_ret:
1382             a_ret.stop()
1383         return False
1384 
1385     def run(self):
1386         self.logger.info("Start workflow run().")
1387         ret = None
1388         try:
1389             ret = self.run_local()
1390         except Exception as ex:
1391             self.logger.error(f"Failed to run function: {ex}")
1392             self.logger.error(traceback.format_exc())
1393         except:
1394             self.logger.error("Unknow error")
1395             self.logger.error(traceback.format_exc())
1396         self.logger.info(f"finish workflow run() with ret: {ret}.")
1397         return ret
1398 
1399     def run_local(self):
1400         """
1401         Run the workflow.
1402         """
1403         # with self:
1404         if True:
1405             is_ok = self.pre_run()
1406             if not is_ok:
1407                 self.logger.error(f"pre_run is_ok: {is_ok}, will exit.")
1408                 raise Exception("workflow pre_run failed")
1409 
1410             func_name, pre_kwargs, args, kwargs = self._func_name_and_args
1411             multi_jobs_kwargs_list = self.multi_jobs_kwargs_list
1412             if args:
1413                 args = pickle.loads(zlib.decompress(base64.b64decode(args)))
1414             if pre_kwargs:
1415                 pre_kwargs = pickle.loads(zlib.decompress(base64.b64decode(pre_kwargs)))
1416             if kwargs:
1417                 kwargs = pickle.loads(zlib.decompress(base64.b64decode(kwargs)))
1418             if multi_jobs_kwargs_list:
1419                 multi_jobs_kwargs_list = [pickle.loads(zlib.decompress(base64.b64decode(k))) for k in multi_jobs_kwargs_list]
1420 
1421             if self._func is None:
1422                 func = self.load_func(func_name)
1423                 self._func = func
1424             status, output, error = self.run_func(self._func, pre_kwargs, args, kwargs)
1425             if status:
1426                 self.logger.info(f"run workflow successfully. output: {output}, error: {error}")
1427             else:
1428                 self.logger.error(f"run workflow failed. output: {output}, error: {error}")
1429             return status
1430 
1431     # Context Manager -----------------------------------------------
1432     def __enter__(self):
1433         WorkflowCanvas.push_managed_workflow(self)
1434         return self
1435 
1436     def __exit__(self, _type, _value, _tb):
1437         w = WorkflowCanvas.pop_managed_workflow()
1438         if w is not None:
1439             w.close()
1440 
1441     # /Context Manager ----------------------------------------------
1442 
1443     def get_run_command(self):
1444         cmd = "run_workflow --type workflow --name %s " % self.name
1445         cmd += "--context %s --original_args %s " % (encode_base64(json_dumps(self._context)),
1446                                                      encode_base64(json_dumps(self._func_name_and_args)))
1447         cmd += "--current_job_kwargs ${IN/L}"
1448         return cmd
1449 
1450     def get_runner(self):
1451         setup = self.setup()
1452         cmd = ""
1453         run_command = self.get_run_command()
1454 
1455         if setup:
1456             pre_setup, main_setup = self.split_setup(setup)
1457             pre_setup = encode_base64(json_dumps(pre_setup))
1458             main_setup = encode_base64(json_dumps(main_setup))
1459             if pre_setup:
1460                 cmd = " --pre_setup " + pre_setup + " "
1461             cmd = cmd + " --setup " + main_setup + " "
1462         if cmd:
1463             cmd = cmd + " " + run_command
1464         else:
1465             cmd = run_command
1466 
1467         clean_env = self.get_clean_env()
1468         if clean_env:
1469             cmd = cmd + "; ret=$?; " + clean_env + "; exit $ret"
1470 
1471         return cmd
1472 
1473     def get_func_name(self):
1474         func_name = self._func_name_and_args[0]
1475         return func_name
1476 
1477 
1478 # foo = workflow(arg)(foo)
1479 def workflow(func=None, *, local=False, service='idds', source_dir=None, primary=False, queue=None, site=None, cloud=None,
1480              max_walltime=24 * 3600, distributed=True, init_env=None, pre_kwargs={}, return_workflow=False, no_wraps=False,
1481              source_dir_parent_level=None, exclude_source_files=[], enable_separate_log=False, clean_env=None,
1482              name=None, core_count=1, total_memory=4000,    # MB
1483              container_options=None, use_func_dir=False, post_script=None):
1484     if func is None:
1485         return functools.partial(workflow, local=local, service=service, source_dir=source_dir, primary=primary, queue=queue, site=site, cloud=cloud,
1486                                  max_walltime=max_walltime, distributed=distributed, init_env=init_env, pre_kwargs=pre_kwargs, no_wraps=no_wraps,
1487                                  return_workflow=return_workflow, source_dir_parent_level=source_dir_parent_level,
1488                                  exclude_source_files=exclude_source_files, clean_env=clean_env, enable_separate_log=enable_separate_log,
1489                                  name=name, core_count=core_count, total_memory=total_memory, use_func_dir=use_func_dir,
1490                                  container_options=container_options, post_script=post_script)
1491 
1492     if 'IDDS_IGNORE_WORKFLOW_DECORATOR' in os.environ:
1493         return func
1494 
1495     # @functools.wraps(func)
1496     def wrapper(*args, **kwargs):
1497         try:
1498             logger = logging.getLogger("workflow_def")
1499             f = Workflow(func, service=service, source_dir=source_dir, local=local, max_walltime=max_walltime, distributed=distributed,
1500                          pre_kwargs=pre_kwargs, args=args, kwargs=kwargs, init_env=init_env, source_dir_parent_level=source_dir_parent_level,
1501                          exclude_source_files=exclude_source_files, clean_env=clean_env, enable_separate_log=enable_separate_log,
1502                          name=name, container_options=container_options, use_func_dir=use_func_dir, post_script=post_script)
1503 
1504             f.queue = queue
1505             f.site = site
1506             f.cloud = cloud
1507             f.core_count = core_count
1508             f.total_memory = total_memory
1509 
1510             logger.info("return_workflow %s" % return_workflow)
1511             if return_workflow:
1512                 return f
1513 
1514             logger.info("Prepare workflow")
1515             f.prepare()
1516             logger.info("Prepared workflow")
1517 
1518             logger.info("Registering workflow")
1519             f.submit()
1520 
1521             if not local:
1522                 logger.info("Run workflow at remote sites")
1523                 return f
1524             else:
1525                 logger.info("Run workflow locally")
1526                 with f:
1527                     ret = f.run()
1528                 return ret
1529         except Exception as ex:
1530             logger.error("Failed to run workflow %s: %s" % (func, ex))
1531             raise ex
1532         except:
1533             raise
1534     if no_wraps:
1535         return wrapper
1536     else:
1537         return functools.wraps(func)(wrapper)
1538 
1539 
1540 def workflow_old(func=None, *, lazy=False, service='panda', source_dir=None, primary=False, distributed=True):
1541 
1542     def decorator(func):
1543         @functools.wraps(func)
1544         def wrapper(*args, **kwargs):
1545             try:
1546                 f = Workflow(func, service=service, source_dir=source_dir, distributed=distributed)
1547 
1548                 if lazy:
1549                     return f
1550 
1551                 return f.run()
1552             except Exception as ex:
1553                 logging.error("Failed to run workflow %s: %s" % (func, ex))
1554                 raise ex
1555             except:
1556                 raise
1557         return wrapper
1558 
1559     if func is None:
1560         return decorator
1561     else:
1562         return decorator(func)