File indexing completed on 2026-04-09 07:58:21
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 Test client.
0014 """
0015 import json
0016 import re
0017 import time
0018
0019
0020 try:
0021 from urllib import quote
0022 except ImportError:
0023 from urllib.parse import quote
0024
0025 from pandaclient import Client
0026
0027
0028 from idds.client.clientmanager import ClientManager
0029
0030 from idds.common.utils import get_rest_host, run_command
0031
0032
0033
0034
0035
0036 from idds.workflowv2.workflow import Condition, Workflow
0037
0038
0039 from idds.atlas.workflowv2.atlaspandawork import ATLASPandaWork
0040 from idds.atlas.workflowv2.atlasactuatorwork import ATLASActuatorWork
0041
0042
0043 def get_task_id(output, error):
0044 m = re.search('jediTaskID=(\d+)', output + error)
0045 task_id = int(m.group(1))
0046 return task_id
0047
0048
0049 def submit_processing_task():
0050 outDS = "user.wguan.altest%s" % str(int(time.time()))
0051 cmd = "cd /afs/cern.ch/user/w/wguan/workdisk/iDDS/main/lib/idds/tests/activelearning_test_codes; prun --exec 'python simplescript.py 0.5 0.5 200 output.json' --outDS %s --outputs output.json --nJobs=10" % outDS
0052 status, output, error = run_command(cmd)
0053 """
0054 print("status:")
0055 print(status)
0056 print("output:")
0057 print(output)
0058 print("error:")
0059 print(error)
0060
0061 status:
0062 0
0063 output:
0064
0065 error:
0066 INFO : gathering files under /afs/cern.ch/user/w/wguan/workdisk/iDDS/main/lib/idds/tests/activelearning_test_codes
0067 INFO : upload source files
0068 INFO : submit user.wguan.altest1234/
0069 INFO : succeeded. new jediTaskID=23752996
0070 """
0071 if status == 0:
0072 task_id = get_task_id(output, error)
0073 return task_id
0074 else:
0075 raise Exception(output + error)
0076
0077
0078 def get_panda_task_paramsmap(panda_task_id):
0079 status, task_param_map = Client.getTaskParamsMap(panda_task_id)
0080 if status == 0:
0081 task_param_map = json.loads(task_param_map)
0082 return task_param_map
0083 return None
0084
0085
0086 def define_panda_task_paramsmap():
0087
0088
0089
0090
0091
0092 task_id = submit_processing_task()
0093 task_param_map = get_panda_task_paramsmap(task_id)
0094 cmd_to_arguments = {'arguments': 'python simplescript.py 0.5 0.5 200',
0095 'parameters': 'python simplescript.py {m1} {m2} {nevents}'}
0096
0097
0098 task_param_map['cliParams'] = task_param_map['cliParams'].replace(cmd_to_arguments['arguments'], cmd_to_arguments['parameters'])
0099 jobParameters = task_param_map['jobParameters']
0100 for p in jobParameters:
0101 if 'value' in p:
0102 p['value'] = p['value'].replace(quote(cmd_to_arguments['arguments']), quote(cmd_to_arguments['parameters']))
0103 return task_param_map
0104
0105
0106 def test_panda_work():
0107 task_param_map = define_panda_task_paramsmap()
0108 work = ATLASPandaWork(panda_task_paramsmap=task_param_map)
0109 work.initialize_work()
0110 print(work.__class__.__name__)
0111 print('sandbox: %s' % work.sandbox)
0112 print('output_collections: %s' % str(work.get_output_collections()))
0113
0114 print("new work")
0115 test_work = work.generate_work_from_template()
0116 test_work.set_parameters({'m1': 0.5, 'm2': 0.5, 'nevents': 100})
0117
0118 test_work.initialize_work()
0119
0120
0121
0122
0123
0124
0125
0126
0127
0128
0129 def get_workflow():
0130 task_param_map = define_panda_task_paramsmap()
0131 work = ATLASPandaWork(panda_task_paramsmap=task_param_map)
0132
0133
0134
0135 work.initialize_work()
0136
0137 work_output_coll = work.get_output_collections()[0]
0138
0139 input_coll = {'scope': work_output_coll['scope'],
0140 'name': work_output_coll['name'],
0141 'coll_metadata': {'force_close': True}}
0142 output_coll = {'scope': work_output_coll['scope'],
0143 'name': work_output_coll['name'] + "." + str(int(time.time()))}
0144
0145
0146 acutator = ATLASActuatorWork(executable='python', arguments='merge.py {output_json} {events} {dataset}',
0147 parameters={'output_json': 'merge.json',
0148 'events': 200,
0149 'dataset': '{scope}:{name}'.format(**input_coll),
0150 'filename': 'output*.json'},
0151 sandbox=work.sandbox, primary_input_collection=input_coll,
0152 output_collections=output_coll, output_json='merge.json')
0153 wf = Workflow()
0154
0155 wf.add_work(work)
0156 wf.add_work(acutator)
0157 cond = Condition(work.is_finished, current_work=work, true_work=acutator, false_work=None)
0158 wf.add_condition(cond)
0159 cond1 = Condition(acutator.generate_new_task, current_work=acutator, true_work=work, false_work=None)
0160 wf.add_condition(cond1)
0161
0162
0163
0164 wf.add_initial_works(work)
0165
0166
0167 return wf
0168
0169
0170 if __name__ == '__main__':
0171 host = get_rest_host()
0172
0173
0174
0175
0176
0177 test_panda_work()
0178 workflow = get_workflow()
0179 wm = ClientManager(host=host)
0180 request_id = wm.submit(workflow)
0181 print(request_id)