File indexing completed on 2026-04-10 08:39:07
0001 import json
0002 import os.path
0003 import sys
0004 import tempfile
0005 import traceback
0006
0007 import requests
0008 from idds.client.clientmanager import ClientManager
0009 from idds.common.utils import get_rest_host
0010 from pandacommon.pandalogger.LogWrapper import LogWrapper
0011
0012
0013 from pandacommon.pandalogger.PandaLogger import PandaLogger
0014 from ruamel.yaml import YAML
0015
0016 from pandaserver.srvcore.CoreUtils import clean_user_id, commands_get_status_output
0017 from pandaserver.srvcore.MailUtils import MailUtils
0018 from pandaserver.workflow import pcwl_utils, workflow_utils
0019 from pandaserver.workflow.snakeparser import Parser
0020
0021 _logger = PandaLogger().getLogger("workflow_processor")
0022
0023
0024 SUPPORTED_WORKFLOW_LANGUAGES = ["cwl", "snakemake"]
0025
0026
0027
0028 class WorkflowProcessor(object):
0029
0030 def __init__(self, task_buffer=None, log_stream=None):
0031 self.taskBuffer = task_buffer
0032 self.log = _logger
0033
0034
0035 def process(
0036 self,
0037 file_name,
0038 to_delete=False,
0039 test_mode=False,
0040 get_log=False,
0041 dump_workflow=False,
0042 ):
0043 is_fatal = False
0044 request_id = None
0045 dump_str = None
0046 try:
0047 with open(file_name) as f:
0048 ops = json.load(f)
0049 user_name = clean_user_id(ops["userName"])
0050 base_platform = ops["data"].get("base_platform")
0051 for task_type in ops["data"]["taskParams"]:
0052 ops["data"]["taskParams"][task_type]["userName"] = user_name
0053 if base_platform:
0054 ops["data"]["taskParams"][task_type]["basePlatform"] = base_platform
0055 log_token = f"< id=\"{user_name}\" test={test_mode} outDS={ops['data']['outDS']} >"
0056 tmpLog = LogWrapper(self.log, log_token)
0057 tmpLog.info(f"start {file_name}")
0058 sandbox_url = os.path.join(ops["data"]["sourceURL"], "cache", ops["data"]["sandbox"])
0059
0060 ops_file = tempfile.NamedTemporaryFile(delete=False, mode="w")
0061 json.dump(ops, ops_file)
0062 ops_file.close()
0063
0064 tmp_stat, tmp_out = commands_get_status_output(
0065 f"python {__file__} {sandbox_url} '{log_token}' {dump_workflow} {ops_file.name} '{user_name}' {test_mode}"
0066 )
0067 if tmp_stat:
0068 is_OK = False
0069 tmpLog.error(f"main execution failed with {tmp_stat}:{tmp_out}")
0070 else:
0071 with open(tmp_out.split("\n")[-1]) as tmp_out_file:
0072 is_OK, is_fatal, request_id, dump_str = json.load(tmp_out_file)
0073 try:
0074 os.remove(tmp_out)
0075 except Exception:
0076 pass
0077 if is_OK:
0078 tmpLog.info(f"is_OK={is_OK} request_id={request_id}")
0079 else:
0080 tmpLog.info(f"is_OK={is_OK} is_fatal={is_fatal} request_id={request_id}")
0081 if to_delete or (not test_mode and (is_OK or is_fatal)):
0082 if dump_str:
0083 dump_str = dump_str + tmpLog.dumpToString()
0084 else:
0085 dump_str = tmpLog.dumpToString()
0086 tmpLog.debug(f"delete {file_name}")
0087 try:
0088 os.remove(file_name)
0089 except Exception:
0090 pass
0091
0092 if not test_mode and self.taskBuffer is not None:
0093 toAdder = self.taskBuffer.getEmailAddr(user_name)
0094 if toAdder is None or toAdder.startswith("notsend"):
0095 tmpLog.debug("skip to send notification since suppressed")
0096 else:
0097
0098 if is_OK:
0099 mailSubject = f"PANDA Notification for Workflow {ops['data']['outDS']}"
0100 mailBody = f"Hello,\n\nWorkflow:{ops['data']['outDS']} has been accepted with RequestID:{request_id}\n\n"
0101 else:
0102 mailSubject = f"PANDA WARNING for Workflow={ops['data']['outDS']}"
0103 mailBody = "Hello,\n\nWorkflow {} was not accepted\n\n".format(ops["data"]["outDS"], request_id)
0104 mailBody += f"Reason : {dump_str}\n"
0105
0106 tmpSM = MailUtils().send(toAdder, mailSubject, mailBody)
0107 tmpLog.debug(f"sent message with {tmpSM}")
0108 except Exception as e:
0109 is_OK = False
0110 tmpLog.error(f"failed to run with {str(e)} {traceback.format_exc()}")
0111 if get_log:
0112 ret_val = {"status": is_OK, "request_id": request_id}
0113 if is_OK:
0114 ret_val["log"] = dump_str
0115 else:
0116 if dump_str is None:
0117 ret_val["log"] = tmpLog.dumpToString()
0118 else:
0119 ret_val["log"] = dump_str
0120 return ret_val
0121
0122
0123
0124 def core_exec(sandbox_url, log_token, dump_workflow, ops_file, user_name, test_mode):
0125 tmpLog = LogWrapper(_logger, log_token)
0126 is_OK = True
0127 is_fatal = False
0128 request_id = None
0129 if dump_workflow == "True":
0130 dump_workflow = True
0131 else:
0132 dump_workflow = False
0133 if test_mode == "True":
0134 test_mode = True
0135 else:
0136 test_mode = False
0137 try:
0138 with open(ops_file) as f:
0139 ops = json.load(f)
0140 try:
0141 os.remove(ops_file)
0142 except Exception:
0143 pass
0144
0145 cur_dir = os.getcwd()
0146 with tempfile.TemporaryDirectory() as tmp_dirname:
0147 os.chdir(tmp_dirname)
0148
0149 tmpLog.info(f"downloading sandbox from {sandbox_url}")
0150 with requests.get(sandbox_url, allow_redirects=True, verify=False, stream=True) as r:
0151 if r.status_code == 400:
0152 tmpLog.error("not found")
0153 is_fatal = True
0154 is_OK = False
0155 elif r.status_code != 200:
0156 tmpLog.error(f"bad HTTP response {r.status_code}")
0157 is_OK = False
0158
0159 if is_OK:
0160 with open(ops["data"]["sandbox"], "wb") as fs:
0161 for chunk in r.raw.stream(1024, decode_content=False):
0162 if chunk:
0163 fs.write(chunk)
0164 fs.close()
0165 tmp_stat, tmp_out = commands_get_status_output(f"tar xvfz {ops['data']['sandbox']}")
0166 if tmp_stat != 0:
0167 tmpLog.error(tmp_out)
0168 dump_str = f"failed to extract {ops['data']['sandbox']}"
0169 tmpLog.error(dump_str)
0170 is_fatal = True
0171 is_OK = False
0172
0173 if is_OK:
0174 tmpLog.info("parse workflow")
0175 workflow_name = None
0176 if (wf_lang := ops["data"]["language"]) in SUPPORTED_WORKFLOW_LANGUAGES:
0177 if wf_lang == "cwl":
0178 workflow_name = ops["data"].get("workflow_name")
0179 nodes, root_in = pcwl_utils.parse_workflow_file(ops["data"]["workflowSpecFile"], tmpLog)
0180 with open(ops["data"]["workflowInputFile"]) as workflow_input:
0181 yaml = YAML(typ="safe", pure=True)
0182 data = yaml.load(workflow_input)
0183 elif wf_lang == "snakemake":
0184 parser = Parser(ops["data"]["workflowSpecFile"], logger=tmpLog)
0185 nodes, root_in = parser.parse_nodes()
0186 data = dict()
0187
0188 s_id, t_nodes, nodes = workflow_utils.resolve_nodes(nodes, root_in, data, 0, set(), ops["data"]["outDS"], tmpLog)
0189 workflow_utils.set_workflow_outputs(nodes)
0190 id_node_map = workflow_utils.get_node_id_map(nodes)
0191 [node.resolve_params(ops["data"]["taskParams"], id_node_map) for node in nodes]
0192 dump_str = "the description was internally converted as follows\n" + workflow_utils.dump_nodes(nodes)
0193 tmpLog.info(dump_str)
0194 for node in nodes:
0195 s_check, o_check = node.verify()
0196 tmp_str = f"Verification failure in ID:{node.id} {o_check}"
0197 if not s_check:
0198 tmpLog.error(tmp_str)
0199 dump_str += tmp_str
0200 dump_str += "\n"
0201 is_fatal = True
0202 is_OK = False
0203 else:
0204 dump_str = "{} is not supported to describe the workflow"
0205 tmpLog.error(dump_str)
0206 is_fatal = True
0207 is_OK = False
0208
0209 if is_OK:
0210 (
0211 workflow_to_submit,
0212 dump_str_list,
0213 ) = workflow_utils.convert_nodes_to_workflow(nodes, workflow_name=workflow_name)
0214 try:
0215 if workflow_to_submit:
0216 if not test_mode:
0217 tmpLog.info("submit workflow")
0218 wm = ClientManager(host=get_rest_host())
0219 request_id = wm.submit(workflow_to_submit, username=user_name, use_dataset_name=False)
0220 try:
0221 request_id = int(request_id)
0222 except Exception:
0223
0224 is_fatal = True
0225 is_OK = False
0226 else:
0227 dump_str = "workflow is empty"
0228 tmpLog.error(dump_str)
0229 is_fatal = True
0230 is_OK = False
0231 except Exception as e:
0232 dump_str = f"failed to submit the workflow with {str(e)}"
0233 tmpLog.error(f"{dump_str} {traceback.format_exc()}")
0234 if dump_workflow:
0235 tmpLog.debug("\n" + "".join(dump_str_list))
0236 os.chdir(cur_dir)
0237 except Exception as e:
0238 is_OK = False
0239 is_fatal = True
0240 tmpLog.error(f"failed to run with {str(e)} {traceback.format_exc()}")
0241
0242 with tempfile.NamedTemporaryFile(delete=False, mode="w") as tmp_json:
0243 json.dump([is_OK, is_fatal, request_id, tmpLog.dumpToString()], tmp_json)
0244 print(tmp_json.name)
0245 sys.exit(0)
0246
0247
0248 if __name__ == "__main__":
0249 core_exec(*sys.argv[1:])