Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:21

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2020 - 2025
0010 
0011 import copy
0012 import json
0013 import os
0014 import traceback
0015 
0016 from idds.common.constants import RequestType, RequestStatus, Sections
0017 from idds.common.config import config_has_section, config_has_option, config_get, config_get_int
0018 from idds.common.utils import is_new_version
0019 
0020 from idds.workflow.work import Collection, Processing
0021 from idds.workflow.workflow import Workflow
0022 
0023 # from idds.atlas.workflow.atlasstageinwork import ATLASStageInWork
0024 # from idds.atlas.workflow.atlashpowork import ATLASHPOWork
0025 
0026 
0027 def convert_stagein_request_metadata_to_workflow(scope, name, workload_id, request_metadata):
0028     """
0029     Convert old format stagein request metadata of json to new format request metadata based on workflow.
0030 
0031     :param scope: The collection scope.
0032     :param name: The collection name.
0033     :param workload_id: The workload id.
0034     :param request_metadata: The request metadata.
0035     """
0036     # 'request_metadata': {'workload_id': '20776840', 'max_waiting_time': 3600, 'src_rse': 'NDGF-T1_DATATAPE', 'dest_rse': 'NDGF-T1_DATADISK', 'rule_id': '236e4bf87e11490291e3259b14724e30'}  # noqa: E501
0037 
0038     from idds.atlas.workflow.atlasstageinwork import ATLASStageinWork
0039 
0040     work = ATLASStageinWork(executable=None, arguments=None, parameters=None, setup=None,
0041                             exec_type='local', sandbox=None,
0042                             primary_input_collection={'scope': scope, 'name': name},
0043                             other_input_collections=None,
0044                             output_collections={'scope': scope, 'name': name},
0045                             log_collections=None,
0046                             logger=None,
0047                             max_waiting_time=request_metadata.get('max_waiting_time', 3600 * 7 * 24),
0048                             src_rse=request_metadata.get('src_rse', None),
0049                             dest_rse=request_metadata.get('dest_rse', None),
0050                             rule_id=request_metadata.get('rule_id', None))
0051     wf = Workflow()
0052     wf.set_workload_id(workload_id)
0053     wf.add_work(work)
0054     # work.set_workflow(wf)
0055     return wf
0056 
0057 
0058 def convert_hpo_request_metadata_to_workflow(scope, name, workload_id, request_metadata):
0059     """
0060     Convert old format hpo request metadata of json to new format request metadata based on workflow.
0061 
0062     :param scope: The collection scope.
0063     :param name: The collection name.
0064     :param workload_id: The workload id.
0065     :param request_metadata: The request metadata.
0066     """
0067     # 'request_metadata': {'workload_id': '20525134', 'sandbox': None, 'method': 'bayesian', 'opt_space': {'A': (1, 4), 'B': (1, 10)}, 'initial_points': [({'A': 1, 'B': 2}, 0.3), ({'A': 1, 'B': 3}, None)], 'max_points': 20, 'num_points_per_generation': 10}  # noqa: E501
0068     # 'request_metadata': {'workload_id': '20525135', 'sandbox': None, 'method': 'nevergrad', 'opt_space': {"A": {"type": "Choice", "params": {"choices": [1, 4]}}, "B": {"type": "Scalar", "bounds": [0, 5]}}, 'initial_points': [({'A': 1, 'B': 2}, 0.3), ({'A': 1, 'B': 3}, None)], 'max_points': 20, 'num_points_per_generation': 10}  # noqa: E501
0069     # 'request_metadata': {'workload_id': '20525134', 'sandbox': 'wguanicedew/idds_hpo_nevergrad', 'workdir': '/data', 'executable': 'docker', 'arguments': 'python /opt/hyperparameteropt_nevergrad.py --max_points=%MAX_POINTS --num_points=%NUM_POINTS --input=/data/%IN --output=/data/%OUT', 'output_json': 'output.json', 'opt_space': {"A": {"type": "Choice", "params": {"choices": [1, 4]}}, "B": {"type": "Scalar", "bounds": [0, 5]}}, 'initial_points': [({'A': 1, 'B': 2}, 0.3), ({'A': 1, 'B': 3}, None)], 'max_points': 20, 'num_points_per_generation': 10}  # noqa: E501
0070 
0071     from idds.atlas.workflow.atlashpowork import ATLASHPOWork
0072 
0073     work = ATLASHPOWork(executable=request_metadata.get('executable', None),
0074                         arguments=request_metadata.get('arguments', None),
0075                         parameters=request_metadata.get('parameters', None),
0076                         setup=None, exec_type='local',
0077                         sandbox=request_metadata.get('sandbox', None),
0078                         method=request_metadata.get('method', None),
0079                         container_workdir=request_metadata.get('workdir', None),
0080                         output_json=request_metadata.get('output_json', None),
0081                         opt_space=request_metadata.get('opt_space', None),
0082                         workload_id=workload_id,
0083                         initial_points=request_metadata.get('initial_points', None),
0084                         max_points=request_metadata.get('max_points', None),
0085                         num_points_per_iteration=request_metadata.get('num_points_per_iteration', 10))
0086     wf = Workflow()
0087     wf.set_workload_id(workload_id)
0088     wf.add_work(work)
0089     return wf
0090 
0091 
0092 def convert_old_workflow_to_new_workflow(data):
0093     if ('request_metadata' in data and data['request_metadata'] and 'workflow' in data['request_metadata']):
0094         workflow = data['request_metadata']['workflow']
0095         if workflow:
0096             for work_key in workflow.works_template:
0097                 work = workflow.works_template[work_key]
0098                 for coll_key in work.collections:
0099                     coll = work.collections[coll_key]
0100                     if type(coll) in [Collection]:
0101                         pass
0102                     else:
0103                         coll_metadata = copy.copy(coll)
0104                         del coll_metadata['scope']
0105                         del coll_metadata['name']
0106                         new_coll = Collection(scope=coll['scope'], name=coll['name'], coll_metadata=coll_metadata)
0107                         new_coll.internal_id = coll_key
0108                         work.collections[coll_key] = new_coll
0109                 for proc_key in work.processings:
0110                     proc = work.processings[proc_key]
0111                     if type(proc) in [Processing]:
0112                         pass
0113                     else:
0114                         proc_metadata = proc['processing_metadata']
0115                         new_proc = Processing(processing_metadata=proc_metadata)
0116                         new_proc.internal_id = proc_key
0117                         if 'rule_id' in proc_metadata:
0118                             new_proc.external_id = proc_metadata['rule_id']
0119                         work.processings[proc_key] = new_proc
0120     return data
0121 
0122 
0123 def convert_old_req_2_workflow_req(data):
0124     if not data:
0125         return data
0126 
0127     if data['request_type'] == RequestType.Workflow:
0128         if ('request_metadata' in data and data['request_metadata'] and 'version' in data['request_metadata']
0129             and data['request_metadata']['version'] and is_new_version(data['request_metadata']['version'], '0.2.9')):  # noqa W503
0130             return data
0131         else:
0132             data = convert_old_workflow_to_new_workflow(data)
0133             return data
0134 
0135     workload_id = None
0136     if 'workload_id' in data and data['workload_id']:
0137         workload_id = data['workload_id']
0138     elif 'workload_id' in data['request_metadata'] and data['request_metadata']['workload_id']:
0139         workload_id = data['request_metadata']['workload_id']
0140 
0141     if data['request_type'] in [RequestType.StageIn, RequestType.StageIn.value]:
0142         wf = convert_stagein_request_metadata_to_workflow(data['scope'], data['name'], workload_id,
0143                                                           data['request_metadata'])
0144         data['request_type'] = RequestType.Workflow
0145         data['transform_tag'] = 'workflow'
0146         data['status'] = RequestStatus.New
0147         data['workload_id'] = wf.get_workload_id()
0148         data['request_metadata'] = {'workload_id': wf.get_workload_id(),
0149                                     'workflow': wf}
0150         return data
0151     if data['request_type'] in [RequestType.HyperParameterOpt, RequestType.HyperParameterOpt.value]:
0152         wf = convert_hpo_request_metadata_to_workflow(data['scope'] if 'scope' in data else None,
0153                                                       data['name'] if 'name' in data else None,
0154                                                       workload_id,
0155                                                       data['request_metadata'])
0156         primary_init_work = wf.get_primary_initial_collection()
0157         if primary_init_work:
0158             if type(primary_init_work) in [dict]:
0159                 data['scope'] = primary_init_work['scope']
0160                 data['name'] = primary_init_work['name']
0161             elif type(primary_init_work) in [Collection]:
0162                 data['scope'] = primary_init_work.scope
0163                 data['name'] = primary_init_work.name
0164 
0165         data['request_type'] = RequestType.Workflow
0166         data['transform_tag'] = 'workflow'
0167         data['status'] = RequestStatus.New
0168         data['workload_id'] = wf.get_workload_id()
0169         data['request_metadata'] = {'workload_id': wf.get_workload_id(),
0170                                     'workflow': wf}
0171         return data
0172     return data
0173 
0174 
0175 def convert_old_request_metadata(req):
0176     if 'request_metadata' in req and req['request_metadata']:
0177         wf = req['request_metadata']['workflow']
0178         req['request_metadata'] = wf
0179         return req
0180     return req
0181 
0182 
0183 def get_additional_request_data_storage(data, workflow, logger):
0184     try:
0185         if config_has_section(Sections.Rest) and config_has_option(Sections.Rest, 'max_request_data_length'):
0186             max_request_data_length = config_get_int(Sections.Rest, 'max_request_data_length')
0187         else:
0188             max_request_data_length = 10000000
0189 
0190         if config_has_section(Sections.Rest) and config_has_option(Sections.Rest, 'additional_storage'):
0191             additional_storage = config_get(Sections.Rest, 'additional_storage')
0192         else:
0193             additional_storage = '/tmp'
0194 
0195         if not workflow or not hasattr(workflow, "is_with_steps"):
0196             return False, additional_storage
0197 
0198         if workflow and (workflow.is_with_steps() or workflow.is_workflow_step):
0199             return False, additional_storage
0200 
0201         data_length = len(data)
0202         logger.info(f"max_request_data_length: {max_request_data_length}, len(data): {data_length}")
0203         if data_length > max_request_data_length:
0204             return True, additional_storage
0205         return False, additional_storage
0206     except Exception as ex:
0207         logger.warning(f"get_additional_request_data_storage raise exception: {ex}: {traceback.format_exc()}")
0208     return False, None
0209 
0210 
0211 def convert_data_to_use_additional_storage(data, additional_data_storage, with_add_storage, logger):
0212     if not data:
0213         return data
0214 
0215     if ('request_metadata' in data and isinstance(data['request_metadata'], dict) and data['request_metadata'].get('workflow')):
0216         workflow = data['request_metadata']['workflow']
0217         logger.debug(f"convert_data_to_use_additional_storage get workflow: {workflow}")
0218         if with_add_storage:
0219             internal_id = workflow.get_internal_id()
0220             storage = os.path.join(additional_data_storage, internal_id)
0221             if not os.path.exists(storage):
0222                 os.makedirs(storage, exist_ok=True)
0223 
0224             data['additional_data_storage'] = storage
0225             workflow.set_additional_data_storage(storage)
0226             workflow.convert_data_to_additional_data_storage(storage)
0227             data['request_metadata']['workflow'] = workflow
0228             logger.info(f"Converted workflow {workflow.name} to use storage {storage}")
0229         elif hasattr(workflow, "is_with_steps") and workflow.is_with_steps():
0230             wf_storage = workflow.get_additional_data_storage()
0231             logger.info(f"Workflow {workflow.name} has steps with additional storage {wf_storage}")
0232             if wf_storage == "IDDS_WORKFLOW_ADDITIONAL_STORAGE":
0233                 internal_id = workflow.get_internal_id()
0234                 storage = os.path.join(additional_data_storage, internal_id)
0235 
0236                 # convert IDDS_WORKFLOW_ADDITIONAL_STORAGE to additional_data_storage
0237                 workflow.convert_data_to_additional_data_storage(storage, storage_name=wf_storage, replace_storage_name=True)
0238                 logger.info(f"Replaced workflow {workflow.name} {wf_storage} with {storage}")
0239     return data
0240 
0241 
0242 def store_data_to_use_additional_storage(internal_id, data, additional_data_storage, logger):
0243     data_storage = os.path.join(additional_data_storage, internal_id)
0244     if not os.path.exists(data_storage):
0245         os.makedirs(data_storage, exist_ok=True)
0246 
0247     for work_name, work_data in data.items():
0248         data_file = os.path.join(data_storage, work_name)
0249         with open(data_file, 'w') as fd:
0250             json.dump(work_data, fd)
0251         logger.info(f"store data of {work_name} to {data_file}")
0252 
0253 
0254 def get_workflow_item(data, item_name, logger):
0255     try:
0256         if not data:
0257             return None
0258 
0259         request_metadata = data.get('request_metadata', {})
0260         if 'workflow' in request_metadata:
0261             workflow = request_metadata.get('workflow')
0262         elif 'build_workflow' in request_metadata:
0263             workflow = request_metadata.get('build_workflow')
0264 
0265         if workflow:
0266             return getattr(workflow, item_name, None)()
0267     except Exception as ex:
0268         logger.warning(f"failed to get workflow item {item_name}: {ex}")