Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:21

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2019 - 2021
0010 
0011 
0012 """
0013 Test client.
0014 """
0015 import json
0016 import re
0017 import time
0018 # import traceback
0019 
0020 try:
0021     from urllib import quote
0022 except ImportError:
0023     from urllib.parse import quote
0024 
0025 from pandaclient import Client
0026 
0027 # from idds.client.client import Client
0028 from idds.client.clientmanager import ClientManager
0029 # from idds.common.constants import RequestType, RequestStatus
0030 from idds.common.utils import get_rest_host, run_command
0031 # from idds.common.utils import json_dumps
0032 # from idds.tests.common import get_example_real_tape_stagein_request
0033 # from idds.tests.common import get_example_prodsys2_tape_stagein_request
0034 
0035 # from idds.workflowv2.work import Work, Parameter, WorkStatus
0036 from idds.workflowv2.workflow import Condition, Workflow
0037 # from idds.workflowv2.workflow import Workflow
0038 # from idds.atlas.workflowv2.atlasstageinwork import ATLASStageinWork
0039 from idds.atlas.workflowv2.atlaspandawork import ATLASPandaWork
0040 from idds.atlas.workflowv2.atlasactuatorwork import ATLASActuatorWork
0041 
0042 
0043 def get_task_id(output, error):
0044     m = re.search('jediTaskID=(\d+)', output + error)  # noqa W605
0045     task_id = int(m.group(1))
0046     return task_id
0047 
0048 
0049 def submit_processing_task():
0050     outDS = "user.wguan.altest%s" % str(int(time.time()))
0051     cmd = "cd /afs/cern.ch/user/w/wguan/workdisk/iDDS/main/lib/idds/tests/activelearning_test_codes; prun --exec 'python simplescript.py 0.5 0.5 200 output.json' --outDS %s  --outputs output.json --nJobs=10" % outDS
0052     status, output, error = run_command(cmd)
0053     """
0054     print("status:")
0055     print(status)
0056     print("output:")
0057     print(output)
0058     print("error:")
0059     print(error)
0060 
0061     status:
0062     0
0063     output:
0064 
0065     error:
0066     INFO : gathering files under /afs/cern.ch/user/w/wguan/workdisk/iDDS/main/lib/idds/tests/activelearning_test_codes
0067     INFO : upload source files
0068     INFO : submit user.wguan.altest1234/
0069     INFO : succeeded. new jediTaskID=23752996
0070     """
0071     if status == 0:
0072         task_id = get_task_id(output, error)
0073         return task_id
0074     else:
0075         raise Exception(output + error)
0076 
0077 
0078 def get_panda_task_paramsmap(panda_task_id):
0079     status, task_param_map = Client.getTaskParamsMap(panda_task_id)
0080     if status == 0:
0081         task_param_map = json.loads(task_param_map)
0082         return task_param_map
0083     return None
0084 
0085 
0086 def define_panda_task_paramsmap():
0087     # here is using a fake method by submitting a panda task.
0088     # Users should define the task params map by themselves.
0089 
0090     # (0, '{"buildSpec": {"jobParameters": "-i ${IN} -o ${OUT} --sourceURL ${SURL} -r . ", "archiveName": "sources.0ca6a2fb-4ad0-42d0-979d-aa7c284f1ff7.tar.gz", "prodSourceLabel": "panda"}, "sourceURL": "https://aipanda048.cern.ch:25443", "cliParams": "prun --exec \\"python simplescript.py 0.5 0.5 200 output.json\\" --outDS user.wguan.altest1234 --outputs output.json --nJobs=10", "site": null, "vo": "atlas", "respectSplitRule": true, "osInfo": "Linux-3.10.0-1127.19.1.el7.x86_64-x86_64-with-centos-7.9.2009-Core", "log": {"type": "template", "param_type": "log", "container": "user.wguan.altest1234.log/", "value": "user.wguan.altest1234.log.$JEDITASKID.${SN}.log.tgz", "dataset": "user.wguan.altest1234.log/"}, "transUses": "", "excludedSite": [], "nMaxFilesPerJob": 200, "uniqueTaskName": true, "noInput": true, "taskName": "user.wguan.altest1234/", "transHome": null, "includedSite": null, "nEvents": 10, "nEventsPerJob": 1, "jobParameters": [{"type": "constant", "value": "-j \\"\\" --sourceURL ${SURL}"}, {"type": "constant", "value": "-r ."}, {"padding": false, "type": "constant", "value": "-p \\""}, {"padding": false, "type": "constant", "value": "python%20simplescript.py%200.5%200.5%20200%20output.json"}, {"type": "constant", "value": "\\""}, {"type": "constant", "value": "-l ${LIB}"}, {"container": "user.wguan.altest1234_output.json/", "value": "user.wguan.$JEDITASKID._${SN/P}.output.json", "dataset": "user.wguan.altest1234_output.json/", "param_type": "output", "hidden": true, "type": "template"}, {"type": "constant", "value": "-o \\"{\'output.json\': \'user.wguan.$JEDITASKID._${SN/P}.output.json\'}\\""}], "prodSourceLabel": "user", "processingType": "panda-client-1.4.47-jedi-run", "architecture": "@centos7", "userName": "Wen Guan", "taskType": "anal", "taskPriority": 1000, "countryGroup": "us"}')  # noqa E501
0091 
0092     task_id = submit_processing_task()
0093     task_param_map = get_panda_task_paramsmap(task_id)
0094     cmd_to_arguments = {'arguments': 'python simplescript.py 0.5 0.5 200',
0095                         'parameters': 'python simplescript.py {m1} {m2} {nevents}'}
0096 
0097     # update the cliParams to have undefined parameters, these parameters {m1}, {m2}, {nevents} will be the outputs of learning script.
0098     task_param_map['cliParams'] = task_param_map['cliParams'].replace(cmd_to_arguments['arguments'], cmd_to_arguments['parameters'])
0099     jobParameters = task_param_map['jobParameters']
0100     for p in jobParameters:
0101         if 'value' in p:
0102             p['value'] = p['value'].replace(quote(cmd_to_arguments['arguments']), quote(cmd_to_arguments['parameters']))
0103     return task_param_map
0104 
0105 
0106 def test_panda_work():
0107     task_param_map = define_panda_task_paramsmap()
0108     work = ATLASPandaWork(panda_task_paramsmap=task_param_map)
0109     work.initialize_work()
0110     print(work.__class__.__name__)
0111     print('sandbox: %s' % work.sandbox)
0112     print('output_collections: %s' % str(work.get_output_collections()))
0113 
0114     print("new work")
0115     test_work = work.generate_work_from_template()
0116     test_work.set_parameters({'m1': 0.5, 'm2': 0.5, 'nevents': 100})
0117 
0118     test_work.initialize_work()
0119     # print(json_dumps(test_work, sort_keys=True, indent=4))
0120     # print('output_collections: %s' % str(test_work.get_output_collections()))
0121     # print(json_dumps(test_work, sort_keys=True, indent=4))
0122 
0123     # from pandaclient import Client
0124     # Client.getJediTaskDetails(taskDict,fullFlag,withTaskInfo,verbose=False)
0125     # ret = Client.getJediTaskDetails({'jediTaskID': panda_task_id},False,True)
0126     # print(ret)
0127 
0128 
0129 def get_workflow():
0130     task_param_map = define_panda_task_paramsmap()
0131     work = ATLASPandaWork(panda_task_paramsmap=task_param_map)
0132 
0133     # it's needed to parse the panda task parameter information, for example output dataset name, for the next task.
0134     # if the information is not needed, you don't need to run it manually. iDDS will call it interally to parse the information.
0135     work.initialize_work()
0136 
0137     work_output_coll = work.get_output_collections()[0]
0138 
0139     input_coll = {'scope': work_output_coll['scope'],
0140                   'name': work_output_coll['name'],
0141                   'coll_metadata': {'force_close': True}}
0142     output_coll = {'scope': work_output_coll['scope'],
0143                    'name': work_output_coll['name'] + "." + str(int(time.time()))}
0144 
0145     # acutator = ATLASActuatorWork(executable='python', arguments='merge.py {output_json} {events} {dataset}/{filename}',
0146     acutator = ATLASActuatorWork(executable='python', arguments='merge.py {output_json} {events} {dataset}',
0147                                  parameters={'output_json': 'merge.json',
0148                                              'events': 200,
0149                                              'dataset': '{scope}:{name}'.format(**input_coll),
0150                                              'filename': 'output*.json'},
0151                                  sandbox=work.sandbox, primary_input_collection=input_coll,
0152                                  output_collections=output_coll, output_json='merge.json')
0153     wf = Workflow()
0154     # because the two tasks are in a loop. It's good to set which one to start.
0155     wf.add_work(work)
0156     wf.add_work(acutator)
0157     cond = Condition(work.is_finished, current_work=work, true_work=acutator, false_work=None)
0158     wf.add_condition(cond)
0159     cond1 = Condition(acutator.generate_new_task, current_work=acutator, true_work=work, false_work=None)
0160     wf.add_condition(cond1)
0161 
0162     # because the two works are in a loop, they are not independent. This call is needed to tell which one to start.
0163     # otherwise idds will use the first one to start.
0164     wf.add_initial_works(work)
0165 
0166     # work.set_workflow(wf)
0167     return wf
0168 
0169 
0170 if __name__ == '__main__':
0171     host = get_rest_host()
0172     # panda_task_id = submit_processing_task()
0173     # panda_task_id = 23752996
0174     # panda_task_id = 23810059
0175     # panda_task_id = 23818866
0176     # print(panda_task_id)
0177     test_panda_work()
0178     workflow = get_workflow()
0179     wm = ClientManager(host=host)
0180     request_id = wm.submit(workflow)
0181     print(request_id)