Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:00

0001 import datetime
0002 import gc
0003 import gzip
0004 import json
0005 import os
0006 import re
0007 import struct
0008 import sys
0009 import traceback
0010 import uuid
0011 import zlib
0012 from pathlib import Path
0013 from typing import Dict, List
0014 
0015 from pandacommon.pandalogger.LogWrapper import LogWrapper
0016 from pandacommon.pandalogger.PandaLogger import PandaLogger
0017 from pandacommon.pandautils.PandaUtils import naive_utcnow
0018 from werkzeug.datastructures import FileStorage
0019 
0020 from pandaserver.api.v1.common import (
0021     generate_response,
0022     get_dn,
0023     get_endpoint,
0024     has_production_role,
0025     request_validation,
0026 )
0027 from pandaserver.config import panda_config
0028 from pandaserver.jobdispatcher import Protocol
0029 from pandaserver.srvcore import CoreUtils
0030 from pandaserver.srvcore.panda_request import PandaRequest
0031 from pandaserver.taskbuffer.TaskBuffer import TaskBuffer
0032 from pandaserver.userinterface import Client
0033 
0034 _logger = PandaLogger().getLogger("api_file_server")
0035 
0036 # Skip registration for files with these suffixes
0037 IGNORED_SUFFIX = [".out"]
0038 
0039 # File size limits
0040 MB = 1024 * 1024
0041 EVENT_PICKING_LIMIT = 10 * MB
0042 LOG_LIMIT = 100 * MB
0043 CHECKPOINT_LIMIT = 500 * MB
0044 SANDBOX_NO_BUILD_LIMIT = 10 * MB
0045 SANDBOX_LIMIT = 768 * MB
0046 
0047 # Error messages
0048 ERROR_NOT_SECURE = "ERROR : no HTTPS"
0049 ERROR_LIMITED_PROXY = "ERROR: rejected due to the usage of limited proxy"
0050 ERROR_OVERWRITE = "ERROR: cannot overwrite file"
0051 ERROR_WRITE = "ERROR: cannot write file"
0052 ERROR_SIZE_LIMIT = "ERROR: upload failure. Exceeded size limit"
0053 
0054 global_task_buffer = None
0055 
0056 
0057 def init_task_buffer(task_buffer: TaskBuffer) -> None:
0058     """
0059     Initialize the task buffer. This method needs to be called before any other method with DB access in this module.
0060     """
0061     global global_task_buffer
0062     global_task_buffer = task_buffer
0063 
0064 
0065 def _get_content_length(req: PandaRequest, tmp_logger: LogWrapper) -> int:
0066     """
0067     Get the content length of the request.
0068 
0069     Args:
0070         req(PandaRequest): internally generated request object containing the env variables
0071         tmp_logger(LogWrapper): logger object of the calling function.
0072 
0073     Returns:
0074         int: content length of the request.
0075     """
0076     content_length = 0
0077     try:
0078         content_length = int(req.headers_in["content-length"])
0079     except Exception:
0080         if "content-length" in req.headers_in:
0081             tmp_logger.error(f"Cannot get content_length: {req.headers_in['content-length']}")
0082         else:
0083             tmp_logger.error("No content_length for {method_name}")
0084 
0085     tmp_logger.debug(f"Size: {content_length}")
0086     return content_length
0087 
0088 
0089 @request_validation(_logger, secure=True, production=True, request_method="POST")
0090 def upload_jedi_log(req: PandaRequest, file: FileStorage) -> Dict:
0091     """
0092     Upload a JEDI log file
0093 
0094     Uploads a JEDI log file and returns the URL to the file. If there is already a log file for the task, it will be overwritten. Requires a secure connection and production role.
0095 
0096     API details:
0097         HTTP Method: POST
0098         Path: /v1/file_server/upload_jedi_log
0099 
0100     Args:
0101         req(PandaRequest): internally generated request object containing the env variables
0102         file(FileStorage): werkzeug.FileStorage object to be uploaded.
0103 
0104     Returns:
0105         dict: The system response `{"success": success, "message": message, "data": data}`.
0106               When successful, the data field will contain the URL to the file. Otherwise the message field will indicate the issue.
0107     """
0108 
0109     tmp_logger = LogWrapper(_logger, f"upload_jedi_log <{file.filename}>")
0110     tmp_logger.debug(f"start {req.subprocess_env['SSL_CLIENT_S_DN']}")
0111 
0112     # get file size
0113     content_length = _get_content_length(req, tmp_logger)
0114 
0115     # check against the size limit for logs
0116     if content_length > LOG_LIMIT:
0117         error_message = ERROR_SIZE_LIMIT
0118         tmp_logger.error(error_message)
0119         tmp_logger.debug("Done")
0120         return generate_response(False, error_message)
0121 
0122     jedi_log_directory = "/jedilog"
0123     try:
0124         file_base_name = file.filename.split("/")[-1]
0125         full_path = f"{panda_config.cache_dir}{jedi_log_directory}/{file_base_name}"
0126 
0127         # delete old file
0128         if os.path.exists(full_path):
0129             os.remove(full_path)
0130 
0131         # write the new file
0132         with open(full_path, "wb") as file_object:
0133             file_content = file.read()
0134             file_object.write(file_content)
0135         tmp_logger.debug(f"written to {full_path}")
0136 
0137         # return the URL depending on the protocol
0138         if panda_config.disableHTTP:
0139             protocol = "https"
0140         else:
0141             protocol = "http"
0142         success, server = get_endpoint(protocol)
0143         if not success:
0144             error_message = f"cannot get endpoint: {server}"
0145             tmp_logger.error(error_message)
0146             tmp_logger.debug("Done")
0147             return generate_response(False, error_message)
0148 
0149         file_url = f"{protocol}://{server}/cache{jedi_log_directory}/{file_base_name}"
0150         tmp_logger.debug("Done")
0151         return generate_response(True, data=file_url)
0152 
0153     except Exception:
0154         error_type, error_value = sys.exc_info()[:2]
0155         error_message = f"failed to write log with {error_type.__name__}:{error_value}"
0156         tmp_logger.error(error_message)
0157         tmp_logger.debug("Done")
0158         return generate_response(False, error_message)
0159 
0160 
0161 @request_validation(_logger, secure=True, production=True, request_method="POST")
0162 def update_jedi_log(req: PandaRequest, file: FileStorage) -> Dict:
0163     """
0164     Update a JEDI log file
0165 
0166     Updates a JEDI log file, appending more content at the end of the file. Requires a secure connection and production role.
0167 
0168     API details:
0169         HTTP Method: POST
0170         Path: /v1/file_server/update_jedi_log
0171 
0172     Args:
0173         req(PandaRequest): internally generated request object containing the env variables
0174         file(FileStorage): werkzeug.FileStorage object to be updated.
0175 
0176     Returns:
0177         dict: The system response `{"success": success, "message": message, "data": data}`. When unsuccessful, the message field will indicate the issue.
0178     """
0179 
0180     tmp_logger = LogWrapper(_logger, f"update_jedi_log < {file.filename} >")
0181     tmp_logger.debug("Start")
0182 
0183     # write to file
0184     try:
0185         # expand
0186         new_content = zlib.decompress(file.read())
0187 
0188         # stdout name
0189         log_name = f"{panda_config.cache_dir}/{file.filename.split('/')[-1]}"
0190 
0191         # append to file end
0192         with open(log_name, "a") as file_object:
0193             file_object.write(new_content)
0194 
0195     except Exception:
0196         error_type, error_value, _ = sys.exc_info()
0197         tmp_logger.error(f"{error_type} {error_value}")
0198         return generate_response(False, f"ERROR: cannot update file with {error_type} {error_value}")
0199 
0200     tmp_logger.debug("Done")
0201     return generate_response(True)
0202 
0203 
0204 @request_validation(_logger, request_method="GET")
0205 def download_jedi_log(req: PandaRequest, log_name: str, offset: int = 0) -> str:
0206     """
0207     Download JEDI log file
0208 
0209     Downloads the JEDI log file, if required at a particular offset.
0210 
0211     API details:
0212         HTTP Method: POST
0213         Path: /v1/file_server/download_jedi_log
0214 
0215     Args:
0216         req(PandaRequest): internally generated request object containing the env variables
0217         log_name(string): log file name
0218         offset(int): offset in the file
0219 
0220     Returns:
0221         str: The content of the log file or an error message.
0222     """
0223 
0224     tmp_logger = LogWrapper(_logger, f"download_jedi_log <{log_name}>")
0225     tmp_logger.debug(f"Start offset={offset}")
0226 
0227     # put dummy char to avoid Internal Server Error
0228     return_string = " "
0229     try:
0230         # stdout name
0231         full_log_name = f"{panda_config.cache_dir}/{log_name.split('/')[-1]}"
0232 
0233         # read at offset of the file
0234         with open(full_log_name, "r") as file_object:
0235             file_object.seek(int(offset))
0236             return_string += file_object.read()
0237 
0238     except Exception:
0239         error_type, error_value, _ = sys.exc_info()
0240         tmp_logger.error(f"Failed with: {error_type} {error_value}")
0241 
0242     tmp_logger.debug(f"Read {len(return_string)} bytes")
0243     tmp_logger.debug("Done")
0244     return return_string
0245 
0246 
0247 @request_validation(_logger, secure=True, request_method="POST")
0248 def upload_cache_file(req: PandaRequest, file: FileStorage) -> Dict:
0249     """
0250     Upload a cache file
0251 
0252     Uploads a file to the cache. When not touched, cache files are expired after some time.
0253     User caches will get registered in the PanDA database and will account towards user limits.
0254     PanDA log files will be stored in gzip format. Requires a secure connection.
0255 
0256     API details:
0257         HTTP Method: POST
0258         Path: /v1/file_server/upload_cache_file
0259 
0260     Args:
0261         req(PandaRequest): internally generated request object containing the env variables
0262         file(FileStorage): werkzeug.FileStorage object to be uploaded.
0263 
0264     Returns:
0265         dict: The system response `{"success": success, "message": message, "data": data}`. When unsuccessful, the message field will indicate the issue.
0266     """
0267 
0268     tmp_logger = LogWrapper(_logger, f"upload_cache_file-{naive_utcnow().isoformat('/')}")
0269     tmp_logger.debug(f"Start")
0270 
0271     # check if using secure connection and the proxy is not limited
0272     # we run these checks explicitly to trigger garbage collection
0273     if not Protocol.isSecure(req):
0274         tmp_logger.error("No HTTPS. Triggering garbage collection...")
0275         gc.collect()
0276         tmp_logger.debug("Done")
0277         return generate_response(False, ERROR_NOT_SECURE)
0278 
0279     if "/CN=limited proxy" in req.subprocess_env["SSL_CLIENT_S_DN"]:
0280         tmp_logger.error("Limited proxy is used. Triggering garbage collection...")
0281         gc.collect()
0282         tmp_logger.debug("Done")
0283         return generate_response(False, ERROR_LIMITED_PROXY)
0284 
0285     # user name
0286     user_name = CoreUtils.clean_user_id(req.subprocess_env["SSL_CLIENT_S_DN"])
0287     tmp_logger.debug(f"user_name={user_name} file_path={file.filename}")
0288 
0289     # get file size limit
0290     # log file
0291     if any(file.filename.endswith(suffix) for suffix in IGNORED_SUFFIX):
0292         no_build = False
0293         size_limit = LOG_LIMIT
0294     # no build case
0295     elif not file.filename.startswith("sources."):
0296         no_build = True
0297         size_limit = SANDBOX_NO_BUILD_LIMIT
0298     # general sandbox limit
0299     else:
0300         no_build = False
0301         size_limit = SANDBOX_LIMIT
0302 
0303     # get actual file size
0304     content_length = _get_content_length(req, tmp_logger)
0305 
0306     # check if we are above the size limit
0307     if content_length > size_limit:
0308         error_message = f"{ERROR_SIZE_LIMIT} {content_length // MB} MB > {size_limit // MB} MB."
0309         if no_build:
0310             error_message += " Please submit the job without --noBuild/--libDS since those options impose a tighter size limit"
0311         else:
0312             error_message += " Please remove redundant files from your work area"
0313         tmp_logger.error(error_message)
0314         tmp_logger.debug("Triggering garbage collection...")
0315         gc.collect()
0316         tmp_logger.debug("Done")
0317         return generate_response(False, error_message)
0318 
0319     # write to file
0320     try:
0321         file_name = file.filename.split("/")[-1]
0322         full_path = f"{panda_config.cache_dir}/{file_name}"
0323 
0324         # avoid overwriting
0325         if os.path.exists(full_path) and file.filename.split(".")[-1] != "__ow__":
0326             # touch
0327             os.utime(full_path, None)
0328             # send error message
0329             error_message = ERROR_OVERWRITE
0330             tmp_logger.debug(f"{ERROR_OVERWRITE} {file_name}")
0331             tmp_logger.debug("end")
0332             return generate_response(False, error_message)
0333 
0334         # write the file to the cache directory
0335         with open(full_path, "wb") as file_object:
0336             file_content = file.read()
0337             if hasattr(panda_config, "compress_file_names") and [
0338                 True for patt in panda_config.compress_file_names.split(",") if re.search(patt, file_name) is not None
0339             ]:
0340                 file_content = gzip.compress(file_content)
0341             file_object.write(file_content)
0342 
0343     except Exception:
0344         error_message = ERROR_WRITE
0345         tmp_logger.error(error_message)
0346         tmp_logger.debug("Triggering garbage collection...")
0347         gc.collect()
0348         tmp_logger.debug("Done")
0349         return generate_response(False, error_message)
0350 
0351     # calculate the checksum
0352     try:
0353         # decode Footer
0354         footer = file_content[-8:]
0355         checksum, _ = struct.unpack("II", footer)
0356         checksum = str(checksum)
0357         tmp_logger.debug(f"CRC from gzip Footer {checksum}")
0358     except Exception:
0359         # use None to avoid delay for now
0360         checksum = None
0361         tmp_logger.debug(f"No CRC calculated {checksum}")
0362 
0363     # calculate the file size
0364     file_size = len(file_content)
0365 
0366     # log the full file information
0367     tmp_logger.debug(f"Written dn={user_name}, file={full_path}, size={file_size // MB} MB, crc={checksum}")
0368 
0369     # record the file information to DB
0370     if panda_config.record_sandbox_info:
0371         # ignore some suffixes, e.g. out
0372         to_insert = True
0373         for patt in IGNORED_SUFFIX:
0374             if file.filename.endswith(patt):
0375                 to_insert = False
0376                 break
0377         if not to_insert:
0378             tmp_logger.debug("skipped to insert to DB")
0379         else:
0380             status_client, output_client = Client.register_cache_file(user_name, file.filename, file_size, checksum)
0381             if status_client != 0:
0382                 error_message = f"ERROR : failed to register sandbox to DB with {status_client} {output_client}"
0383                 tmp_logger.error(error_message)
0384                 tmp_logger.debug("Done")
0385                 return generate_response(False, error_message)
0386 
0387             success = output_client["success"]
0388             message = output_client["message"]
0389             if not success:
0390                 error_message = f"ERROR : failed to register sandbox to DB with {message}"
0391                 tmp_logger.error(error_message)
0392                 tmp_logger.debug("Done")
0393                 return generate_response(False, error_message)
0394 
0395             tmp_logger.debug(f"Registered file in database with: {output_client}")
0396 
0397     tmp_logger.debug("Triggering garbage collection...")
0398     gc.collect()
0399     tmp_logger.debug("Done")
0400 
0401     return generate_response(True)
0402 
0403 
0404 @request_validation(_logger, secure=True, request_method="POST")
0405 def touch_cache_file(req: PandaRequest, file_name: str) -> Dict:
0406     """
0407     Touch file in the cache directory.
0408 
0409     Touches a file in the cache directory. It avoids the file to expire and being deleted by server clean up processes. Requires a secure connection.
0410 
0411     API details:
0412         HTTP Method: POST
0413         Path: /v1/file_server/touch_cache_file
0414 
0415     Args:
0416         req(PandaRequest): internally generated request object containing the env variables
0417         file_name(string): file name to be deleted
0418 
0419     Returns:
0420         dict: The system response `{"success": success, "message": message, "data": data}`. When unsuccessful, the message field will indicate the issue.
0421     """
0422 
0423     tmp_logger = LogWrapper(_logger, f"touch_cache_file < {file_name} >")
0424     tmp_logger.debug(f"Start")
0425 
0426     try:
0427         os.utime(f"{panda_config.cache_dir}/{file_name.split('/')[-1]}", None)
0428         tmp_logger.debug(f"Done")
0429         return generate_response(True)
0430     except Exception:
0431         error_type, error_value = sys.exc_info()[:2]
0432         message = f"Failed to touch file with: {error_type} {error_value}"
0433         _logger.error(message)
0434         return generate_response(False, message)
0435 
0436 
0437 @request_validation(_logger, secure=True, request_method="POST")
0438 def delete_cache_file(req: PandaRequest, file_name: str) -> Dict:
0439     """
0440     Delete cache file
0441 
0442     Deletes a file from the cache directory. Currently a dummy method. Requires a secure connection.
0443 
0444     API details:
0445         HTTP Method: POST
0446         Path: /v1/file_server/delete_cache_file
0447 
0448     Args:
0449         req(PandaRequest): internally generated request object containing the env variables
0450         file_name(string): file name to be deleted
0451 
0452     Returns:
0453         dict: The system response `{"success": success, "message": message, "data": data}`.
0454     """
0455 
0456     tmp_logger = LogWrapper(_logger, f"delete_cache_file <{file_name}>")
0457     tmp_logger.debug(f"Start")
0458 
0459     try:
0460         # may be reused for re-brokerage
0461         # os.remove('%s/%s' % (panda_config.cache_dir, file_name.split('/')[-1]))
0462         return generate_response(True)
0463     except Exception:
0464         return generate_response(False)
0465 
0466 
0467 @request_validation(_logger, secure=True, production=True, request_method="POST")
0468 def register_cache_file(req: PandaRequest, user_name: str, file_name: str, file_size: int, checksum: str) -> Dict:
0469     """
0470     Register cache file
0471 
0472     Registers a file from the cache directory into the PanDA database, so that PanDA knows the server it's on. Requires a secure connection and production role.
0473 
0474     API details:
0475         HTTP Method: POST
0476         Path: /v1/file_server/register_cache_file
0477 
0478     Args:
0479         req(PandaRequest): internally generated request object containing the env variables
0480         user_name(string): user that uploaded the file
0481         file_name(string): file name
0482         file_size(int): file size
0483         checksum(string): checksum
0484 
0485     Returns:
0486         dict: The system response `{"success": success, "message": message, "data": data}`. When unsuccessful, the message field will indicate the issue.
0487     """
0488 
0489     tmp_logger = LogWrapper(_logger, f"register_cache_file {user_name} {file_name}")
0490     tmp_logger.debug("Start")
0491 
0492     # the files are on a particular server and not accessible through the LB endpoint
0493     # therefore we need to register a preconfigured hostname or hostname of the caller
0494     if hasattr(panda_config, "sandboxHostname") and panda_config.sandboxHostname:
0495         host_name = panda_config.sandboxHostname
0496     else:
0497         host_name = req.get_remote_host()
0498 
0499     message = global_task_buffer.insertSandboxFileInfo(user_name, host_name, file_name, file_size, checksum)
0500     if message != "OK":
0501         tmp_logger.debug("Done")
0502         return generate_response(False, message)
0503 
0504     tmp_logger.debug("Done")
0505     return generate_response(True)
0506 
0507 
0508 @request_validation(_logger, secure=True, request_method="POST")
0509 def validate_cache_file(req: PandaRequest, file_size: int, checksum: int | str) -> Dict:
0510     """
0511     Validate cache file
0512 
0513     Validates a cache file owned by the caller by checking the file metadata that was registered in the database. Requires a secure connection.
0514 
0515     API details:
0516         HTTP Method: POST
0517         Path: /v1/file_server/validate_cache_file
0518 
0519     Args:
0520         req(PandaRequest): internally generated request object containing the env variables
0521         file_size(int): file size
0522         checksum(int): checksum
0523 
0524     Returns:
0525         dict: The system response `{"success": success, "message": message, "data": data}`. When successful the message will return the host and file name.
0526               When unsuccessful, the message field will indicate the issue.
0527     """
0528     user = get_dn(req)
0529     message = global_task_buffer.checkSandboxFile(user, file_size, checksum)
0530 
0531     # The file was not found or there was an exception
0532     if message and not message.startswith("FOUND"):
0533         return generate_response(False, message)
0534 
0535     # The file was found
0536     return generate_response(True, message)
0537 
0538 
0539 def _get_checkpoint_filename(task_id: str, sub_id: str) -> Dict:
0540     """
0541     Get the checkpoint file name.
0542 
0543     Args:
0544         task_id(string): task ID.
0545         sub_id(string): sub ID.
0546 
0547     Returns:
0548         string: checkpoint file name.
0549     """
0550     return f"hpo_cp_{task_id}_{sub_id}"
0551 
0552 
0553 @request_validation(_logger, secure=True, request_method="POST")
0554 def upload_hpo_checkpoint(req: PandaRequest, file: FileStorage) -> Dict:
0555     """
0556     Upload a HPO checkpoint file
0557 
0558     Uploads a HPO checkpoint file to the server. Requires a secure connection.
0559 
0560     API details:
0561         HTTP Method: POST
0562         Path: /v1/file_server/upload_hpo_checkpoint
0563 
0564     Args:
0565         req(PandaRequest): internally generated request object containing the env variables
0566         file(FileStorage): werkzeug.FileStorage object to be uploaded.
0567 
0568     Returns:
0569         dict: The system response `{"success": success, "message": message, "data": data}`. When unsuccessful, the message field will indicate the issue.
0570     """
0571 
0572     tmp_logger = LogWrapper(_logger, f"upload_hpo_checkpoint <jediTaskID_subID={file.filename}>")
0573 
0574     tmp_logger.debug(f"Start {req.subprocess_env['SSL_CLIENT_S_DN']}")
0575 
0576     # extract task ID and sub ID
0577     try:
0578         task_id, sub_id = file.filename.split("/")[-1].split("_")
0579     except Exception:
0580         error_message = "Failed to extract task and sub IDs"
0581         tmp_logger.error(error_message)
0582         return generate_response(False, error_message)
0583 
0584     # get the file size
0585     content_length = _get_content_length(req, tmp_logger)
0586     if not content_length:
0587         error_message = f"Cannot get content-length"
0588         tmp_logger.error(error_message)
0589         return generate_response(False, error_message)
0590 
0591     # compare the size against the limit for checkpoints
0592     if content_length > CHECKPOINT_LIMIT:
0593         error_message = f"Exceeded size limit {content_length}>{CHECKPOINT_LIMIT}"
0594         tmp_logger.error(error_message)
0595         return generate_response(False, error_message)
0596 
0597     # write the file to the cache directory
0598     try:
0599         full_path = os.path.join(panda_config.cache_dir, _get_checkpoint_filename(task_id, sub_id))
0600         # write
0601         with open(full_path, "wb") as file_object:
0602             file_object.write(file.read())
0603     except Exception as exc:
0604         error_message = f"Cannot write file due to {str(exc)}"
0605         tmp_logger.error(error_message)
0606         return generate_response(False, error_message)
0607 
0608     success_message = f"Successfully placed at {full_path}"
0609     tmp_logger.debug(success_message)
0610     tmp_logger.debug("Done")
0611     return generate_response(True, message=success_message, data=full_path)
0612 
0613 
0614 @request_validation(_logger, secure=True, request_method="POST")
0615 def delete_hpo_checkpoint(req: PandaRequest, task_id: str, sub_id: str) -> Dict:
0616     """
0617     Delete a HPO checkpoint file.
0618 
0619     Deletes a HPO checkpoint file from the server. Requires a secure connection.
0620 
0621     API details:
0622         HTTP Method: POST
0623         Path: /v1/file_server/delete_hpo_checkpoint
0624 
0625     Args:
0626         req(PandaRequest): internally generated request object containing the env variables
0627         task_id(string): JEDI task ID
0628         sub_id(string): sub ID.
0629 
0630     Returns:
0631         dict: The system response `{"success": success, "message": message, "data": data}`. When unsuccessful, the message field will indicate the issue.
0632     """
0633 
0634     tmp_logger = LogWrapper(_logger, f"delete_hpo_checkpoint <jediTaskID={task_id} ID={sub_id}>")
0635 
0636     tmp_logger.debug(f"Start {req.subprocess_env['SSL_CLIENT_S_DN']}")
0637 
0638     try:
0639         full_path = os.path.join(panda_config.cache_dir, _get_checkpoint_filename(task_id, sub_id))
0640         os.remove(full_path)
0641         tmp_logger.debug("Done")
0642         return generate_response(True)
0643     except Exception as exc:
0644         message = f"Failed to delete file due to {str(exc)}"
0645         tmp_logger.error(message)
0646         return generate_response(False, message=message)
0647 
0648 
0649 @request_validation(_logger, secure=True, request_method="POST")
0650 def upload_file_recovery_request(
0651     req: PandaRequest,
0652     task_id: int = None,
0653     dry_run: bool = None,
0654     dataset: str = None,
0655     files: List[str] = None,
0656     no_child_retry: bool = False,
0657     resurrect_datasets: bool = False,
0658     force: bool = False,
0659     reproduce_parent: bool = False,
0660     reproduce_upto_nth_gen: int = 0,
0661 ) -> Dict:
0662     """
0663     Upload file recovery request
0664 
0665     Upload request to recover lost files. Either task_id or dataset needs to be specified. Requires a secure connection.
0666 
0667     API details:
0668         HTTP Method: POST
0669         Path: /v1/file_server/upload_file_recovery_request
0670 
0671     Args:
0672         req(PandaRequest): internally generated request object containing the env variables
0673         task_id(int, optional): JEDI task ID. Either task_id or dataset must be provided.
0674         dry_run(bool, optional): dry run flag.
0675         dataset(string, optional): the dataset name in which to recover files. Either task_id or dataset must be provided.
0676         files(list of str, optional): list of file names to recover.
0677         no_child_retry(bool, optional): flag to avoid retrying child tasks. Default is False.
0678         resurrect_datasets(bool, optional): Specifies whether to resurrect datasets when they were already deleted. Default is False.
0679         force(bool, optional): To force recovery even if there is no lost file. Default is False.
0680         reproduce_parent(bool, optional): Specifies whether to reproduce the parent task if the input files that originally generated the lost files have been deleted. Default: False.
0681         reproduce_upto_nth_gen(int, optional): Defines how many generations of parent tasks should be reproduced. Default 0, meaning no parent tasks are reproduced. When this is set to N>0, reproduce_parent is set to True automatically.
0682 
0683     Returns:
0684         dict: The system response `{"success": success, "message": message, "data": data}`. When unsuccessful, the message field will indicate the issue.
0685     """
0686 
0687     user_name = req.subprocess_env["SSL_CLIENT_S_DN"]
0688     creation_time = naive_utcnow().strftime("%Y-%m-%d %H:%M:%S")
0689 
0690     tmp_logger = LogWrapper(_logger, f"put_file_recovery_request < task_id={task_id} >")
0691     tmp_logger.debug(f"Start user={user_name}")
0692 
0693     try:
0694         # check that at least task_id or dataset is provided
0695         if not task_id and not dataset:
0696             error_message = "Either task_id or dataset must be provided"
0697             tmp_logger.error(error_message)
0698             return generate_response(False, error_message)
0699 
0700         # generate the filename
0701         file_name = f"{panda_config.cache_dir}/recov.{str(uuid.uuid4())}"
0702         log_filename = f"jedilog/lost_file_recovery.{str(uuid.uuid4())}.log"
0703         tmp_logger.debug(f"file={file_name}")
0704 
0705         # check if the user has production manager role
0706         is_production_manager = has_production_role(req)
0707 
0708         # write the file content
0709 
0710         with open(file_name, "w") as file_object:
0711             data = {
0712                 "userName": user_name,
0713                 "creationTime": creation_time,
0714                 "logFilename": log_filename,
0715                 "isProductionManager": is_production_manager,
0716             }
0717             if task_id:
0718                 data["jediTaskID"] = task_id
0719             if dry_run:
0720                 data["dryRun"] = True
0721             if dataset:
0722                 data["ds"] = dataset
0723             if files:
0724                 data["files"] = ",".join(files)
0725             if no_child_retry:
0726                 data["noChildRetry"] = True
0727             if resurrect_datasets:
0728                 data["resurrectDS"] = True
0729             if force:
0730                 data["force"] = True
0731             if reproduce_parent:
0732                 data["reproduceParent"] = True
0733             if reproduce_upto_nth_gen > 0:
0734                 data["reproduceUptoNthGen"] = reproduce_upto_nth_gen
0735 
0736             json.dump(data, file_object)
0737     except Exception as exc:
0738         error_message = f"cannot put request due to {str(exc)} "
0739         tmp_logger.error(error_message + traceback.format_exc())
0740         return generate_response(False, error_message)
0741 
0742     # create an empty log file to be filled later
0743     Path(os.path.join(panda_config.cache_dir, log_filename)).touch()
0744 
0745     # return the URL depending on the protocol
0746     protocol = "https" if panda_config.disableHTTP else "http"
0747     _, server = get_endpoint(protocol)
0748     log_file_url = f"{protocol}://{server}/cache/{log_filename}"
0749 
0750     tmp_logger.debug("done")
0751     data = {"logFileURL": log_file_url}
0752     return generate_response(True, message="The request was accepted and will be processed in a few minutes", data=data)
0753 
0754 
0755 @request_validation(_logger, secure=True, request_method="POST")
0756 def upload_workflow_request(req: PandaRequest, data: str, dry_run: bool = False, sync: bool = False) -> Dict:
0757     """
0758     Upload workflow request to the server.
0759 
0760     Uploads a workflow request to the server. The request can be processed synchronously or asynchronously. Requires a secure connection.
0761 
0762     API details:
0763         HTTP Method: POST
0764         Path: /v1/file_server/upload_workflow_request
0765 
0766     Args:
0767         req(PandaRequest): internally generated request object containing the env variables
0768         data(string): workflow request data
0769         dry_run(bool): requests the workflow to be executed synchronously in dry_run mode
0770         sync(bool): requests the workflow to be processed synchronously
0771     Returns:
0772         dict: The system response `{"success": success, "message": message, "data": data}`. When unsuccessful, the message field will indicate the issue.
0773               When the request asked to process the workflow synchronously or with the check file, the data field will contain the response.
0774 
0775     """
0776 
0777     user_name = req.subprocess_env["SSL_CLIENT_S_DN"]
0778     creation_time = naive_utcnow().strftime("%Y-%m-%d %H:%M:%S")
0779 
0780     tmp_logger = LogWrapper(_logger, "upload_workflow_request")
0781 
0782     tmp_logger.debug(f"Start user={user_name} dry_run={dry_run}")
0783 
0784     try:
0785         # Generate the filename
0786         file_name = f"{panda_config.cache_dir}/workflow.{str(uuid.uuid4())}"
0787         tmp_logger.debug(f"file={file_name}")
0788 
0789         # Write out the workflow request
0790         with open(file_name, "w") as file_object:
0791             data_dict = {
0792                 "userName": user_name,
0793                 "creationTime": creation_time,
0794                 "data": json.loads(data),
0795             }
0796             json.dump(data_dict, file_object)
0797 
0798         # Submitter requested synchronous processing
0799         if sync or dry_run:
0800             tmp_logger.debug("Starting synchronous processing of the workflow")
0801             from pandaserver.taskbuffer.workflow_processor import WorkflowProcessor
0802 
0803             processor = WorkflowProcessor(log_stream=_logger)
0804             if dry_run:
0805                 ret = processor.process(file_name, True, True, True, True)
0806             else:
0807                 ret = processor.process(file_name, True, False, True, False)
0808 
0809             # Delete the file to prevent it being processed again
0810             if os.path.exists(file_name):
0811                 try:
0812                     os.remove(file_name)
0813                 except Exception:
0814                     pass
0815             tmp_logger.debug("Done")
0816             return generate_response(True, data=ret)
0817 
0818     except Exception as exc:
0819         error_message = f"Cannot upload the workflow request: {str(exc)} "
0820         tmp_logger.error(error_message + traceback.format_exc())
0821         return generate_response(False, error_message)
0822 
0823     # Submitter did not request synchronous processing
0824     tmp_logger.debug("Done")
0825     return generate_response(True, message="The request was accepted and will be processed in a few minutes")
0826 
0827 
0828 @request_validation(_logger, secure=True, request_method="POST")
0829 def upload_event_picking_request(
0830     req: PandaRequest,
0831     run_event_list: str = "",
0832     data_type: str = "",
0833     stream_name: str = "",
0834     dataset_name: str = "",
0835     ami_tag: str = "",
0836     user_dataset_name: str = "",
0837     locked_by: str = "",
0838     parameters: str = "",
0839     input_file_list: str = "",
0840     n_sites: str = "",
0841     user_task_name: str = "",
0842     ei_api: str = "",
0843     include_guids: bool = False,
0844 ) -> Dict:
0845     """
0846     Upload event picking request to the server.
0847 
0848     Uploads an event picking request to the server. Requires a secure connection.
0849 
0850     API details:
0851         HTTP Method: POST
0852         Path: /v1/file_server/upload_event_picking_request
0853 
0854     Args:
0855         req(PandaRequest): internally generated request object containing the env variables
0856         run_event_list(string): run and event list.
0857         data_type(string): data type.
0858         stream_name(string): stream name.
0859         dataset_name(string): dataset name.
0860         ami_tag(string): AMI tag.
0861         user_dataset_name(string): user dataset name.
0862         locked_by(string): locking agent.
0863         parameters(string): parameters.
0864         input_file_list(string): input file list.
0865         n_sites(string): number of sites.
0866         user_task_name(string): user task name.
0867         ei_api(string): event index API.
0868         include_guids(bool): flag to indicate if GUIDs are included with the run-event list
0869 
0870     Returns:
0871         dict: The system response `{"success": success, "message": message, "data": data}`. When unsuccessful, the message field will indicate the issue.
0872 
0873     """
0874 
0875     user_name = req.subprocess_env["SSL_CLIENT_S_DN"]
0876 
0877     tmp_logger = LogWrapper(_logger, f"upload_event_picking_request-{naive_utcnow().isoformat('/')}")
0878     tmp_logger.debug(f"Start for {user_name}")
0879 
0880     creation_time = naive_utcnow().strftime("%Y-%m-%d %H:%M:%S")
0881 
0882     # get total size
0883     content_length = _get_content_length(req, tmp_logger)
0884     if not content_length:
0885         error_message = "Cannot get content-length from HTTP request."
0886         tmp_logger.error(f"{error_message}")
0887         return generate_response(False, f"ERROR: {error_message}")
0888 
0889     if content_length > EVENT_PICKING_LIMIT:
0890         error_message = f"Run/event list is too large. Exceeded size limit {content_length}>{EVENT_PICKING_LIMIT}."
0891         tmp_logger.error(f"{error_message} ")
0892         return generate_response(False, f"ERROR: {error_message}")
0893 
0894     try:
0895         # generate the filename
0896         file_name = f"{panda_config.cache_dir}/evp.{str(uuid.uuid4())}"
0897         tmp_logger.debug(f"file: {file_name}")
0898 
0899         # write the information to file
0900         file_content = (
0901             f"userName={user_name}\n"
0902             f"creationTime={creation_time}\n"
0903             f"eventPickDataType={data_type}\n"
0904             f"eventPickStreamName={stream_name}\n"
0905             f"eventPickDS={dataset_name}\n"
0906             f"eventPickAmiTag={ami_tag}\n"
0907             f"eventPickNumSites={n_sites}\n"
0908             f"userTaskName={user_task_name}\n"
0909             f"userDatasetName={user_dataset_name}\n"
0910             f"lockedBy={locked_by}\n"
0911             f"params={parameters}\n"
0912             f"inputFileList={input_file_list}\n"
0913             f"ei_api={ei_api}\n"
0914         )
0915 
0916         with open(file_name, "w") as file_object:
0917             file_object.write(file_content)
0918             run_event_guid_map = {}
0919 
0920             valid_entry_length = 3 if include_guids else 2
0921 
0922             for tmp_line in run_event_list.split("\n"):
0923                 tmp_items = tmp_line.split()
0924 
0925                 # Skip invalid entries
0926                 if len(tmp_items) != valid_entry_length:
0927                     continue
0928 
0929                 file_object.write(f"runEvent={tmp_items[0]},{tmp_items[1]}\n")
0930 
0931                 if include_guids:
0932                     run_event_guid_map[(tmp_items[0], tmp_items[1])] = [tmp_items[2]]
0933 
0934             file_object.write(f"runEvtGuidMap={str(run_event_guid_map)}\n")
0935 
0936     except Exception as e:
0937         error_message = f"Cannot upload the Event Picking request: {str(e)}"
0938         tmp_logger.error(error_message + traceback.format_exc())
0939         return generate_response(False, f"ERROR: {error_message}")
0940 
0941     tmp_logger.debug("Done")
0942     return generate_response(True)