Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:07

0001 import json
0002 import os.path
0003 import sys
0004 import tempfile
0005 import traceback
0006 
0007 import requests
0008 from idds.client.clientmanager import ClientManager
0009 from idds.common.utils import get_rest_host
0010 from pandacommon.pandalogger.LogWrapper import LogWrapper
0011 
0012 # import PandaLogger before idds modules not to change message levels of other modules
0013 from pandacommon.pandalogger.PandaLogger import PandaLogger
0014 from ruamel.yaml import YAML
0015 
0016 from pandaserver.srvcore.CoreUtils import clean_user_id, commands_get_status_output
0017 from pandaserver.srvcore.MailUtils import MailUtils
0018 from pandaserver.workflow import pcwl_utils, workflow_utils
0019 from pandaserver.workflow.snakeparser import Parser
0020 
0021 _logger = PandaLogger().getLogger("workflow_processor")
0022 
0023 
0024 SUPPORTED_WORKFLOW_LANGUAGES = ["cwl", "snakemake"]
0025 
0026 
0027 # process workflow
0028 class WorkflowProcessor(object):
0029     # constructor
0030     def __init__(self, task_buffer=None, log_stream=None):
0031         self.taskBuffer = task_buffer
0032         self.log = _logger
0033 
0034     # process a file
0035     def process(
0036         self,
0037         file_name,
0038         to_delete=False,
0039         test_mode=False,
0040         get_log=False,
0041         dump_workflow=False,
0042     ):
0043         is_fatal = False
0044         request_id = None
0045         dump_str = None
0046         try:
0047             with open(file_name) as f:
0048                 ops = json.load(f)
0049                 user_name = clean_user_id(ops["userName"])
0050                 base_platform = ops["data"].get("base_platform")
0051                 for task_type in ops["data"]["taskParams"]:
0052                     ops["data"]["taskParams"][task_type]["userName"] = user_name
0053                     if base_platform:
0054                         ops["data"]["taskParams"][task_type]["basePlatform"] = base_platform
0055                 log_token = f"< id=\"{user_name}\" test={test_mode} outDS={ops['data']['outDS']} >"
0056                 tmpLog = LogWrapper(self.log, log_token)
0057                 tmpLog.info(f"start {file_name}")
0058                 sandbox_url = os.path.join(ops["data"]["sourceURL"], "cache", ops["data"]["sandbox"])
0059                 # IO through json files
0060                 ops_file = tempfile.NamedTemporaryFile(delete=False, mode="w")
0061                 json.dump(ops, ops_file)
0062                 ops_file.close()
0063                 # execute main in another process to avoid chdir mess
0064                 tmp_stat, tmp_out = commands_get_status_output(
0065                     f"python {__file__} {sandbox_url} '{log_token}' {dump_workflow} {ops_file.name} '{user_name}' {test_mode}"
0066                 )
0067                 if tmp_stat:
0068                     is_OK = False
0069                     tmpLog.error(f"main execution failed with {tmp_stat}:{tmp_out}")
0070                 else:
0071                     with open(tmp_out.split("\n")[-1]) as tmp_out_file:
0072                         is_OK, is_fatal, request_id, dump_str = json.load(tmp_out_file)
0073                     try:
0074                         os.remove(tmp_out)
0075                     except Exception:
0076                         pass
0077                 if is_OK:
0078                     tmpLog.info(f"is_OK={is_OK} request_id={request_id}")
0079                 else:
0080                     tmpLog.info(f"is_OK={is_OK} is_fatal={is_fatal} request_id={request_id}")
0081                 if to_delete or (not test_mode and (is_OK or is_fatal)):
0082                     if dump_str:
0083                         dump_str = dump_str + tmpLog.dumpToString()
0084                     else:
0085                         dump_str = tmpLog.dumpToString()
0086                     tmpLog.debug(f"delete {file_name}")
0087                     try:
0088                         os.remove(file_name)
0089                     except Exception:
0090                         pass
0091                     # send notification
0092                     if not test_mode and self.taskBuffer is not None:
0093                         toAdder = self.taskBuffer.getEmailAddr(user_name)
0094                         if toAdder is None or toAdder.startswith("notsend"):
0095                             tmpLog.debug("skip to send notification since suppressed")
0096                         else:
0097                             # message
0098                             if is_OK:
0099                                 mailSubject = f"PANDA Notification for Workflow {ops['data']['outDS']}"
0100                                 mailBody = f"Hello,\n\nWorkflow:{ops['data']['outDS']} has been accepted with RequestID:{request_id}\n\n"
0101                             else:
0102                                 mailSubject = f"PANDA WARNING for Workflow={ops['data']['outDS']}"
0103                                 mailBody = "Hello,\n\nWorkflow {} was not accepted\n\n".format(ops["data"]["outDS"], request_id)
0104                                 mailBody += f"Reason : {dump_str}\n"
0105                             # send
0106                             tmpSM = MailUtils().send(toAdder, mailSubject, mailBody)
0107                             tmpLog.debug(f"sent message with {tmpSM}")
0108         except Exception as e:
0109             is_OK = False
0110             tmpLog.error(f"failed to run with {str(e)} {traceback.format_exc()}")
0111         if get_log:
0112             ret_val = {"status": is_OK, "request_id": request_id}
0113             if is_OK:
0114                 ret_val["log"] = dump_str
0115             else:
0116                 if dump_str is None:
0117                     ret_val["log"] = tmpLog.dumpToString()
0118                 else:
0119                     ret_val["log"] = dump_str
0120             return ret_val
0121 
0122 
0123 # execute chdir in another process
0124 def core_exec(sandbox_url, log_token, dump_workflow, ops_file, user_name, test_mode):
0125     tmpLog = LogWrapper(_logger, log_token)
0126     is_OK = True
0127     is_fatal = False
0128     request_id = None
0129     if dump_workflow == "True":
0130         dump_workflow = True
0131     else:
0132         dump_workflow = False
0133     if test_mode == "True":
0134         test_mode = True
0135     else:
0136         test_mode = False
0137     try:
0138         with open(ops_file) as f:
0139             ops = json.load(f)
0140         try:
0141             os.remove(ops_file)
0142         except Exception:
0143             pass
0144         # go to temp dir
0145         cur_dir = os.getcwd()
0146         with tempfile.TemporaryDirectory() as tmp_dirname:
0147             os.chdir(tmp_dirname)
0148             # download sandbox
0149             tmpLog.info(f"downloading sandbox from {sandbox_url}")
0150             with requests.get(sandbox_url, allow_redirects=True, verify=False, stream=True) as r:
0151                 if r.status_code == 400:
0152                     tmpLog.error("not found")
0153                     is_fatal = True
0154                     is_OK = False
0155                 elif r.status_code != 200:
0156                     tmpLog.error(f"bad HTTP response {r.status_code}")
0157                     is_OK = False
0158                 # extract sandbox
0159                 if is_OK:
0160                     with open(ops["data"]["sandbox"], "wb") as fs:
0161                         for chunk in r.raw.stream(1024, decode_content=False):
0162                             if chunk:
0163                                 fs.write(chunk)
0164                         fs.close()
0165                         tmp_stat, tmp_out = commands_get_status_output(f"tar xvfz {ops['data']['sandbox']}")
0166                         if tmp_stat != 0:
0167                             tmpLog.error(tmp_out)
0168                             dump_str = f"failed to extract {ops['data']['sandbox']}"
0169                             tmpLog.error(dump_str)
0170                             is_fatal = True
0171                             is_OK = False
0172                 # parse workflow files
0173                 if is_OK:
0174                     tmpLog.info("parse workflow")
0175                     workflow_name = None
0176                     if (wf_lang := ops["data"]["language"]) in SUPPORTED_WORKFLOW_LANGUAGES:
0177                         if wf_lang == "cwl":
0178                             workflow_name = ops["data"].get("workflow_name")
0179                             nodes, root_in = pcwl_utils.parse_workflow_file(ops["data"]["workflowSpecFile"], tmpLog)
0180                             with open(ops["data"]["workflowInputFile"]) as workflow_input:
0181                                 yaml = YAML(typ="safe", pure=True)
0182                                 data = yaml.load(workflow_input)
0183                         elif wf_lang == "snakemake":
0184                             parser = Parser(ops["data"]["workflowSpecFile"], logger=tmpLog)
0185                             nodes, root_in = parser.parse_nodes()
0186                             data = dict()
0187                         # resolve nodes
0188                         s_id, t_nodes, nodes = workflow_utils.resolve_nodes(nodes, root_in, data, 0, set(), ops["data"]["outDS"], tmpLog)
0189                         workflow_utils.set_workflow_outputs(nodes)
0190                         id_node_map = workflow_utils.get_node_id_map(nodes)
0191                         [node.resolve_params(ops["data"]["taskParams"], id_node_map) for node in nodes]
0192                         dump_str = "the description was internally converted as follows\n" + workflow_utils.dump_nodes(nodes)
0193                         tmpLog.info(dump_str)
0194                         for node in nodes:
0195                             s_check, o_check = node.verify()
0196                             tmp_str = f"Verification failure in ID:{node.id} {o_check}"
0197                             if not s_check:
0198                                 tmpLog.error(tmp_str)
0199                                 dump_str += tmp_str
0200                                 dump_str += "\n"
0201                                 is_fatal = True
0202                                 is_OK = False
0203                     else:
0204                         dump_str = "{} is not supported to describe the workflow"
0205                         tmpLog.error(dump_str)
0206                         is_fatal = True
0207                         is_OK = False
0208                     # convert to workflow
0209                     if is_OK:
0210                         (
0211                             workflow_to_submit,
0212                             dump_str_list,
0213                         ) = workflow_utils.convert_nodes_to_workflow(nodes, workflow_name=workflow_name)
0214                         try:
0215                             if workflow_to_submit:
0216                                 if not test_mode:
0217                                     tmpLog.info("submit workflow")
0218                                     wm = ClientManager(host=get_rest_host())
0219                                     request_id = wm.submit(workflow_to_submit, username=user_name, use_dataset_name=False)
0220                                     try:
0221                                         request_id = int(request_id)
0222                                     except Exception:
0223                                         # wrong request_id
0224                                         is_fatal = True
0225                                         is_OK = False
0226                             else:
0227                                 dump_str = "workflow is empty"
0228                                 tmpLog.error(dump_str)
0229                                 is_fatal = True
0230                                 is_OK = False
0231                         except Exception as e:
0232                             dump_str = f"failed to submit the workflow with {str(e)}"
0233                             tmpLog.error(f"{dump_str} {traceback.format_exc()}")
0234                         if dump_workflow:
0235                             tmpLog.debug("\n" + "".join(dump_str_list))
0236         os.chdir(cur_dir)
0237     except Exception as e:
0238         is_OK = False
0239         is_fatal = True
0240         tmpLog.error(f"failed to run with {str(e)} {traceback.format_exc()}")
0241 
0242     with tempfile.NamedTemporaryFile(delete=False, mode="w") as tmp_json:
0243         json.dump([is_OK, is_fatal, request_id, tmpLog.dumpToString()], tmp_json)
0244         print(tmp_json.name)
0245     sys.exit(0)
0246 
0247 
0248 if __name__ == "__main__":
0249     core_exec(*sys.argv[1:])