Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:21

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2025
0010 
0011 """
0012 REST endpoints for workflow task management.
0013 
0014 POST /workflow_task
0015     Create a workflow task.
0016     Body: {"workflow": <workflow-dict>}
0017 
0018 PUT  /workflow_task/<request_id>/<transform_id>/<workload_id>/adjust
0019     Adjust worker resource parameters.
0020     Body: {"parameters": {"core_count": .., "memory_per_core": .., "site": ..}}
0021 
0022 PUT  /workflow_task/<request_id>/close
0023 PUT  /workflow_task/<request_id>/<workload_id>/close   (legacy)
0024     Close a workflow task.
0025     Body: {"parameters": <content-dict>}
0026 """
0027 
0028 from traceback import format_exc
0029 
0030 from flask import Blueprint
0031 
0032 from idds.common import exceptions
0033 from idds.common.constants import HTTP_STATUS_CODE
0034 from idds.common.utils import json_loads, setup_logging
0035 from idds.rest.v1.controller import IDDSController
0036 
0037 from idds.prompt.handlers.workflowtaskhandler import (  # type: ignore[import-untyped]
0038     create_workflow_task as _create_workflow_task,
0039     adjust_worker as _adjust_worker,
0040     close_workflow_task as _close_workflow_task,
0041 )
0042 
0043 
0044 setup_logging(__name__)
0045 
0046 
0047 class WorkflowTaskCreate(IDDSController):
0048     """
0049     Create a workflow task.
0050 
0051     POST /workflow_task
0052 
0053     Body (new format matching the spec's ``idds_create_workflow_task`` API):
0054     {
0055         "workflow": {
0056             "scope":          "<scope>",
0057             "name":           "<name>",
0058             "requester":      "<requester>",
0059             "username":       "<username>",
0060             "transform_tag":  "<tag>",
0061             "cloud":          "<cloud>",
0062             "campaign":       "<campaign>",
0063             "campaign_scope": "<campaign_scope>",
0064             "campaign_group": "<campaign_group>",
0065             "campaign_tag":   "<campaign_tag>",
0066             "content": {
0067                 "run_id":          "<run_id>",
0068                 "core_count":      <int>,
0069                 "memory_per_core": <float>,
0070                 "site":            "<site>",
0071                 "panda_attributes": { ... },
0072                 ...
0073             }
0074         }
0075     }
0076 
0077     Returns:
0078     {
0079         "request_id":    <int>,
0080         "transform_id":  <int>,
0081         "processing_id": <int>,
0082         "input_coll_id":  <int>,
0083         "output_coll_id": <int>,
0084         "workload_id":   <int|null>
0085     }
0086     """
0087 
0088     def post(self):
0089         """Create workflow task."""
0090         logger = self.get_logger()
0091         try:
0092             body = self.get_request().data and json_loads(self.get_request().data)
0093             if not body:
0094                 raise exceptions.BadRequest("Request body must not be empty")
0095 
0096             workflow = body.get('workflow')
0097             if not workflow:
0098                 raise exceptions.BadRequest("'workflow' key is required in the request body")
0099 
0100         except exceptions.BadRequest as error:
0101             return self.generate_http_response(
0102                 HTTP_STATUS_CODE.BadRequest,
0103                 exc_cls=error.__class__.__name__,
0104                 exc_msg=str(error),
0105             )
0106         except Exception as error:
0107             logger.error(error)
0108             logger.error(format_exc())
0109             return self.generate_http_response(
0110                 HTTP_STATUS_CODE.BadRequest,
0111                 exc_cls=exceptions.BadRequest.__name__,
0112                 exc_msg="Cannot decode JSON parameter dictionary",
0113             )
0114 
0115         try:
0116             (
0117                 request_id,
0118                 transform_id,
0119                 processing_id,
0120                 input_coll_id,
0121                 output_coll_id,
0122                 workload_id,
0123             ) = _create_workflow_task(workflow, logger=logger)
0124 
0125         except exceptions.IDDSException as error:
0126             logger.error(error)
0127             logger.error(format_exc())
0128             return self.generate_http_response(
0129                 HTTP_STATUS_CODE.InternalError,
0130                 exc_cls=error.__class__.__name__,
0131                 exc_msg=str(error),
0132             )
0133         except Exception as error:
0134             logger.error(error)
0135             logger.error(format_exc())
0136             return self.generate_http_response(
0137                 HTTP_STATUS_CODE.InternalError,
0138                 exc_cls=exceptions.CoreException.__name__,
0139                 exc_msg=str(error),
0140             )
0141 
0142         return self.generate_http_response(
0143             HTTP_STATUS_CODE.OK,
0144             data={
0145                 'request_id': request_id,
0146                 'transform_id': transform_id,
0147                 'processing_id': processing_id,
0148                 'input_coll_id': input_coll_id,
0149                 'output_coll_id': output_coll_id,
0150                 'workload_id': workload_id,
0151             },
0152         )
0153 
0154 
0155 class WorkflowTaskAdjust(IDDSController):
0156     """
0157     Adjust worker resource parameters for an existing workflow task.
0158 
0159     PUT /workflow_task/<request_id>/<transform_id>/<workload_id>/adjust
0160 
0161     Body (matching the spec's ``idds_adjust_worker`` API):
0162     {
0163         "parameters": {
0164             "core_count":      <int>,
0165             "memory_per_core": <float>,
0166             "site":            "<site>",
0167             "content":         { ... }
0168         }
0169     }
0170 
0171     Returns: {"status": 0, "message": "adjusted successfully"}
0172     """
0173 
0174     def put(self, request_id, transform_id=None, workload_id=None):
0175         """Adjust worker parameters."""
0176         logger = self.get_logger()
0177 
0178         if request_id == 'null':
0179             request_id = None
0180         if transform_id == 'null':
0181             transform_id = None
0182         if workload_id == 'null':
0183             workload_id = None
0184 
0185         try:
0186             body = {}
0187             if self.get_request().data:
0188                 body = json_loads(self.get_request().data) or {}
0189             parameters = body.get('parameters', {})
0190         except Exception as error:
0191             logger.error(error)
0192             logger.error(format_exc())
0193             return self.generate_http_response(
0194                 HTTP_STATUS_CODE.BadRequest,
0195                 exc_cls=exceptions.BadRequest.__name__,
0196                 exc_msg="Cannot decode JSON parameter dictionary",
0197             )
0198 
0199         try:
0200             _adjust_worker(request_id, transform_id, workload_id, parameters, logger=logger)
0201         except exceptions.IDDSException as error:
0202             logger.error(error)
0203             logger.error(format_exc())
0204             return self.generate_http_response(
0205                 HTTP_STATUS_CODE.InternalError,
0206                 exc_cls=error.__class__.__name__,
0207                 exc_msg=str(error),
0208             )
0209         except Exception as error:
0210             logger.error(error)
0211             logger.error(format_exc())
0212             return self.generate_http_response(
0213                 HTTP_STATUS_CODE.InternalError,
0214                 exc_cls=exceptions.CoreException.__name__,
0215                 exc_msg=str(error),
0216             )
0217 
0218         return self.generate_http_response(
0219             HTTP_STATUS_CODE.OK,
0220             data={'status': 0, 'message': 'adjusted successfully'},
0221         )
0222 
0223 
0224 class WorkflowTaskClose(IDDSController):
0225     """
0226     Close a workflow task.
0227 
0228     PUT /workflow_task/<request_id>/close
0229     PUT /workflow_task/<request_id>/<workload_id>/close  (legacy)
0230 
0231     Body (matching the spec's ``idds_close_workflow_task`` API):
0232     {
0233         "parameters": {
0234             "request_id":   <int>,
0235             "transform_id": <int>,
0236             "workload_id":  <int>,
0237             ...
0238         }
0239     }
0240 
0241     Returns: {"status": 0, "message": "closed successfully"}
0242     """
0243 
0244     def put(self, request_id, workload_id=None):
0245         """Close workflow task."""
0246         logger = self.get_logger()
0247 
0248         if request_id == 'null':
0249             request_id = None
0250         if workload_id == 'null':
0251             workload_id = None
0252 
0253         try:
0254             body = {}
0255             if self.get_request().data:
0256                 body = json_loads(self.get_request().data) or {}
0257             parameters = body.get('parameters', {})
0258             # fall back: if workload_id given in URL, ensure it's in parameters
0259             if workload_id and not parameters.get('workload_id'):
0260                 parameters = dict(parameters)
0261                 parameters['workload_id'] = workload_id
0262         except Exception as error:
0263             logger.error(error)
0264             logger.error(format_exc())
0265             return self.generate_http_response(
0266                 HTTP_STATUS_CODE.BadRequest,
0267                 exc_cls=exceptions.BadRequest.__name__,
0268                 exc_msg="Cannot decode JSON parameter dictionary",
0269             )
0270 
0271         try:
0272             _close_workflow_task(request_id, parameters=parameters, logger=logger)
0273         except exceptions.IDDSException as error:
0274             logger.error(error)
0275             logger.error(format_exc())
0276             return self.generate_http_response(
0277                 HTTP_STATUS_CODE.InternalError,
0278                 exc_cls=error.__class__.__name__,
0279                 exc_msg=str(error),
0280             )
0281         except Exception as error:
0282             logger.error(error)
0283             logger.error(format_exc())
0284             return self.generate_http_response(
0285                 HTTP_STATUS_CODE.InternalError,
0286                 exc_cls=exceptions.CoreException.__name__,
0287                 exc_msg=str(error),
0288             )
0289 
0290         return self.generate_http_response(
0291             HTTP_STATUS_CODE.OK,
0292             data={'status': 0, 'message': 'closed successfully'},
0293         )
0294 
0295 
0296 # ---------------------------------------------------------------------------
0297 # Blueprint
0298 # ---------------------------------------------------------------------------
0299 
0300 def get_blueprint():
0301     bp = Blueprint('workflowtask', __name__)
0302 
0303     wft_create = WorkflowTaskCreate.as_view('workflow_task_create')
0304     bp.add_url_rule('/workflow_task', view_func=wft_create, methods=['post'])
0305 
0306     wft_adjust = WorkflowTaskAdjust.as_view('workflow_task_adjust')
0307     bp.add_url_rule(
0308         '/workflow_task/<request_id>/<transform_id>/<workload_id>/adjust',
0309         view_func=wft_adjust,
0310         methods=['put'],
0311     )
0312 
0313     wft_close = WorkflowTaskClose.as_view('workflow_task_close')
0314     bp.add_url_rule(
0315         '/workflow_task/<request_id>/close',
0316         view_func=wft_close,
0317         methods=['put'],
0318     )
0319     bp.add_url_rule(
0320         '/workflow_task/<request_id>/<workload_id>/close',
0321         view_func=wft_close,
0322         methods=['put'],
0323     )
0324 
0325     return bp