Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:32

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2024
0010 
0011 import copy
0012 import base64
0013 import logging
0014 import inspect
0015 import json
0016 import os
0017 import pickle
0018 import traceback
0019 import uuid
0020 import zlib
0021 
0022 from typing import Any, Dict, List, Optional, Tuple, Union    # noqa F401
0023 
0024 from idds.common.dict_class import DictMetadata, DictBase
0025 from idds.common.imports import import_func, get_func_name
0026 
0027 
0028 class IDDSDict(dict):
0029     def __setitem__(self, key, value):
0030         if key == 'test':
0031             pass
0032         else:
0033             super().__setitem__(key, value)
0034 
0035 
0036 class IDDSMetadata(DictMetadata):
0037     def __init__(self):
0038         super(IDDSMetadata, self).__init__()
0039 
0040 
0041 class Base(DictBase):
0042     def __init__(self):
0043         super(Base, self).__init__()
0044         self._internal_id = str(uuid.uuid4())[:8]
0045         self._template_id = self._internal_id
0046         self._sequence_id = 0
0047 
0048     @property
0049     def internal_id(self):
0050         return self._internal_id
0051 
0052     @internal_id.setter
0053     def internal_id(self, value):
0054         self._internal_id = value
0055 
0056     def get_func_name_and_args(self,
0057                                func,
0058                                pre_kwargs=None,
0059                                args=None,
0060                                kwargs=None,
0061                                base_dir=None,
0062                                multi_jobs_kwargs_list=None):
0063 
0064         if args is None:
0065             args = ()
0066         if pre_kwargs is None:
0067             pre_kwargs = {}
0068         if kwargs is None:
0069             kwargs = {}
0070         if multi_jobs_kwargs_list is None:
0071             multi_jobs_kwargs_list = []
0072         if not isinstance(args, (tuple, list)):
0073             raise TypeError('{0!r} is not a valid args list'.format(args))
0074         if not isinstance(pre_kwargs, dict):
0075             raise TypeError('{0!r} is not a valid pre_kwargs dict'.format(pre_kwargs))
0076         if not isinstance(kwargs, dict):
0077             raise TypeError('{0!r} is not a valid kwargs dict'.format(kwargs))
0078         if not isinstance(multi_jobs_kwargs_list, list):
0079             raise TypeError('{0!r} is not a valid multi_jobs_kwargs_list list'.format(multi_jobs_kwargs_list))
0080 
0081         func_call, func_name = None, None
0082         if isinstance(func, str):
0083             func_name = func
0084         elif inspect.ismethod(func) or inspect.isfunction(func) or inspect.isbuiltin(func):
0085             # func_name = '{0}.{1}'.format(func.__module__, func.__qualname__)
0086             func_name = get_func_name(func, base_dir=base_dir)
0087             func_call = func
0088         else:
0089             # raise TypeError('Expected a callable or a string, but got: {0}'.format(func))
0090             func_name = func
0091 
0092         if args:
0093             args = base64.b64encode(zlib.compress(pickle.dumps(args))).decode("utf-8")
0094         if pre_kwargs:
0095             pre_kwargs = base64.b64encode(zlib.compress(pickle.dumps(pre_kwargs))).decode("utf-8")
0096         if kwargs:
0097             kwargs = base64.b64encode(zlib.compress(pickle.dumps(kwargs))).decode("utf-8")
0098         if multi_jobs_kwargs_list:
0099             multi_jobs_kwargs_list = [base64.b64encode(zlib.compress(pickle.dumps(k))).decode("utf-8") for k in multi_jobs_kwargs_list]
0100 
0101         return func_call, (func_name, pre_kwargs, args, kwargs), multi_jobs_kwargs_list
0102 
0103     @property
0104     def logger(self):
0105         return logging.getLogger(self.__class__.__name__)
0106 
0107     @logger.setter
0108     def logger(self, value):
0109         pass
0110 
0111     def get_internal_id(self):
0112         return self._internal_id
0113 
0114     def get_template_work_id(self):
0115         return self._template_id
0116 
0117     def get_sequence_id(self):
0118         return self._sequence_id
0119 
0120     def get_input_collections(self):
0121         return []
0122 
0123     def get_output_collections(self):
0124         return []
0125 
0126     def get_log_collections(self):
0127         return []
0128 
0129     def save_context(self, source_dir, name, context):
0130         if source_dir and name and context:
0131             try:
0132                 file_name = name + ".json"
0133                 file_name = os.path.join(source_dir, file_name)
0134                 with open(file_name, 'w') as f:
0135                     json.dump(context, f)
0136                 self.logger.info(f"Saved context to file {file_name}")
0137             except Exception as ex:
0138                 self.logger.error(f"Failed to save context to file {file_name}: {ex}")
0139 
0140     def load_context(self, source_dir, name):
0141         if source_dir and name:
0142             try:
0143                 context = None
0144                 file_name = name + ".json"
0145                 file_name = os.path.join(source_dir, file_name)
0146                 if os.path.exists(file_name):
0147                     with open(file_name, 'r') as f:
0148                         context = json.load(f)
0149                 self.logger.info(f"Loading context from file {file_name}")
0150                 return context
0151             except Exception as ex:
0152                 self.logger.error(f"Failed to load context from file: {ex}")
0153         return []
0154 
0155     def prepare(self):
0156         """
0157         Prepare the workflow: upload the source files to server.
0158 
0159         :returns id: The workflow id.
0160         :raise Exception when failing to prepare the workflow.
0161         """
0162 
0163     def submit(self):
0164         """
0165         Submit the workflow to the iDDS server.
0166 
0167         :returns id: The workflow id.
0168         :raise Exception when failing to submit the workflow.
0169         """
0170         self.prepare()
0171         return None
0172 
0173     def split_setup(self, setup):
0174         """
0175         Split setup string
0176         """
0177         if ";" not in setup:
0178             return "", setup
0179 
0180         setup_list = setup.split(";")
0181         main_setup = setup_list[-1]
0182         pre_setup = "; ".join(setup_list[:-1])
0183         pre_setup = pre_setup + "; "
0184         return pre_setup, main_setup
0185 
0186     def setup(self):
0187         """
0188         :returns command: `str` to setup the workflow.
0189         """
0190         return None
0191 
0192     def load_func(self, func_name):
0193         """
0194         Load the function from the source files.
0195 
0196         :raise Exception
0197         """
0198         os.environ['IDDS_IWORKFLOW_LOAD'] = 'true'
0199         func = import_func(func_name)
0200         del os.environ['IDDS_IWORKFLOW_LOAD']
0201 
0202         return func
0203 
0204     def run_func(self, func, pre_kwargs, args, kwargs):
0205         """
0206         Run the function.
0207 
0208         :returns: status, output, error
0209 
0210         :raise Exception.
0211         """
0212         try:
0213             logging.info(f"func type: {type(func)}: {str(func)}")
0214             logging.info(f"pre_kwargs type: {type(pre_kwargs)}: {str(pre_kwargs)}")
0215             logging.info(f"args type: {type(args)}: {str(args)}")
0216             logging.info(f"kwargs type: {type(kwargs)}: {str(kwargs)}")
0217             kwargs_copy = copy.deepcopy(pre_kwargs)
0218             kwargs_copy.update(kwargs)
0219             logging.info(f"start to run function: {str(func)}")
0220             if kwargs_copy:
0221                 ret = func(*args, **kwargs_copy)
0222             else:
0223                 ret = func(*args)
0224             logging.info(f"Successfully run function, ret: {ret}")
0225             return True, ret, None
0226         except Exception as ex:
0227             logging.error(f"Failed to run the function: {str(ex)}")
0228             logging.error(traceback.format_exc())
0229             return False, None, str(ex)
0230 
0231 
0232 class Context(DictBase):
0233     def __init__(self):
0234         super(Context, self).__init__()
0235         self._internal_id = str(uuid.uuid4())[:8]
0236 
0237     @property
0238     def internal_id(self):
0239         return self._internal_id
0240 
0241     @internal_id.setter
0242     def internal_id(self, value):
0243         self._internal_id = value
0244 
0245     def prepare(self):
0246         """
0247         Prepare the workflow.
0248         """
0249         return None
0250 
0251     def setup(self):
0252         """
0253         :returns command: `str` to setup the workflow.
0254         """
0255         return None
0256 
0257 
0258 class CollectionBase(DictBase):
0259     def __init__(self):
0260         super(CollectionBase, self).__init__()
0261         pass