File indexing completed on 2026-04-09 07:58:21
0001 import sys
0002 import datetime
0003
0004 from idds.common.utils import json_dumps, setup_logging
0005 from idds.common.constants import ContentStatus, ContentType, ContentRelationType, ContentLocking
0006 from idds.core.requests import get_requests
0007 from idds.core.messages import retrieve_messages
0008 from idds.core.transforms import get_transforms, get_transform
0009 from idds.core.workprogress import get_workprogresses
0010 from idds.core.processings import get_processings
0011 from idds.core import transforms as core_transforms
0012 from idds.orm.contents import get_contents
0013 from idds.core.transforms import release_inputs_by_collection, release_inputs_by_collection_old
0014 from idds.workflowv2.workflow import Workflow
0015 from idds.workflowv2.work import Work
0016
0017
0018 setup_logging(__name__)
0019
0020
0021 def release_inputs_test():
0022 to_release_inputs = {3498: [{'map_id': 1, 'status': ContentStatus.Available, 'retries': 0, 'scope': 'pseudo_dataset',
0023 'substatus': ContentStatus.Available, 'path': None,
0024 'name': 'u_jchiang_dark_12781_panda_20210712T222923Z.qgraph+3_isr_3020111900038_94+qgraphNodeId:3+qgraphId:1626129062.5744567-119392',
0025 'content_id': 2248918, 'min_id': 0, 'bytes': 1, 'coll_id': 3498, 'max_id': 1, 'md5': None,
0026 'request_id': 93, 'content_type': ContentType.File, 'adler32': '12345678',
0027 'workload_id': 1626129080, 'content_relation_type': ContentRelationType.Output,
0028 'processing_id': None, 'content_metadata': {'events': 1, 'panda_id': 1411522}, 'transform_id': 1749, 'storage_id': None},
0029 {'map_id': 2, 'status': ContentStatus.Available, 'retries': 0, 'scope': 'pseudo_dataset',
0030 'substatus': ContentStatus.Available, 'path': None,
0031 'name': 'u_jchiang_dark_12781_panda_20210712T222923Z.qgraph+2_isr_3020111900032_94+qgraphNodeId:2+qgraphId:1626129062.5744567-119392',
0032 'content_id': 2248919, 'min_id': 0, 'bytes': 1, 'coll_id': 3498, 'max_id': 1, 'md5': None,
0033 'request_id': 93, 'content_type': ContentType.File, 'adler32': '12345678',
0034 'workload_id': 1626129080, 'content_relation_type': ContentRelationType.Output,
0035 'processing_id': None, 'content_metadata': {'events': 1, 'panda_id': 1411523}, 'transform_id': 1749, 'storage_id': None},
0036 {'map_id': 3, 'status': ContentStatus.Available, 'retries': 0, 'scope': 'pseudo_dataset',
0037 'substatus': ContentStatus.Available, 'path': None,
0038 'name': 'u_jchiang_dark_12781_panda_20210712T222923Z.qgraph+4_isr_3020111900040_94+qgraphNodeId:4+qgraphId:1626129062.5744567-119392',
0039 'content_id': 2248920, 'min_id': 0, 'bytes': 1, 'coll_id': 3498, 'max_id': 1, 'md5': None,
0040 'request_id': 93, 'content_type': ContentType.File, 'adler32': '12345678',
0041 'workload_id': 1626129080, 'content_relation_type': ContentRelationType.Output,
0042 'processing_id': None, 'content_metadata': {'events': 1, 'panda_id': 1411524}, 'transform_id': 1749, 'storage_id': None},
0043 {'map_id': 4, 'status': ContentStatus.Available, 'retries': 0, 'scope': 'pseudo_dataset',
0044 'substatus': ContentStatus.Available, 'path': None,
0045 'name': 'u_jchiang_dark_12781_panda_20210712T222923Z.qgraph+1_isr_3020111900036_94+qgraphNodeId:1+qgraphId:1626129062.5744567-119392',
0046 'content_id': 2248921, 'min_id': 0, 'bytes': 1, 'coll_id': 3498, 'max_id': 1, 'md5': None,
0047 'request_id': 93, 'content_type': ContentType.File, 'adler32': '12345678',
0048 'workload_id': 1626129080, 'content_relation_type': ContentRelationType.Output,
0049 'processing_id': None, 'content_metadata': {'events': 1, 'panda_id': 1411525}, 'transform_id': 1749, 'storage_id': None},
0050 {'map_id': 5, 'status': ContentStatus.Available, 'retries': 0, 'scope': 'pseudo_dataset',
0051 'substatus': ContentStatus.Available, 'path': None,
0052 'name': 'u_jchiang_dark_12781_panda_20210712T222923Z.qgraph+0_isr_3020111900034_94+qgraphNodeId:0+qgraphId:1626129062.5744567-119392',
0053 'content_id': 2248922, 'min_id': 0, 'bytes': 1, 'coll_id': 3498, 'max_id': 1, 'md5': None,
0054 'request_id': 93, 'content_type': ContentType.File, 'adler32': '12345678',
0055 'workload_id': 1626129080, 'content_relation_type': ContentRelationType.Output,
0056 'processing_id': None, 'content_metadata': {'events': 1, 'panda_id': 1411526}, 'transform_id': 1749, 'storage_id': None}
0057 ]}
0058
0059 to_release_inputs = {4042: [{'map_id': 1, 'status': ContentStatus.Available, 'retries': 0, 'scope': 'pseudo_dataset', 'substatus': ContentStatus.Available, 'path': None, 'name': 'u_huanlin_panda_test_ci_imsim_w26_20210714T214732Z.qgraph+13_isr_257768_161+1626299263.3909254-24148+13', 'locking': ContentLocking.Idle, 'created_at': datetime.datetime(2021, 7, 14, 21, 48, 10), 'content_id': 2254913, 'min_id': 0, 'bytes': 1, 'updated_at': datetime.datetime(2021, 7, 14, 22, 8, 30), 'coll_id': 4042, 'max_id': 1, 'md5': None, 'accessed_at': datetime.datetime(2021, 7, 14, 22, 8, 30), 'request_id': 107, 'content_type': ContentType.File, 'adler32': '12345678', 'expired_at': datetime.datetime(2021, 8, 13, 21, 48, 10), 'workload_id': 1626299273, 'content_relation_type': ContentRelationType.Output, 'processing_id': None, 'content_metadata': {'events': 1, 'panda_id': 1412272}, 'transform_id': 2021, 'storage_id': None},
0060 {'map_id': 2, 'status': ContentStatus.Available, 'retries': 0, 'scope': 'pseudo_dataset', 'substatus': ContentStatus.Available, 'path': None, 'name': 'u_huanlin_panda_test_ci_imsim_w26_20210714T214732Z.qgraph+2_isr_212071_54+1626299263.3909254-24148+2', 'locking': ContentLocking.Idle, 'created_at': datetime.datetime(2021, 7, 14, 21, 48, 10), 'content_id': 2254914, 'min_id': 0, 'bytes': 1, 'updated_at': datetime.datetime(2021, 7, 14, 22, 8, 30), 'coll_id': 4042, 'max_id': 1, 'md5': None, 'accessed_at': datetime.datetime(2021, 7, 14, 22, 8, 30), 'request_id': 107, 'content_type': ContentType.File, 'adler32': '12345678', 'expired_at': datetime.datetime(2021, 8, 13, 21, 48, 10), 'workload_id': 1626299273, 'content_relation_type': ContentRelationType.Output, 'processing_id': None, 'content_metadata': {'events': 1, 'panda_id': 1412273}, 'transform_id': 2021, 'storage_id': None},
0061 {'map_id': 3, 'status': ContentStatus.Available, 'retries': 0, 'scope': 'pseudo_dataset', 'substatus': ContentStatus.Available, 'path': None, 'name': 'u_huanlin_panda_test_ci_imsim_w26_20210714T214732Z.qgraph+10_isr_456716_99+1626299263.3909254-24148+10', 'locking': ContentLocking.Idle, 'created_at': datetime.datetime(2021, 7, 14, 21, 48, 10), 'content_id': 2254915, 'min_id': 0, 'bytes': 1, 'updated_at': datetime.datetime(2021, 7, 14, 22, 8, 30), 'coll_id': 4042, 'max_id': 1, 'md5': None, 'accessed_at': datetime.datetime(2021, 7, 14, 22, 8, 30), 'request_id': 107, 'content_type': ContentType.File, 'adler32': '12345678', 'expired_at': datetime.datetime(2021, 8, 13, 21, 48, 10), 'workload_id': 1626299273, 'content_relation_type': ContentRelationType.Output, 'processing_id': None, 'content_metadata': {'events': 1, 'panda_id': 1412274}, 'transform_id': 2021, 'storage_id': None},
0062 {'map_id': 4, 'status': ContentStatus.Available, 'retries': 0, 'scope': 'pseudo_dataset', 'substatus': ContentStatus.Available, 'path': None, 'name': 'u_huanlin_panda_test_ci_imsim_w26_20210714T214732Z.qgraph+34_isr_407919_130+1626299263.3909254-24148+34', 'locking': ContentLocking.Idle, 'created_at': datetime.datetime(2021, 7, 14, 21, 48, 10), 'content_id': 2254916, 'min_id': 0, 'bytes': 1, 'updated_at': datetime.datetime(2021, 7, 14, 22, 8, 30), 'coll_id': 4042, 'max_id': 1, 'md5': None, 'accessed_at': datetime.datetime(2021, 7, 14, 22, 8, 30), 'request_id': 107, 'content_type': ContentType.File, 'adler32': '12345678', 'expired_at': datetime.datetime(2021, 8, 13, 21, 48, 10), 'workload_id': 1626299273, 'content_relation_type': ContentRelationType.Output, 'processing_id': None, 'content_metadata': {'events': 1, 'panda_id': 1412275}, 'transform_id': 2021, 'storage_id': None},
0063 {'map_id': 5, 'status': ContentStatus.Available, 'retries': 0, 'scope': 'pseudo_dataset', 'substatus': ContentStatus.Available, 'path': None, 'name': 'u_huanlin_panda_test_ci_imsim_w26_20210714T214732Z.qgraph+23_isr_254379_48+1626299263.3909254-24148+23', 'locking': ContentLocking.Idle, 'created_at': datetime.datetime(2021, 7, 14, 21, 48, 10), 'content_id': 2254917, 'min_id': 0, 'bytes': 1, 'updated_at': datetime.datetime(2021, 7, 14, 22, 8, 30), 'coll_id': 4042, 'max_id': 1, 'md5': None, 'accessed_at': datetime.datetime(2021, 7, 14, 22, 8, 30), 'request_id': 107, 'content_type': ContentType.File, 'adler32': '12345678', 'expired_at': datetime.datetime(2021, 8, 13, 21, 48, 10), 'workload_id': 1626299273, 'content_relation_type': ContentRelationType.Output, 'processing_id': None, 'content_metadata': {'events': 1, 'panda_id': 1412276}, 'transform_id': 2021, 'storage_id': None},
0064 {'map_id': 6, 'status': ContentStatus.Available, 'retries': 0, 'scope': 'pseudo_dataset', 'substatus': ContentStatus.Available, 'path': None, 'name': 'u_huanlin_panda_test_ci_imsim_w26_20210714T214732Z.qgraph+11_isr_37657_141+1626299263.3909254-24148+11', 'locking': ContentLocking.Idle, 'created_at': datetime.datetime(2021, 7, 14, 21, 48, 10), 'content_id': 2254918, 'min_id': 0, 'bytes': 1, 'updated_at': datetime.datetime(2021, 7, 14, 22, 8, 30), 'coll_id': 4042, 'max_id': 1, 'md5': None, 'accessed_at': datetime.datetime(2021, 7, 14, 22, 8, 30), 'request_id': 107, 'content_type': ContentType.File, 'adler32': '12345678', 'expired_at': datetime.datetime(2021, 8, 13, 21, 48, 10), 'workload_id': 1626299273, 'content_relation_type': ContentRelationType.Output, 'processing_id': None, 'content_metadata': {'events': 1, 'panda_id': 1412277}, 'transform_id': 2021, 'storage_id': None},
0065 {'map_id': 7, 'status': ContentStatus.Available, 'retries': 0, 'scope': 'pseudo_dataset', 'substatus': ContentStatus.Available, 'path': None, 'name': 'u_huanlin_panda_test_ci_imsim_w26_20210714T214732Z.qgraph+31_isr_226983_36+1626299263.3909254-24148+31', 'locking': ContentLocking.Idle, 'created_at': datetime.datetime(2021, 7, 14, 21, 48, 10), 'content_id': 2254919, 'min_id': 0, 'bytes': 1, 'updated_at': datetime.datetime(2021, 7, 14, 22, 8, 30), 'coll_id': 4042, 'max_id': 1, 'md5': None, 'accessed_at': datetime.datetime(2021, 7, 14, 22, 8, 30), 'request_id': 107, 'content_type': ContentType.File, 'adler32': '12345678', 'expired_at': datetime.datetime(2021, 8, 13, 21, 48, 10), 'workload_id': 1626299273, 'content_relation_type': ContentRelationType.Output, 'processing_id': None, 'content_metadata': {'events': 1, 'panda_id': 1412278}, 'transform_id': 2021, 'storage_id': None}]}
0066
0067 for coll_id in to_release_inputs:
0068 contents = get_contents(request_id=to_release_inputs[coll_id][0]['request_id'],
0069 coll_id=coll_id,
0070 name=None)
0071 print(len(contents))
0072 in_dep_contents = []
0073 for content in contents:
0074 if (content['content_relation_type'] == ContentRelationType.InputDependency):
0075 in_dep_contents.append(content)
0076 print(len(in_dep_contents))
0077
0078 update_contents = release_inputs_by_collection(to_release_inputs)
0079 print(update_contents)
0080
0081 update_contents = release_inputs_by_collection(to_release_inputs, final=True)
0082 print(update_contents)
0083
0084
0085
0086
0087
0088 def show_works(req):
0089 workflow = req['processing_metadata']['workflow']
0090 print(workflow.independent_works)
0091 print(len(workflow.independent_works))
0092 print(workflow.works_template.keys())
0093 print(len(workflow.works_template.keys()))
0094 print(workflow.work_sequence.keys())
0095 print(len(workflow.work_sequence.keys()))
0096 print(workflow.works.keys())
0097 print(len(workflow.works.keys()))
0098
0099 work_ids = []
0100 for i_id in workflow.works:
0101 work = workflow.works[i_id]
0102 print(i_id)
0103 print(work.work_name)
0104 print(work.task_name)
0105 print(work.work_id)
0106 work_ids.append(work.work_id)
0107 print(work_ids)
0108
0109
0110 def print_workflow(workflow, layers=0):
0111 prefix = " " * layers * 4
0112 for run in workflow.runs:
0113 print(prefix + "run: " + str(run) + ", has_loop_condition: " + str(workflow.runs[run].has_loop_condition()))
0114
0115
0116 for work_id in workflow.runs[run].works:
0117 print(prefix + " " + str(work_id) + " " + str(type(workflow.runs[run].works[work_id])))
0118 if type(workflow.runs[run].works[work_id]) in [Workflow]:
0119 print(prefix + " parent_num_run: " + workflow.runs[run].works[work_id].parent_num_run + ", num_run: " + str(workflow.runs[run].works[work_id].num_run))
0120 print_workflow(workflow.runs[run].works[work_id], layers=layers + 1)
0121
0122
0123
0124 else:
0125 work = workflow.runs[run].works[work_id]
0126 tf = get_transform(transform_id=work.get_work_id())
0127 if tf:
0128 transform_work = tf['transform_metadata']['work']
0129
0130 work.sync_work_data(status=tf['status'], substatus=tf['substatus'], work=transform_work, workload_id=tf['workload_id'])
0131
0132 print(prefix + " or: " + str(work.or_custom_conditions) + " and: " + str(work.and_custom_conditions))
0133 print(prefix + " output: " + str(work.output_data))
0134 print(prefix + " " + workflow.runs[run].works[work_id].task_name + ", num_run: " + str(workflow.runs[run].works[work_id].num_run))
0135 print(prefix + " workload_id: " + str(work.workload_id))
0136 print(prefix + " is_terminated: " + str(workflow.runs[run].works[work_id].is_terminated()))
0137 print(prefix + " is_finished: " + str(workflow.runs[run].works[work_id].is_finished()))
0138 if workflow.runs[run].has_loop_condition():
0139 print(prefix + " Loop condition status: %s" % workflow.runs[run].get_loop_condition_status())
0140 print(prefix + " Loop condition: %s" % json_dumps(workflow.runs[run].loop_condition, sort_keys=True, indent=4))
0141
0142
0143 def print_workflow_template(workflow, layers=0):
0144 prefix = " " * layers * 4
0145 print(prefix + str(workflow.template.internal_id) + ", has_loop_condition: " + str(workflow.template.has_loop_condition()))
0146 for work_id in workflow.template.works:
0147 print(prefix + " " + str(work_id) + " " + str(type(workflow.template.works[work_id])))
0148 if type(workflow.template.works[work_id]) in [Workflow]:
0149 print(prefix + " parent_num_run: " + str(workflow.template.works[work_id].parent_num_run) + ", num_run: " + str(workflow.template.works[work_id].num_run))
0150 print_workflow_template(workflow.template.works[work_id], layers=layers + 1)
0151 else:
0152 print(prefix + " " + workflow.template.works[work_id].task_name + ", num_run: " + str(workflow.template.works[work_id].num_run))
0153
0154
0155
0156
0157
0158
0159
0160
0161
0162
0163
0164
0165
0166
0167
0168
0169
0170
0171
0172
0173
0174
0175 reqs = get_requests(request_id=479187, with_request=True, with_detail=False, with_metadata=True)
0176 reqs = get_requests(request_id=4498, with_request=True, with_detail=False, with_metadata=True)
0177 reqs = get_requests(request_id=3244, with_request=True, with_detail=False, with_metadata=True)
0178 reqs = get_requests(request_id=6082, with_request=True, with_detail=False, with_metadata=True)
0179
0180 reqs = get_requests(request_id=13798, with_request=True, with_detail=False, with_metadata=True)
0181 for req in reqs:
0182
0183
0184
0185
0186
0187 pass
0188 if 'build_workflow' in req['request_metadata']:
0189 workflow = req['request_metadata']['build_workflow']
0190
0191 print(workflow.runs.keys())
0192
0193 print(json_dumps(workflow.runs["1"], sort_keys=True, indent=4))
0194 elif 'workflow' in req['request_metadata']:
0195 workflow = req['request_metadata']['workflow']
0196
0197 print(workflow.runs.keys())
0198
0199
0200
0201
0202
0203
0204
0205
0206
0207
0208
0209
0210 if hasattr(workflow, 'get_relation_map'):
0211
0212 pass
0213
0214 print("workflow")
0215 print_workflow(workflow)
0216 new_works = workflow.get_new_works()
0217 print('new_works:' + str(new_works))
0218 all_works = workflow.get_all_works()
0219 print('all_works:' + str(all_works))
0220 for work in all_works:
0221 print("work %s signature: %s" % (work.get_work_id(), work.signature))
0222
0223
0224 print_workflow_template(workflow)
0225
0226
0227
0228 print("workflow template")
0229 print(json_dumps(workflow.template, sort_keys=True, indent=4))
0230
0231
0232 sys.exit(0)
0233
0234 """
0235 reqs = get_requests(request_id=28182323, with_request=False, with_detail=True, with_metadata=False)
0236 for req in reqs:
0237 print(json_dumps(req, sort_keys=True, indent=4))
0238
0239 # sys.exit(0)
0240 """
0241
0242 """
0243 # reqs = get_requests()
0244 # print(len(reqs))
0245 for req in reqs:
0246 if req['request_id'] == 113:
0247 # print(req)
0248 # print(req['request_metadata']['workflow'].to_dict())
0249 # print(json_dumps(req, sort_keys=True, indent=4))
0250 pass
0251
0252 sys.exit(0)
0253
0254 """
0255
0256 """
0257 tfs = get_transforms(request_id=470)
0258 # tfs = get_transforms(transform_id=350723)
0259 for tf in tfs:
0260 # print(tf)
0261 # print(tf['transform_metadata']['work'].to_dict())
0262 # print(tf)
0263 print(json_dumps(tf, sort_keys=True, indent=4))
0264 print(tf['request_id'], tf['workload_id'])
0265 print(tf['transform_metadata']['work_name'])
0266 print(tf['transform_metadata']['work'].num_run)
0267 print(tf['transform_metadata']['work'].task_name)
0268 print(tf['transform_metadata']['work'].output_data)
0269 pass
0270
0271 sys.exit(0)
0272 """
0273
0274 """
0275 msgs = retrieve_messages(workload_id=25972557)
0276 number_contents = 0
0277 for msg in msgs:
0278 # if msg['msg_id'] in [323720]:
0279 # if True:
0280 # if msg['request_id'] in [208]:
0281 print(json_dumps(msg['msg_content'], sort_keys=True, indent=4))
0282 if msg['msg_content']['msg_type'] == 'file_stagein' and msg['msg_content']['relation_type'] == 'output':
0283 # number_contents += len(msg['msg_content']['files'])
0284 for i_file in msg['msg_content']['files']:
0285 if i_file['status'] == 'Available':
0286 number_contents += 1
0287 pass
0288 print(number_contents)
0289
0290 sys.exit(0)
0291 """
0292
0293 prs = get_processings(request_id=4615)
0294
0295 i = 0
0296 for pr in prs:
0297
0298 print("processing_number: %s" % i)
0299 i += 1
0300 print(json_dumps(pr, sort_keys=True, indent=4))
0301 pass
0302
0303 sys.exit(0)
0304
0305 to_release_inputs = [{'request_id': 248,
0306 'coll_id': 3425,
0307 'name': 'shared_pipecheck_20210407T110240Z.qgraph',
0308 'status': ContentStatus.Available,
0309 'substatus': ContentStatus.Available}]
0310
0311