Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:18

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2025
0010 
0011 
0012 import datetime
0013 
0014 from idds.common.utils import setup_logging
0015 from idds.common.constants import (
0016     RequestType,
0017     RequestStatus,
0018     RequestLocking,
0019     TransformType,
0020     TransformStatus,
0021     ProcessingType,
0022     ProcessingStatus,
0023     CollectionType,
0024     CollectionStatus,
0025     CollectionRelationType,
0026 )
0027 from idds.core import requests as core_requests
0028 from idds.core import transforms as core_transforms
0029 from idds.core import catalog as core_catalog
0030 from idds.core import processings as core_processings
0031 from idds.orm.base.session import transactional_session
0032 
0033 from .panda import PandaClient
0034 
0035 
0036 setup_logging(__name__)
0037 
0038 
0039 def get_scope_name(run_id, site, panda_attributes={}):
0040     """Generate scope and name for a run."""
0041     utc_now = datetime.datetime.now(datetime.timezone.utc)
0042     year = utc_now.year
0043     month = utc_now.month
0044     day = utc_now.day
0045     scope = f"EIC_{year}"
0046     if site is None:
0047         site = panda_attributes.get("site", None)
0048     workflow_name = f"EIC_fastprocessing_{site}_{year}{month}{day}"
0049     name = f"EIC_fastprocessing_{site}_{year}{month}{day}_{run_id}"
0050     return scope, workflow_name, name
0051 
0052 
0053 def get_task_parameters(
0054     request_id,
0055     transform_id=None,
0056     name=None,
0057     num_workers=None,
0058     core_count=None,
0059     ram_count=None,
0060     site=None,
0061     run_id=None,
0062     panda_attributes={},
0063 ):
0064     """
0065     Build PanDA task parameters.
0066     """
0067     vo = panda_attributes.get("vo", "wlcg")
0068     queue = panda_attributes.get("queue", None)
0069     if site is None:
0070         site = panda_attributes.get("site", None)
0071     cloud = panda_attributes.get("cloud", None)
0072     working_group = panda_attributes.get("working_group", None)
0073     priority = panda_attributes.get("priority", 900)
0074     if core_count is None:
0075         core_count = panda_attributes.get("core_count", 1)
0076     if ram_count is None:
0077         ram_count = panda_attributes.get("ram_count", 4000)
0078     if num_workers is None:
0079         num_workers = panda_attributes.get("num_workers", 1)
0080     max_walltime = panda_attributes.get("max_walltime", 3600)
0081     max_attempt = panda_attributes.get("max_attempt", 3)
0082     idle_timeout = panda_attributes.get("idle_timeout", 120)
0083     user_name = panda_attributes.get("user_name", "iDDS")
0084     task_type = panda_attributes.get("task_type", "iDDS")
0085     processing_type = panda_attributes.get("processing_type", None)
0086 
0087     task_param_map = {}
0088     task_param_map["vo"] = vo
0089     if queue:
0090         task_param_map["site"] = queue
0091     if site:
0092         task_param_map["PandaSite"] = site
0093     if cloud:
0094         task_param_map["cloud"] = cloud
0095 
0096     if working_group:
0097         task_param_map["workingGroup"] = working_group
0098 
0099     task_param_map["taskName"] = name
0100     task_param_map["userName"] = user_name
0101     task_param_map["taskPriority"] = priority
0102     task_param_map["architecture"] = ""
0103     # task_param_map['architecture'] = '@el9'
0104     task_param_map["transUses"] = ""
0105     task_param_map["transHome"] = None
0106 
0107     executable = f"--run_id {run_id} --idle_timeout {idle_timeout}"
0108     task_param_map["transPath"] = (
0109         "https://storage.googleapis.com/drp-us-central1-containers/run_prompt_wrapper"
0110     )
0111 
0112     task_param_map["processingType"] = processing_type
0113     task_param_map["prodSourceLabel"] = "managed"  # managed, test, ptest
0114 
0115     # task_param_map['noWaitParent'] = True
0116     task_param_map["taskType"] = task_type
0117     task_param_map["coreCount"] = core_count
0118     task_param_map["skipScout"] = True
0119     task_param_map["ramCount"] = ram_count
0120     # task_param_map['ramUnit'] = 'MB'
0121     task_param_map["ramUnit"] = "MBPerCoreFixed"
0122 
0123     # task_param_map['inputPreStaging'] = True
0124     task_param_map["prestagingRuleID"] = 123
0125     task_param_map["nChunksToWait"] = 1
0126     task_param_map["maxCpuCount"] = core_count
0127     task_param_map["maxWalltime"] = max_walltime
0128     task_param_map["maxFailure"] = max_attempt
0129     task_param_map["maxAttempt"] = max_attempt
0130 
0131     task_param_map["jobParameters"] = [
0132         {
0133             "type": "constant",
0134             "value": executable,  # noqa: E501
0135         },
0136     ]
0137 
0138     in_files = [f"{run_id}"]
0139     task_param_map["nFiles"] = len(in_files)
0140     task_param_map["noInput"] = True
0141     task_param_map["pfnList"] = in_files
0142     task_param_map["nFilesPerJob"] = 1
0143 
0144     task_param_map["nEvents"] = num_workers
0145     task_param_map["nEventsPerJob"] = 1
0146 
0147     task_param_map["reqID"] = request_id
0148 
0149     return task_param_map
0150 
0151 
0152 def submit_task_to_panda(
0153     request_id,
0154     transform_id=None,
0155     name=None,
0156     num_workers=None,
0157     core_count=None,
0158     ram_count=None,
0159     site=None,
0160     run_id=None,
0161     panda_attributes={},
0162     logger=None,
0163 ):
0164     """
0165     Submit a task to Panda for processing.
0166     """
0167 
0168     # Submit the task to Panda
0169     panda_client = PandaClient()
0170     task_parameters = get_task_parameters(
0171         request_id,
0172         transform_id=transform_id,
0173         name=name,
0174         num_workers=num_workers,
0175         core_count=core_count,
0176         ram_count=ram_count,
0177         site=site,
0178         run_id=run_id,
0179         panda_attributes=panda_attributes,
0180     )
0181     task_id = panda_client.submit(
0182         task_parameters, logger=logger, log_prefix=f"[run_id={run_id}] "
0183     )
0184     if logger:
0185         logger.info(f"Submitted PanDA task {task_id} for run_id={run_id}")
0186     return task_id
0187 
0188 
0189 def close_panda_task(task_id, logger=None):
0190     """
0191     Close a PanDA task to prevent further job retries.
0192     """
0193     panda_client = PandaClient()
0194     ret = panda_client.close(task_id, soft=True)
0195     if logger:
0196         logger.info(f"Finished PanDA task {task_id} with return code: {ret}")
0197 
0198 
0199 @transactional_session
0200 def create_workflow_task(message, panda_attributes={}, logger=None, session=None):
0201     """
0202     Create a workflow task in iDDS when receiving 'run_imminent' message.
0203 
0204     Message format:
0205     {
0206       'msg_type': 'run_imminent',
0207       'run_id': 20250914185722,
0208       'created_at': datetime.datetime.utcnow(),
0209       'content': {
0210         'num_workers': 2,
0211         'num_cores_per_worker': 10,
0212         'num_ram_per_core': 4000.0,  # MB
0213         ...
0214       }
0215     }
0216 
0217     Returns: (request_id, transform_id, processing_id, coll_id, workload_id)
0218     """
0219     run_id = message.get("run_id")
0220     content = message.get("content", {})
0221 
0222     # Extract resource requirements from message
0223     num_workers = content.get("target_worker_count", 1)
0224     core_count = content.get("num_cores_per_worker", None)
0225     ram_count = content.get("num_ram_per_core", None)
0226     site = content.get("site", None)
0227 
0228     utc_now = datetime.datetime.now(datetime.timezone.utc)
0229     year = utc_now.year
0230     month = utc_now.month
0231 
0232     scope, workflow_name, name = get_scope_name(run_id, site, panda_attributes)
0233     reqs = core_requests.get_request_ids_by_name(
0234         scope=scope, name=workflow_name, exact_match=True, session=session
0235     )
0236 
0237     if reqs:
0238         request_id = reqs[workflow_name]
0239     else:
0240         workflow = {
0241             "scope": scope,
0242             "name": workflow_name,
0243             "requester": "iDDS",
0244             "request_type": RequestType.iWorkflow,
0245             "username": "EIC",
0246             "transform_tag": "EIC",
0247             "status": RequestStatus.Transforming,
0248             "locking": RequestLocking.Idle,
0249             "cloud": "US",
0250             "campaign": "EIC",
0251             "campaign_scope": f"EIC_{year}",
0252             "campaign_group": f"EIC_{year}_{month}",
0253             "campaign_tag": "reco",
0254         }
0255         request_id = core_requests.add_request(**workflow, session=session)
0256 
0257     transform = {
0258         "request_id": request_id,
0259         "workload_id": None,
0260         "transform_type": TransformType.iWork,
0261         "transform_tag": "EIC",
0262         "name": name,
0263         "status": TransformStatus.New,
0264         "substatus": TransformStatus.New,
0265     }
0266     transform_id = core_transforms.add_transform(**transform, session=session)
0267 
0268     input_coll = {
0269         "request_id": request_id,
0270         "transform_id": transform_id,
0271         "workload_id": None,
0272         "scope": scope,
0273         "name": name,
0274         'coll_type': CollectionType.Dataset,
0275         'relation_type': CollectionRelationType.Input,
0276         'bytes': 0,
0277         'total_files': 0,
0278         'new_files': 0,
0279         'processed_files': 0,
0280         'processing_files': 0,
0281         'coll_metadata': None,
0282         'status': CollectionStatus.Closed,
0283     }
0284     output_coll = {
0285         "request_id": request_id,
0286         "transform_id": transform_id,
0287         "workload_id": None,
0288         "scope": scope,
0289         "name": name,
0290         'coll_type': CollectionType.Dataset,
0291         'relation_type': CollectionRelationType.Output,
0292         'bytes': 0,
0293         'total_files': 0,
0294         'new_files': 0,
0295         'processed_files': 0,
0296         'processing_files': 0,
0297         'coll_metadata': None,
0298         'status': CollectionStatus.Closed,
0299     }
0300     input_coll_id = core_catalog.add_collection(**input_coll, session=session)
0301     output_coll_id = core_catalog.add_collection(**output_coll, session=session)
0302 
0303     # For now, return a placeholder workload_id
0304     # In production, this should call submit_task_to_panda()
0305     workload_id = submit_task_to_panda(
0306         request_id,
0307         transform_id=transform_id,
0308         name=name,
0309         num_workers=num_workers,
0310         core_count=core_count,
0311         ram_count=ram_count,
0312         site=site,
0313         run_id=run_id,
0314         panda_attributes=panda_attributes,
0315         logger=logger,
0316     )
0317 
0318     processing = {
0319         "request_id": request_id,
0320         "transform_id": transform_id,
0321         "workload_id": workload_id,
0322         "status": ProcessingStatus.Submitting,
0323         "submitter": "panda",
0324         "site": site,
0325         "processing_type": ProcessingType.iWork,
0326     }
0327     processing_id = core_processings.add_processing(**processing, session=session)
0328 
0329     if logger:
0330         logger.info(
0331             f"Created workflow task: request_id={request_id}, transform_id={transform_id}, "
0332             f"processing_id={processing_id}, workload_id={workload_id}"
0333         )
0334 
0335     return request_id, transform_id, processing_id, input_coll_id, output_coll_id, workload_id
0336 
0337 
0338 def create_harvester_worker(
0339     header,
0340     msg,
0341     task_id,
0342     harvester_publisher,
0343     timetolive=12 * 3600 * 1000,
0344     panda_attributes={},
0345     logger=None,
0346 ):
0347     """
0348     Send message to harvester to create workers.
0349 
0350     Message format based on prompt.md:
0351     {
0352       'msg_type': 'adjuster_worker',
0353       'run_id': 20250914185722,
0354       'created_at': datetime.datetime.utcnow(),
0355       'content': {
0356         'num_workers': 2,
0357         'num_cores_per_worker': 10,
0358         'num_ram_per_core': 4000.0,
0359         'requested_at': <original message created_at>
0360       }
0361     }
0362     """
0363     content = msg.get("content", {})
0364     run_id = msg.get("run_id")
0365     panda_queue = panda_attributes.get("queue", None)
0366     created_at_original = msg.get("created_at")
0367 
0368     worker_msg = {
0369         "msg_type": "adjuster_worker",
0370         "run_id": run_id,
0371         "created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
0372         "content": {
0373             "num_workers": content.get("num_workers", 1),
0374             "num_cores_per_worker": content.get("num_cores_per_worker", 1),
0375             "num_ram_per_core": content.get("num_ram_per_core", 1000),
0376             "panda_queue": panda_queue,
0377             "requested_at": created_at_original,
0378         },
0379     }
0380 
0381     msg_header = {
0382         "persistent": "true",
0383         "ttl": timetolive,
0384         "vo": "eic",
0385         "msg_type": "adjuster_worker",
0386         "run_id": str(run_id),
0387     }
0388 
0389     if harvester_publisher:
0390         harvester_publisher.publish(worker_msg, headers=msg_header)
0391 
0392         if logger:
0393             logger.info(
0394                 f"Sent adjuster_worker message for run_id={run_id}, task_id={task_id}, num_workers={worker_msg['content']['num_workers']}"
0395             )
0396 
0397 
0398 def stop_harvester_worker(
0399     header,
0400     msg,
0401     task_id,
0402     harvester_publisher,
0403     transformer_broadcaster,
0404     timetolive=12 * 3600 * 1000,
0405     panda_attributes={},
0406     logger=None,
0407 ):
0408     """
0409     Stop harvester workers when receiving 'run_end' message.
0410 
0411     Based on prompt.md:
0412     1. Close PanDA task to avoid retrying jobs
0413     2. Send worker adjuster message to stop creating new workers
0414     3. Broadcast stop message to all transformers
0415 
0416     Message formats:
0417     - adjuster_worker: Stop creating new workers
0418     - stop_transformer: Broadcast to all transformers to stop
0419     """
0420     # content = msg.get("content", {})
0421     run_id = msg.get("run_id")
0422     panda_queue = panda_attributes.get("queue", None)
0423     created_at_original = msg.get("created_at")
0424 
0425     # TODO: Close PanDA task
0426     # close_panda_task(task_id)
0427 
0428     # Send stop message to transformers (broadcast)
0429     stop_transformer_msg = {
0430         "msg_type": "stop_transformer",
0431         "run_id": run_id,
0432         "created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
0433         "content": {"task_id": task_id, "requested_at": created_at_original},
0434     }
0435 
0436     stop_header = {
0437         "persistent": "true",
0438         "ttl": timetolive,
0439         "vo": "eic",
0440         "msg_type": "stop_transformer",
0441         "run_id": str(run_id),
0442     }
0443 
0444     if transformer_broadcaster:
0445         transformer_broadcaster.publish(stop_transformer_msg, headers=stop_header)
0446         if logger:
0447             logger.info(
0448                 f"Sent stop_transformer broadcast for run_id={run_id}, task_id={task_id}"
0449             )
0450 
0451     # Send message to stop creating new workers
0452     stop_worker_msg = {
0453         "msg_type": "adjuster_worker",
0454         "run_id": run_id,
0455         "created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
0456         "content": {
0457             "num_workers": 0,  # Stop creating new workers
0458             "num_cores_per_worker": 0,
0459             "num_ram_per_core": 0,
0460             "panda_queue": panda_queue,
0461             "requested_at": created_at_original,
0462         },
0463     }
0464 
0465     worker_header = {
0466         "persistent": "true",
0467         "ttl": timetolive,
0468         "vo": "eic",
0469         "msg_type": "adjuster_worker",
0470         "run_id": str(run_id),
0471     }
0472 
0473     if harvester_publisher:
0474         harvester_publisher.publish(stop_worker_msg, headers=worker_header)
0475         if logger:
0476             logger.info(
0477                 f"Sent adjuster_worker (stop) message for run_id={run_id}, task_id={task_id}"
0478             )
0479 
0480 
0481 def worker_handler(header, msg, task_id=None, handler_kwargs={}, logger=None):
0482     """
0483     Handle worker-related messages based on prompt.md specifications.
0484 
0485     Supported message types:
0486     - run_imminent: Create workflow task and start workers
0487     - run_end/run_stop: Stop workers and transformers
0488     - transformer_heartbeat: Track transformer health
0489 
0490     :param header: Message header (should contain 'run_id')
0491     :param msg: Message content with format:
0492                 {
0493                   'msg_type': '<type>',
0494                   'run_id': 20250914185722,
0495                   'created_at': datetime,
0496                   'content': {...}
0497                 }
0498     :param task_id: Task ID (may be None for run_imminent)
0499     :param handler_kwargs: Dictionary of handler keyword arguments
0500     :return: Dictionary with task_id if created
0501     """
0502     ret = {}
0503     msg_type = msg.get("msg_type")
0504     run_id = msg.get("run_id")
0505 
0506     timetolive = 12 * 3600 * 1000
0507     panda_attributes = {}
0508 
0509     transformer_broadcaster = handler_kwargs.get("transformer_broadcaster", None)
0510     worker_publisher = handler_kwargs.get("worker_publisher", None)
0511     timetolive = handler_kwargs.get("timetolive", timetolive)
0512     panda_attributes = handler_kwargs.get("panda_attributes", panda_attributes)
0513 
0514     try:
0515         if msg_type == "run_imminent":
0516             # Create workflow task and workers
0517             request_id, transform_id, processing_id, input_coll_id, output_coll_id, workload_id = (
0518                 create_workflow_task(msg, panda_attributes=panda_attributes)
0519             )
0520             task_id = workload_id
0521             create_harvester_worker(
0522                 header,
0523                 msg,
0524                 task_id,
0525                 worker_publisher,
0526                 timetolive=timetolive,
0527                 panda_attributes=panda_attributes,
0528                 logger=logger,
0529             )
0530             ret["task_id"] = task_id
0531             if logger:
0532                 logger.info(f"Handled run_imminent: run_id={run_id}, task_id={task_id}")
0533 
0534         elif msg_type in ["run_end", "run_stop", "end_run"]:
0535             # Close PanDA task
0536             # close_panda_task(task_id, logger=logger)
0537             # Stop workers and transformers
0538             stop_harvester_worker(
0539                 header,
0540                 msg,
0541                 task_id,
0542                 worker_publisher,
0543                 transformer_broadcaster,
0544                 timetolive=timetolive,
0545                 panda_attributes=panda_attributes,
0546                 logger=logger,
0547             )
0548             if logger:
0549                 logger.info(f"Handled {msg_type}: run_id={run_id}, task_id={task_id}")
0550 
0551         elif msg_type == "transformer_heartbeat":
0552             transformer_id = msg.get("content", {}).get("id")
0553             hostname = msg.get("content", {}).get("hostname")
0554             if logger:
0555                 logger.info(
0556                     f"Transformer heartbeat: run_id={run_id}, transformer_id={transformer_id}, hostname={hostname}"
0557                 )
0558 
0559         else:
0560             if logger:
0561                 logger.warning(
0562                     f"Unknown message type in worker_handler: {msg_type}, run_id={run_id}"
0563                 )
0564 
0565     except Exception as ex:
0566         if logger:
0567             logger.error(
0568                 f"Error in worker_handler for msg_type={msg_type}, run_id={run_id}: {ex}",
0569                 exc_info=True,
0570             )
0571 
0572     return ret