File indexing completed on 2026-04-09 07:58:32
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 import base64
0013 import collections
0014 import datetime
0015 import functools
0016 import logging
0017 import inspect
0018 import os
0019 import pickle
0020 import tarfile
0021 import traceback
0022 import uuid
0023 import zlib
0024
0025
0026
0027 from idds.common import exceptions
0028 from idds.common.constants import WorkflowType
0029 from idds.common.utils import setup_logging, create_archive_file, json_dumps, json_loads, encode_base64, modified_environ, is_panda_client_verbose
0030 from .asyncresult import AsyncResult
0031 from .base import Base, Context
0032
0033
0034 setup_logging(__name__)
0035
0036
0037 class WorkflowCanvas(object):
0038
0039 _managed_workflows = collections.deque()
0040
0041 @classmethod
0042 def push_managed_workflow(cls, workflow: Base):
0043 cls._managed_workflows.appendleft(workflow)
0044
0045 @classmethod
0046 def pop_managed_workflow(cls):
0047 workflow = cls._managed_workflows.popleft()
0048 return workflow
0049
0050 @classmethod
0051 def get_current_workflow(cls):
0052 try:
0053 return cls._managed_workflows[0]
0054 except IndexError:
0055 return None
0056
0057
0058 class WorkflowContext(Context):
0059 def __init__(self, name=None, service='panda', source_dir=None, workflow_type=WorkflowType.iWorkflow, distributed=True,
0060 max_walltime=24 * 3600, init_env=None, exclude_source_files=[], clean_env=None, enable_separate_log=False,
0061 cloud=None, site=None, queue=None, vo=None, container_options=None, task_type=None, working_group=None,
0062 processing_type=None, post_script=None):
0063 super(WorkflowContext, self).__init__()
0064 self._service = service
0065 self._request_id = None
0066 self._workflow_type = workflow_type
0067
0068
0069
0070 self._idds_env = {}
0071 self._panda_env = {}
0072
0073 self._name = name
0074 self._source_dir = source_dir
0075 self.remote_source_file = None
0076
0077 self._vo = vo
0078
0079 self._queue = queue
0080 self._site = site
0081 self._cloud = cloud
0082
0083 self._working_group = working_group
0084 self._task_type = task_type
0085 self._processing_type = processing_type
0086
0087 self._priority = 500
0088 self._core_count = 1
0089 self._total_memory = 1000
0090 self._max_walltime = max_walltime
0091 self._max_attempt = 5
0092
0093 self._username = None
0094 self._userdn = None
0095 self._workflow_type = workflow_type
0096 self._lifetime = 7 * 24 * 3600
0097 self._workload_id = None
0098 self._request_id = None
0099
0100 self.distributed = distributed
0101
0102 self._broker_initialized = False
0103 self._brokers = None
0104 self._broker_timeout = 180
0105 self._broker_username = None
0106 self._broker_password = None
0107 self._broker_destination = None
0108
0109
0110
0111 self._token = str(uuid.uuid4())
0112
0113 self._panda_initialized = False
0114 self.init_panda()
0115 self._idds_initialized = False
0116 self.init_idds()
0117
0118 self.init_env = init_env
0119 self._clean_env = clean_env
0120 self._post_script = post_script
0121
0122 self._exclude_source_files = []
0123 if exclude_source_files:
0124 if type(exclude_source_files) in [list, tuple]:
0125 self._exclude_source_files = exclude_source_files
0126 else:
0127 self._exclude_source_files = [exclude_source_files]
0128
0129 self._enable_separate_log = enable_separate_log
0130
0131 self._container_options = container_options
0132
0133 self._campaign = None
0134 self._campaign_scope = None
0135 self._campaign_group = None
0136 self._campaign_tag = None
0137 self._max_processing_requests = -1
0138
0139 self.logger = logging.getLogger(self.__class__.__name__)
0140
0141 @property
0142 def distributed(self):
0143 return self._distributed
0144
0145 @distributed.setter
0146 def distributed(self, value):
0147 self._distributed = value
0148
0149 @property
0150 def service(self):
0151 return self._service
0152
0153 @service.setter
0154 def service(self, value):
0155 self._service = value
0156
0157 @property
0158 def init_env(self):
0159 return self._init_env
0160
0161 @init_env.setter
0162 def init_env(self, value):
0163 self._init_env = value
0164 if self._init_env:
0165 self._init_env = self._init_env + " "
0166
0167 @property
0168 def clean_env(self):
0169 return self._clean_env
0170
0171 @clean_env.setter
0172 def clean_env(self, value):
0173 self._clean_env = value
0174 if self._clean_env:
0175 self._clean_env = self._clean_env + " "
0176
0177 @property
0178 def post_script(self):
0179 return self._post_script
0180
0181 @post_script.setter
0182 def post_script(self, value):
0183 self._post_script = value
0184
0185 @property
0186 def vo(self):
0187 return self._vo
0188
0189 @vo.setter
0190 def vo(self, value):
0191 self._vo = value
0192
0193 @property
0194 def queue(self):
0195 return self._queue
0196
0197 @queue.setter
0198 def queue(self, value):
0199 self._queue = value
0200
0201 @property
0202 def site(self):
0203 return self._site
0204
0205 @site.setter
0206 def site(self, value):
0207 self._site = value
0208
0209 @property
0210 def cloud(self):
0211 return self._cloud
0212
0213 @cloud.setter
0214 def cloud(self, value):
0215 self._cloud = value
0216
0217 @property
0218 def working_group(self):
0219 return self._working_group
0220
0221 @working_group.setter
0222 def working_group(self, value):
0223 self._working_group = value
0224
0225 @property
0226 def task_type(self):
0227 return self._task_type
0228
0229 @task_type.setter
0230 def task_type(self, value):
0231 self._task_type = value
0232
0233 @property
0234 def processing_type(self):
0235 return self._processing_type
0236
0237 @processing_type.setter
0238 def processing_type(self, value):
0239 self._processing_type = value
0240
0241 @property
0242 def priority(self):
0243 return self._priority
0244
0245 @priority.setter
0246 def priority(self, value):
0247 self._priority = value
0248
0249 @property
0250 def core_count(self):
0251 return self._core_count
0252
0253 @core_count.setter
0254 def core_count(self, value):
0255 self._core_count = value
0256
0257 @property
0258 def total_memory(self):
0259 return self._total_memory
0260
0261 @total_memory.setter
0262 def total_memory(self, value):
0263 self._total_memory = value
0264
0265 @property
0266 def max_walltime(self):
0267 return self._max_walltime
0268
0269 @max_walltime.setter
0270 def max_walltime(self, value):
0271 self._max_walltime = value
0272
0273 @property
0274 def max_attempt(self):
0275 return self._max_attempt
0276
0277 @max_attempt.setter
0278 def max_attempt(self, value):
0279 self._max_attempt = value
0280
0281 @property
0282 def username(self):
0283 return self._username
0284
0285 @username.setter
0286 def username(self, value):
0287 self._username = value
0288
0289 @property
0290 def userdn(self):
0291 return self._userdn
0292
0293 @userdn.setter
0294 def userdn(self, value):
0295 self._userdn = value
0296
0297 @property
0298 def workflow_type(self):
0299 return self._workflow_type
0300
0301 @workflow_type.setter
0302 def workflow_type(self, value):
0303 self._workflow_type = value
0304
0305 @property
0306 def lifetime(self):
0307 return self._lifetime
0308
0309 @lifetime.setter
0310 def lifetime(self, value):
0311 self._lifetime = value
0312
0313 @property
0314 def request_id(self):
0315 return self._request_id
0316
0317 @request_id.setter
0318 def request_id(self, value):
0319 self._request_id = int(value)
0320
0321 @property
0322 def workload_id(self):
0323 return self._workload_id
0324
0325 @workload_id.setter
0326 def workload_id(self, value):
0327 self._workload_id = value
0328
0329 @property
0330 def enable_separate_log(self):
0331 return self._enable_separate_log
0332
0333 @enable_separate_log.setter
0334 def enable_separate_log(self, value):
0335 self._enable_separate_log = value
0336
0337 @property
0338 def brokers(self):
0339 return self._brokers
0340
0341 @brokers.setter
0342 def brokers(self, value):
0343 self._brokers = value
0344
0345 @property
0346 def broker_timeout(self):
0347 return self._broker_timeout
0348
0349 @broker_timeout.setter
0350 def broker_timeout(self, value):
0351 self._broker_timeout = value
0352
0353 @property
0354 def broker_username(self):
0355 return self._broker_username
0356
0357 @broker_username.setter
0358 def broker_username(self, value):
0359 self._broker_username = value
0360
0361 @property
0362 def broker_password(self):
0363 if self._broker_password:
0364 return self._broker_password
0365 return None
0366
0367 @broker_password.setter
0368 def broker_password(self, value):
0369 self._broker_password = value
0370
0371 @property
0372 def broker_destination(self):
0373 return self._broker_destination
0374
0375 @broker_destination.setter
0376 def broker_destination(self, value):
0377 self._broker_destination = value
0378
0379 def get_source_dir(self):
0380 return self._source_dir
0381
0382 @property
0383 def container_options(self):
0384 return self._container_options
0385
0386 @container_options.setter
0387 def container_options(self, value):
0388 self._container_options = value
0389
0390 @property
0391 def token(self):
0392 return self._token
0393
0394 @token.setter
0395 def token(self, value):
0396 self._token = value
0397
0398 @property
0399 def campaign(self):
0400 return self._campaign
0401
0402 @campaign.setter
0403 def campaign(self, value):
0404 self._campaign = value
0405
0406 @property
0407 def campaign_scope(self):
0408 return self._campaign_scope
0409
0410 @campaign_scope.setter
0411 def campaign_scope(self, value):
0412 self._campaign_scope = value
0413
0414 @property
0415 def campaign_group(self):
0416 return self._campaign_group
0417
0418 @campaign_group.setter
0419 def campaign_group(self, value):
0420 self._campaign_group = value
0421
0422 @property
0423 def campaign_tag(self):
0424 return self._campaign_tag
0425
0426 @campaign_tag.setter
0427 def campaign_tag(self, value):
0428 self._campaign_tag = value
0429
0430 @property
0431 def max_processing_requests(self):
0432 return self._max_processing_requests
0433
0434 @max_processing_requests.setter
0435 def max_processing_requests(self, value):
0436 self._max_processing_requests = value
0437
0438 @property
0439 def panda_env(self):
0440 return self._panda_env
0441
0442 @panda_env.setter
0443 def panda_env(self, value):
0444 self._panda_env = value
0445
0446 @property
0447 def idds_env(self):
0448 return self._idds_env
0449
0450 @idds_env.setter
0451 def idds_env(self, value):
0452 self._idds_env = value
0453
0454 def init_brokers(self):
0455 if not self._broker_initialized:
0456 self.logger.info("To initialize broker")
0457 brokers = os.environ.get("IDDS_BROKERS", None)
0458 broker_destination = os.environ.get("IDDS_BROKER_DESTINATION", None)
0459 broker_timeout = os.environ.get("IDDS_BROKER_TIMEOUT", 180)
0460 broker_username = os.environ.get("IDDS_BROKER_USERNAME", None)
0461 broker_password = os.environ.get("IDDS_BROKER_PASSWORD", None)
0462 if brokers and broker_destination and broker_username and broker_password:
0463 self._brokers = brokers
0464 self._broker_timeout = int(broker_timeout)
0465 self._broker_username = broker_username
0466 self._broker_password = broker_password
0467 self._broker_destination = broker_destination
0468
0469 self._broker_initialized = True
0470 self.logger.info("Initialized brokers from environment")
0471 else:
0472 self.logger.info("Getting brokers information from central service")
0473 broker_info = self.get_broker_info()
0474 if broker_info:
0475 brokers = broker_info.get("brokers", None)
0476 broker_destination = broker_info.get("broker_destination", None)
0477 broker_timeout = broker_info.get("broker_timeout", 180)
0478 broker_username = broker_info.get("broker_username", None)
0479 broker_password = broker_info.get("broker_password", None)
0480 if brokers and broker_destination and broker_username and broker_password:
0481 self._brokers = brokers
0482 self._broker_timeout = int(broker_timeout)
0483 self._broker_username = broker_username
0484 self._broker_password = broker_password
0485 self._broker_destination = broker_destination
0486
0487 self._broker_initialized = True
0488 self.logger.info("Initialized brokers from central service")
0489 else:
0490 self.logger.warn("Broker information from the central service is missing, will not initialize it")
0491 if not self._broker_initialized:
0492 self.logger.warn("Broker is not initialized")
0493 return self._broker_initialized
0494
0495 def get_broker_info_from_idds_server(self):
0496 """
0497 Get broker infomation from the iDDS server.
0498
0499 :raise Exception when failing to get broker information.
0500 """
0501 self.logger.info("Getting broker information through idds server.")
0502
0503 from idds.client.clientmanager import ClientManager
0504
0505 client = ClientManager(host=self.get_idds_server(), timeout=60)
0506 ret = client.get_metainfo(name='asyncresult_config')
0507 if type(ret) in (list, tuple) and ret[0] is True:
0508 return ret[1]
0509 else:
0510 self.logger.warn(f"Failed to get broker info: {ret}")
0511 return None
0512
0513 return ret
0514
0515 def get_broker_info_from_panda_server(self):
0516 """
0517 Get broker infomation from the iDDS server through PanDA service.
0518
0519 :raise Exception when failing to get broker information.
0520 """
0521 self.logger.info("Get broker information through panda server.")
0522
0523 import idds.common.utils as idds_utils
0524 import pandaclient.idds_api as idds_api
0525
0526 idds_server = self.get_idds_server()
0527 client = idds_api.get_api(idds_utils.json_dumps,
0528 idds_host=idds_server,
0529 compress=True,
0530 verbose=is_panda_client_verbose(),
0531 manager=True)
0532 ret = client.get_metainfo(name='asyncresult_config')
0533 if ret[0] == 0 and ret[1][0]:
0534 idds_ret = ret[1][1]
0535 if type(idds_ret) in (list, tuple) and idds_ret[0] is True:
0536 meta_info = idds_ret[1]
0537 if type(meta_info) in [dict]:
0538 pass
0539 elif type(meta_info) in [str]:
0540 try:
0541 meta_info = json_loads(meta_info)
0542 except Exception as ex:
0543 self.logger.warn("Failed to json loads meta info(%s): %s" % (meta_info, ex))
0544 else:
0545 meta_info = None
0546 self.logger.warn("Failed to get meta info: %s" % str(ret))
0547 else:
0548 meta_info = None
0549 self.logger.warn("Failed to get meta info: %s" % str(ret))
0550
0551 return meta_info
0552
0553 def get_broker_info(self):
0554 try:
0555 if self.service == 'panda':
0556 return self.get_broker_info_from_panda_server()
0557 return self.get_broker_info_from_idds_server()
0558 except Exception as ex:
0559 self.logger.error("Failed to get broker info: %s" % str(ex))
0560
0561 def init_idds(self):
0562 if not self._idds_initialized:
0563 self._idds_initialized = True
0564 self._idds_env = self.get_idds_env()
0565
0566 def init_panda(self):
0567 if not self._panda_initialized:
0568 self._panda_initialized = True
0569 self._panda_env = self.get_panda_env()
0570 if not self.site:
0571 self.site = os.environ.get("PANDA_SITE", None)
0572 if not self.queue:
0573 self.queue = os.environ.get("PANDA_QUEUE", None)
0574 if not self.cloud:
0575 self.cloud = os.environ.get("PANDA_CLOUD", None)
0576 if not self.vo:
0577 self.vo = os.environ.get("PANDA_VO", None)
0578 if not self.working_group:
0579 self.working_group = os.environ.get("PANDA_WORKING_GROUP", None)
0580
0581 def initialize(self):
0582
0583
0584 env_list = ['IDDS_HOST', 'IDDS_AUTH_NO_VERIFY']
0585 for env in env_list:
0586 if env not in os.environ and env in self._idds_env:
0587 os.environ[env] = self._idds_env[env]
0588
0589
0590
0591 env_list = ['PANDA_URL_SSL', 'PANDA_URL', 'PANDACACHE_URL', 'PANDAMON_URL',
0592 'PANDA_VERIFY_HOST', 'PANDA_BEHIND_REAL_LB', 'PANDA_CLIENT_VERBOSE']
0593 for env in env_list:
0594 if env not in os.environ and env in self._panda_env:
0595 os.environ[env] = self._panda_env[env]
0596 if 'PANDA_CONFIG_ROOT' not in os.environ:
0597 os.environ['PANDA_CONFIG_ROOT'] = os.getcwd()
0598
0599 def global_setup(self):
0600 if self.service == 'panda':
0601 set_up = self.setup_panda()
0602 elif self.service == 'idds':
0603 set_up = self.setup_idds()
0604 elif self.service == 'sharefs':
0605 set_up = self.setup_sharefs()
0606 else:
0607 set_up = self.setup_sharefs()
0608 return set_up
0609
0610 def setup(self):
0611 """
0612 :returns command: `str` to setup the workflow.
0613 """
0614 set_up = self.global_setup()
0615
0616 init_env = self.init_env
0617 ret = None
0618 if set_up:
0619 ret = set_up
0620 if init_env:
0621 if ret:
0622 ret = ret + "; " + init_env
0623 else:
0624 ret = init_env
0625 return ret
0626
0627 def get_clean_env(self):
0628 return self.clean_env
0629
0630 def setup_source_files(self):
0631 """
0632 Setup source files.
0633 """
0634 if self.service == 'panda':
0635 return self.setup_panda_source_files()
0636 elif self.service == 'idds':
0637 return self.setup_idds_source_files()
0638 elif self.service == 'sharefs':
0639 return self.setup_sharefs_source_files()
0640 return self.setup_sharefs_source_files()
0641
0642 def download_source_files_from_panda(self, filename):
0643 """Download and extract the tarball from pandacache"""
0644 archive_basename = os.path.basename(filename)
0645 target_dir = os.getcwd()
0646 full_output_filename = os.path.join(target_dir, archive_basename)
0647 self.logger.info("Downloading %s to %s" % (filename, full_output_filename))
0648
0649 if filename.startswith("https:"):
0650 panda_cache_url = os.path.dirname(os.path.dirname(filename))
0651 os.environ["PANDACACHE_URL"] = panda_cache_url
0652 elif "PANDACACHE_URL" not in os.environ and "PANDA_URL_SSL" in os.environ:
0653 os.environ["PANDACACHE_URL"] = os.environ["PANDA_URL_SSL"]
0654 self.logger.info("PANDACACHE_URL: %s" % os.environ.get("PANDACACHE_URL", None))
0655
0656 from pandaclient import Client
0657
0658 attempt = 0
0659 max_attempts = 3
0660 done = False
0661 while attempt < max_attempts and not done:
0662 attempt += 1
0663 status, output = Client.getFile(archive_basename, output_path=full_output_filename)
0664 if status == 0:
0665 done = True
0666 self.logger.info(f"Download archive file from pandacache status: {status}, output: {output}")
0667 if status != 0:
0668 raise RuntimeError("Failed to download archive file from pandacache")
0669 with tarfile.open(full_output_filename, "r:gz") as f:
0670 f.extractall(target_dir)
0671 self.logger.info(f"Extract {full_output_filename} to {target_dir}")
0672 os.remove(full_output_filename)
0673 self.logger.info("Remove %s" % full_output_filename)
0674 return target_dir
0675
0676 def setup_panda(self):
0677 """
0678 Download source files from the panda cache and return the setup env.
0679
0680 :returns command: `str` to setup the workflow.
0681 """
0682
0683
0684 return None
0685
0686 def setup_idds(self):
0687 """
0688 Download source files from the idds cache and return the setup env.
0689
0690 :returns command: `str` to setup the workflow.
0691 """
0692 return None
0693
0694 def setup_sharefs(self):
0695 """
0696 Download source files from the share file system or use the codes from the share file system.
0697 Return the setup env.
0698
0699 :returns command: `str` to setup the workflow.
0700 """
0701 return None
0702
0703 def setup_panda_source_files(self):
0704 """
0705 Download source files from the panda cache and return the setup env.
0706
0707 :returns command: `str` to setup the workflow.
0708 """
0709 if self.remote_source_file:
0710 target_dir = self.download_source_files_from_panda(self.remote_source_file)
0711 self._source_dir = target_dir
0712 return None
0713
0714 def setup_idds_source_files(self):
0715 """
0716 Download source files from the idds cache and return the setup env.
0717
0718 :returns command: `str` to setup the workflow.
0719 """
0720 if self.remote_source_file:
0721 self.download_source_files_from_panda(self.remote_source_file)
0722
0723 return None
0724
0725 def setup_sharefs_source_files(self):
0726 """
0727 Download source files from the share file system or use the codes from the share file system.
0728 Return the setup env.
0729
0730 :returns command: `str` to setup the workflow.
0731 """
0732 return None
0733
0734 def get_panda_env(self):
0735 env_list = ['PANDA_CONFIG_ROOT', 'PANDA_URL_SSL', 'PANDA_URL', 'PANDACACHE_URL', 'PANDAMON_URL',
0736 'PANDA_AUTH', 'PANDA_VERIFY_HOST', 'PANDA_AUTH_VO', 'PANDA_BEHIND_REAL_LB']
0737 ret_envs = {}
0738 for env in env_list:
0739 if env in os.environ:
0740 ret_envs[env] = os.environ[env]
0741 return ret_envs
0742
0743 def get_archive_name(self):
0744 name = self._name.split(":")[-1]
0745
0746 archive_name = "%s.tar.gz" % name
0747 return archive_name
0748
0749 def upload_source_files_to_panda(self):
0750 if not self._source_dir:
0751 return None
0752
0753 archive_name = self.get_archive_name()
0754 archive_file = create_archive_file('/tmp', archive_name, [self._source_dir], exclude_files=self._exclude_source_files)
0755 self.logger.info(f"created archive file {archive_file} from [{self._source_dir}]")
0756 from pandaclient import Client
0757
0758 attempt = 0
0759 max_attempts = 3
0760 done = False
0761 while attempt < max_attempts and not done:
0762 attempt += 1
0763 status, out = Client.putFile(archive_file, True)
0764 if status == 0:
0765 done = True
0766 self.logger.info(f"copy_files_to_pandacache: status: {status}, out: {out}")
0767 if out.startswith("NewFileName:"):
0768
0769 archive_file = out.split(":")[-1]
0770 elif out != "True":
0771 self.logger.error(out)
0772 return None
0773
0774 filename = os.path.basename(archive_file)
0775 cache_path = os.path.join(os.environ["PANDACACHE_URL"], "cache")
0776 filename = os.path.join(cache_path, filename)
0777 return filename
0778
0779 def prepare_with_panda(self):
0780 """
0781 Upload the source files to the panda server.
0782
0783 :raise Exception when failed.
0784 """
0785 self.logger.info("preparing workflow with PanDA")
0786 self._panda_env = self.get_panda_env()
0787 remote_file_name = self.upload_source_files_to_panda()
0788 self.remote_source_file = remote_file_name
0789 self.logger.info("remote source file: %s" % self.remote_source_file)
0790 if self.remote_source_file is None:
0791 raise exceptions.IDDSException("Failed to upload source files to PanDA cache. Please check log file ${IDDS_LOG_FILE} or direct output for details.")
0792 self.logger.info("prepared workflow with PanDA")
0793
0794 def get_idds_env(self):
0795 env_list = ['IDDS_HOST', 'IDDS_AUTH_TYPE', 'IDDS_VO', 'IDDS_AUTH_NO_VERIFY',
0796 'OIDC_AUTH_ID_TOKEN', 'OIDC_AUTH_VO', 'IDDS_CONFIG']
0797 ret_envs = {}
0798 for env in env_list:
0799 if env in os.environ:
0800 ret_envs[env] = os.environ[env]
0801 return ret_envs
0802
0803 def get_idds_server(self):
0804 if 'IDDS_HOST' in self._idds_env:
0805 return self._idds_env['IDDS_HOST']
0806 return os.environ.get('IDDS_HOST', None)
0807
0808 def prepare_with_idds(self):
0809 """
0810 Upload the source files to the idds server.
0811
0812 :raise Exception when failed.
0813 """
0814
0815 pass
0816
0817 def prepare_with_sharefs(self):
0818 """
0819 Upload the source files to the share file system
0820 Or directly use the source files on the share file system..
0821
0822 :raise Exception when failed.
0823 """
0824 pass
0825
0826 def prepare(self):
0827 """
0828 Prepare the workflow.
0829 """
0830 if self.service == 'panda':
0831 return self.prepare_with_panda()
0832 elif self.service == 'idds':
0833
0834 return self.prepare_with_panda()
0835 elif self.service == 'sharefs':
0836 return self.prepare_with_sharefs()
0837 return self.prepare_with_sharefs()
0838
0839
0840 class Workflow(Base):
0841
0842 def __init__(self, func=None, service='panda', context=None, source_dir=None, local=False, distributed=True,
0843 pre_kwargs={}, args=None, kwargs={}, multi_jobs_kwargs_list=[], current_job_kwargs=None, name=None,
0844 init_env=None, is_unique_func_name=False, max_walltime=24 * 3600, source_dir_parent_level=None,
0845 enable_separate_log=False, exclude_source_files=[], clean_env=None, container_options=None,
0846 use_func_dir=False, json_load=False, post_script=None):
0847 """
0848 Init a workflow.
0849 """
0850 super(Workflow, self).__init__()
0851 self.prepared = False
0852
0853 self.logger = logging.getLogger(self.__class__.__name__)
0854
0855
0856 source_dir = self.get_source_dir(func, source_dir, source_dir_parent_level=source_dir_parent_level)
0857 self._func, self._func_name_and_args, self._multi_jobs_kwargs_list = self.get_func_name_and_args(
0858 func=func,
0859 pre_kwargs=pre_kwargs,
0860 args=args,
0861 kwargs=kwargs,
0862 base_dir=source_dir,
0863 multi_jobs_kwargs_list=multi_jobs_kwargs_list
0864 )
0865 if not json_load:
0866 self.logger.info(f"func: {self._func}, func_name_and_args: {self._func_name_and_args}, multi_jobs_kwargs_list: {self._multi_jobs_kwargs_list}")
0867
0868 self._current_job_kwargs = current_job_kwargs
0869 if self._current_job_kwargs:
0870 self._current_job_kwargs = base64.b64encode(zlib.compress(pickle.dumps(self._current_job_kwargs))).decode("utf-8")
0871
0872 if name:
0873 self._name = name
0874 else:
0875 self._name = self._func_name_and_args[0]
0876 if self._name:
0877 self._name = self._name.replace('__main__:', '').replace('.py', '').replace(':', '.')
0878 self._name = self._name.replace("/", "_").replace(".", "_").replace(":", "_")
0879 if not is_unique_func_name:
0880 if self._name:
0881 self._name = self._name + "_" + datetime.datetime.utcnow().strftime("%Y_%m_%d_%H_%M_%S")
0882
0883 workflow_type = WorkflowType.iWorkflow
0884 if local:
0885 self._func = None
0886 workflow_type = WorkflowType.iWorkflowLocal
0887
0888 if context is not None:
0889 self._context = context
0890 else:
0891 self._context = WorkflowContext(name=self._name, service=service, workflow_type=workflow_type, source_dir=source_dir,
0892 distributed=distributed, init_env=init_env, max_walltime=max_walltime,
0893 exclude_source_files=exclude_source_files, clean_env=clean_env,
0894 enable_separate_log=enable_separate_log, container_options=container_options,
0895 post_script=post_script)
0896
0897 @property
0898 def service(self):
0899 return self._context.service
0900
0901 @property
0902 def internal_id(self):
0903 return self._context.internal_id
0904
0905 @internal_id.setter
0906 def internal_id(self, value):
0907 self._context.internal_id = value
0908
0909 @service.setter
0910 def service(self, value):
0911 self._context.service = value
0912
0913 @property
0914 def name(self):
0915 return self._name
0916
0917 @name.setter
0918 def name(self, value):
0919 self._name = value
0920
0921 @property
0922 def request_id(self):
0923 return self._context.request_id
0924
0925 @request_id.setter
0926 def request_id(self, value):
0927 self._context.request_id = value
0928
0929 def set_request_id(self, request_id):
0930 self.request_id = request_id
0931
0932 @property
0933 def vo(self):
0934 return self._context.vo
0935
0936 @vo.setter
0937 def vo(self, value):
0938 self._context.vo = value
0939
0940 @property
0941 def site(self):
0942 return self._context.site
0943
0944 @site.setter
0945 def site(self, value):
0946 self._context.site = value
0947
0948 @property
0949 def queue(self):
0950 return self._context.queue
0951
0952 @queue.setter
0953 def queue(self, value):
0954 self._context.queue = value
0955
0956 def get_site(self):
0957 return self.site
0958
0959 @property
0960 def cloud(self):
0961 return self._context.cloud
0962
0963 @cloud.setter
0964 def cloud(self, value):
0965 self._context.cloud = value
0966
0967 @property
0968 def working_group(self):
0969 return self._context.working_group
0970
0971 @working_group.setter
0972 def working_group(self, value):
0973 self._context.working_group = value
0974
0975 @property
0976 def task_type(self):
0977 return self._context.task_type
0978
0979 @task_type.setter
0980 def task_type(self, value):
0981 self._context.task_type = value
0982
0983 @property
0984 def processing_type(self):
0985 return self._context.processing_type
0986
0987 @processing_type.setter
0988 def processing_type(self, value):
0989 self._context.processing_type = value
0990
0991 @property
0992 def priority(self):
0993 return self._context.priority
0994
0995 @priority.setter
0996 def priority(self, value):
0997 self._context.priority = value
0998
0999 @property
1000 def core_count(self):
1001 return self._context.core_count
1002
1003 @core_count.setter
1004 def core_count(self, value):
1005 self._context.core_count = value
1006
1007 @property
1008 def total_memory(self):
1009 return self._context.total_memory
1010
1011 @total_memory.setter
1012 def total_memory(self, value):
1013 self._context.total_memory = value
1014
1015 @property
1016 def max_walltime(self):
1017 return self._context.max_walltime
1018
1019 @max_walltime.setter
1020 def max_walltime(self, value):
1021 self._context.max_walltime = value
1022
1023 @property
1024 def max_attempt(self):
1025 return self._context.max_attempt
1026
1027 @max_attempt.setter
1028 def max_attempt(self, value):
1029 self._context.max_attempt = value
1030
1031 @property
1032 def username(self):
1033 return self._context.username
1034
1035 @username.setter
1036 def username(self, value):
1037 self._context.username = value
1038
1039 @property
1040 def userdn(self):
1041 return self._context.userdn
1042
1043 @userdn.setter
1044 def userdn(self, value):
1045 self._context.userdn = value
1046
1047 @property
1048 def workflow_type(self):
1049 return self._context.workflow_type
1050
1051 @workflow_type.setter
1052 def workflow_type(self, value):
1053 self._context.workflow_type = value
1054
1055 @property
1056 def lifetime(self):
1057 return self._context.lifetime
1058
1059 @lifetime.setter
1060 def lifetime(self, value):
1061 self._context.lifetime = value
1062
1063 @property
1064 def workload_id(self):
1065 return self._context.workload_id
1066
1067 @workload_id.setter
1068 def workload_id(self, value):
1069 self._context.workload_id = value
1070
1071 def get_workload_id(self):
1072 return self.workload_id
1073
1074 @property
1075 def token(self):
1076 return self._context.token
1077
1078 @token.setter
1079 def token(self, value):
1080 self._context.token = value
1081
1082 @property
1083 def enable_separate_log(self):
1084 return self._context.enable_separate_log
1085
1086 @enable_separate_log.setter
1087 def enable_separate_log(self, value):
1088 self._context.enable_separate_log = value
1089
1090 @property
1091 def container_options(self):
1092 return self._context.container_options
1093
1094 @container_options.setter
1095 def container_options(self, value):
1096 self._context.container_options = value
1097
1098 def get_work_tag(self):
1099 return self._context.workflow_type.name
1100
1101 def get_work_type(self):
1102 return self._context.workflow_type
1103
1104 def get_work_name(self):
1105 return self._name
1106
1107 @property
1108 def campaign(self):
1109 return self._context.campaign
1110
1111 @campaign.setter
1112 def campaign(self, value):
1113 self._context.campaign = value
1114
1115 @property
1116 def campaign_scope(self):
1117 return self._context.campaign_scope
1118
1119 @campaign_scope.setter
1120 def campaign_scope(self, value):
1121 self._context.campaign_scope = value
1122
1123 @property
1124 def campaign_group(self):
1125 return self._context.campaign_group
1126
1127 @campaign_group.setter
1128 def campaign_group(self, value):
1129 self._context.campaign_group = value
1130
1131 @property
1132 def campaign_tag(self):
1133 return self._context.campaign_tag
1134
1135 @campaign_tag.setter
1136 def campaign_tag(self, value):
1137 self._context.campaign_tag = value
1138
1139 @property
1140 def max_processing_requests(self):
1141 return self._context.max_processing_requests
1142
1143 @max_processing_requests.setter
1144 def max_processing_requests(self, value):
1145 self._context.max_processing_requests = value
1146
1147 @property
1148 def multi_jobs_kwargs_list(self):
1149 return self._multi_jobs_kwargs_list
1150
1151 @multi_jobs_kwargs_list.setter
1152 def multi_jobs_kwargs_list(self, value):
1153 raise Exception("Not allwed to update multi_jobs_kwargs_list")
1154
1155 def to_dict(self):
1156 func = self._func
1157 self._func = None
1158 obj = super(Workflow, self).to_dict()
1159 self._func = func
1160 return obj
1161
1162 def get_source_dir(self, func, source_dir, source_dir_parent_level=None, use_func_dir=False):
1163 if source_dir:
1164 return source_dir
1165 if func and use_func_dir:
1166 if inspect.isbuiltin(func):
1167 return None
1168 source_file = inspect.getsourcefile(func)
1169 if not source_file:
1170 return None
1171 file_path = os.path.abspath(source_file)
1172 source_dir = os.path.dirname(file_path)
1173 if source_dir_parent_level and source_dir_parent_level > 0:
1174 for _ in range(0, source_dir_parent_level):
1175 source_dir = os.path.dirname(source_dir)
1176 return source_dir
1177 current_dir = os.path.abspath(os.getcwd())
1178 return current_dir
1179
1180 def store(self):
1181 if self._context:
1182 content = {'type': 'work',
1183 'name': self.name,
1184 'context': self._context,
1185 'original_args': self._func_name_and_args,
1186 'multi_jobs_kwargs_list': self._multi_jobs_kwargs_list,
1187 'current_job_kwargs': self._current_job_kwargs}
1188 content = json_dumps(content)
1189 source_dir = self._context.get_source_dir()
1190 self.save_context(source_dir, self._name, content)
1191
1192 def load(self, source_dir=None):
1193 if not source_dir:
1194 source_dir = self._context.get_source_dir()
1195 if not source_dir:
1196 source_dir = os.getcwd()
1197 ret = self.load_context(source_dir, self._name)
1198 if ret:
1199 ret = json_loads(ret)
1200 self.logger.info(f"Loaded context: {ret}")
1201 if 'multi_jobs_kwargs_list' in ret:
1202 self._multi_jobs_kwargs_list = ret['multi_jobs_kwargs_list']
1203
1204 def prepare(self):
1205 """
1206 Prepare the workflow: for example uploading the source codes to cache server.
1207 :returns command: `str` to setup the workflow.
1208 """
1209 if not self.prepared:
1210 self._context.prepare()
1211 self.prepared = True
1212
1213 def submit_to_idds_server(self):
1214 """
1215 Submit the workflow to the iDDS server.
1216
1217 :returns id: the workflow id.
1218 :raise Exception when failing to submit the workflow.
1219 """
1220
1221 from idds.client.clientmanager import ClientManager
1222
1223 client = ClientManager(host=self._context.get_idds_server(), timeout=60)
1224 request_id = client.submit(self, use_dataset_name=False)
1225
1226 self.logger.info("Submitted into iDDS with request id=%s", str(request_id))
1227 return request_id
1228
1229 def submit_to_panda_server(self):
1230 """
1231 Submit the workflow to the iDDS server through PanDA service.
1232
1233 :returns id: the workflow id.
1234 :raise Exception when failing to submit the workflow.
1235 """
1236 import idds.common.utils as idds_utils
1237 import pandaclient.idds_api as idds_api
1238
1239 idds_server = self._context.get_idds_server()
1240 client = idds_api.get_api(dumper=idds_utils.json_dumps,
1241 idds_host=idds_server,
1242 compress=True,
1243 verbose=is_panda_client_verbose(),
1244 manager=True)
1245 ret = client.submit(self, username=None, use_dataset_name=False)
1246 if ret[0] == 0 and ret[1][0]:
1247 request_id = ret[1][1]
1248 else:
1249 request_id = None
1250 self.logger.error("Failed to submit workflow to PanDA-iDDS with error: %s" % str(ret))
1251
1252 self.logger.info("Submitted into PanDA-iDDS with request id=%s", str(request_id))
1253 return request_id
1254
1255 def submit(self):
1256 """
1257 Submit the workflow to the iDDS server.
1258
1259 :returns id: the workflow id.
1260 :raise Exception when failing to submit the workflow.
1261 """
1262 try:
1263 request_id = None
1264 self.prepare()
1265 if self.service == 'panda':
1266 request_id = self.submit_to_panda_server()
1267 else:
1268 request_id = self.submit_to_idds_server()
1269 except Exception as ex:
1270 self.logger.error("Failed to submit workflow: %s" % str(ex))
1271
1272 try:
1273 self._context.request_id = int(request_id)
1274 return request_id
1275 except Exception as ex:
1276 self.logger.error("Request id (%s) is not integer, there should be some submission errors: %s" % (request_id, str(ex)))
1277
1278 return None
1279
1280 def close_to_idds_server(self, request_id):
1281 """
1282 close the workflow to the iDDS server.
1283
1284 :param request_id: the workflow id.
1285 :raise Exception when failing to close the workflow.
1286 """
1287
1288 from idds.client.clientmanager import ClientManager
1289
1290 client = ClientManager(host=self._context.get_idds_server(), timeout=60)
1291 ret = client.close(request_id)
1292
1293 self.logger.info("Close request id=%s to iDDS server: %s", str(request_id), str(ret))
1294 return request_id
1295
1296 def close_to_panda_server(self, request_id):
1297 """
1298 close the workflow to the iDDS server through PanDA service.
1299
1300 :param request_id: the workflow id.
1301 :raise Exception when failing to closet the workflow.
1302 """
1303 import idds.common.utils as idds_utils
1304 import pandaclient.idds_api as idds_api
1305
1306 idds_server = self._context.get_idds_server()
1307 client = idds_api.get_api(idds_utils.json_dumps,
1308 idds_host=idds_server,
1309 compress=True,
1310 verbose=is_panda_client_verbose(),
1311 manager=True)
1312 ret = client.close(request_id)
1313
1314 self.logger.info("Close request id=%s through PanDA-iDDS: %s", str(request_id), str(ret))
1315 return request_id
1316
1317 def close(self):
1318 """
1319 close the workflow to the iDDS server.
1320
1321 :raise Exception when failing to close the workflow.
1322 """
1323 if self._context.request_id is not None:
1324 try:
1325 if self.service == 'panda':
1326 self.close_to_panda_server(self._context.request_id)
1327 else:
1328 self.close_to_idds_server(self._context.request_id)
1329 except Exception as ex:
1330 self.logger.error("Failed to close request(%s): %s" % (self._context.request_id, str(ex)))
1331
1332 def __del__(self):
1333
1334 pass
1335
1336 def setup(self):
1337 """
1338 :returns command: `str` to setup the workflow.
1339 """
1340 return self._context.setup()
1341
1342 def get_clean_env(self):
1343 """
1344 :returns command: `str` to clean the workflow.
1345 """
1346 return self._context.get_clean_env()
1347
1348 def setup_source_files(self):
1349 """
1350 Setup location of source files
1351 """
1352 return self._context.setup_source_files()
1353
1354 def load_func(self, func_name):
1355 """
1356 Load the function from the source files.
1357
1358 :raise Exception
1359 """
1360 with modified_environ(IDDS_IGNORE_WORKFLOW_DECORATOR='true'):
1361 func = super(Workflow, self).load_func(func_name)
1362
1363 return func
1364
1365 def pre_run(self):
1366
1367 a_ret = None
1368 try:
1369 workflow_context = self._context
1370 if workflow_context.distributed:
1371 self.logger.info("Test AsyncResult")
1372 a_ret = AsyncResult(workflow_context, wait_num=1, timeout=30)
1373 ret = a_ret.is_ok()
1374 a_ret.stop()
1375 self.logger.info(f"pre_run asyncresult test is_ok: {ret}")
1376 return ret
1377 return True
1378 except Exception as ex:
1379 self.logger.error(f"pre_run failed with error: {ex}")
1380 self.logger.error(traceback.format_exc())
1381 if a_ret:
1382 a_ret.stop()
1383 return False
1384
1385 def run(self):
1386 self.logger.info("Start workflow run().")
1387 ret = None
1388 try:
1389 ret = self.run_local()
1390 except Exception as ex:
1391 self.logger.error(f"Failed to run function: {ex}")
1392 self.logger.error(traceback.format_exc())
1393 except:
1394 self.logger.error("Unknow error")
1395 self.logger.error(traceback.format_exc())
1396 self.logger.info(f"finish workflow run() with ret: {ret}.")
1397 return ret
1398
1399 def run_local(self):
1400 """
1401 Run the workflow.
1402 """
1403
1404 if True:
1405 is_ok = self.pre_run()
1406 if not is_ok:
1407 self.logger.error(f"pre_run is_ok: {is_ok}, will exit.")
1408 raise Exception("workflow pre_run failed")
1409
1410 func_name, pre_kwargs, args, kwargs = self._func_name_and_args
1411 multi_jobs_kwargs_list = self.multi_jobs_kwargs_list
1412 if args:
1413 args = pickle.loads(zlib.decompress(base64.b64decode(args)))
1414 if pre_kwargs:
1415 pre_kwargs = pickle.loads(zlib.decompress(base64.b64decode(pre_kwargs)))
1416 if kwargs:
1417 kwargs = pickle.loads(zlib.decompress(base64.b64decode(kwargs)))
1418 if multi_jobs_kwargs_list:
1419 multi_jobs_kwargs_list = [pickle.loads(zlib.decompress(base64.b64decode(k))) for k in multi_jobs_kwargs_list]
1420
1421 if self._func is None:
1422 func = self.load_func(func_name)
1423 self._func = func
1424 status, output, error = self.run_func(self._func, pre_kwargs, args, kwargs)
1425 if status:
1426 self.logger.info(f"run workflow successfully. output: {output}, error: {error}")
1427 else:
1428 self.logger.error(f"run workflow failed. output: {output}, error: {error}")
1429 return status
1430
1431
1432 def __enter__(self):
1433 WorkflowCanvas.push_managed_workflow(self)
1434 return self
1435
1436 def __exit__(self, _type, _value, _tb):
1437 w = WorkflowCanvas.pop_managed_workflow()
1438 if w is not None:
1439 w.close()
1440
1441
1442
1443 def get_run_command(self):
1444 cmd = "run_workflow --type workflow --name %s " % self.name
1445 cmd += "--context %s --original_args %s " % (encode_base64(json_dumps(self._context)),
1446 encode_base64(json_dumps(self._func_name_and_args)))
1447 cmd += "--current_job_kwargs ${IN/L}"
1448 return cmd
1449
1450 def get_runner(self):
1451 setup = self.setup()
1452 cmd = ""
1453 run_command = self.get_run_command()
1454
1455 if setup:
1456 pre_setup, main_setup = self.split_setup(setup)
1457 pre_setup = encode_base64(json_dumps(pre_setup))
1458 main_setup = encode_base64(json_dumps(main_setup))
1459 if pre_setup:
1460 cmd = " --pre_setup " + pre_setup + " "
1461 cmd = cmd + " --setup " + main_setup + " "
1462 if cmd:
1463 cmd = cmd + " " + run_command
1464 else:
1465 cmd = run_command
1466
1467 clean_env = self.get_clean_env()
1468 if clean_env:
1469 cmd = cmd + "; ret=$?; " + clean_env + "; exit $ret"
1470
1471 return cmd
1472
1473 def get_func_name(self):
1474 func_name = self._func_name_and_args[0]
1475 return func_name
1476
1477
1478
1479 def workflow(func=None, *, local=False, service='idds', source_dir=None, primary=False, queue=None, site=None, cloud=None,
1480 max_walltime=24 * 3600, distributed=True, init_env=None, pre_kwargs={}, return_workflow=False, no_wraps=False,
1481 source_dir_parent_level=None, exclude_source_files=[], enable_separate_log=False, clean_env=None,
1482 name=None, core_count=1, total_memory=4000,
1483 container_options=None, use_func_dir=False, post_script=None):
1484 if func is None:
1485 return functools.partial(workflow, local=local, service=service, source_dir=source_dir, primary=primary, queue=queue, site=site, cloud=cloud,
1486 max_walltime=max_walltime, distributed=distributed, init_env=init_env, pre_kwargs=pre_kwargs, no_wraps=no_wraps,
1487 return_workflow=return_workflow, source_dir_parent_level=source_dir_parent_level,
1488 exclude_source_files=exclude_source_files, clean_env=clean_env, enable_separate_log=enable_separate_log,
1489 name=name, core_count=core_count, total_memory=total_memory, use_func_dir=use_func_dir,
1490 container_options=container_options, post_script=post_script)
1491
1492 if 'IDDS_IGNORE_WORKFLOW_DECORATOR' in os.environ:
1493 return func
1494
1495
1496 def wrapper(*args, **kwargs):
1497 try:
1498 logger = logging.getLogger("workflow_def")
1499 f = Workflow(func, service=service, source_dir=source_dir, local=local, max_walltime=max_walltime, distributed=distributed,
1500 pre_kwargs=pre_kwargs, args=args, kwargs=kwargs, init_env=init_env, source_dir_parent_level=source_dir_parent_level,
1501 exclude_source_files=exclude_source_files, clean_env=clean_env, enable_separate_log=enable_separate_log,
1502 name=name, container_options=container_options, use_func_dir=use_func_dir, post_script=post_script)
1503
1504 f.queue = queue
1505 f.site = site
1506 f.cloud = cloud
1507 f.core_count = core_count
1508 f.total_memory = total_memory
1509
1510 logger.info("return_workflow %s" % return_workflow)
1511 if return_workflow:
1512 return f
1513
1514 logger.info("Prepare workflow")
1515 f.prepare()
1516 logger.info("Prepared workflow")
1517
1518 logger.info("Registering workflow")
1519 f.submit()
1520
1521 if not local:
1522 logger.info("Run workflow at remote sites")
1523 return f
1524 else:
1525 logger.info("Run workflow locally")
1526 with f:
1527 ret = f.run()
1528 return ret
1529 except Exception as ex:
1530 logger.error("Failed to run workflow %s: %s" % (func, ex))
1531 raise ex
1532 except:
1533 raise
1534 if no_wraps:
1535 return wrapper
1536 else:
1537 return functools.wraps(func)(wrapper)
1538
1539
1540 def workflow_old(func=None, *, lazy=False, service='panda', source_dir=None, primary=False, distributed=True):
1541
1542 def decorator(func):
1543 @functools.wraps(func)
1544 def wrapper(*args, **kwargs):
1545 try:
1546 f = Workflow(func, service=service, source_dir=source_dir, distributed=distributed)
1547
1548 if lazy:
1549 return f
1550
1551 return f.run()
1552 except Exception as ex:
1553 logging.error("Failed to run workflow %s: %s" % (func, ex))
1554 raise ex
1555 except:
1556 raise
1557 return wrapper
1558
1559 if func is None:
1560 return decorator
1561 else:
1562 return decorator(func)