File indexing completed on 2026-04-10 08:39:00
0001 import datetime
0002 import gc
0003 import gzip
0004 import json
0005 import os
0006 import re
0007 import struct
0008 import sys
0009 import traceback
0010 import uuid
0011 import zlib
0012 from pathlib import Path
0013 from typing import Dict, List
0014
0015 from pandacommon.pandalogger.LogWrapper import LogWrapper
0016 from pandacommon.pandalogger.PandaLogger import PandaLogger
0017 from pandacommon.pandautils.PandaUtils import naive_utcnow
0018 from werkzeug.datastructures import FileStorage
0019
0020 from pandaserver.api.v1.common import (
0021 generate_response,
0022 get_dn,
0023 get_endpoint,
0024 has_production_role,
0025 request_validation,
0026 )
0027 from pandaserver.config import panda_config
0028 from pandaserver.jobdispatcher import Protocol
0029 from pandaserver.srvcore import CoreUtils
0030 from pandaserver.srvcore.panda_request import PandaRequest
0031 from pandaserver.taskbuffer.TaskBuffer import TaskBuffer
0032 from pandaserver.userinterface import Client
0033
0034 _logger = PandaLogger().getLogger("api_file_server")
0035
0036
0037 IGNORED_SUFFIX = [".out"]
0038
0039
0040 MB = 1024 * 1024
0041 EVENT_PICKING_LIMIT = 10 * MB
0042 LOG_LIMIT = 100 * MB
0043 CHECKPOINT_LIMIT = 500 * MB
0044 SANDBOX_NO_BUILD_LIMIT = 10 * MB
0045 SANDBOX_LIMIT = 768 * MB
0046
0047
0048 ERROR_NOT_SECURE = "ERROR : no HTTPS"
0049 ERROR_LIMITED_PROXY = "ERROR: rejected due to the usage of limited proxy"
0050 ERROR_OVERWRITE = "ERROR: cannot overwrite file"
0051 ERROR_WRITE = "ERROR: cannot write file"
0052 ERROR_SIZE_LIMIT = "ERROR: upload failure. Exceeded size limit"
0053
0054 global_task_buffer = None
0055
0056
0057 def init_task_buffer(task_buffer: TaskBuffer) -> None:
0058 """
0059 Initialize the task buffer. This method needs to be called before any other method with DB access in this module.
0060 """
0061 global global_task_buffer
0062 global_task_buffer = task_buffer
0063
0064
0065 def _get_content_length(req: PandaRequest, tmp_logger: LogWrapper) -> int:
0066 """
0067 Get the content length of the request.
0068
0069 Args:
0070 req(PandaRequest): internally generated request object containing the env variables
0071 tmp_logger(LogWrapper): logger object of the calling function.
0072
0073 Returns:
0074 int: content length of the request.
0075 """
0076 content_length = 0
0077 try:
0078 content_length = int(req.headers_in["content-length"])
0079 except Exception:
0080 if "content-length" in req.headers_in:
0081 tmp_logger.error(f"Cannot get content_length: {req.headers_in['content-length']}")
0082 else:
0083 tmp_logger.error("No content_length for {method_name}")
0084
0085 tmp_logger.debug(f"Size: {content_length}")
0086 return content_length
0087
0088
0089 @request_validation(_logger, secure=True, production=True, request_method="POST")
0090 def upload_jedi_log(req: PandaRequest, file: FileStorage) -> Dict:
0091 """
0092 Upload a JEDI log file
0093
0094 Uploads a JEDI log file and returns the URL to the file. If there is already a log file for the task, it will be overwritten. Requires a secure connection and production role.
0095
0096 API details:
0097 HTTP Method: POST
0098 Path: /v1/file_server/upload_jedi_log
0099
0100 Args:
0101 req(PandaRequest): internally generated request object containing the env variables
0102 file(FileStorage): werkzeug.FileStorage object to be uploaded.
0103
0104 Returns:
0105 dict: The system response `{"success": success, "message": message, "data": data}`.
0106 When successful, the data field will contain the URL to the file. Otherwise the message field will indicate the issue.
0107 """
0108
0109 tmp_logger = LogWrapper(_logger, f"upload_jedi_log <{file.filename}>")
0110 tmp_logger.debug(f"start {req.subprocess_env['SSL_CLIENT_S_DN']}")
0111
0112
0113 content_length = _get_content_length(req, tmp_logger)
0114
0115
0116 if content_length > LOG_LIMIT:
0117 error_message = ERROR_SIZE_LIMIT
0118 tmp_logger.error(error_message)
0119 tmp_logger.debug("Done")
0120 return generate_response(False, error_message)
0121
0122 jedi_log_directory = "/jedilog"
0123 try:
0124 file_base_name = file.filename.split("/")[-1]
0125 full_path = f"{panda_config.cache_dir}{jedi_log_directory}/{file_base_name}"
0126
0127
0128 if os.path.exists(full_path):
0129 os.remove(full_path)
0130
0131
0132 with open(full_path, "wb") as file_object:
0133 file_content = file.read()
0134 file_object.write(file_content)
0135 tmp_logger.debug(f"written to {full_path}")
0136
0137
0138 if panda_config.disableHTTP:
0139 protocol = "https"
0140 else:
0141 protocol = "http"
0142 success, server = get_endpoint(protocol)
0143 if not success:
0144 error_message = f"cannot get endpoint: {server}"
0145 tmp_logger.error(error_message)
0146 tmp_logger.debug("Done")
0147 return generate_response(False, error_message)
0148
0149 file_url = f"{protocol}://{server}/cache{jedi_log_directory}/{file_base_name}"
0150 tmp_logger.debug("Done")
0151 return generate_response(True, data=file_url)
0152
0153 except Exception:
0154 error_type, error_value = sys.exc_info()[:2]
0155 error_message = f"failed to write log with {error_type.__name__}:{error_value}"
0156 tmp_logger.error(error_message)
0157 tmp_logger.debug("Done")
0158 return generate_response(False, error_message)
0159
0160
0161 @request_validation(_logger, secure=True, production=True, request_method="POST")
0162 def update_jedi_log(req: PandaRequest, file: FileStorage) -> Dict:
0163 """
0164 Update a JEDI log file
0165
0166 Updates a JEDI log file, appending more content at the end of the file. Requires a secure connection and production role.
0167
0168 API details:
0169 HTTP Method: POST
0170 Path: /v1/file_server/update_jedi_log
0171
0172 Args:
0173 req(PandaRequest): internally generated request object containing the env variables
0174 file(FileStorage): werkzeug.FileStorage object to be updated.
0175
0176 Returns:
0177 dict: The system response `{"success": success, "message": message, "data": data}`. When unsuccessful, the message field will indicate the issue.
0178 """
0179
0180 tmp_logger = LogWrapper(_logger, f"update_jedi_log < {file.filename} >")
0181 tmp_logger.debug("Start")
0182
0183
0184 try:
0185
0186 new_content = zlib.decompress(file.read())
0187
0188
0189 log_name = f"{panda_config.cache_dir}/{file.filename.split('/')[-1]}"
0190
0191
0192 with open(log_name, "a") as file_object:
0193 file_object.write(new_content)
0194
0195 except Exception:
0196 error_type, error_value, _ = sys.exc_info()
0197 tmp_logger.error(f"{error_type} {error_value}")
0198 return generate_response(False, f"ERROR: cannot update file with {error_type} {error_value}")
0199
0200 tmp_logger.debug("Done")
0201 return generate_response(True)
0202
0203
0204 @request_validation(_logger, request_method="GET")
0205 def download_jedi_log(req: PandaRequest, log_name: str, offset: int = 0) -> str:
0206 """
0207 Download JEDI log file
0208
0209 Downloads the JEDI log file, if required at a particular offset.
0210
0211 API details:
0212 HTTP Method: POST
0213 Path: /v1/file_server/download_jedi_log
0214
0215 Args:
0216 req(PandaRequest): internally generated request object containing the env variables
0217 log_name(string): log file name
0218 offset(int): offset in the file
0219
0220 Returns:
0221 str: The content of the log file or an error message.
0222 """
0223
0224 tmp_logger = LogWrapper(_logger, f"download_jedi_log <{log_name}>")
0225 tmp_logger.debug(f"Start offset={offset}")
0226
0227
0228 return_string = " "
0229 try:
0230
0231 full_log_name = f"{panda_config.cache_dir}/{log_name.split('/')[-1]}"
0232
0233
0234 with open(full_log_name, "r") as file_object:
0235 file_object.seek(int(offset))
0236 return_string += file_object.read()
0237
0238 except Exception:
0239 error_type, error_value, _ = sys.exc_info()
0240 tmp_logger.error(f"Failed with: {error_type} {error_value}")
0241
0242 tmp_logger.debug(f"Read {len(return_string)} bytes")
0243 tmp_logger.debug("Done")
0244 return return_string
0245
0246
0247 @request_validation(_logger, secure=True, request_method="POST")
0248 def upload_cache_file(req: PandaRequest, file: FileStorage) -> Dict:
0249 """
0250 Upload a cache file
0251
0252 Uploads a file to the cache. When not touched, cache files are expired after some time.
0253 User caches will get registered in the PanDA database and will account towards user limits.
0254 PanDA log files will be stored in gzip format. Requires a secure connection.
0255
0256 API details:
0257 HTTP Method: POST
0258 Path: /v1/file_server/upload_cache_file
0259
0260 Args:
0261 req(PandaRequest): internally generated request object containing the env variables
0262 file(FileStorage): werkzeug.FileStorage object to be uploaded.
0263
0264 Returns:
0265 dict: The system response `{"success": success, "message": message, "data": data}`. When unsuccessful, the message field will indicate the issue.
0266 """
0267
0268 tmp_logger = LogWrapper(_logger, f"upload_cache_file-{naive_utcnow().isoformat('/')}")
0269 tmp_logger.debug(f"Start")
0270
0271
0272
0273 if not Protocol.isSecure(req):
0274 tmp_logger.error("No HTTPS. Triggering garbage collection...")
0275 gc.collect()
0276 tmp_logger.debug("Done")
0277 return generate_response(False, ERROR_NOT_SECURE)
0278
0279 if "/CN=limited proxy" in req.subprocess_env["SSL_CLIENT_S_DN"]:
0280 tmp_logger.error("Limited proxy is used. Triggering garbage collection...")
0281 gc.collect()
0282 tmp_logger.debug("Done")
0283 return generate_response(False, ERROR_LIMITED_PROXY)
0284
0285
0286 user_name = CoreUtils.clean_user_id(req.subprocess_env["SSL_CLIENT_S_DN"])
0287 tmp_logger.debug(f"user_name={user_name} file_path={file.filename}")
0288
0289
0290
0291 if any(file.filename.endswith(suffix) for suffix in IGNORED_SUFFIX):
0292 no_build = False
0293 size_limit = LOG_LIMIT
0294
0295 elif not file.filename.startswith("sources."):
0296 no_build = True
0297 size_limit = SANDBOX_NO_BUILD_LIMIT
0298
0299 else:
0300 no_build = False
0301 size_limit = SANDBOX_LIMIT
0302
0303
0304 content_length = _get_content_length(req, tmp_logger)
0305
0306
0307 if content_length > size_limit:
0308 error_message = f"{ERROR_SIZE_LIMIT} {content_length // MB} MB > {size_limit // MB} MB."
0309 if no_build:
0310 error_message += " Please submit the job without --noBuild/--libDS since those options impose a tighter size limit"
0311 else:
0312 error_message += " Please remove redundant files from your work area"
0313 tmp_logger.error(error_message)
0314 tmp_logger.debug("Triggering garbage collection...")
0315 gc.collect()
0316 tmp_logger.debug("Done")
0317 return generate_response(False, error_message)
0318
0319
0320 try:
0321 file_name = file.filename.split("/")[-1]
0322 full_path = f"{panda_config.cache_dir}/{file_name}"
0323
0324
0325 if os.path.exists(full_path) and file.filename.split(".")[-1] != "__ow__":
0326
0327 os.utime(full_path, None)
0328
0329 error_message = ERROR_OVERWRITE
0330 tmp_logger.debug(f"{ERROR_OVERWRITE} {file_name}")
0331 tmp_logger.debug("end")
0332 return generate_response(False, error_message)
0333
0334
0335 with open(full_path, "wb") as file_object:
0336 file_content = file.read()
0337 if hasattr(panda_config, "compress_file_names") and [
0338 True for patt in panda_config.compress_file_names.split(",") if re.search(patt, file_name) is not None
0339 ]:
0340 file_content = gzip.compress(file_content)
0341 file_object.write(file_content)
0342
0343 except Exception:
0344 error_message = ERROR_WRITE
0345 tmp_logger.error(error_message)
0346 tmp_logger.debug("Triggering garbage collection...")
0347 gc.collect()
0348 tmp_logger.debug("Done")
0349 return generate_response(False, error_message)
0350
0351
0352 try:
0353
0354 footer = file_content[-8:]
0355 checksum, _ = struct.unpack("II", footer)
0356 checksum = str(checksum)
0357 tmp_logger.debug(f"CRC from gzip Footer {checksum}")
0358 except Exception:
0359
0360 checksum = None
0361 tmp_logger.debug(f"No CRC calculated {checksum}")
0362
0363
0364 file_size = len(file_content)
0365
0366
0367 tmp_logger.debug(f"Written dn={user_name}, file={full_path}, size={file_size // MB} MB, crc={checksum}")
0368
0369
0370 if panda_config.record_sandbox_info:
0371
0372 to_insert = True
0373 for patt in IGNORED_SUFFIX:
0374 if file.filename.endswith(patt):
0375 to_insert = False
0376 break
0377 if not to_insert:
0378 tmp_logger.debug("skipped to insert to DB")
0379 else:
0380 status_client, output_client = Client.register_cache_file(user_name, file.filename, file_size, checksum)
0381 if status_client != 0:
0382 error_message = f"ERROR : failed to register sandbox to DB with {status_client} {output_client}"
0383 tmp_logger.error(error_message)
0384 tmp_logger.debug("Done")
0385 return generate_response(False, error_message)
0386
0387 success = output_client["success"]
0388 message = output_client["message"]
0389 if not success:
0390 error_message = f"ERROR : failed to register sandbox to DB with {message}"
0391 tmp_logger.error(error_message)
0392 tmp_logger.debug("Done")
0393 return generate_response(False, error_message)
0394
0395 tmp_logger.debug(f"Registered file in database with: {output_client}")
0396
0397 tmp_logger.debug("Triggering garbage collection...")
0398 gc.collect()
0399 tmp_logger.debug("Done")
0400
0401 return generate_response(True)
0402
0403
0404 @request_validation(_logger, secure=True, request_method="POST")
0405 def touch_cache_file(req: PandaRequest, file_name: str) -> Dict:
0406 """
0407 Touch file in the cache directory.
0408
0409 Touches a file in the cache directory. It avoids the file to expire and being deleted by server clean up processes. Requires a secure connection.
0410
0411 API details:
0412 HTTP Method: POST
0413 Path: /v1/file_server/touch_cache_file
0414
0415 Args:
0416 req(PandaRequest): internally generated request object containing the env variables
0417 file_name(string): file name to be deleted
0418
0419 Returns:
0420 dict: The system response `{"success": success, "message": message, "data": data}`. When unsuccessful, the message field will indicate the issue.
0421 """
0422
0423 tmp_logger = LogWrapper(_logger, f"touch_cache_file < {file_name} >")
0424 tmp_logger.debug(f"Start")
0425
0426 try:
0427 os.utime(f"{panda_config.cache_dir}/{file_name.split('/')[-1]}", None)
0428 tmp_logger.debug(f"Done")
0429 return generate_response(True)
0430 except Exception:
0431 error_type, error_value = sys.exc_info()[:2]
0432 message = f"Failed to touch file with: {error_type} {error_value}"
0433 _logger.error(message)
0434 return generate_response(False, message)
0435
0436
0437 @request_validation(_logger, secure=True, request_method="POST")
0438 def delete_cache_file(req: PandaRequest, file_name: str) -> Dict:
0439 """
0440 Delete cache file
0441
0442 Deletes a file from the cache directory. Currently a dummy method. Requires a secure connection.
0443
0444 API details:
0445 HTTP Method: POST
0446 Path: /v1/file_server/delete_cache_file
0447
0448 Args:
0449 req(PandaRequest): internally generated request object containing the env variables
0450 file_name(string): file name to be deleted
0451
0452 Returns:
0453 dict: The system response `{"success": success, "message": message, "data": data}`.
0454 """
0455
0456 tmp_logger = LogWrapper(_logger, f"delete_cache_file <{file_name}>")
0457 tmp_logger.debug(f"Start")
0458
0459 try:
0460
0461
0462 return generate_response(True)
0463 except Exception:
0464 return generate_response(False)
0465
0466
0467 @request_validation(_logger, secure=True, production=True, request_method="POST")
0468 def register_cache_file(req: PandaRequest, user_name: str, file_name: str, file_size: int, checksum: str) -> Dict:
0469 """
0470 Register cache file
0471
0472 Registers a file from the cache directory into the PanDA database, so that PanDA knows the server it's on. Requires a secure connection and production role.
0473
0474 API details:
0475 HTTP Method: POST
0476 Path: /v1/file_server/register_cache_file
0477
0478 Args:
0479 req(PandaRequest): internally generated request object containing the env variables
0480 user_name(string): user that uploaded the file
0481 file_name(string): file name
0482 file_size(int): file size
0483 checksum(string): checksum
0484
0485 Returns:
0486 dict: The system response `{"success": success, "message": message, "data": data}`. When unsuccessful, the message field will indicate the issue.
0487 """
0488
0489 tmp_logger = LogWrapper(_logger, f"register_cache_file {user_name} {file_name}")
0490 tmp_logger.debug("Start")
0491
0492
0493
0494 if hasattr(panda_config, "sandboxHostname") and panda_config.sandboxHostname:
0495 host_name = panda_config.sandboxHostname
0496 else:
0497 host_name = req.get_remote_host()
0498
0499 message = global_task_buffer.insertSandboxFileInfo(user_name, host_name, file_name, file_size, checksum)
0500 if message != "OK":
0501 tmp_logger.debug("Done")
0502 return generate_response(False, message)
0503
0504 tmp_logger.debug("Done")
0505 return generate_response(True)
0506
0507
0508 @request_validation(_logger, secure=True, request_method="POST")
0509 def validate_cache_file(req: PandaRequest, file_size: int, checksum: int | str) -> Dict:
0510 """
0511 Validate cache file
0512
0513 Validates a cache file owned by the caller by checking the file metadata that was registered in the database. Requires a secure connection.
0514
0515 API details:
0516 HTTP Method: POST
0517 Path: /v1/file_server/validate_cache_file
0518
0519 Args:
0520 req(PandaRequest): internally generated request object containing the env variables
0521 file_size(int): file size
0522 checksum(int): checksum
0523
0524 Returns:
0525 dict: The system response `{"success": success, "message": message, "data": data}`. When successful the message will return the host and file name.
0526 When unsuccessful, the message field will indicate the issue.
0527 """
0528 user = get_dn(req)
0529 message = global_task_buffer.checkSandboxFile(user, file_size, checksum)
0530
0531
0532 if message and not message.startswith("FOUND"):
0533 return generate_response(False, message)
0534
0535
0536 return generate_response(True, message)
0537
0538
0539 def _get_checkpoint_filename(task_id: str, sub_id: str) -> Dict:
0540 """
0541 Get the checkpoint file name.
0542
0543 Args:
0544 task_id(string): task ID.
0545 sub_id(string): sub ID.
0546
0547 Returns:
0548 string: checkpoint file name.
0549 """
0550 return f"hpo_cp_{task_id}_{sub_id}"
0551
0552
0553 @request_validation(_logger, secure=True, request_method="POST")
0554 def upload_hpo_checkpoint(req: PandaRequest, file: FileStorage) -> Dict:
0555 """
0556 Upload a HPO checkpoint file
0557
0558 Uploads a HPO checkpoint file to the server. Requires a secure connection.
0559
0560 API details:
0561 HTTP Method: POST
0562 Path: /v1/file_server/upload_hpo_checkpoint
0563
0564 Args:
0565 req(PandaRequest): internally generated request object containing the env variables
0566 file(FileStorage): werkzeug.FileStorage object to be uploaded.
0567
0568 Returns:
0569 dict: The system response `{"success": success, "message": message, "data": data}`. When unsuccessful, the message field will indicate the issue.
0570 """
0571
0572 tmp_logger = LogWrapper(_logger, f"upload_hpo_checkpoint <jediTaskID_subID={file.filename}>")
0573
0574 tmp_logger.debug(f"Start {req.subprocess_env['SSL_CLIENT_S_DN']}")
0575
0576
0577 try:
0578 task_id, sub_id = file.filename.split("/")[-1].split("_")
0579 except Exception:
0580 error_message = "Failed to extract task and sub IDs"
0581 tmp_logger.error(error_message)
0582 return generate_response(False, error_message)
0583
0584
0585 content_length = _get_content_length(req, tmp_logger)
0586 if not content_length:
0587 error_message = f"Cannot get content-length"
0588 tmp_logger.error(error_message)
0589 return generate_response(False, error_message)
0590
0591
0592 if content_length > CHECKPOINT_LIMIT:
0593 error_message = f"Exceeded size limit {content_length}>{CHECKPOINT_LIMIT}"
0594 tmp_logger.error(error_message)
0595 return generate_response(False, error_message)
0596
0597
0598 try:
0599 full_path = os.path.join(panda_config.cache_dir, _get_checkpoint_filename(task_id, sub_id))
0600
0601 with open(full_path, "wb") as file_object:
0602 file_object.write(file.read())
0603 except Exception as exc:
0604 error_message = f"Cannot write file due to {str(exc)}"
0605 tmp_logger.error(error_message)
0606 return generate_response(False, error_message)
0607
0608 success_message = f"Successfully placed at {full_path}"
0609 tmp_logger.debug(success_message)
0610 tmp_logger.debug("Done")
0611 return generate_response(True, message=success_message, data=full_path)
0612
0613
0614 @request_validation(_logger, secure=True, request_method="POST")
0615 def delete_hpo_checkpoint(req: PandaRequest, task_id: str, sub_id: str) -> Dict:
0616 """
0617 Delete a HPO checkpoint file.
0618
0619 Deletes a HPO checkpoint file from the server. Requires a secure connection.
0620
0621 API details:
0622 HTTP Method: POST
0623 Path: /v1/file_server/delete_hpo_checkpoint
0624
0625 Args:
0626 req(PandaRequest): internally generated request object containing the env variables
0627 task_id(string): JEDI task ID
0628 sub_id(string): sub ID.
0629
0630 Returns:
0631 dict: The system response `{"success": success, "message": message, "data": data}`. When unsuccessful, the message field will indicate the issue.
0632 """
0633
0634 tmp_logger = LogWrapper(_logger, f"delete_hpo_checkpoint <jediTaskID={task_id} ID={sub_id}>")
0635
0636 tmp_logger.debug(f"Start {req.subprocess_env['SSL_CLIENT_S_DN']}")
0637
0638 try:
0639 full_path = os.path.join(panda_config.cache_dir, _get_checkpoint_filename(task_id, sub_id))
0640 os.remove(full_path)
0641 tmp_logger.debug("Done")
0642 return generate_response(True)
0643 except Exception as exc:
0644 message = f"Failed to delete file due to {str(exc)}"
0645 tmp_logger.error(message)
0646 return generate_response(False, message=message)
0647
0648
0649 @request_validation(_logger, secure=True, request_method="POST")
0650 def upload_file_recovery_request(
0651 req: PandaRequest,
0652 task_id: int = None,
0653 dry_run: bool = None,
0654 dataset: str = None,
0655 files: List[str] = None,
0656 no_child_retry: bool = False,
0657 resurrect_datasets: bool = False,
0658 force: bool = False,
0659 reproduce_parent: bool = False,
0660 reproduce_upto_nth_gen: int = 0,
0661 ) -> Dict:
0662 """
0663 Upload file recovery request
0664
0665 Upload request to recover lost files. Either task_id or dataset needs to be specified. Requires a secure connection.
0666
0667 API details:
0668 HTTP Method: POST
0669 Path: /v1/file_server/upload_file_recovery_request
0670
0671 Args:
0672 req(PandaRequest): internally generated request object containing the env variables
0673 task_id(int, optional): JEDI task ID. Either task_id or dataset must be provided.
0674 dry_run(bool, optional): dry run flag.
0675 dataset(string, optional): the dataset name in which to recover files. Either task_id or dataset must be provided.
0676 files(list of str, optional): list of file names to recover.
0677 no_child_retry(bool, optional): flag to avoid retrying child tasks. Default is False.
0678 resurrect_datasets(bool, optional): Specifies whether to resurrect datasets when they were already deleted. Default is False.
0679 force(bool, optional): To force recovery even if there is no lost file. Default is False.
0680 reproduce_parent(bool, optional): Specifies whether to reproduce the parent task if the input files that originally generated the lost files have been deleted. Default: False.
0681 reproduce_upto_nth_gen(int, optional): Defines how many generations of parent tasks should be reproduced. Default 0, meaning no parent tasks are reproduced. When this is set to N>0, reproduce_parent is set to True automatically.
0682
0683 Returns:
0684 dict: The system response `{"success": success, "message": message, "data": data}`. When unsuccessful, the message field will indicate the issue.
0685 """
0686
0687 user_name = req.subprocess_env["SSL_CLIENT_S_DN"]
0688 creation_time = naive_utcnow().strftime("%Y-%m-%d %H:%M:%S")
0689
0690 tmp_logger = LogWrapper(_logger, f"put_file_recovery_request < task_id={task_id} >")
0691 tmp_logger.debug(f"Start user={user_name}")
0692
0693 try:
0694
0695 if not task_id and not dataset:
0696 error_message = "Either task_id or dataset must be provided"
0697 tmp_logger.error(error_message)
0698 return generate_response(False, error_message)
0699
0700
0701 file_name = f"{panda_config.cache_dir}/recov.{str(uuid.uuid4())}"
0702 log_filename = f"jedilog/lost_file_recovery.{str(uuid.uuid4())}.log"
0703 tmp_logger.debug(f"file={file_name}")
0704
0705
0706 is_production_manager = has_production_role(req)
0707
0708
0709
0710 with open(file_name, "w") as file_object:
0711 data = {
0712 "userName": user_name,
0713 "creationTime": creation_time,
0714 "logFilename": log_filename,
0715 "isProductionManager": is_production_manager,
0716 }
0717 if task_id:
0718 data["jediTaskID"] = task_id
0719 if dry_run:
0720 data["dryRun"] = True
0721 if dataset:
0722 data["ds"] = dataset
0723 if files:
0724 data["files"] = ",".join(files)
0725 if no_child_retry:
0726 data["noChildRetry"] = True
0727 if resurrect_datasets:
0728 data["resurrectDS"] = True
0729 if force:
0730 data["force"] = True
0731 if reproduce_parent:
0732 data["reproduceParent"] = True
0733 if reproduce_upto_nth_gen > 0:
0734 data["reproduceUptoNthGen"] = reproduce_upto_nth_gen
0735
0736 json.dump(data, file_object)
0737 except Exception as exc:
0738 error_message = f"cannot put request due to {str(exc)} "
0739 tmp_logger.error(error_message + traceback.format_exc())
0740 return generate_response(False, error_message)
0741
0742
0743 Path(os.path.join(panda_config.cache_dir, log_filename)).touch()
0744
0745
0746 protocol = "https" if panda_config.disableHTTP else "http"
0747 _, server = get_endpoint(protocol)
0748 log_file_url = f"{protocol}://{server}/cache/{log_filename}"
0749
0750 tmp_logger.debug("done")
0751 data = {"logFileURL": log_file_url}
0752 return generate_response(True, message="The request was accepted and will be processed in a few minutes", data=data)
0753
0754
0755 @request_validation(_logger, secure=True, request_method="POST")
0756 def upload_workflow_request(req: PandaRequest, data: str, dry_run: bool = False, sync: bool = False) -> Dict:
0757 """
0758 Upload workflow request to the server.
0759
0760 Uploads a workflow request to the server. The request can be processed synchronously or asynchronously. Requires a secure connection.
0761
0762 API details:
0763 HTTP Method: POST
0764 Path: /v1/file_server/upload_workflow_request
0765
0766 Args:
0767 req(PandaRequest): internally generated request object containing the env variables
0768 data(string): workflow request data
0769 dry_run(bool): requests the workflow to be executed synchronously in dry_run mode
0770 sync(bool): requests the workflow to be processed synchronously
0771 Returns:
0772 dict: The system response `{"success": success, "message": message, "data": data}`. When unsuccessful, the message field will indicate the issue.
0773 When the request asked to process the workflow synchronously or with the check file, the data field will contain the response.
0774
0775 """
0776
0777 user_name = req.subprocess_env["SSL_CLIENT_S_DN"]
0778 creation_time = naive_utcnow().strftime("%Y-%m-%d %H:%M:%S")
0779
0780 tmp_logger = LogWrapper(_logger, "upload_workflow_request")
0781
0782 tmp_logger.debug(f"Start user={user_name} dry_run={dry_run}")
0783
0784 try:
0785
0786 file_name = f"{panda_config.cache_dir}/workflow.{str(uuid.uuid4())}"
0787 tmp_logger.debug(f"file={file_name}")
0788
0789
0790 with open(file_name, "w") as file_object:
0791 data_dict = {
0792 "userName": user_name,
0793 "creationTime": creation_time,
0794 "data": json.loads(data),
0795 }
0796 json.dump(data_dict, file_object)
0797
0798
0799 if sync or dry_run:
0800 tmp_logger.debug("Starting synchronous processing of the workflow")
0801 from pandaserver.taskbuffer.workflow_processor import WorkflowProcessor
0802
0803 processor = WorkflowProcessor(log_stream=_logger)
0804 if dry_run:
0805 ret = processor.process(file_name, True, True, True, True)
0806 else:
0807 ret = processor.process(file_name, True, False, True, False)
0808
0809
0810 if os.path.exists(file_name):
0811 try:
0812 os.remove(file_name)
0813 except Exception:
0814 pass
0815 tmp_logger.debug("Done")
0816 return generate_response(True, data=ret)
0817
0818 except Exception as exc:
0819 error_message = f"Cannot upload the workflow request: {str(exc)} "
0820 tmp_logger.error(error_message + traceback.format_exc())
0821 return generate_response(False, error_message)
0822
0823
0824 tmp_logger.debug("Done")
0825 return generate_response(True, message="The request was accepted and will be processed in a few minutes")
0826
0827
0828 @request_validation(_logger, secure=True, request_method="POST")
0829 def upload_event_picking_request(
0830 req: PandaRequest,
0831 run_event_list: str = "",
0832 data_type: str = "",
0833 stream_name: str = "",
0834 dataset_name: str = "",
0835 ami_tag: str = "",
0836 user_dataset_name: str = "",
0837 locked_by: str = "",
0838 parameters: str = "",
0839 input_file_list: str = "",
0840 n_sites: str = "",
0841 user_task_name: str = "",
0842 ei_api: str = "",
0843 include_guids: bool = False,
0844 ) -> Dict:
0845 """
0846 Upload event picking request to the server.
0847
0848 Uploads an event picking request to the server. Requires a secure connection.
0849
0850 API details:
0851 HTTP Method: POST
0852 Path: /v1/file_server/upload_event_picking_request
0853
0854 Args:
0855 req(PandaRequest): internally generated request object containing the env variables
0856 run_event_list(string): run and event list.
0857 data_type(string): data type.
0858 stream_name(string): stream name.
0859 dataset_name(string): dataset name.
0860 ami_tag(string): AMI tag.
0861 user_dataset_name(string): user dataset name.
0862 locked_by(string): locking agent.
0863 parameters(string): parameters.
0864 input_file_list(string): input file list.
0865 n_sites(string): number of sites.
0866 user_task_name(string): user task name.
0867 ei_api(string): event index API.
0868 include_guids(bool): flag to indicate if GUIDs are included with the run-event list
0869
0870 Returns:
0871 dict: The system response `{"success": success, "message": message, "data": data}`. When unsuccessful, the message field will indicate the issue.
0872
0873 """
0874
0875 user_name = req.subprocess_env["SSL_CLIENT_S_DN"]
0876
0877 tmp_logger = LogWrapper(_logger, f"upload_event_picking_request-{naive_utcnow().isoformat('/')}")
0878 tmp_logger.debug(f"Start for {user_name}")
0879
0880 creation_time = naive_utcnow().strftime("%Y-%m-%d %H:%M:%S")
0881
0882
0883 content_length = _get_content_length(req, tmp_logger)
0884 if not content_length:
0885 error_message = "Cannot get content-length from HTTP request."
0886 tmp_logger.error(f"{error_message}")
0887 return generate_response(False, f"ERROR: {error_message}")
0888
0889 if content_length > EVENT_PICKING_LIMIT:
0890 error_message = f"Run/event list is too large. Exceeded size limit {content_length}>{EVENT_PICKING_LIMIT}."
0891 tmp_logger.error(f"{error_message} ")
0892 return generate_response(False, f"ERROR: {error_message}")
0893
0894 try:
0895
0896 file_name = f"{panda_config.cache_dir}/evp.{str(uuid.uuid4())}"
0897 tmp_logger.debug(f"file: {file_name}")
0898
0899
0900 file_content = (
0901 f"userName={user_name}\n"
0902 f"creationTime={creation_time}\n"
0903 f"eventPickDataType={data_type}\n"
0904 f"eventPickStreamName={stream_name}\n"
0905 f"eventPickDS={dataset_name}\n"
0906 f"eventPickAmiTag={ami_tag}\n"
0907 f"eventPickNumSites={n_sites}\n"
0908 f"userTaskName={user_task_name}\n"
0909 f"userDatasetName={user_dataset_name}\n"
0910 f"lockedBy={locked_by}\n"
0911 f"params={parameters}\n"
0912 f"inputFileList={input_file_list}\n"
0913 f"ei_api={ei_api}\n"
0914 )
0915
0916 with open(file_name, "w") as file_object:
0917 file_object.write(file_content)
0918 run_event_guid_map = {}
0919
0920 valid_entry_length = 3 if include_guids else 2
0921
0922 for tmp_line in run_event_list.split("\n"):
0923 tmp_items = tmp_line.split()
0924
0925
0926 if len(tmp_items) != valid_entry_length:
0927 continue
0928
0929 file_object.write(f"runEvent={tmp_items[0]},{tmp_items[1]}\n")
0930
0931 if include_guids:
0932 run_event_guid_map[(tmp_items[0], tmp_items[1])] = [tmp_items[2]]
0933
0934 file_object.write(f"runEvtGuidMap={str(run_event_guid_map)}\n")
0935
0936 except Exception as e:
0937 error_message = f"Cannot upload the Event Picking request: {str(e)}"
0938 tmp_logger.error(error_message + traceback.format_exc())
0939 return generate_response(False, f"ERROR: {error_message}")
0940
0941 tmp_logger.debug("Done")
0942 return generate_response(True)