File indexing completed on 2026-04-09 07:58:21
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 """
0012 REST endpoints for workflow task management.
0013
0014 POST /workflow_task
0015 Create a workflow task.
0016 Body: {"workflow": <workflow-dict>}
0017
0018 PUT /workflow_task/<request_id>/<transform_id>/<workload_id>/adjust
0019 Adjust worker resource parameters.
0020 Body: {"parameters": {"core_count": .., "memory_per_core": .., "site": ..}}
0021
0022 PUT /workflow_task/<request_id>/close
0023 PUT /workflow_task/<request_id>/<workload_id>/close (legacy)
0024 Close a workflow task.
0025 Body: {"parameters": <content-dict>}
0026 """
0027
0028 from traceback import format_exc
0029
0030 from flask import Blueprint
0031
0032 from idds.common import exceptions
0033 from idds.common.constants import HTTP_STATUS_CODE
0034 from idds.common.utils import json_loads, setup_logging
0035 from idds.rest.v1.controller import IDDSController
0036
0037 from idds.prompt.handlers.workflowtaskhandler import (
0038 create_workflow_task as _create_workflow_task,
0039 adjust_worker as _adjust_worker,
0040 close_workflow_task as _close_workflow_task,
0041 )
0042
0043
0044 setup_logging(__name__)
0045
0046
0047 class WorkflowTaskCreate(IDDSController):
0048 """
0049 Create a workflow task.
0050
0051 POST /workflow_task
0052
0053 Body (new format matching the spec's ``idds_create_workflow_task`` API):
0054 {
0055 "workflow": {
0056 "scope": "<scope>",
0057 "name": "<name>",
0058 "requester": "<requester>",
0059 "username": "<username>",
0060 "transform_tag": "<tag>",
0061 "cloud": "<cloud>",
0062 "campaign": "<campaign>",
0063 "campaign_scope": "<campaign_scope>",
0064 "campaign_group": "<campaign_group>",
0065 "campaign_tag": "<campaign_tag>",
0066 "content": {
0067 "run_id": "<run_id>",
0068 "core_count": <int>,
0069 "memory_per_core": <float>,
0070 "site": "<site>",
0071 "panda_attributes": { ... },
0072 ...
0073 }
0074 }
0075 }
0076
0077 Returns:
0078 {
0079 "request_id": <int>,
0080 "transform_id": <int>,
0081 "processing_id": <int>,
0082 "input_coll_id": <int>,
0083 "output_coll_id": <int>,
0084 "workload_id": <int|null>
0085 }
0086 """
0087
0088 def post(self):
0089 """Create workflow task."""
0090 logger = self.get_logger()
0091 try:
0092 body = self.get_request().data and json_loads(self.get_request().data)
0093 if not body:
0094 raise exceptions.BadRequest("Request body must not be empty")
0095
0096 workflow = body.get('workflow')
0097 if not workflow:
0098 raise exceptions.BadRequest("'workflow' key is required in the request body")
0099
0100 except exceptions.BadRequest as error:
0101 return self.generate_http_response(
0102 HTTP_STATUS_CODE.BadRequest,
0103 exc_cls=error.__class__.__name__,
0104 exc_msg=str(error),
0105 )
0106 except Exception as error:
0107 logger.error(error)
0108 logger.error(format_exc())
0109 return self.generate_http_response(
0110 HTTP_STATUS_CODE.BadRequest,
0111 exc_cls=exceptions.BadRequest.__name__,
0112 exc_msg="Cannot decode JSON parameter dictionary",
0113 )
0114
0115 try:
0116 (
0117 request_id,
0118 transform_id,
0119 processing_id,
0120 input_coll_id,
0121 output_coll_id,
0122 workload_id,
0123 ) = _create_workflow_task(workflow, logger=logger)
0124
0125 except exceptions.IDDSException as error:
0126 logger.error(error)
0127 logger.error(format_exc())
0128 return self.generate_http_response(
0129 HTTP_STATUS_CODE.InternalError,
0130 exc_cls=error.__class__.__name__,
0131 exc_msg=str(error),
0132 )
0133 except Exception as error:
0134 logger.error(error)
0135 logger.error(format_exc())
0136 return self.generate_http_response(
0137 HTTP_STATUS_CODE.InternalError,
0138 exc_cls=exceptions.CoreException.__name__,
0139 exc_msg=str(error),
0140 )
0141
0142 return self.generate_http_response(
0143 HTTP_STATUS_CODE.OK,
0144 data={
0145 'request_id': request_id,
0146 'transform_id': transform_id,
0147 'processing_id': processing_id,
0148 'input_coll_id': input_coll_id,
0149 'output_coll_id': output_coll_id,
0150 'workload_id': workload_id,
0151 },
0152 )
0153
0154
0155 class WorkflowTaskAdjust(IDDSController):
0156 """
0157 Adjust worker resource parameters for an existing workflow task.
0158
0159 PUT /workflow_task/<request_id>/<transform_id>/<workload_id>/adjust
0160
0161 Body (matching the spec's ``idds_adjust_worker`` API):
0162 {
0163 "parameters": {
0164 "core_count": <int>,
0165 "memory_per_core": <float>,
0166 "site": "<site>",
0167 "content": { ... }
0168 }
0169 }
0170
0171 Returns: {"status": 0, "message": "adjusted successfully"}
0172 """
0173
0174 def put(self, request_id, transform_id=None, workload_id=None):
0175 """Adjust worker parameters."""
0176 logger = self.get_logger()
0177
0178 if request_id == 'null':
0179 request_id = None
0180 if transform_id == 'null':
0181 transform_id = None
0182 if workload_id == 'null':
0183 workload_id = None
0184
0185 try:
0186 body = {}
0187 if self.get_request().data:
0188 body = json_loads(self.get_request().data) or {}
0189 parameters = body.get('parameters', {})
0190 except Exception as error:
0191 logger.error(error)
0192 logger.error(format_exc())
0193 return self.generate_http_response(
0194 HTTP_STATUS_CODE.BadRequest,
0195 exc_cls=exceptions.BadRequest.__name__,
0196 exc_msg="Cannot decode JSON parameter dictionary",
0197 )
0198
0199 try:
0200 _adjust_worker(request_id, transform_id, workload_id, parameters, logger=logger)
0201 except exceptions.IDDSException as error:
0202 logger.error(error)
0203 logger.error(format_exc())
0204 return self.generate_http_response(
0205 HTTP_STATUS_CODE.InternalError,
0206 exc_cls=error.__class__.__name__,
0207 exc_msg=str(error),
0208 )
0209 except Exception as error:
0210 logger.error(error)
0211 logger.error(format_exc())
0212 return self.generate_http_response(
0213 HTTP_STATUS_CODE.InternalError,
0214 exc_cls=exceptions.CoreException.__name__,
0215 exc_msg=str(error),
0216 )
0217
0218 return self.generate_http_response(
0219 HTTP_STATUS_CODE.OK,
0220 data={'status': 0, 'message': 'adjusted successfully'},
0221 )
0222
0223
0224 class WorkflowTaskClose(IDDSController):
0225 """
0226 Close a workflow task.
0227
0228 PUT /workflow_task/<request_id>/close
0229 PUT /workflow_task/<request_id>/<workload_id>/close (legacy)
0230
0231 Body (matching the spec's ``idds_close_workflow_task`` API):
0232 {
0233 "parameters": {
0234 "request_id": <int>,
0235 "transform_id": <int>,
0236 "workload_id": <int>,
0237 ...
0238 }
0239 }
0240
0241 Returns: {"status": 0, "message": "closed successfully"}
0242 """
0243
0244 def put(self, request_id, workload_id=None):
0245 """Close workflow task."""
0246 logger = self.get_logger()
0247
0248 if request_id == 'null':
0249 request_id = None
0250 if workload_id == 'null':
0251 workload_id = None
0252
0253 try:
0254 body = {}
0255 if self.get_request().data:
0256 body = json_loads(self.get_request().data) or {}
0257 parameters = body.get('parameters', {})
0258
0259 if workload_id and not parameters.get('workload_id'):
0260 parameters = dict(parameters)
0261 parameters['workload_id'] = workload_id
0262 except Exception as error:
0263 logger.error(error)
0264 logger.error(format_exc())
0265 return self.generate_http_response(
0266 HTTP_STATUS_CODE.BadRequest,
0267 exc_cls=exceptions.BadRequest.__name__,
0268 exc_msg="Cannot decode JSON parameter dictionary",
0269 )
0270
0271 try:
0272 _close_workflow_task(request_id, parameters=parameters, logger=logger)
0273 except exceptions.IDDSException as error:
0274 logger.error(error)
0275 logger.error(format_exc())
0276 return self.generate_http_response(
0277 HTTP_STATUS_CODE.InternalError,
0278 exc_cls=error.__class__.__name__,
0279 exc_msg=str(error),
0280 )
0281 except Exception as error:
0282 logger.error(error)
0283 logger.error(format_exc())
0284 return self.generate_http_response(
0285 HTTP_STATUS_CODE.InternalError,
0286 exc_cls=exceptions.CoreException.__name__,
0287 exc_msg=str(error),
0288 )
0289
0290 return self.generate_http_response(
0291 HTTP_STATUS_CODE.OK,
0292 data={'status': 0, 'message': 'closed successfully'},
0293 )
0294
0295
0296
0297
0298
0299
0300 def get_blueprint():
0301 bp = Blueprint('workflowtask', __name__)
0302
0303 wft_create = WorkflowTaskCreate.as_view('workflow_task_create')
0304 bp.add_url_rule('/workflow_task', view_func=wft_create, methods=['post'])
0305
0306 wft_adjust = WorkflowTaskAdjust.as_view('workflow_task_adjust')
0307 bp.add_url_rule(
0308 '/workflow_task/<request_id>/<transform_id>/<workload_id>/adjust',
0309 view_func=wft_adjust,
0310 methods=['put'],
0311 )
0312
0313 wft_close = WorkflowTaskClose.as_view('workflow_task_close')
0314 bp.add_url_rule(
0315 '/workflow_task/<request_id>/close',
0316 view_func=wft_close,
0317 methods=['put'],
0318 )
0319 bp.add_url_rule(
0320 '/workflow_task/<request_id>/<workload_id>/close',
0321 view_func=wft_close,
0322 methods=['put'],
0323 )
0324
0325 return bp