File indexing completed on 2026-04-09 07:58:21
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 Test client.
0014 """
0015
0016 import traceback
0017
0018 from rucio.client.client import Client as Rucio_Client
0019 from rucio.common.exception import CannotAuthenticate
0020
0021
0022 from idds.client.clientmanager import ClientManager
0023 from idds.common.constants import RequestType, RequestStatus
0024 from idds.common.utils import get_rest_host
0025
0026
0027
0028
0029
0030 from idds.workflowv2.workflow import Workflow
0031 from idds.atlas.workflowv2.atlasstageinwork import ATLASStageinWork
0032
0033
0034 def get_rucio_client():
0035 try:
0036 client = Rucio_Client()
0037 except CannotAuthenticate as error:
0038 print(traceback.format_exc())
0039 raise Exception('%s: %s' % (str(error), traceback.format_exc()))
0040 return client
0041
0042
0043 def get_rule(scope, name, rucio_client, src_rse, dest_rse, account='ddmadmin'):
0044 rules = rucio_client.list_did_rules(scope=scope, name=name)
0045 for rule in rules:
0046 if rule['source_replica_expression'] == src_rse and rule['rse_expression']:
0047 print(rule['id'])
0048 return rule['id']
0049
0050 return None
0051
0052
0053 def create_rule(scope, name, rucio_client, src_rse, dest_rse, account='ddmadmin'):
0054 did = {'scope': scope, 'name': name}
0055 rule_id = rucio_client.add_replication_rule(dids=[did],
0056 copies=1,
0057 rse_expression=dest_rse,
0058 source_replica_expression=src_rse,
0059 lifetime=24 * 7 * 3600,
0060 locked=False,
0061 account=account,
0062 grouping='DATASET',
0063 ask_approval=False)
0064 return rule_id
0065
0066
0067
0068 def get_req_properties():
0069 req_properties = {
0070 'scope': 'data16_13TeV',
0071 'name': 'data16_13TeV.00298862.physics_Main.daq.RAW',
0072 'requester': 'panda',
0073 'request_type': RequestType.StageIn,
0074 'transform_tag': 'prodsys2',
0075 'status': RequestStatus.New,
0076 'priority': 0,
0077 'lifetime': 30,
0078 'request_metadata': {'workload_id': '20776840', 'max_waiting_time': 3600, 'src_rse': 'NDGF-T1_DATATAPE', 'dest_rse': 'NDGF-T1_DATADISK', 'rule_id': '236e4bf87e11490291e3259b14724e30'}
0079 }
0080 return req_properties
0081
0082
0083 def get_rule_id(scope, name, src_rse, dest_rse):
0084 rucio_client = get_rucio_client()
0085 rule_id = get_rule(scope, name, rucio_client, src_rse, dest_rse)
0086 if not rule_id:
0087 rule_id = create_rule(scope, name, rucio_client, src_rse, dest_rse)
0088 if rule_id:
0089 print("new rule id: %s" % rule_id)
0090 return rule_id
0091
0092
0093 def get_workflow():
0094
0095
0096 src_rse = 'NDGF-T1_DATATAPE'
0097 dest_rse = 'NDGF-T1_DATADISK'
0098
0099 scope = 'mc16_13TeV'
0100 name = 'mc16_13TeV.411332.PhHerwig7EG_ttbar_hdamp258p75_713_dil_BBFilt.merge.EVNT.e7800_e5984_tid19396149_00'
0101
0102 src_rse = None
0103 dest_rse = 'UKI-SCOTGRID-GLASGOW-CEPH_DATADISK'
0104 rule_id = get_rule_id(scope, name, src_rse, dest_rse)
0105 work = ATLASStageinWork(executable=None, arguments=None, parameters=None, setup=None,
0106 exec_type='local', sandbox=None, work_id=None,
0107 primary_input_collection={'scope': scope, 'name': name},
0108 other_input_collections=None,
0109 output_collections={'scope': scope, 'name': name + '.idds.stagein'},
0110 log_collections=None,
0111 logger=None,
0112 max_waiting_time=3600 * 7 * 24, src_rse=src_rse, dest_rse=dest_rse, rule_id=rule_id)
0113 wf = Workflow()
0114 wf.add_work(work)
0115
0116 return wf
0117
0118
0119 def pre_check(req):
0120 rucio_client = get_rucio_client()
0121 rule_id = get_rule(req['scope'], req['name'], rucio_client, req['request_metadata']['src_rse'], req['request_metadata']['dest_rse'])
0122 if not rule_id:
0123 rule_id = create_rule(req['scope'], req['name'], rucio_client, req['request_metadata']['src_rse'], req['request_metadata']['dest_rse'])
0124 if rule_id:
0125 print("new rule id: %s" % rule_id)
0126 req['request_metadata']['rule_id'] = rule_id
0127 return req
0128
0129
0130 if __name__ == '__main__':
0131 host = get_rest_host()
0132
0133
0134
0135
0136 workflow = get_workflow()
0137
0138 workflow.username = 'abdc'
0139 workflow.userdn = '/DC=abc/DC=def'
0140
0141
0142
0143 wm = ClientManager(host=host)
0144 request_id = wm.submit(workflow)
0145 print(request_id)