File indexing completed on 2026-04-10 08:38:59
0001 import copy
0002
0003 from pandajedi.jedicore import JediException
0004 from pandajedi.jedirefine import RefinerUtils
0005
0006 try:
0007 import idds.common.constants
0008 import idds.common.utils
0009 from idds.client.client import Client as iDDS_Client
0010 except ImportError:
0011 pass
0012
0013
0014
0015 def send_notification(taskBufferIF, ddmIF, taskSpec, tmpLog):
0016
0017 try:
0018 taskParam = taskBufferIF.getTaskParamsWithID_JEDI(taskSpec.jediTaskID)
0019 taskParamMap = RefinerUtils.decodeJSON(taskParam)
0020 except Exception as e:
0021 errStr = f"task param conversion from json failed with {str(e)}"
0022 raise JediException.ExternalTempError(errStr)
0023 if "outputPostProcessing" in taskParamMap and "system" in taskParamMap["outputPostProcessing"]:
0024 if taskParamMap["outputPostProcessing"]["system"] == "idds":
0025 try:
0026 c = iDDS_Client(idds.common.utils.get_rest_host())
0027 if taskParamMap["outputPostProcessing"]["type"] == "active_learning":
0028 for datasetSpec in taskSpec.datasetSpecList:
0029 if datasetSpec.type != "output":
0030 continue
0031 data = copy.copy(taskParamMap["outputPostProcessing"]["data"])
0032 tmp_scope, tmp_name = ddmIF.extract_scope(datasetSpec.datasetName)
0033 data["workload_id"] = taskSpec.jediTaskID
0034 req = {
0035 "scope": tmp_scope,
0036 "name": tmp_name,
0037 "requester": "panda",
0038 "request_type": idds.common.constants.RequestType.ActiveLearning,
0039 "transform_tag": idds.common.constants.RequestType.ActiveLearning.value,
0040 "status": idds.common.constants.RequestStatus.New,
0041 "priority": 0,
0042 "lifetime": 30,
0043 "request_metadata": data,
0044 }
0045 tmpLog.debug(f"req {str(req)}")
0046 ret = c.add_request(**req)
0047 tmpLog.debug(f"got requestID={str(ret)}")
0048 except Exception as e:
0049 errStr = f"iDDS failed with {str(e)}"
0050 raise JediException.ExternalTempError(errStr)