Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:22

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Sergey Padolski, <spadolski@bnl.gov>, 2021
0010 # - Wen Guan, <wen.guan@cern.ch>, 2021
0011 
0012 
0013 """
0014 Test client.
0015 """
0016 
0017 import string
0018 import random
0019 
0020 # import traceback
0021 
0022 # from rucio.client.client import Client as Rucio_Client
0023 # from rucio.common.exception import CannotAuthenticate
0024 
0025 from idds.common.utils import json_dumps                 # noqa F401
0026 from idds.common.constants import ContentStatus, RequestType, RequestStatus          # noqa F401
0027 from idds.core.requests import get_requests, add_request              # noqa F401
0028 from idds.core.messages import retrieve_messages         # noqa F401
0029 from idds.core.transforms import get_transforms          # noqa F401
0030 from idds.core.workprogress import get_workprogresses    # noqa F401
0031 from idds.core.processings import get_processings        # noqa F401
0032 from idds.core import transforms as core_transforms      # noqa F401
0033 
0034 # from idds.client.client import Client
0035 from idds.client.clientmanager import ClientManager                           # noqa F401
0036 # from idds.common.constants import RequestType, RequestStatus                # noqa F401
0037 from idds.common.utils import get_rest_host                                   # noqa F401
0038 # from idds.tests.common import get_example_real_tape_stagein_request
0039 # from idds.tests.common import get_example_prodsys2_tape_stagein_request
0040 
0041 # from idds.workflowv2.work import Work, Parameter, WorkStatus
0042 # from idds.workflowv2.workflow import Condition, Workflow
0043 from idds.workflowv2.workflow import Workflow
0044 # from idds.atlas.workflowv2.atlasstageinwork import ATLASStageinWork
0045 from idds.doma.workflowv2.domapandawork import DomaPanDAWork
0046 
0047 
0048 task_queue = 'DOMA_LSST_GOOGLE_TEST'
0049 
0050 
0051 def randStr(chars=string.ascii_lowercase + string.digits, N=10):
0052     return ''.join(random.choice(chars) for _ in range(N))
0053 
0054 
0055 class PanDATask(object):
0056     name = None
0057     step = None
0058     dependencies = []
0059 
0060 
0061 def get_workflow():
0062 
0063     taskN1 = PanDATask()
0064     taskN1.step = "step1"
0065     taskN1.name = taskN1.step + "_" + randStr()
0066     taskN1.dependencies = [
0067         {"name": "00000" + str(k),
0068          "dependencies": [],
0069          "submitted": False} for k in range(6)
0070     ]
0071 
0072     taskN2 = PanDATask()
0073     taskN2.step = "step2"
0074     taskN2.name = taskN2.step + "_" + randStr()
0075     taskN2.dependencies = [
0076         {
0077             "name": "000010",
0078             "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0079                              {"task": taskN1.name, "inputname": "000002", "available": False}],
0080             "submitted": False
0081         },
0082         {
0083             "name": "000011",
0084             "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0085                              {"task": taskN1.name, "inputname": "000002", "available": False}],
0086             "submitted": False
0087         },
0088         {
0089             "name": "000012",
0090             "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0091                              {"task": taskN1.name, "inputname": "000002", "available": False}],
0092             "submitted": False
0093         }
0094     ]
0095 
0096     taskN3 = PanDATask()
0097     taskN3.step = "step3"
0098     taskN3.name = taskN3.step + "_" + randStr()
0099     taskN3.dependencies = [
0100         {
0101             "name": "000020",
0102             "dependencies": [],
0103             "submitted": False
0104         },
0105         {
0106             "name": "000021",
0107             "dependencies": [{"task": taskN2.name, "inputname": "000010", "available": False},
0108                              {"task": taskN2.name, "inputname": "000011", "available": False}],
0109             "submitted": False
0110         },
0111         {
0112             "name": "000022",
0113             "dependencies": [{"task": taskN2.name, "inputname": "000011", "available": False},
0114                              {"task": taskN2.name, "inputname": "000012", "available": False}],
0115             "submitted": False
0116         },
0117         {
0118             "name": "000023",
0119             "dependencies": [],
0120             "submitted": False
0121         },
0122         {
0123             "name": "000024",
0124             "dependencies": [{"task": taskN3.name, "inputname": "000021", "available": False},
0125                              {"task": taskN3.name, "inputname": "000023", "available": False}],
0126             "submitted": False
0127         },
0128     ]
0129 
0130     work1 = DomaPanDAWork(executable='echo',
0131                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0132                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0133                           log_collections=[], dependency_map=taskN1.dependencies, task_name=taskN1.name, task_queue=task_queue)
0134     work2 = DomaPanDAWork(executable='echo',
0135                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#2'},
0136                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#2'}],
0137                           log_collections=[], dependency_map=taskN2.dependencies, task_name=taskN2.name, task_queue=task_queue)
0138     work3 = DomaPanDAWork(executable='echo',
0139                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#3'},
0140                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#3'}],
0141                           log_collections=[], dependency_map=taskN3.dependencies, task_name=taskN3.name, task_queue=task_queue)
0142 
0143     workflow = Workflow()
0144     workflow.add_work(work1)
0145     workflow.add_work(work2)
0146     workflow.add_work(work3)
0147     return workflow
0148 
0149 
0150 def get_workflow_props(workflow):
0151     props = {
0152         'scope': 'workflow',
0153         'name': workflow.name,
0154         'requester': 'panda',
0155         'request_type': RequestType.Workflow,
0156         'transform_tag': 'workflow',
0157         'status': RequestStatus.New,
0158         'priority': 0,
0159         'lifetime': 30,
0160         'workload_id': workflow.get_workload_id(),
0161         'request_metadata': {'workload_id': workflow.get_workload_id(), 'workflow': workflow}
0162     }
0163     # workflow.add_proxy()
0164     primary_init_work = workflow.get_primary_initial_collection()
0165     if primary_init_work:
0166         props['scope'] = primary_init_work['scope']
0167         props['name'] = primary_init_work['name']
0168     return props
0169 
0170 
0171 def test_running_data():
0172     workflow = get_workflow()
0173     props = get_workflow_props(workflow)
0174     request_id = add_request(**props)
0175     return request_id
0176 
0177 
0178 def show_request(req):
0179     workflow = req['processing_metadata']['workflow']
0180     print(workflow.independent_works)
0181     print(len(workflow.independent_works))
0182     print(workflow.works_template.keys())
0183     print(len(workflow.works_template.keys()))
0184     print(workflow.work_sequence.keys())
0185     print(len(workflow.work_sequence.keys()))
0186     print(workflow.works.keys())
0187     print(len(workflow.works.keys()))
0188 
0189     work_ids = []
0190     for i_id in workflow.works:
0191         work = workflow.works[i_id]
0192         print(i_id)
0193         print(work.work_name)
0194         print(work.task_name)
0195         print(work.work_id)
0196         work_ids.append(work.work_id)
0197     print(work_ids)
0198 
0199 
0200 def test_get_requests(request_id):
0201     reqs = get_requests(request_id=request_id, with_detail=False)
0202     for req in reqs:
0203         # show_request(req)
0204         print(req)
0205         print(json_dumps(req, sort_keys=True, indent=4))
0206         pass
0207 
0208 
0209 if __name__ == '__main__':
0210     request_id = test_running_data()
0211     print(request_id)
0212     test_get_requests(request_id=3)