Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:17

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2024 - 2025
0010 
0011 import logging
0012 import json
0013 import re
0014 
0015 from idds.common.constants import WorkflowType
0016 from idds.common.utils import encode_base64, setup_logging
0017 
0018 
0019 setup_logging(__name__)
0020 
0021 
0022 class BaseSubmitter(object):
0023 
0024     def __init__(self, *args, **kwargs):
0025         pass
0026 
0027     def get_task_params(self, work):
0028         if work.workflow_type in [WorkflowType.iWork]:
0029             task_name = work.name + "_" + str(work.request_id) + "_" + str(work.transform_id)
0030         elif work.workflow_type in [WorkflowType.iWorkflow, WorkflowType.iWorkflowLocal]:
0031             task_name = work.name + "_" + str(work.request_id)
0032         else:
0033             task_name = work.name
0034 
0035         in_files = []
0036         multi_jobs_kwargs_list = work.multi_jobs_kwargs_list
0037         for p in multi_jobs_kwargs_list:
0038             p = json.dumps(p)
0039             p = encode_base64(p)
0040             in_files.append(p)
0041 
0042         task_param_map = {}
0043         task_param_map['vo'] = work.vo if work.vo else 'wlcg'
0044         if work.queue and len(work.queue) > 0:
0045             task_param_map['site'] = work.queue
0046         if work.site and len(work.site) > 0:
0047             task_param_map['PandaSite'] = work.site
0048         if work.cloud and len(work.cloud) > 0:
0049             task_param_map['cloud'] = work.cloud
0050 
0051         if work.no_wait_parent:
0052             task_param_map['noWaitParent'] = work.no_wait_parent
0053 
0054         task_param_map['workingGroup'] = work.working_group
0055 
0056         if work.num_events:
0057             task_param_map['nEvents'] = work.num_events
0058             if work.num_events_per_job:
0059                 task_param_map['nEventsPerJob'] = work.num_events_per_job
0060             else:
0061                 task_param_map['nEventsPerJob'] = work.num_events
0062 
0063         task_param_map['taskName'] = task_name
0064         task_param_map['userName'] = work.username if work.username else 'iDDS'
0065         task_param_map['taskPriority'] = work.priority
0066         # task_param_map['architecture'] = ''
0067         task_param_map['architecture'] = '@el9'
0068         task_param_map['transUses'] = ''
0069         task_param_map['transHome'] = None
0070 
0071         # executable = work.executable
0072         executable = work.get_runner()
0073         # task_param_map['transPath'] = 'https://storage.googleapis.com/drp-us-central1-containers/bash-c-enc'
0074         # task_param_map['encJobParams'] = True
0075         # task_param_map['transPath'] = 'https://wguan-wisc.web.cern.ch/wguan-wisc/run_workflow_wrapper'
0076         # task_param_map['transPath'] = 'https://wguan-idds.web.cern.ch/run_workflow_wrapper'
0077         # task_param_map['transPath'] = 'http://pandaserver-doma.cern.ch:25080/trf/user/run_workflow_wrapper'
0078         # task_param_map['transPath'] = 'https://panda-doma-k8s-panda.cern.ch/trf/user/run_workflow_wrapper'
0079         # task_param_map['transPath'] = 'https://pandaserver-doma.cern.ch/trf/user/run_workflow_wrapper'
0080         task_param_map['transPath'] = 'https://storage.googleapis.com/drp-us-central1-containers/run_workflow_wrapper'
0081 
0082         task_param_map['processingType'] = work.processing_type
0083         task_param_map['prodSourceLabel'] = 'managed'   # managed, test, ptest
0084 
0085         # task_param_map['noWaitParent'] = True
0086         task_param_map['taskType'] = work.task_type if work.task_type else 'iDDS'
0087         task_param_map['coreCount'] = work.core_count
0088         task_param_map['skipScout'] = True
0089         task_param_map['ramCount'] = work.total_memory / work.core_count if work.core_count else work.total_memory
0090         # task_param_map['ramUnit'] = 'MB'
0091         task_param_map['ramUnit'] = 'MBPerCoreFixed'
0092 
0093         # task_param_map['inputPreStaging'] = True
0094         task_param_map['prestagingRuleID'] = 123
0095         task_param_map['nChunksToWait'] = 1
0096         task_param_map['maxCpuCount'] = work.core_count
0097         task_param_map['maxWalltime'] = work.max_walltime
0098         task_param_map['maxFailure'] = work.max_attempt if work.max_attempt else 5
0099         task_param_map['maxAttempt'] = work.max_attempt if work.max_attempt else 5
0100         if task_param_map['maxAttempt'] < work.max_attempt:
0101             task_param_map['maxAttempt'] = work.max_attempt
0102         if task_param_map['maxFailure'] < work.max_attempt:
0103             task_param_map['maxFailure'] = work.max_attempt
0104 
0105         task_param_map['jobParameters'] = [
0106             {'type': 'constant',
0107              'value': executable,  # noqa: E501
0108              },
0109         ]
0110 
0111         # task_param_map['nFilesPerJob'] = 1
0112         if in_files:
0113             # if has_dependencies:
0114             #     task_param_map['inputPreStaging'] = True
0115             task_param_map['nFiles'] = len(in_files)
0116             task_param_map['noInput'] = True
0117             task_param_map['pfnList'] = in_files
0118             task_param_map['nFilesPerJob'] = 1
0119         elif work.input_datasets:
0120             for i, (input_file_name, input_dataset_name) in enumerate(work.input_datasets.items()):
0121                 input_dataset_name = input_dataset_name.replace("$WORKFLOWID", str(work.request_id))
0122                 input_dataset_name = input_dataset_name.replace("${WORKFLOWID}", str(work.request_id))
0123                 tmp_dict = {
0124                     "type": "template",
0125                     "param_type": "input",
0126                     "exclude": "\.log\.tgz(\.\d+)*$",   # noqa W605
0127                     "expand": True,
0128                     # "value": '--inputs%s "${IN/T}" --input_map%s %s' % (i, i, input_file_name),
0129                     "value": f'--inputs{i} "${{IN/T}}" --input_map{i} {input_file_name}',
0130                     "dataset": input_dataset_name
0131                 }
0132                 i += 1
0133                 task_param_map['jobParameters'].append(tmp_dict)
0134                 task_param_map['dsForIN'] = input_dataset_name
0135         else:
0136             # task_param_map['inputPreStaging'] = True
0137             in_files = [json.dumps('pseudo_file')]
0138             task_param_map['nFiles'] = len(in_files)
0139             task_param_map['noInput'] = True
0140             task_param_map['pfnList'] = in_files
0141             task_param_map['nFilesPerJob'] = 1
0142 
0143         if work.output_dataset_name:
0144             if not work.output_dataset_name.endswith("/"):
0145                 work.output_dataset_name = work.output_dataset_name + "/"
0146 
0147         if work.output_dataset_name and work.output_file_name:
0148             output_dataset_name = work.output_dataset_name.replace("$WORKFLOWID", str(work.request_id))
0149             output_dataset_name_no_scope = output_dataset_name.split(":")[-1]
0150             output_file_name = f"{output_dataset_name_no_scope[:-1]}_${{SN/P}}.{work.output_file_name}"
0151             tmp_dict = {"dataset": output_dataset_name,
0152                         "container": output_dataset_name,
0153                         # "destination": "local",
0154                         "param_type": "output",
0155                         # "token": "local",
0156                         "type": "template",
0157                         # "value": "log.tgz"}
0158                         "value": output_file_name
0159                         }
0160 
0161             task_param_map['jobParameters'].append(tmp_dict)
0162 
0163             output_map = {work.output_file_name: output_file_name}
0164             task_param_map["jobParameters"] += [
0165                 {
0166                     "type": "constant",
0167                     # "value": f" --output {work.output_file_name} --mapped_output {output_file_name}",
0168                     "value": ' --output_map "{0}"'.format(str(output_map)),
0169                 },
0170             ]
0171 
0172         # if work.enable_separate_log:
0173         if True:
0174             if work.log_dataset_name:
0175                 log_dataset_name = work.log_dataset_name
0176             elif work.output_dataset_name:
0177                 log_dataset_name = re.sub('/$', '.log/', work.output_dataset_name)
0178             else:
0179                 log_dataset_name = f"Panda.iworkflow.{work.request_id}/"   # "PandaJob_#{pandaid}/"
0180 
0181             log_dataset_name = log_dataset_name.replace("$WORKFLOWID", str(work.request_id))
0182             log_dataset_name_no_scope = log_dataset_name.split(":")[-1]
0183 
0184             logging.debug(f"BaseSubmitter enable_separate_log: {work.enable_separate_log}")
0185             task_param_map['log'] = {"dataset": log_dataset_name,
0186                                      "container": log_dataset_name,
0187                                      # "destination": "local",
0188                                      "param_type": "log",
0189                                      # "token": "local",
0190                                      "type": "template",
0191                                      # "value": "log.tgz"}
0192                                      # 'value': '{0}.$JEDITASKID.${{SN}}.log.tgz'.format(log_dataset_name[:-1])
0193                                      'value': '{0}.${{SN}}.log.tgz'.format(log_dataset_name_no_scope[:-1])
0194                                      }
0195 
0196         task_param_map['reqID'] = work.request_id
0197 
0198         if work.container_options:
0199             if type(work.container_options) in [dict] and work.container_options.get('container_image', None):
0200                 container_image = work.container_options.get('container_image', None)
0201                 task_param_map['container_name'] = container_image
0202 
0203         return task_param_map
0204 
0205     def submit(self, *args, **kwargs):
0206         pass
0207 
0208 
0209 class BasePoller(object):
0210 
0211     def __init__(self, *args, **kwargs):
0212         pass
0213 
0214     def poll(self, *args, **kwargs):
0215         pass
0216 
0217 
0218 class BaseSubmitterPoller(BaseSubmitter):
0219 
0220     def __init__(self, *args, **kwargs):
0221         super(BaseSubmitterPoller, self).__init__(*args, **kwargs)
0222 
0223     def poll(self, *args, **kwargs):
0224         pass