File indexing completed on 2026-04-09 07:58:32
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 import copy
0012 import base64
0013 import logging
0014 import inspect
0015 import json
0016 import os
0017 import pickle
0018 import traceback
0019 import uuid
0020 import zlib
0021
0022 from typing import Any, Dict, List, Optional, Tuple, Union
0023
0024 from idds.common.dict_class import DictMetadata, DictBase
0025 from idds.common.imports import import_func, get_func_name
0026
0027
0028 class IDDSDict(dict):
0029 def __setitem__(self, key, value):
0030 if key == 'test':
0031 pass
0032 else:
0033 super().__setitem__(key, value)
0034
0035
0036 class IDDSMetadata(DictMetadata):
0037 def __init__(self):
0038 super(IDDSMetadata, self).__init__()
0039
0040
0041 class Base(DictBase):
0042 def __init__(self):
0043 super(Base, self).__init__()
0044 self._internal_id = str(uuid.uuid4())[:8]
0045 self._template_id = self._internal_id
0046 self._sequence_id = 0
0047
0048 @property
0049 def internal_id(self):
0050 return self._internal_id
0051
0052 @internal_id.setter
0053 def internal_id(self, value):
0054 self._internal_id = value
0055
0056 def get_func_name_and_args(self,
0057 func,
0058 pre_kwargs=None,
0059 args=None,
0060 kwargs=None,
0061 base_dir=None,
0062 multi_jobs_kwargs_list=None):
0063
0064 if args is None:
0065 args = ()
0066 if pre_kwargs is None:
0067 pre_kwargs = {}
0068 if kwargs is None:
0069 kwargs = {}
0070 if multi_jobs_kwargs_list is None:
0071 multi_jobs_kwargs_list = []
0072 if not isinstance(args, (tuple, list)):
0073 raise TypeError('{0!r} is not a valid args list'.format(args))
0074 if not isinstance(pre_kwargs, dict):
0075 raise TypeError('{0!r} is not a valid pre_kwargs dict'.format(pre_kwargs))
0076 if not isinstance(kwargs, dict):
0077 raise TypeError('{0!r} is not a valid kwargs dict'.format(kwargs))
0078 if not isinstance(multi_jobs_kwargs_list, list):
0079 raise TypeError('{0!r} is not a valid multi_jobs_kwargs_list list'.format(multi_jobs_kwargs_list))
0080
0081 func_call, func_name = None, None
0082 if isinstance(func, str):
0083 func_name = func
0084 elif inspect.ismethod(func) or inspect.isfunction(func) or inspect.isbuiltin(func):
0085
0086 func_name = get_func_name(func, base_dir=base_dir)
0087 func_call = func
0088 else:
0089
0090 func_name = func
0091
0092 if args:
0093 args = base64.b64encode(zlib.compress(pickle.dumps(args))).decode("utf-8")
0094 if pre_kwargs:
0095 pre_kwargs = base64.b64encode(zlib.compress(pickle.dumps(pre_kwargs))).decode("utf-8")
0096 if kwargs:
0097 kwargs = base64.b64encode(zlib.compress(pickle.dumps(kwargs))).decode("utf-8")
0098 if multi_jobs_kwargs_list:
0099 multi_jobs_kwargs_list = [base64.b64encode(zlib.compress(pickle.dumps(k))).decode("utf-8") for k in multi_jobs_kwargs_list]
0100
0101 return func_call, (func_name, pre_kwargs, args, kwargs), multi_jobs_kwargs_list
0102
0103 @property
0104 def logger(self):
0105 return logging.getLogger(self.__class__.__name__)
0106
0107 @logger.setter
0108 def logger(self, value):
0109 pass
0110
0111 def get_internal_id(self):
0112 return self._internal_id
0113
0114 def get_template_work_id(self):
0115 return self._template_id
0116
0117 def get_sequence_id(self):
0118 return self._sequence_id
0119
0120 def get_input_collections(self):
0121 return []
0122
0123 def get_output_collections(self):
0124 return []
0125
0126 def get_log_collections(self):
0127 return []
0128
0129 def save_context(self, source_dir, name, context):
0130 if source_dir and name and context:
0131 try:
0132 file_name = name + ".json"
0133 file_name = os.path.join(source_dir, file_name)
0134 with open(file_name, 'w') as f:
0135 json.dump(context, f)
0136 self.logger.info(f"Saved context to file {file_name}")
0137 except Exception as ex:
0138 self.logger.error(f"Failed to save context to file {file_name}: {ex}")
0139
0140 def load_context(self, source_dir, name):
0141 if source_dir and name:
0142 try:
0143 context = None
0144 file_name = name + ".json"
0145 file_name = os.path.join(source_dir, file_name)
0146 if os.path.exists(file_name):
0147 with open(file_name, 'r') as f:
0148 context = json.load(f)
0149 self.logger.info(f"Loading context from file {file_name}")
0150 return context
0151 except Exception as ex:
0152 self.logger.error(f"Failed to load context from file: {ex}")
0153 return []
0154
0155 def prepare(self):
0156 """
0157 Prepare the workflow: upload the source files to server.
0158
0159 :returns id: The workflow id.
0160 :raise Exception when failing to prepare the workflow.
0161 """
0162
0163 def submit(self):
0164 """
0165 Submit the workflow to the iDDS server.
0166
0167 :returns id: The workflow id.
0168 :raise Exception when failing to submit the workflow.
0169 """
0170 self.prepare()
0171 return None
0172
0173 def split_setup(self, setup):
0174 """
0175 Split setup string
0176 """
0177 if ";" not in setup:
0178 return "", setup
0179
0180 setup_list = setup.split(";")
0181 main_setup = setup_list[-1]
0182 pre_setup = "; ".join(setup_list[:-1])
0183 pre_setup = pre_setup + "; "
0184 return pre_setup, main_setup
0185
0186 def setup(self):
0187 """
0188 :returns command: `str` to setup the workflow.
0189 """
0190 return None
0191
0192 def load_func(self, func_name):
0193 """
0194 Load the function from the source files.
0195
0196 :raise Exception
0197 """
0198 os.environ['IDDS_IWORKFLOW_LOAD'] = 'true'
0199 func = import_func(func_name)
0200 del os.environ['IDDS_IWORKFLOW_LOAD']
0201
0202 return func
0203
0204 def run_func(self, func, pre_kwargs, args, kwargs):
0205 """
0206 Run the function.
0207
0208 :returns: status, output, error
0209
0210 :raise Exception.
0211 """
0212 try:
0213 logging.info(f"func type: {type(func)}: {str(func)}")
0214 logging.info(f"pre_kwargs type: {type(pre_kwargs)}: {str(pre_kwargs)}")
0215 logging.info(f"args type: {type(args)}: {str(args)}")
0216 logging.info(f"kwargs type: {type(kwargs)}: {str(kwargs)}")
0217 kwargs_copy = copy.deepcopy(pre_kwargs)
0218 kwargs_copy.update(kwargs)
0219 logging.info(f"start to run function: {str(func)}")
0220 if kwargs_copy:
0221 ret = func(*args, **kwargs_copy)
0222 else:
0223 ret = func(*args)
0224 logging.info(f"Successfully run function, ret: {ret}")
0225 return True, ret, None
0226 except Exception as ex:
0227 logging.error(f"Failed to run the function: {str(ex)}")
0228 logging.error(traceback.format_exc())
0229 return False, None, str(ex)
0230
0231
0232 class Context(DictBase):
0233 def __init__(self):
0234 super(Context, self).__init__()
0235 self._internal_id = str(uuid.uuid4())[:8]
0236
0237 @property
0238 def internal_id(self):
0239 return self._internal_id
0240
0241 @internal_id.setter
0242 def internal_id(self, value):
0243 self._internal_id = value
0244
0245 def prepare(self):
0246 """
0247 Prepare the workflow.
0248 """
0249 return None
0250
0251 def setup(self):
0252 """
0253 :returns command: `str` to setup the workflow.
0254 """
0255 return None
0256
0257
0258 class CollectionBase(DictBase):
0259 def __init__(self):
0260 super(CollectionBase, self).__init__()
0261 pass