Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:07

0001 import gc
0002 import gzip
0003 import json
0004 import os
0005 import re
0006 import struct
0007 import sys
0008 import traceback
0009 import uuid
0010 import zlib
0011 from typing import Generator
0012 
0013 from pandacommon.pandalogger.LogWrapper import LogWrapper
0014 from pandacommon.pandalogger.PandaLogger import PandaLogger
0015 from pandacommon.pandautils.PandaUtils import naive_utcnow
0016 from werkzeug.datastructures import FileStorage
0017 
0018 from pandaserver.config import panda_config
0019 from pandaserver.jobdispatcher import Protocol
0020 from pandaserver.srvcore import CoreUtils
0021 from pandaserver.srvcore.panda_request import PandaRequest
0022 from pandaserver.userinterface import Client
0023 
0024 _logger = PandaLogger().getLogger("Utils")
0025 
0026 IGNORED_SUFFIX = [".out"]
0027 
0028 # File size limits
0029 MB = 1024 * 1024
0030 EVENT_PICKING_LIMIT = 10 * MB
0031 LOG_LIMIT = 100 * MB
0032 CHECKPOINT_LIMIT = 500 * MB
0033 SANDBOX_NO_BUILD_LIMIT = 100 * MB
0034 SANDBOX_LIMIT = 768 * MB
0035 
0036 # Error messages
0037 ERROR_NOT_SECURE = "ERROR : no HTTPS"
0038 ERROR_LIMITED_PROXY = "ERROR: rejected due to the usage of limited proxy"
0039 ERROR_OVERWRITE = "ERROR: cannot overwrite file"
0040 ERROR_WRITE = "ERROR: cannot write file"
0041 ERROR_SIZE_LIMIT = "ERROR: upload failure. Exceeded size limit"
0042 
0043 
0044 def isAlive(panda_request: PandaRequest) -> str:
0045     """
0046     Check if the server is alive. Basic function for the health check and used in SLS monitoring.
0047 
0048     Args:
0049         panda_request (PandaRequest): PanDA request object.
0050 
0051     Returns:
0052         str: "alive=yes"
0053     """
0054     return "alive=yes"
0055 
0056 
0057 def get_content_length(panda_request: PandaRequest, tmp_log: LogWrapper) -> int:
0058     """
0059     Get the content length of the request.
0060 
0061     Args:
0062         panda_request (PandaRequest): PanDA request object.
0063         tmp_log (LogWrapper): logger object of the calling function.
0064 
0065     Returns:
0066         int: content length of the request.
0067     """
0068     content_length = 0
0069     try:
0070         content_length = int(panda_request.headers_in["content-length"])
0071     except Exception:
0072         if "content-length" in panda_request.headers_in:
0073             tmp_log.error(f"cannot get content_length: {panda_request.headers_in['content-length']}")
0074         else:
0075             tmp_log.error("no content_length for {method_name}")
0076 
0077     tmp_log.debug(f"size {content_length}")
0078     return content_length
0079 
0080 
0081 # upload file
0082 def putFile(panda_request: PandaRequest, file: FileStorage) -> str:
0083     """
0084     Upload a file to the server.
0085 
0086     Args:
0087         panda_request (PandaRequest): PanDA request object.
0088         file (FileStorage): werkzeug.FileStorage object to be uploaded.
0089 
0090     Returns:
0091         string: "True" if the upload was successful, otherwise an error message.
0092     """
0093 
0094     tmp_log = LogWrapper(_logger, f"putFile-{naive_utcnow().isoformat('/')}")
0095     tmp_log.debug(f"start")
0096 
0097     # check if using secure connection and the proxy is not limited
0098     if not Protocol.isSecure(panda_request):
0099         tmp_log.error("no HTTPS")
0100         tmp_log.debug("trigger garbage collection")
0101         gc.collect()
0102         tmp_log.debug("end")
0103         return ERROR_NOT_SECURE
0104     if "/CN=limited proxy" in panda_request.subprocess_env["SSL_CLIENT_S_DN"]:
0105         tmp_log.error("limited proxy is used")
0106         tmp_log.debug("trigger garbage collection")
0107         gc.collect()
0108         tmp_log.debug("end")
0109         return ERROR_LIMITED_PROXY
0110 
0111     # user name
0112     user_name = CoreUtils.clean_user_id(panda_request.subprocess_env["SSL_CLIENT_S_DN"])
0113     tmp_log.debug(f"user_name={user_name} file_path={file.filename}")
0114 
0115     # get file size limit
0116     if not file.filename.startswith("sources."):
0117         no_build = True
0118         size_limit = SANDBOX_NO_BUILD_LIMIT
0119     else:
0120         no_build = False
0121         size_limit = SANDBOX_LIMIT
0122 
0123     # get actual file size
0124     content_length = get_content_length(panda_request, tmp_log)
0125 
0126     # check if we are above the size limit
0127     if content_length > size_limit:
0128         error_message = f"{ERROR_SIZE_LIMIT} {content_length}>{size_limit}."
0129         if no_build:
0130             error_message += " Please submit the job without --noBuild/--libDS since those options impose a tighter size limit"
0131         else:
0132             error_message += " Please remove redundant files from your work area"
0133         tmp_log.error(error_message)
0134         tmp_log.debug("trigger garbage collection")
0135         gc.collect()
0136         tmp_log.debug("end")
0137         return error_message
0138 
0139     # write to file
0140     try:
0141         file_name = file.filename.split("/")[-1]
0142         full_path = f"{panda_config.cache_dir}/{file_name}"
0143 
0144         # avoid overwriting
0145         if os.path.exists(full_path) and file.filename.split(".")[-1] != "__ow__":
0146             # touch
0147             os.utime(full_path, None)
0148             # send error message
0149             error_message = ERROR_OVERWRITE
0150             tmp_log.debug(f"{ERROR_OVERWRITE} {file_name}")
0151             tmp_log.debug("end")
0152             return error_message
0153 
0154         # write the file to the cache directory
0155         with open(full_path, "wb") as file_object:
0156             file_content = file.read()
0157             if hasattr(panda_config, "compress_file_names") and [
0158                 True for patt in panda_config.compress_file_names.split(",") if re.search(patt, file_name) is not None
0159             ]:
0160                 file_content = gzip.compress(file_content)
0161             file_object.write(file_content)
0162 
0163     except Exception:
0164         error_message = ERROR_WRITE
0165         tmp_log.error(error_message)
0166         tmp_log.debug("trigger garbage collection")
0167         gc.collect()
0168         tmp_log.debug("end")
0169         return error_message
0170 
0171     # calculate the checksum
0172     try:
0173         # decode Footer
0174         footer = file_content[-8:]
0175         checksum, _ = struct.unpack("II", footer)
0176         checksum = str(checksum)
0177         tmp_log.debug(f"CRC from gzip Footer {checksum}")
0178     except Exception:
0179         # use None to avoid delay for now
0180         checksum = ""
0181         tmp_log.debug(f"No CRC calculated")
0182 
0183     # calculate the file size
0184     file_size = len(file_content)
0185 
0186     # log the full file information
0187     tmp_log.debug(f"written dn={user_name} file={full_path} size={file_size} crc={checksum}")
0188 
0189     # record the file information to DB
0190     if panda_config.record_sandbox_info:
0191         # ignore some suffixes, e.g. out
0192         to_insert = True
0193         for patt in IGNORED_SUFFIX:
0194             if file.filename.endswith(patt):
0195                 to_insert = False
0196                 break
0197         if not to_insert:
0198             tmp_log.debug("skipped to insert to DB")
0199         else:
0200             status_client, output_client = Client.register_cache_file(user_name, file.filename, file_size, checksum)
0201             if status_client != 0:
0202                 error_message = f"ERROR : failed to register sandbox to DB with {status_client} {output_client}"
0203                 tmp_log.error(error_message)
0204                 tmp_log.debug("Done")
0205                 return error_message
0206 
0207             success = output_client["success"]
0208             message = output_client["message"]
0209             if not success:
0210                 error_message = f"ERROR : failed to register sandbox to DB with {message}"
0211                 tmp_log.error(error_message)
0212                 tmp_log.debug("Done")
0213                 return error_message
0214 
0215             tmp_log.debug(f"Inserted sandbox to DB with {output_client}")
0216 
0217     tmp_log.debug("trigger garbage collection")
0218     gc.collect()
0219     tmp_log.debug("Done")
0220 
0221     return "True"
0222 
0223 
0224 def putEventPickingRequest(
0225     panda_request: PandaRequest,
0226     runEventList="",
0227     eventPickDataType="",
0228     eventPickStreamName="",
0229     eventPickDS="",
0230     eventPickAmiTag="",
0231     userDatasetName="",
0232     lockedBy="",
0233     params="",
0234     inputFileList="",
0235     eventPickNumSites="",
0236     userTaskName="",
0237     ei_api="",
0238     giveGUID=None,
0239 ) -> str:
0240     """
0241     Upload event picking request to the server.
0242 
0243     Args:
0244         panda_request (PandaRequest): PanDA request object.
0245         runEventList (str): run and event list.
0246         eventPickDataType (str): data type.
0247         eventPickStreamName (str): stream name.
0248         eventPickDS (str): dataset name.
0249         eventPickAmiTag (str): AMI tag.
0250         userDatasetName (str): user dataset name.
0251         lockedBy (str): locking agent.
0252         params (str): parameters.
0253         inputFileList (str): input file list.
0254         eventPickNumSites (str): number of sites.
0255         userTaskName (str): user task name.
0256         ei_api (str): event index API.
0257         giveGUID (str): give GUID.
0258 
0259     Returns:
0260         string: "True" if the upload was successful, otherwise an error message.
0261 
0262     """
0263     if not Protocol.isSecure(panda_request):
0264         return ERROR_NOT_SECURE
0265 
0266     user_name = panda_request.subprocess_env["SSL_CLIENT_S_DN"]
0267 
0268     tmp_log = LogWrapper(_logger, f"putEventPickingRequest-{naive_utcnow().isoformat('/')}")
0269     tmp_log.debug(f"start for {user_name}")
0270 
0271     creation_time = naive_utcnow().strftime("%Y-%m-%d %H:%M:%S")
0272 
0273     # get total size
0274     try:
0275         content_length = int(panda_request.headers_in["content-length"])
0276     except Exception:
0277         error_message = "cannot get content-length from HTTP request."
0278         tmp_log.error(f"{error_message}")
0279         tmp_log.debug("end")
0280         return "ERROR : " + error_message
0281     tmp_log.debug(f"size {content_length}")
0282 
0283     if content_length > EVENT_PICKING_LIMIT:
0284         error_message = f"Run/event list is too large. Exceeded size limit {content_length}>{EVENT_PICKING_LIMIT}."
0285         tmp_log.error(f"{error_message} ")
0286         tmp_log.debug("end")
0287         return "ERROR : " + error_message
0288 
0289     if giveGUID == "True":
0290         giveGUID = True
0291     else:
0292         giveGUID = False
0293 
0294     try:
0295         # generate the filename
0296         file_name = f"{panda_config.cache_dir}/evp.{str(uuid.uuid4())}"
0297         tmp_log.debug(f"file: {file_name}")
0298 
0299         # write the information to file
0300         file_content = (
0301             f"userName={user_name}\n"
0302             f"creationTime={creation_time}\n"
0303             f"eventPickDataType={eventPickDataType}\n"
0304             f"eventPickStreamName={eventPickStreamName}\n"
0305             f"eventPickDS={eventPickDS}\n"
0306             f"eventPickAmiTag={eventPickAmiTag}\n"
0307             f"eventPickNumSites={eventPickNumSites}\n"
0308             f"userTaskName={userTaskName}\n"
0309             f"userDatasetName={userDatasetName}\n"
0310             f"lockedBy={lockedBy}\n"
0311             f"params={params}\n"
0312             f"inputFileList={inputFileList}\n"
0313             f"ei_api={ei_api}\n"
0314         )
0315 
0316         with open(file_name, "w") as file_object:
0317             file_object.write(file_content)
0318             run_event_guid_map = {}
0319             for tmp_line in runEventList.split("\n"):
0320                 tmp_items = tmp_line.split()
0321                 if (len(tmp_items) != 2 and not giveGUID) or (len(tmp_items) != 3 and giveGUID):
0322                     continue
0323                 file_object.write("runEvent=%s,%s\n" % tuple(tmp_items[:2]))
0324                 if giveGUID:
0325                     run_event_guid_map[tuple(tmp_items[:2])] = [tmp_items[2]]
0326             file_object.write(f"runEvtGuidMap={str(run_event_guid_map)}\n")
0327 
0328     except Exception as e:
0329         error_message = f"cannot put request due to {str(e)}"
0330         tmp_log.error(error_message + traceback.format_exc())
0331         return f"ERROR : {error_message}"
0332 
0333     tmp_log.debug("end")
0334     return "True"
0335 
0336 
0337 # upload lost file recovery request
0338 def put_file_recovery_request(panda_request: PandaRequest, jediTaskID: str, dryRun: bool = None) -> str:
0339     """
0340     Upload lost file recovery request to the server.
0341 
0342     Args:
0343         panda_request (PandaRequest): PanDA request object.
0344         jediTaskID (string): task ID.
0345         dryRun (bool): dry run flag.
0346 
0347     Returns:
0348         string: String in json format with (boolean, message)
0349     """
0350     if not Protocol.isSecure(panda_request):
0351         return json.dumps((False, ERROR_NOT_SECURE))
0352     user_name = panda_request.subprocess_env["SSL_CLIENT_S_DN"]
0353     creation_time = naive_utcnow().strftime("%Y-%m-%d %H:%M:%S")
0354 
0355     tmp_log = LogWrapper(_logger, f"put_file_recovery_request < jediTaskID={jediTaskID}")
0356     tmp_log.debug(f"start user={user_name}")
0357     # get total size
0358     try:
0359         jedi_task_id = int(jediTaskID)
0360 
0361         # generate the filename
0362         file_name = f"{panda_config.cache_dir}/recov.{str(uuid.uuid4())}"
0363         tmp_log.debug(f"file={file_name}")
0364 
0365         # write the file content
0366         with open(file_name, "w") as file_object:
0367             data = {
0368                 "userName": user_name,
0369                 "creationTime": creation_time,
0370                 "jediTaskID": jedi_task_id,
0371             }
0372             if dryRun:
0373                 data["dryRun"] = True
0374 
0375             json.dump(data, file_object)
0376     except Exception as exc:
0377         error_message = f"cannot put request due to {str(exc)} "
0378         tmp_log.error(error_message + traceback.format_exc())
0379         return json.dumps((False, error_message))
0380 
0381     tmp_log.debug("done")
0382     return json.dumps((True, "request was accepted and will be processed in a few minutes"))
0383 
0384 
0385 def put_workflow_request(panda_request: PandaRequest, data: str, check: bool = False, sync: bool = False) -> str:
0386     """
0387     Upload workflow request to the server.
0388     Args:
0389         panda_request (PandaRequest): PanDA request object.
0390         data (string): workflow request data.
0391         check (bool): check flag.
0392         sync (bool): synchronous processing.
0393     Returns:
0394         string: String in json format with (boolean, message)
0395     """
0396 
0397     if not Protocol.isSecure(panda_request):
0398         return json.dumps((False, ERROR_NOT_SECURE))
0399 
0400     user_name = panda_request.subprocess_env["SSL_CLIENT_S_DN"]
0401     creation_time = naive_utcnow().strftime("%Y-%m-%d %H:%M:%S")
0402 
0403     tmp_log = LogWrapper(_logger, "put_workflow_request")
0404 
0405     tmp_log.debug(f"start user={user_name} check={check}")
0406 
0407     if check in ("True", True):
0408         check = True
0409     elif sync in ("True", True):
0410         sync = True
0411 
0412     try:
0413         # generate the filename
0414         file_name = f"{panda_config.cache_dir}/workflow.{str(uuid.uuid4())}"
0415         tmp_log.debug(f"file={file_name}")
0416 
0417         # write
0418         with open(file_name, "w") as file_object:
0419             data_dict = {
0420                 "userName": user_name,
0421                 "creationTime": creation_time,
0422                 "data": json.loads(data),
0423             }
0424             json.dump(data_dict, file_object)
0425 
0426         if sync or check:
0427             from pandaserver.taskbuffer.workflow_processor import WorkflowProcessor
0428 
0429             processor = WorkflowProcessor(log_stream=_logger)
0430             if check:
0431                 ret = processor.process(file_name, True, True, True, True)
0432             else:
0433                 ret = processor.process(file_name, True, False, True, False)
0434             if os.path.exists(file_name):
0435                 try:
0436                     os.remove(file_name)
0437                 except Exception:
0438                     pass
0439             tmp_log.debug("done")
0440             return json.dumps((True, ret))
0441 
0442     except Exception as exc:
0443         error_message = f"cannot put request due to {str(exc)} "
0444         tmp_log.error(error_message + traceback.format_exc())
0445         return json.dumps((False, error_message))
0446 
0447     tmp_log.debug("done")
0448     return json.dumps((True, "request was accepted and will be processed in a few minutes"))
0449 
0450 
0451 # delete file
0452 def deleteFile(panda_request: PandaRequest, file: FileStorage) -> str:
0453     """
0454     Delete a file from the cache directory.
0455     Args:
0456         panda_request (PandaRequest): PanDA request object.
0457         file (string): file name to be deleted
0458 
0459     Returns:
0460         string: String with "True" or "False"
0461     """
0462     if not Protocol.isSecure(panda_request):
0463         return ERROR_NOT_SECURE
0464 
0465     try:
0466         # may be reused for re-brokerage
0467         # os.remove('%s/%s' % (panda_config.cache_dir,file.split('/')[-1]))
0468         return "True"
0469     except Exception:
0470         return "False"
0471 
0472 
0473 # touch file
0474 def touchFile(panda_request: PandaRequest, filename: str) -> str:
0475     """
0476     Touch a file in the cache directory.
0477     Args:
0478         panda_request (PandaRequest): PanDA request object.
0479         filename (string): file name to be deleted
0480 
0481     Returns:
0482         string: String with "True" or "False"
0483     """
0484     if not Protocol.isSecure(panda_request):
0485         return "False"
0486 
0487     try:
0488         os.utime(f"{panda_config.cache_dir}/{filename.split('/')[-1]}", None)
0489         return "True"
0490     except Exception:
0491         error_type, error_value = sys.exc_info()[:2]
0492         _logger.error(f"touchFile : {error_type} {error_value}")
0493         return "False"
0494 
0495 
0496 # get server name:port for SSL
0497 def getServer(panda_request: PandaRequest) -> str:
0498     """
0499     Get the server name and port for HTTPS.
0500     Args:
0501         panda_request (PandaRequest): PanDA request object.
0502 
0503     Returns:
0504         string: String with server:port
0505     """
0506     return f"{panda_config.pserverhost}:{panda_config.pserverport}"
0507 
0508 
0509 # get server name:port for HTTP
0510 def getServerHTTP(panda_request: PandaRequest) -> str:
0511     """
0512     Get the HTTP server name and port for HTTP.
0513     Args:
0514         panda_request (PandaRequest): PanDA request object.
0515 
0516     Returns:
0517         string: String with server:port
0518     """
0519     return f"{panda_config.pserverhosthttp}:{panda_config.pserverporthttp}"
0520 
0521 
0522 def updateLog(panda_request: PandaRequest, file: FileStorage) -> str:
0523     """
0524     Update the log file, appending more content at the end of the file.
0525     Args:
0526         panda_request (PandaRequest): PanDA request object.
0527         file (FileStorage): werkzeug.FileStorage object to be updated.
0528 
0529     Returns:
0530         string: String with "True" or error message
0531     """
0532     tmp_log = LogWrapper(_logger, f"updateLog < {file.filename} >")
0533     tmp_log.debug("start")
0534 
0535     # write to file
0536     try:
0537         # expand
0538         new_content = zlib.decompress(file.read())
0539 
0540         # stdout name
0541         log_name = f"{panda_config.cache_dir}/{file.filename.split('/')[-1]}"
0542 
0543         # append to file end
0544         with open(log_name, "a") as file_object:
0545             file_object.write(new_content)
0546 
0547     except Exception:
0548         error_type, error_value, _ = sys.exc_info()
0549         tmp_log.error(f"{error_type} {error_value}")
0550         return f"ERROR: cannot update file with {error_type} {error_value}"
0551 
0552     tmp_log.debug("end")
0553     return "True"
0554 
0555 
0556 def fetchLog(panda_request: PandaRequest, logName: str, offset: int = 0) -> str:
0557     """
0558     Fetch the log file, if required at a particular offset.
0559     Args:
0560         panda_request (PandaRequest): PanDA request object.
0561         logName (string): log file name
0562         offset (int): offset in the file
0563 
0564     Returns:
0565         string: String with the log content
0566     """
0567     tmp_log = LogWrapper(_logger, f"fetchLog <{logName}>")
0568     tmp_log.debug(f"start offset={offset}")
0569 
0570     # put dummy char to avoid Internal Server Error
0571     return_string = " "
0572     try:
0573         # stdout name
0574         full_log_name = f"{panda_config.cache_dir}/{logName.split('/')[-1]}"
0575 
0576         # read at offset of the file
0577         with open(full_log_name, "r") as file_object:
0578             file_object.seek(int(offset))
0579             return_string += file_object.read()
0580 
0581     except Exception:
0582         error_type, error_value, _ = sys.exc_info()
0583         tmp_log.error(f"{error_type} {error_value}")
0584 
0585     tmp_log.debug(f"end read={len(return_string)}")
0586     return return_string
0587 
0588 
0589 def getVomsAttr(panda_request: PandaRequest) -> str:
0590     """
0591     Get the VOMS attributes in sorted order.
0592     Args:
0593         panda_request (PandaRequest): PanDA request object.
0594 
0595     Returns:
0596         string: String with the VOMS attributes
0597     """
0598     attributes = []
0599 
0600     # Iterate over all the environment variables, keep only the ones related to GRST credentials (GRST: Grid Security Technology)
0601     for tmp_key in panda_request.subprocess_env:
0602         tmp_val = panda_request.subprocess_env[tmp_key]
0603 
0604         # compact credentials
0605         if tmp_key.startswith("GRST_CRED_"):
0606             attributes.append(f"{tmp_key} : {tmp_val}\n")
0607 
0608     return "".join(sorted(attributes))
0609 
0610 
0611 def getAttr(panda_request: PandaRequest, **kv: dict) -> str:
0612     """
0613     Get all parameters and environment variables from the environment.
0614     Args:
0615         panda_request (PandaRequest): PanDA request object.
0616         kv (dict): dictionary with key-value pairs
0617 
0618     Returns:
0619         string: String with the attributes
0620     """
0621     # add the parameters
0622     return_string = "===== param =====\n"
0623     for tmp_key in sorted(kv.keys()):
0624         tmp_val = kv[tmp_key]
0625         return_string += f"{tmp_key} = {tmp_val}\n"
0626 
0627     # add the environment variables
0628     attributes = []
0629     for tmp_key in panda_request.subprocess_env:
0630         tmp_val = panda_request.subprocess_env[tmp_key]
0631         attributes.append(f"{tmp_key} : {tmp_val}\n")
0632 
0633     return_string += "\n====== env ======\n"
0634     attributes.sort()
0635     for attribute in sorted(attributes):
0636         return_string += attribute
0637 
0638     return return_string
0639 
0640 
0641 def uploadLog(panda_request: PandaRequest, file: FileStorage) -> str:
0642     """
0643     Upload a JEDI log file
0644     Args:
0645         panda_request (PandaRequest): PanDA request object.
0646         file (FileStorage): werkzeug.FileStorage object to be uploaded.
0647 
0648     Returns:
0649         string: String with the URL to the file
0650     """
0651 
0652     if not Protocol.isSecure(panda_request):
0653         return ERROR_NOT_SECURE
0654     if "/CN=limited proxy" in panda_request.subprocess_env["SSL_CLIENT_S_DN"]:
0655         return ERROR_LIMITED_PROXY
0656 
0657     tmp_log = LogWrapper(_logger, f"uploadLog <{file.filename}>")
0658     tmp_log.debug(f"start {panda_request.subprocess_env['SSL_CLIENT_S_DN']}")
0659 
0660     # get file size
0661     content_length = 0
0662     try:
0663         content_length = int(panda_request.headers_in["content-length"])
0664     except Exception:
0665         if "content-length" in panda_request.headers_in:
0666             tmp_log.error(f"cannot get CL : {panda_request.headers_in['content-length']}")
0667         else:
0668             tmp_log.error("no CL")
0669     tmp_log.debug(f"size {content_length}")
0670 
0671     # check against the size limit for logs
0672     if content_length > LOG_LIMIT:
0673         error_message = ERROR_SIZE_LIMIT
0674         tmp_log.error(error_message)
0675         tmp_log.debug("end")
0676         return error_message
0677 
0678     jedi_log_directory = "/jedilog"
0679     try:
0680         file_base_name = file.filename.split("/")[-1]
0681         full_path = f"{panda_config.cache_dir}{jedi_log_directory}/{file_base_name}"
0682 
0683         # delete old file
0684         if os.path.exists(full_path):
0685             os.remove(full_path)
0686 
0687         # write the new file
0688         with open(full_path, "wb") as file_object:
0689             file_content = file.read()
0690             file_object.write(file_content)
0691         tmp_log.debug(f"written to {full_path}")
0692 
0693         # return the URL depending on the protocol
0694         if panda_config.disableHTTP:
0695             protocol = "https"
0696             server = getServer(None)
0697         else:
0698             protocol = "http"
0699             server = getServerHTTP(None)
0700         return_string = f"{protocol}://{server}/cache{jedi_log_directory}/{file_base_name}"
0701 
0702     except Exception:
0703         error_type, error_value = sys.exc_info()[:2]
0704         error_message = f"failed to write log with {error_type.__name__}:{error_value}"
0705         tmp_log.error(error_message)
0706         tmp_log.debug("end")
0707         return error_message
0708 
0709     tmp_log.debug("end")
0710     return return_string
0711 
0712 
0713 def create_shards(input_list: list, size: int) -> Generator:
0714     """
0715     Partitions input into shards of a given size for bulk operations.
0716     @author: Miguel Branco in DQ2 Site Services code
0717 
0718     Args:
0719         input_list (list): list to be partitioned
0720         size (int): size of the shards
0721 
0722     Returns:
0723         list: list of shards
0724 
0725     """
0726     shard, i = [], 0
0727     for element in input_list:
0728         shard.append(element)
0729         i += 1
0730         if i == size:
0731             yield shard
0732             shard, i = [], 0
0733 
0734     if i > 0:
0735         yield shard
0736 
0737 
0738 def get_checkpoint_filename(task_id: str, sub_id: str) -> str:
0739     """
0740     Get the checkpoint file name.
0741 
0742     Args:
0743         task_id (str): task ID.
0744         sub_id (str): sub ID.
0745 
0746     Returns:
0747         string: checkpoint file name.
0748     """
0749     return f"hpo_cp_{task_id}_{sub_id}"
0750 
0751 
0752 def put_checkpoint(panda_request: PandaRequest, file: FileStorage) -> str:
0753     """
0754     Upload a HPO checkpoint file to the server.
0755 
0756     Args:
0757         panda_request (PandaRequest): PanDA request object.
0758         file (FileStorage): werkzeug.FileStorage object to be uploaded.
0759 
0760     Returns:
0761         string: json formatted string with status and message.
0762     """
0763 
0764     tmp_log = LogWrapper(_logger, f"put_checkpoint <jediTaskID_subID={file.filename}>")
0765 
0766     # operation status, will be set to True if successful
0767     status = False
0768 
0769     if not Protocol.isSecure(panda_request):
0770         error_message = "insecure request"
0771         tmp_log.error(error_message)
0772         return json.dumps({"status": status, "message": error_message})
0773 
0774     tmp_log.debug(f"start {panda_request.subprocess_env['SSL_CLIENT_S_DN']}")
0775 
0776     # extract task ID and sub ID
0777     try:
0778         task_id, sub_id = file.filename.split("/")[-1].split("_")
0779     except Exception:
0780         error_message = "failed to extract ID"
0781         tmp_log.error(error_message)
0782         return json.dumps({"status": status, "message": error_message})
0783 
0784     # get the file size
0785     try:
0786         content_length = int(panda_request.headers_in["content-length"])
0787     except Exception as exc:
0788         error_message = f"cannot get int(content-length) due to {str(exc)}"
0789         tmp_log.error(error_message)
0790         return json.dumps({"status": status, "message": error_message})
0791     tmp_log.debug(f"size {content_length}")
0792 
0793     # compare the size against the limit for checkpoints
0794     if content_length > CHECKPOINT_LIMIT:
0795         error_message = f"exceeded size limit {content_length}>{CHECKPOINT_LIMIT}"
0796         tmp_log.error(error_message)
0797         return json.dumps({"status": status, "message": error_message})
0798 
0799     # write the file to the cache directory
0800     try:
0801         full_path = os.path.join(panda_config.cache_dir, get_checkpoint_filename(task_id, sub_id))
0802         # write
0803         with open(full_path, "wb") as file_object:
0804             file_object.write(file.read())
0805     except Exception as exc:
0806         error_message = f"cannot write file due to {str(exc)}"
0807         tmp_log.error(error_message)
0808         return json.dumps({"status": status, "message": error_message})
0809 
0810     status = True
0811     success_message = f"successfully placed at {full_path}"
0812     tmp_log.debug(success_message)
0813     return json.dumps({"status": status, "message": success_message})
0814 
0815 
0816 def delete_checkpoint(panda_request: PandaRequest, task_id: str, sub_id: str) -> str:
0817     """
0818     Delete a HPO checkpoint file from the server.
0819 
0820     Args:
0821         panda_request (PandaRequest): PanDA request object.
0822         task_id (str): task ID.
0823         sub_id (str): sub ID.
0824 
0825     Returns:
0826         string: json formatted string with status and message.
0827     """
0828 
0829     tmp_log = LogWrapper(_logger, f"delete_checkpoint <jediTaskID={task_id} ID={sub_id}>")
0830 
0831     if not Protocol.isSecure(panda_request):
0832         tmp_log.error(ERROR_NOT_SECURE)
0833         return json.dumps({"status": False, "message": ERROR_NOT_SECURE})
0834 
0835     tmp_log.debug(f"start {panda_request.subprocess_env['SSL_CLIENT_S_DN']}")
0836     # operation status
0837     status = True
0838     try:
0839         full_path = os.path.join(panda_config.cache_dir, get_checkpoint_filename(task_id, sub_id))
0840         os.remove(full_path)
0841         message = "done"
0842         tmp_log.debug(message)
0843     except Exception as exc:
0844         message = f"failed to delete file due to {str(exc)}"
0845         tmp_log.error(message)
0846         status = False
0847 
0848     return json.dumps({"status": status, "message": message})