File indexing completed on 2026-04-09 07:58:21
0001 import sys
0002 import datetime
0003
0004 from idds.common.utils import json_dumps, setup_logging
0005 from idds.common.constants import ContentStatus, ContentType, ContentRelationType, ContentLocking
0006 from idds.core.requests import get_requests
0007 from idds.core.messages import retrieve_messages
0008 from idds.core.transforms import get_transforms, get_transform
0009 from idds.core.workprogress import get_workprogresses
0010 from idds.core.processings import get_processings
0011 from idds.core import transforms as core_transforms
0012 from idds.orm.contents import get_contents
0013 from idds.core.transforms import release_inputs_by_collection, release_inputs_by_collection_old
0014 from idds.workflowv2.workflow import Workflow
0015 from idds.workflowv2.work import Work
0016
0017
0018 setup_logging(__name__)
0019
0020
0021 def show_works(req):
0022 workflow = req['processing_metadata']['workflow']
0023 print(workflow.independent_works)
0024 print(len(workflow.independent_works))
0025 print(workflow.works_template.keys())
0026 print(len(workflow.works_template.keys()))
0027 print(workflow.work_sequence.keys())
0028 print(len(workflow.work_sequence.keys()))
0029 print(workflow.works.keys())
0030 print(len(workflow.works.keys()))
0031
0032 work_ids = []
0033 for i_id in workflow.works:
0034 work = workflow.works[i_id]
0035 print(i_id)
0036 print(work.work_name)
0037 print(work.task_name)
0038 print(work.work_id)
0039 work_ids.append(work.work_id)
0040 print(work_ids)
0041
0042
0043 def print_workflow(workflow, layers=0):
0044 prefix = " " * layers * 4
0045 for run in workflow.runs:
0046 print(prefix + "run: " + str(run) + ", has_loop_condition: " + str(workflow.runs[run].has_loop_condition()))
0047
0048
0049 for work_id in workflow.runs[run].works:
0050 print(prefix + " " + str(work_id) + " " + str(type(workflow.runs[run].works[work_id])))
0051 if type(workflow.runs[run].works[work_id]) in [Workflow]:
0052 print(prefix + " parent_num_run: " + workflow.runs[run].works[work_id].parent_num_run + ", num_run: " + str(workflow.runs[run].works[work_id].num_run))
0053 print_workflow(workflow.runs[run].works[work_id], layers=layers + 1)
0054
0055
0056
0057 else:
0058 work = workflow.runs[run].works[work_id]
0059 tf = get_transform(transform_id=work.get_work_id())
0060 if tf:
0061 transform_work = tf['transform_metadata']['work']
0062
0063 work.sync_work_data(status=tf['status'], substatus=tf['substatus'], work=transform_work, workload_id=tf['workload_id'])
0064
0065 print(prefix + " or: " + str(work.or_custom_conditions) + " and: " + str(work.and_custom_conditions))
0066 print(prefix + " output: " + str(work.output_data))
0067 print(prefix + " " + workflow.runs[run].works[work_id].task_name + ", num_run: " + str(workflow.runs[run].works[work_id].num_run))
0068 print(prefix + " workload_id: " + str(work.workload_id))
0069 print(prefix + " is_terminated: " + str(workflow.runs[run].works[work_id].is_terminated()))
0070 print(prefix + " is_finished: " + str(workflow.runs[run].works[work_id].is_finished()))
0071 if workflow.runs[run].has_loop_condition():
0072 print(prefix + " Loop condition status: %s" % workflow.runs[run].get_loop_condition_status())
0073 print(prefix + " Loop condition: %s" % json_dumps(workflow.runs[run].loop_condition, sort_keys=True, indent=4))
0074
0075
0076 def print_workflow_template(workflow, layers=0):
0077 prefix = " " * layers * 4
0078 print(prefix + str(workflow.template.internal_id) + ", has_loop_condition: " + str(workflow.template.has_loop_condition()))
0079 for work_id in workflow.template.works:
0080 print(prefix + " " + str(work_id) + " " + str(type(workflow.template.works[work_id])))
0081 if type(workflow.template.works[work_id]) in [Workflow]:
0082 print(prefix + " parent_num_run: " + str(workflow.template.works[work_id].parent_num_run) + ", num_run: " + str(workflow.template.works[work_id].num_run))
0083 print_workflow_template(workflow.template.works[work_id], layers=layers + 1)
0084 else:
0085 print(prefix + " " + workflow.template.works[work_id].task_name + ", num_run: " + str(workflow.template.works[work_id].num_run))
0086
0087
0088
0089
0090
0091
0092
0093
0094
0095
0096
0097
0098
0099
0100
0101
0102
0103
0104
0105
0106
0107
0108 reqs = get_requests(request_id=545851, with_request=True, with_detail=False, with_metadata=True)
0109 for req in reqs:
0110
0111
0112
0113 print(json_dumps(req, sort_keys=True, indent=4))
0114
0115 pass
0116 if 'build_workflow' in req['request_metadata']:
0117 workflow = req['request_metadata']['build_workflow']
0118
0119 print(workflow.runs.keys())
0120
0121 print(json_dumps(workflow.runs["1"], sort_keys=True, indent=4))
0122 elif 'workflow' in req['request_metadata']:
0123 workflow = req['request_metadata']['workflow']
0124
0125 print(workflow.runs.keys())
0126
0127
0128
0129
0130
0131
0132
0133
0134
0135
0136
0137
0138 if hasattr(workflow, 'get_relation_map'):
0139
0140 pass
0141
0142 print("workflow")
0143 print_workflow(workflow)
0144 new_works = workflow.get_new_works()
0145 print('new_works:' + str(new_works))
0146 all_works = workflow.get_all_works()
0147 print('all_works:' + str(all_works))
0148 for work in all_works:
0149 print("work %s signature: %s" % (work.get_work_id(), work.signature))
0150
0151
0152 print_workflow_template(workflow)
0153
0154
0155
0156 print("workflow template")
0157 print(json_dumps(workflow.template, sort_keys=True, indent=4))