File indexing completed on 2026-04-09 07:58:21
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 import copy
0012 import json
0013 import os
0014 import traceback
0015
0016 from idds.common.constants import RequestType, RequestStatus, Sections
0017 from idds.common.config import config_has_section, config_has_option, config_get, config_get_int
0018 from idds.common.utils import is_new_version
0019
0020 from idds.workflow.work import Collection, Processing
0021 from idds.workflow.workflow import Workflow
0022
0023
0024
0025
0026
0027 def convert_stagein_request_metadata_to_workflow(scope, name, workload_id, request_metadata):
0028 """
0029 Convert old format stagein request metadata of json to new format request metadata based on workflow.
0030
0031 :param scope: The collection scope.
0032 :param name: The collection name.
0033 :param workload_id: The workload id.
0034 :param request_metadata: The request metadata.
0035 """
0036
0037
0038 from idds.atlas.workflow.atlasstageinwork import ATLASStageinWork
0039
0040 work = ATLASStageinWork(executable=None, arguments=None, parameters=None, setup=None,
0041 exec_type='local', sandbox=None,
0042 primary_input_collection={'scope': scope, 'name': name},
0043 other_input_collections=None,
0044 output_collections={'scope': scope, 'name': name},
0045 log_collections=None,
0046 logger=None,
0047 max_waiting_time=request_metadata.get('max_waiting_time', 3600 * 7 * 24),
0048 src_rse=request_metadata.get('src_rse', None),
0049 dest_rse=request_metadata.get('dest_rse', None),
0050 rule_id=request_metadata.get('rule_id', None))
0051 wf = Workflow()
0052 wf.set_workload_id(workload_id)
0053 wf.add_work(work)
0054
0055 return wf
0056
0057
0058 def convert_hpo_request_metadata_to_workflow(scope, name, workload_id, request_metadata):
0059 """
0060 Convert old format hpo request metadata of json to new format request metadata based on workflow.
0061
0062 :param scope: The collection scope.
0063 :param name: The collection name.
0064 :param workload_id: The workload id.
0065 :param request_metadata: The request metadata.
0066 """
0067
0068
0069
0070
0071 from idds.atlas.workflow.atlashpowork import ATLASHPOWork
0072
0073 work = ATLASHPOWork(executable=request_metadata.get('executable', None),
0074 arguments=request_metadata.get('arguments', None),
0075 parameters=request_metadata.get('parameters', None),
0076 setup=None, exec_type='local',
0077 sandbox=request_metadata.get('sandbox', None),
0078 method=request_metadata.get('method', None),
0079 container_workdir=request_metadata.get('workdir', None),
0080 output_json=request_metadata.get('output_json', None),
0081 opt_space=request_metadata.get('opt_space', None),
0082 workload_id=workload_id,
0083 initial_points=request_metadata.get('initial_points', None),
0084 max_points=request_metadata.get('max_points', None),
0085 num_points_per_iteration=request_metadata.get('num_points_per_iteration', 10))
0086 wf = Workflow()
0087 wf.set_workload_id(workload_id)
0088 wf.add_work(work)
0089 return wf
0090
0091
0092 def convert_old_workflow_to_new_workflow(data):
0093 if ('request_metadata' in data and data['request_metadata'] and 'workflow' in data['request_metadata']):
0094 workflow = data['request_metadata']['workflow']
0095 if workflow:
0096 for work_key in workflow.works_template:
0097 work = workflow.works_template[work_key]
0098 for coll_key in work.collections:
0099 coll = work.collections[coll_key]
0100 if type(coll) in [Collection]:
0101 pass
0102 else:
0103 coll_metadata = copy.copy(coll)
0104 del coll_metadata['scope']
0105 del coll_metadata['name']
0106 new_coll = Collection(scope=coll['scope'], name=coll['name'], coll_metadata=coll_metadata)
0107 new_coll.internal_id = coll_key
0108 work.collections[coll_key] = new_coll
0109 for proc_key in work.processings:
0110 proc = work.processings[proc_key]
0111 if type(proc) in [Processing]:
0112 pass
0113 else:
0114 proc_metadata = proc['processing_metadata']
0115 new_proc = Processing(processing_metadata=proc_metadata)
0116 new_proc.internal_id = proc_key
0117 if 'rule_id' in proc_metadata:
0118 new_proc.external_id = proc_metadata['rule_id']
0119 work.processings[proc_key] = new_proc
0120 return data
0121
0122
0123 def convert_old_req_2_workflow_req(data):
0124 if not data:
0125 return data
0126
0127 if data['request_type'] == RequestType.Workflow:
0128 if ('request_metadata' in data and data['request_metadata'] and 'version' in data['request_metadata']
0129 and data['request_metadata']['version'] and is_new_version(data['request_metadata']['version'], '0.2.9')):
0130 return data
0131 else:
0132 data = convert_old_workflow_to_new_workflow(data)
0133 return data
0134
0135 workload_id = None
0136 if 'workload_id' in data and data['workload_id']:
0137 workload_id = data['workload_id']
0138 elif 'workload_id' in data['request_metadata'] and data['request_metadata']['workload_id']:
0139 workload_id = data['request_metadata']['workload_id']
0140
0141 if data['request_type'] in [RequestType.StageIn, RequestType.StageIn.value]:
0142 wf = convert_stagein_request_metadata_to_workflow(data['scope'], data['name'], workload_id,
0143 data['request_metadata'])
0144 data['request_type'] = RequestType.Workflow
0145 data['transform_tag'] = 'workflow'
0146 data['status'] = RequestStatus.New
0147 data['workload_id'] = wf.get_workload_id()
0148 data['request_metadata'] = {'workload_id': wf.get_workload_id(),
0149 'workflow': wf}
0150 return data
0151 if data['request_type'] in [RequestType.HyperParameterOpt, RequestType.HyperParameterOpt.value]:
0152 wf = convert_hpo_request_metadata_to_workflow(data['scope'] if 'scope' in data else None,
0153 data['name'] if 'name' in data else None,
0154 workload_id,
0155 data['request_metadata'])
0156 primary_init_work = wf.get_primary_initial_collection()
0157 if primary_init_work:
0158 if type(primary_init_work) in [dict]:
0159 data['scope'] = primary_init_work['scope']
0160 data['name'] = primary_init_work['name']
0161 elif type(primary_init_work) in [Collection]:
0162 data['scope'] = primary_init_work.scope
0163 data['name'] = primary_init_work.name
0164
0165 data['request_type'] = RequestType.Workflow
0166 data['transform_tag'] = 'workflow'
0167 data['status'] = RequestStatus.New
0168 data['workload_id'] = wf.get_workload_id()
0169 data['request_metadata'] = {'workload_id': wf.get_workload_id(),
0170 'workflow': wf}
0171 return data
0172 return data
0173
0174
0175 def convert_old_request_metadata(req):
0176 if 'request_metadata' in req and req['request_metadata']:
0177 wf = req['request_metadata']['workflow']
0178 req['request_metadata'] = wf
0179 return req
0180 return req
0181
0182
0183 def get_additional_request_data_storage(data, workflow, logger):
0184 try:
0185 if config_has_section(Sections.Rest) and config_has_option(Sections.Rest, 'max_request_data_length'):
0186 max_request_data_length = config_get_int(Sections.Rest, 'max_request_data_length')
0187 else:
0188 max_request_data_length = 10000000
0189
0190 if config_has_section(Sections.Rest) and config_has_option(Sections.Rest, 'additional_storage'):
0191 additional_storage = config_get(Sections.Rest, 'additional_storage')
0192 else:
0193 additional_storage = '/tmp'
0194
0195 if not workflow or not hasattr(workflow, "is_with_steps"):
0196 return False, additional_storage
0197
0198 if workflow and (workflow.is_with_steps() or workflow.is_workflow_step):
0199 return False, additional_storage
0200
0201 data_length = len(data)
0202 logger.info(f"max_request_data_length: {max_request_data_length}, len(data): {data_length}")
0203 if data_length > max_request_data_length:
0204 return True, additional_storage
0205 return False, additional_storage
0206 except Exception as ex:
0207 logger.warning(f"get_additional_request_data_storage raise exception: {ex}: {traceback.format_exc()}")
0208 return False, None
0209
0210
0211 def convert_data_to_use_additional_storage(data, additional_data_storage, with_add_storage, logger):
0212 if not data:
0213 return data
0214
0215 if ('request_metadata' in data and isinstance(data['request_metadata'], dict) and data['request_metadata'].get('workflow')):
0216 workflow = data['request_metadata']['workflow']
0217 logger.debug(f"convert_data_to_use_additional_storage get workflow: {workflow}")
0218 if with_add_storage:
0219 internal_id = workflow.get_internal_id()
0220 storage = os.path.join(additional_data_storage, internal_id)
0221 if not os.path.exists(storage):
0222 os.makedirs(storage, exist_ok=True)
0223
0224 data['additional_data_storage'] = storage
0225 workflow.set_additional_data_storage(storage)
0226 workflow.convert_data_to_additional_data_storage(storage)
0227 data['request_metadata']['workflow'] = workflow
0228 logger.info(f"Converted workflow {workflow.name} to use storage {storage}")
0229 elif hasattr(workflow, "is_with_steps") and workflow.is_with_steps():
0230 wf_storage = workflow.get_additional_data_storage()
0231 logger.info(f"Workflow {workflow.name} has steps with additional storage {wf_storage}")
0232 if wf_storage == "IDDS_WORKFLOW_ADDITIONAL_STORAGE":
0233 internal_id = workflow.get_internal_id()
0234 storage = os.path.join(additional_data_storage, internal_id)
0235
0236
0237 workflow.convert_data_to_additional_data_storage(storage, storage_name=wf_storage, replace_storage_name=True)
0238 logger.info(f"Replaced workflow {workflow.name} {wf_storage} with {storage}")
0239 return data
0240
0241
0242 def store_data_to_use_additional_storage(internal_id, data, additional_data_storage, logger):
0243 data_storage = os.path.join(additional_data_storage, internal_id)
0244 if not os.path.exists(data_storage):
0245 os.makedirs(data_storage, exist_ok=True)
0246
0247 for work_name, work_data in data.items():
0248 data_file = os.path.join(data_storage, work_name)
0249 with open(data_file, 'w') as fd:
0250 json.dump(work_data, fd)
0251 logger.info(f"store data of {work_name} to {data_file}")
0252
0253
0254 def get_workflow_item(data, item_name, logger):
0255 try:
0256 if not data:
0257 return None
0258
0259 request_metadata = data.get('request_metadata', {})
0260 if 'workflow' in request_metadata:
0261 workflow = request_metadata.get('workflow')
0262 elif 'build_workflow' in request_metadata:
0263 workflow = request_metadata.get('build_workflow')
0264
0265 if workflow:
0266 return getattr(workflow, item_name, None)()
0267 except Exception as ex:
0268 logger.warning(f"failed to get workflow item {item_name}: {ex}")