File indexing completed on 2026-04-09 07:58:17
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 import logging
0012 import json
0013 import re
0014
0015 from idds.common.constants import WorkflowType
0016 from idds.common.utils import encode_base64, setup_logging
0017
0018
0019 setup_logging(__name__)
0020
0021
0022 class BaseSubmitter(object):
0023
0024 def __init__(self, *args, **kwargs):
0025 pass
0026
0027 def get_task_params(self, work):
0028 if work.workflow_type in [WorkflowType.iWork]:
0029 task_name = work.name + "_" + str(work.request_id) + "_" + str(work.transform_id)
0030 elif work.workflow_type in [WorkflowType.iWorkflow, WorkflowType.iWorkflowLocal]:
0031 task_name = work.name + "_" + str(work.request_id)
0032 else:
0033 task_name = work.name
0034
0035 in_files = []
0036 multi_jobs_kwargs_list = work.multi_jobs_kwargs_list
0037 for p in multi_jobs_kwargs_list:
0038 p = json.dumps(p)
0039 p = encode_base64(p)
0040 in_files.append(p)
0041
0042 task_param_map = {}
0043 task_param_map['vo'] = work.vo if work.vo else 'wlcg'
0044 if work.queue and len(work.queue) > 0:
0045 task_param_map['site'] = work.queue
0046 if work.site and len(work.site) > 0:
0047 task_param_map['PandaSite'] = work.site
0048 if work.cloud and len(work.cloud) > 0:
0049 task_param_map['cloud'] = work.cloud
0050
0051 if work.no_wait_parent:
0052 task_param_map['noWaitParent'] = work.no_wait_parent
0053
0054 task_param_map['workingGroup'] = work.working_group
0055
0056 if work.num_events:
0057 task_param_map['nEvents'] = work.num_events
0058 if work.num_events_per_job:
0059 task_param_map['nEventsPerJob'] = work.num_events_per_job
0060 else:
0061 task_param_map['nEventsPerJob'] = work.num_events
0062
0063 task_param_map['taskName'] = task_name
0064 task_param_map['userName'] = work.username if work.username else 'iDDS'
0065 task_param_map['taskPriority'] = work.priority
0066
0067 task_param_map['architecture'] = '@el9'
0068 task_param_map['transUses'] = ''
0069 task_param_map['transHome'] = None
0070
0071
0072 executable = work.get_runner()
0073
0074
0075
0076
0077
0078
0079
0080 task_param_map['transPath'] = 'https://storage.googleapis.com/drp-us-central1-containers/run_workflow_wrapper'
0081
0082 task_param_map['processingType'] = work.processing_type
0083 task_param_map['prodSourceLabel'] = 'managed'
0084
0085
0086 task_param_map['taskType'] = work.task_type if work.task_type else 'iDDS'
0087 task_param_map['coreCount'] = work.core_count
0088 task_param_map['skipScout'] = True
0089 task_param_map['ramCount'] = work.total_memory / work.core_count if work.core_count else work.total_memory
0090
0091 task_param_map['ramUnit'] = 'MBPerCoreFixed'
0092
0093
0094 task_param_map['prestagingRuleID'] = 123
0095 task_param_map['nChunksToWait'] = 1
0096 task_param_map['maxCpuCount'] = work.core_count
0097 task_param_map['maxWalltime'] = work.max_walltime
0098 task_param_map['maxFailure'] = work.max_attempt if work.max_attempt else 5
0099 task_param_map['maxAttempt'] = work.max_attempt if work.max_attempt else 5
0100 if task_param_map['maxAttempt'] < work.max_attempt:
0101 task_param_map['maxAttempt'] = work.max_attempt
0102 if task_param_map['maxFailure'] < work.max_attempt:
0103 task_param_map['maxFailure'] = work.max_attempt
0104
0105 task_param_map['jobParameters'] = [
0106 {'type': 'constant',
0107 'value': executable,
0108 },
0109 ]
0110
0111
0112 if in_files:
0113
0114
0115 task_param_map['nFiles'] = len(in_files)
0116 task_param_map['noInput'] = True
0117 task_param_map['pfnList'] = in_files
0118 task_param_map['nFilesPerJob'] = 1
0119 elif work.input_datasets:
0120 for i, (input_file_name, input_dataset_name) in enumerate(work.input_datasets.items()):
0121 input_dataset_name = input_dataset_name.replace("$WORKFLOWID", str(work.request_id))
0122 input_dataset_name = input_dataset_name.replace("${WORKFLOWID}", str(work.request_id))
0123 tmp_dict = {
0124 "type": "template",
0125 "param_type": "input",
0126 "exclude": "\.log\.tgz(\.\d+)*$",
0127 "expand": True,
0128
0129 "value": f'--inputs{i} "${{IN/T}}" --input_map{i} {input_file_name}',
0130 "dataset": input_dataset_name
0131 }
0132 i += 1
0133 task_param_map['jobParameters'].append(tmp_dict)
0134 task_param_map['dsForIN'] = input_dataset_name
0135 else:
0136
0137 in_files = [json.dumps('pseudo_file')]
0138 task_param_map['nFiles'] = len(in_files)
0139 task_param_map['noInput'] = True
0140 task_param_map['pfnList'] = in_files
0141 task_param_map['nFilesPerJob'] = 1
0142
0143 if work.output_dataset_name:
0144 if not work.output_dataset_name.endswith("/"):
0145 work.output_dataset_name = work.output_dataset_name + "/"
0146
0147 if work.output_dataset_name and work.output_file_name:
0148 output_dataset_name = work.output_dataset_name.replace("$WORKFLOWID", str(work.request_id))
0149 output_dataset_name_no_scope = output_dataset_name.split(":")[-1]
0150 output_file_name = f"{output_dataset_name_no_scope[:-1]}_${{SN/P}}.{work.output_file_name}"
0151 tmp_dict = {"dataset": output_dataset_name,
0152 "container": output_dataset_name,
0153
0154 "param_type": "output",
0155
0156 "type": "template",
0157
0158 "value": output_file_name
0159 }
0160
0161 task_param_map['jobParameters'].append(tmp_dict)
0162
0163 output_map = {work.output_file_name: output_file_name}
0164 task_param_map["jobParameters"] += [
0165 {
0166 "type": "constant",
0167
0168 "value": ' --output_map "{0}"'.format(str(output_map)),
0169 },
0170 ]
0171
0172
0173 if True:
0174 if work.log_dataset_name:
0175 log_dataset_name = work.log_dataset_name
0176 elif work.output_dataset_name:
0177 log_dataset_name = re.sub('/$', '.log/', work.output_dataset_name)
0178 else:
0179 log_dataset_name = f"Panda.iworkflow.{work.request_id}/"
0180
0181 log_dataset_name = log_dataset_name.replace("$WORKFLOWID", str(work.request_id))
0182 log_dataset_name_no_scope = log_dataset_name.split(":")[-1]
0183
0184 logging.debug(f"BaseSubmitter enable_separate_log: {work.enable_separate_log}")
0185 task_param_map['log'] = {"dataset": log_dataset_name,
0186 "container": log_dataset_name,
0187
0188 "param_type": "log",
0189
0190 "type": "template",
0191
0192
0193 'value': '{0}.${{SN}}.log.tgz'.format(log_dataset_name_no_scope[:-1])
0194 }
0195
0196 task_param_map['reqID'] = work.request_id
0197
0198 if work.container_options:
0199 if type(work.container_options) in [dict] and work.container_options.get('container_image', None):
0200 container_image = work.container_options.get('container_image', None)
0201 task_param_map['container_name'] = container_image
0202
0203 return task_param_map
0204
0205 def submit(self, *args, **kwargs):
0206 pass
0207
0208
0209 class BasePoller(object):
0210
0211 def __init__(self, *args, **kwargs):
0212 pass
0213
0214 def poll(self, *args, **kwargs):
0215 pass
0216
0217
0218 class BaseSubmitterPoller(BaseSubmitter):
0219
0220 def __init__(self, *args, **kwargs):
0221 super(BaseSubmitterPoller, self).__init__(*args, **kwargs)
0222
0223 def poll(self, *args, **kwargs):
0224 pass