File indexing completed on 2026-04-09 07:58:18
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 import datetime
0013
0014 from idds.common.utils import setup_logging
0015 from idds.common.constants import (
0016 RequestType,
0017 RequestStatus,
0018 RequestLocking,
0019 TransformType,
0020 TransformStatus,
0021 ProcessingType,
0022 ProcessingStatus,
0023 CollectionType,
0024 CollectionStatus,
0025 CollectionRelationType,
0026 )
0027 from idds.core import requests as core_requests
0028 from idds.core import transforms as core_transforms
0029 from idds.core import catalog as core_catalog
0030 from idds.core import processings as core_processings
0031 from idds.orm.base.session import transactional_session
0032
0033 from .panda import PandaClient
0034
0035
0036 setup_logging(__name__)
0037
0038
0039 def get_scope_name(run_id, site, panda_attributes={}):
0040 """Generate scope and name for a run."""
0041 utc_now = datetime.datetime.now(datetime.timezone.utc)
0042 year = utc_now.year
0043 month = utc_now.month
0044 day = utc_now.day
0045 scope = f"EIC_{year}"
0046 if site is None:
0047 site = panda_attributes.get("site", None)
0048 workflow_name = f"EIC_fastprocessing_{site}_{year}{month}{day}"
0049 name = f"EIC_fastprocessing_{site}_{year}{month}{day}_{run_id}"
0050 return scope, workflow_name, name
0051
0052
0053 def get_task_parameters(
0054 request_id,
0055 transform_id=None,
0056 name=None,
0057 num_workers=None,
0058 core_count=None,
0059 ram_count=None,
0060 site=None,
0061 run_id=None,
0062 panda_attributes={},
0063 ):
0064 """
0065 Build PanDA task parameters.
0066 """
0067 vo = panda_attributes.get("vo", "wlcg")
0068 queue = panda_attributes.get("queue", None)
0069 if site is None:
0070 site = panda_attributes.get("site", None)
0071 cloud = panda_attributes.get("cloud", None)
0072 working_group = panda_attributes.get("working_group", None)
0073 priority = panda_attributes.get("priority", 900)
0074 if core_count is None:
0075 core_count = panda_attributes.get("core_count", 1)
0076 if ram_count is None:
0077 ram_count = panda_attributes.get("ram_count", 4000)
0078 if num_workers is None:
0079 num_workers = panda_attributes.get("num_workers", 1)
0080 max_walltime = panda_attributes.get("max_walltime", 3600)
0081 max_attempt = panda_attributes.get("max_attempt", 3)
0082 idle_timeout = panda_attributes.get("idle_timeout", 120)
0083 user_name = panda_attributes.get("user_name", "iDDS")
0084 task_type = panda_attributes.get("task_type", "iDDS")
0085 processing_type = panda_attributes.get("processing_type", None)
0086
0087 task_param_map = {}
0088 task_param_map["vo"] = vo
0089 if queue:
0090 task_param_map["site"] = queue
0091 if site:
0092 task_param_map["PandaSite"] = site
0093 if cloud:
0094 task_param_map["cloud"] = cloud
0095
0096 if working_group:
0097 task_param_map["workingGroup"] = working_group
0098
0099 task_param_map["taskName"] = name
0100 task_param_map["userName"] = user_name
0101 task_param_map["taskPriority"] = priority
0102 task_param_map["architecture"] = ""
0103
0104 task_param_map["transUses"] = ""
0105 task_param_map["transHome"] = None
0106
0107 executable = f"--run_id {run_id} --idle_timeout {idle_timeout}"
0108 task_param_map["transPath"] = (
0109 "https://storage.googleapis.com/drp-us-central1-containers/run_prompt_wrapper"
0110 )
0111
0112 task_param_map["processingType"] = processing_type
0113 task_param_map["prodSourceLabel"] = "managed"
0114
0115
0116 task_param_map["taskType"] = task_type
0117 task_param_map["coreCount"] = core_count
0118 task_param_map["skipScout"] = True
0119 task_param_map["ramCount"] = ram_count
0120
0121 task_param_map["ramUnit"] = "MBPerCoreFixed"
0122
0123
0124 task_param_map["prestagingRuleID"] = 123
0125 task_param_map["nChunksToWait"] = 1
0126 task_param_map["maxCpuCount"] = core_count
0127 task_param_map["maxWalltime"] = max_walltime
0128 task_param_map["maxFailure"] = max_attempt
0129 task_param_map["maxAttempt"] = max_attempt
0130
0131 task_param_map["jobParameters"] = [
0132 {
0133 "type": "constant",
0134 "value": executable,
0135 },
0136 ]
0137
0138 in_files = [f"{run_id}"]
0139 task_param_map["nFiles"] = len(in_files)
0140 task_param_map["noInput"] = True
0141 task_param_map["pfnList"] = in_files
0142 task_param_map["nFilesPerJob"] = 1
0143
0144 task_param_map["nEvents"] = num_workers
0145 task_param_map["nEventsPerJob"] = 1
0146
0147 task_param_map["reqID"] = request_id
0148
0149 return task_param_map
0150
0151
0152 def submit_task_to_panda(
0153 request_id,
0154 transform_id=None,
0155 name=None,
0156 num_workers=None,
0157 core_count=None,
0158 ram_count=None,
0159 site=None,
0160 run_id=None,
0161 panda_attributes={},
0162 logger=None,
0163 ):
0164 """
0165 Submit a task to Panda for processing.
0166 """
0167
0168
0169 panda_client = PandaClient()
0170 task_parameters = get_task_parameters(
0171 request_id,
0172 transform_id=transform_id,
0173 name=name,
0174 num_workers=num_workers,
0175 core_count=core_count,
0176 ram_count=ram_count,
0177 site=site,
0178 run_id=run_id,
0179 panda_attributes=panda_attributes,
0180 )
0181 task_id = panda_client.submit(
0182 task_parameters, logger=logger, log_prefix=f"[run_id={run_id}] "
0183 )
0184 if logger:
0185 logger.info(f"Submitted PanDA task {task_id} for run_id={run_id}")
0186 return task_id
0187
0188
0189 def close_panda_task(task_id, logger=None):
0190 """
0191 Close a PanDA task to prevent further job retries.
0192 """
0193 panda_client = PandaClient()
0194 ret = panda_client.close(task_id, soft=True)
0195 if logger:
0196 logger.info(f"Finished PanDA task {task_id} with return code: {ret}")
0197
0198
0199 @transactional_session
0200 def create_workflow_task(message, panda_attributes={}, logger=None, session=None):
0201 """
0202 Create a workflow task in iDDS when receiving 'run_imminent' message.
0203
0204 Message format:
0205 {
0206 'msg_type': 'run_imminent',
0207 'run_id': 20250914185722,
0208 'created_at': datetime.datetime.utcnow(),
0209 'content': {
0210 'num_workers': 2,
0211 'num_cores_per_worker': 10,
0212 'num_ram_per_core': 4000.0, # MB
0213 ...
0214 }
0215 }
0216
0217 Returns: (request_id, transform_id, processing_id, coll_id, workload_id)
0218 """
0219 run_id = message.get("run_id")
0220 content = message.get("content", {})
0221
0222
0223 num_workers = content.get("target_worker_count", 1)
0224 core_count = content.get("num_cores_per_worker", None)
0225 ram_count = content.get("num_ram_per_core", None)
0226 site = content.get("site", None)
0227
0228 utc_now = datetime.datetime.now(datetime.timezone.utc)
0229 year = utc_now.year
0230 month = utc_now.month
0231
0232 scope, workflow_name, name = get_scope_name(run_id, site, panda_attributes)
0233 reqs = core_requests.get_request_ids_by_name(
0234 scope=scope, name=workflow_name, exact_match=True, session=session
0235 )
0236
0237 if reqs:
0238 request_id = reqs[workflow_name]
0239 else:
0240 workflow = {
0241 "scope": scope,
0242 "name": workflow_name,
0243 "requester": "iDDS",
0244 "request_type": RequestType.iWorkflow,
0245 "username": "EIC",
0246 "transform_tag": "EIC",
0247 "status": RequestStatus.Transforming,
0248 "locking": RequestLocking.Idle,
0249 "cloud": "US",
0250 "campaign": "EIC",
0251 "campaign_scope": f"EIC_{year}",
0252 "campaign_group": f"EIC_{year}_{month}",
0253 "campaign_tag": "reco",
0254 }
0255 request_id = core_requests.add_request(**workflow, session=session)
0256
0257 transform = {
0258 "request_id": request_id,
0259 "workload_id": None,
0260 "transform_type": TransformType.iWork,
0261 "transform_tag": "EIC",
0262 "name": name,
0263 "status": TransformStatus.New,
0264 "substatus": TransformStatus.New,
0265 }
0266 transform_id = core_transforms.add_transform(**transform, session=session)
0267
0268 input_coll = {
0269 "request_id": request_id,
0270 "transform_id": transform_id,
0271 "workload_id": None,
0272 "scope": scope,
0273 "name": name,
0274 'coll_type': CollectionType.Dataset,
0275 'relation_type': CollectionRelationType.Input,
0276 'bytes': 0,
0277 'total_files': 0,
0278 'new_files': 0,
0279 'processed_files': 0,
0280 'processing_files': 0,
0281 'coll_metadata': None,
0282 'status': CollectionStatus.Closed,
0283 }
0284 output_coll = {
0285 "request_id": request_id,
0286 "transform_id": transform_id,
0287 "workload_id": None,
0288 "scope": scope,
0289 "name": name,
0290 'coll_type': CollectionType.Dataset,
0291 'relation_type': CollectionRelationType.Output,
0292 'bytes': 0,
0293 'total_files': 0,
0294 'new_files': 0,
0295 'processed_files': 0,
0296 'processing_files': 0,
0297 'coll_metadata': None,
0298 'status': CollectionStatus.Closed,
0299 }
0300 input_coll_id = core_catalog.add_collection(**input_coll, session=session)
0301 output_coll_id = core_catalog.add_collection(**output_coll, session=session)
0302
0303
0304
0305 workload_id = submit_task_to_panda(
0306 request_id,
0307 transform_id=transform_id,
0308 name=name,
0309 num_workers=num_workers,
0310 core_count=core_count,
0311 ram_count=ram_count,
0312 site=site,
0313 run_id=run_id,
0314 panda_attributes=panda_attributes,
0315 logger=logger,
0316 )
0317
0318 processing = {
0319 "request_id": request_id,
0320 "transform_id": transform_id,
0321 "workload_id": workload_id,
0322 "status": ProcessingStatus.Submitting,
0323 "submitter": "panda",
0324 "site": site,
0325 "processing_type": ProcessingType.iWork,
0326 }
0327 processing_id = core_processings.add_processing(**processing, session=session)
0328
0329 if logger:
0330 logger.info(
0331 f"Created workflow task: request_id={request_id}, transform_id={transform_id}, "
0332 f"processing_id={processing_id}, workload_id={workload_id}"
0333 )
0334
0335 return request_id, transform_id, processing_id, input_coll_id, output_coll_id, workload_id
0336
0337
0338 def create_harvester_worker(
0339 header,
0340 msg,
0341 task_id,
0342 harvester_publisher,
0343 timetolive=12 * 3600 * 1000,
0344 panda_attributes={},
0345 logger=None,
0346 ):
0347 """
0348 Send message to harvester to create workers.
0349
0350 Message format based on prompt.md:
0351 {
0352 'msg_type': 'adjuster_worker',
0353 'run_id': 20250914185722,
0354 'created_at': datetime.datetime.utcnow(),
0355 'content': {
0356 'num_workers': 2,
0357 'num_cores_per_worker': 10,
0358 'num_ram_per_core': 4000.0,
0359 'requested_at': <original message created_at>
0360 }
0361 }
0362 """
0363 content = msg.get("content", {})
0364 run_id = msg.get("run_id")
0365 panda_queue = panda_attributes.get("queue", None)
0366 created_at_original = msg.get("created_at")
0367
0368 worker_msg = {
0369 "msg_type": "adjuster_worker",
0370 "run_id": run_id,
0371 "created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
0372 "content": {
0373 "num_workers": content.get("num_workers", 1),
0374 "num_cores_per_worker": content.get("num_cores_per_worker", 1),
0375 "num_ram_per_core": content.get("num_ram_per_core", 1000),
0376 "panda_queue": panda_queue,
0377 "requested_at": created_at_original,
0378 },
0379 }
0380
0381 msg_header = {
0382 "persistent": "true",
0383 "ttl": timetolive,
0384 "vo": "eic",
0385 "msg_type": "adjuster_worker",
0386 "run_id": str(run_id),
0387 }
0388
0389 if harvester_publisher:
0390 harvester_publisher.publish(worker_msg, headers=msg_header)
0391
0392 if logger:
0393 logger.info(
0394 f"Sent adjuster_worker message for run_id={run_id}, task_id={task_id}, num_workers={worker_msg['content']['num_workers']}"
0395 )
0396
0397
0398 def stop_harvester_worker(
0399 header,
0400 msg,
0401 task_id,
0402 harvester_publisher,
0403 transformer_broadcaster,
0404 timetolive=12 * 3600 * 1000,
0405 panda_attributes={},
0406 logger=None,
0407 ):
0408 """
0409 Stop harvester workers when receiving 'run_end' message.
0410
0411 Based on prompt.md:
0412 1. Close PanDA task to avoid retrying jobs
0413 2. Send worker adjuster message to stop creating new workers
0414 3. Broadcast stop message to all transformers
0415
0416 Message formats:
0417 - adjuster_worker: Stop creating new workers
0418 - stop_transformer: Broadcast to all transformers to stop
0419 """
0420
0421 run_id = msg.get("run_id")
0422 panda_queue = panda_attributes.get("queue", None)
0423 created_at_original = msg.get("created_at")
0424
0425
0426
0427
0428
0429 stop_transformer_msg = {
0430 "msg_type": "stop_transformer",
0431 "run_id": run_id,
0432 "created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
0433 "content": {"task_id": task_id, "requested_at": created_at_original},
0434 }
0435
0436 stop_header = {
0437 "persistent": "true",
0438 "ttl": timetolive,
0439 "vo": "eic",
0440 "msg_type": "stop_transformer",
0441 "run_id": str(run_id),
0442 }
0443
0444 if transformer_broadcaster:
0445 transformer_broadcaster.publish(stop_transformer_msg, headers=stop_header)
0446 if logger:
0447 logger.info(
0448 f"Sent stop_transformer broadcast for run_id={run_id}, task_id={task_id}"
0449 )
0450
0451
0452 stop_worker_msg = {
0453 "msg_type": "adjuster_worker",
0454 "run_id": run_id,
0455 "created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
0456 "content": {
0457 "num_workers": 0,
0458 "num_cores_per_worker": 0,
0459 "num_ram_per_core": 0,
0460 "panda_queue": panda_queue,
0461 "requested_at": created_at_original,
0462 },
0463 }
0464
0465 worker_header = {
0466 "persistent": "true",
0467 "ttl": timetolive,
0468 "vo": "eic",
0469 "msg_type": "adjuster_worker",
0470 "run_id": str(run_id),
0471 }
0472
0473 if harvester_publisher:
0474 harvester_publisher.publish(stop_worker_msg, headers=worker_header)
0475 if logger:
0476 logger.info(
0477 f"Sent adjuster_worker (stop) message for run_id={run_id}, task_id={task_id}"
0478 )
0479
0480
0481 def worker_handler(header, msg, task_id=None, handler_kwargs={}, logger=None):
0482 """
0483 Handle worker-related messages based on prompt.md specifications.
0484
0485 Supported message types:
0486 - run_imminent: Create workflow task and start workers
0487 - run_end/run_stop: Stop workers and transformers
0488 - transformer_heartbeat: Track transformer health
0489
0490 :param header: Message header (should contain 'run_id')
0491 :param msg: Message content with format:
0492 {
0493 'msg_type': '<type>',
0494 'run_id': 20250914185722,
0495 'created_at': datetime,
0496 'content': {...}
0497 }
0498 :param task_id: Task ID (may be None for run_imminent)
0499 :param handler_kwargs: Dictionary of handler keyword arguments
0500 :return: Dictionary with task_id if created
0501 """
0502 ret = {}
0503 msg_type = msg.get("msg_type")
0504 run_id = msg.get("run_id")
0505
0506 timetolive = 12 * 3600 * 1000
0507 panda_attributes = {}
0508
0509 transformer_broadcaster = handler_kwargs.get("transformer_broadcaster", None)
0510 worker_publisher = handler_kwargs.get("worker_publisher", None)
0511 timetolive = handler_kwargs.get("timetolive", timetolive)
0512 panda_attributes = handler_kwargs.get("panda_attributes", panda_attributes)
0513
0514 try:
0515 if msg_type == "run_imminent":
0516
0517 request_id, transform_id, processing_id, input_coll_id, output_coll_id, workload_id = (
0518 create_workflow_task(msg, panda_attributes=panda_attributes)
0519 )
0520 task_id = workload_id
0521 create_harvester_worker(
0522 header,
0523 msg,
0524 task_id,
0525 worker_publisher,
0526 timetolive=timetolive,
0527 panda_attributes=panda_attributes,
0528 logger=logger,
0529 )
0530 ret["task_id"] = task_id
0531 if logger:
0532 logger.info(f"Handled run_imminent: run_id={run_id}, task_id={task_id}")
0533
0534 elif msg_type in ["run_end", "run_stop", "end_run"]:
0535
0536
0537
0538 stop_harvester_worker(
0539 header,
0540 msg,
0541 task_id,
0542 worker_publisher,
0543 transformer_broadcaster,
0544 timetolive=timetolive,
0545 panda_attributes=panda_attributes,
0546 logger=logger,
0547 )
0548 if logger:
0549 logger.info(f"Handled {msg_type}: run_id={run_id}, task_id={task_id}")
0550
0551 elif msg_type == "transformer_heartbeat":
0552 transformer_id = msg.get("content", {}).get("id")
0553 hostname = msg.get("content", {}).get("hostname")
0554 if logger:
0555 logger.info(
0556 f"Transformer heartbeat: run_id={run_id}, transformer_id={transformer_id}, hostname={hostname}"
0557 )
0558
0559 else:
0560 if logger:
0561 logger.warning(
0562 f"Unknown message type in worker_handler: {msg_type}, run_id={run_id}"
0563 )
0564
0565 except Exception as ex:
0566 if logger:
0567 logger.error(
0568 f"Error in worker_handler for msg_type={msg_type}, run_id={run_id}: {ex}",
0569 exc_info=True,
0570 )
0571
0572 return ret