File indexing completed on 2026-04-10 08:39:07
0001 import gc
0002 import gzip
0003 import json
0004 import os
0005 import re
0006 import struct
0007 import sys
0008 import traceback
0009 import uuid
0010 import zlib
0011 from typing import Generator
0012
0013 from pandacommon.pandalogger.LogWrapper import LogWrapper
0014 from pandacommon.pandalogger.PandaLogger import PandaLogger
0015 from pandacommon.pandautils.PandaUtils import naive_utcnow
0016 from werkzeug.datastructures import FileStorage
0017
0018 from pandaserver.config import panda_config
0019 from pandaserver.jobdispatcher import Protocol
0020 from pandaserver.srvcore import CoreUtils
0021 from pandaserver.srvcore.panda_request import PandaRequest
0022 from pandaserver.userinterface import Client
0023
0024 _logger = PandaLogger().getLogger("Utils")
0025
0026 IGNORED_SUFFIX = [".out"]
0027
0028
0029 MB = 1024 * 1024
0030 EVENT_PICKING_LIMIT = 10 * MB
0031 LOG_LIMIT = 100 * MB
0032 CHECKPOINT_LIMIT = 500 * MB
0033 SANDBOX_NO_BUILD_LIMIT = 100 * MB
0034 SANDBOX_LIMIT = 768 * MB
0035
0036
0037 ERROR_NOT_SECURE = "ERROR : no HTTPS"
0038 ERROR_LIMITED_PROXY = "ERROR: rejected due to the usage of limited proxy"
0039 ERROR_OVERWRITE = "ERROR: cannot overwrite file"
0040 ERROR_WRITE = "ERROR: cannot write file"
0041 ERROR_SIZE_LIMIT = "ERROR: upload failure. Exceeded size limit"
0042
0043
0044 def isAlive(panda_request: PandaRequest) -> str:
0045 """
0046 Check if the server is alive. Basic function for the health check and used in SLS monitoring.
0047
0048 Args:
0049 panda_request (PandaRequest): PanDA request object.
0050
0051 Returns:
0052 str: "alive=yes"
0053 """
0054 return "alive=yes"
0055
0056
0057 def get_content_length(panda_request: PandaRequest, tmp_log: LogWrapper) -> int:
0058 """
0059 Get the content length of the request.
0060
0061 Args:
0062 panda_request (PandaRequest): PanDA request object.
0063 tmp_log (LogWrapper): logger object of the calling function.
0064
0065 Returns:
0066 int: content length of the request.
0067 """
0068 content_length = 0
0069 try:
0070 content_length = int(panda_request.headers_in["content-length"])
0071 except Exception:
0072 if "content-length" in panda_request.headers_in:
0073 tmp_log.error(f"cannot get content_length: {panda_request.headers_in['content-length']}")
0074 else:
0075 tmp_log.error("no content_length for {method_name}")
0076
0077 tmp_log.debug(f"size {content_length}")
0078 return content_length
0079
0080
0081
0082 def putFile(panda_request: PandaRequest, file: FileStorage) -> str:
0083 """
0084 Upload a file to the server.
0085
0086 Args:
0087 panda_request (PandaRequest): PanDA request object.
0088 file (FileStorage): werkzeug.FileStorage object to be uploaded.
0089
0090 Returns:
0091 string: "True" if the upload was successful, otherwise an error message.
0092 """
0093
0094 tmp_log = LogWrapper(_logger, f"putFile-{naive_utcnow().isoformat('/')}")
0095 tmp_log.debug(f"start")
0096
0097
0098 if not Protocol.isSecure(panda_request):
0099 tmp_log.error("no HTTPS")
0100 tmp_log.debug("trigger garbage collection")
0101 gc.collect()
0102 tmp_log.debug("end")
0103 return ERROR_NOT_SECURE
0104 if "/CN=limited proxy" in panda_request.subprocess_env["SSL_CLIENT_S_DN"]:
0105 tmp_log.error("limited proxy is used")
0106 tmp_log.debug("trigger garbage collection")
0107 gc.collect()
0108 tmp_log.debug("end")
0109 return ERROR_LIMITED_PROXY
0110
0111
0112 user_name = CoreUtils.clean_user_id(panda_request.subprocess_env["SSL_CLIENT_S_DN"])
0113 tmp_log.debug(f"user_name={user_name} file_path={file.filename}")
0114
0115
0116 if not file.filename.startswith("sources."):
0117 no_build = True
0118 size_limit = SANDBOX_NO_BUILD_LIMIT
0119 else:
0120 no_build = False
0121 size_limit = SANDBOX_LIMIT
0122
0123
0124 content_length = get_content_length(panda_request, tmp_log)
0125
0126
0127 if content_length > size_limit:
0128 error_message = f"{ERROR_SIZE_LIMIT} {content_length}>{size_limit}."
0129 if no_build:
0130 error_message += " Please submit the job without --noBuild/--libDS since those options impose a tighter size limit"
0131 else:
0132 error_message += " Please remove redundant files from your work area"
0133 tmp_log.error(error_message)
0134 tmp_log.debug("trigger garbage collection")
0135 gc.collect()
0136 tmp_log.debug("end")
0137 return error_message
0138
0139
0140 try:
0141 file_name = file.filename.split("/")[-1]
0142 full_path = f"{panda_config.cache_dir}/{file_name}"
0143
0144
0145 if os.path.exists(full_path) and file.filename.split(".")[-1] != "__ow__":
0146
0147 os.utime(full_path, None)
0148
0149 error_message = ERROR_OVERWRITE
0150 tmp_log.debug(f"{ERROR_OVERWRITE} {file_name}")
0151 tmp_log.debug("end")
0152 return error_message
0153
0154
0155 with open(full_path, "wb") as file_object:
0156 file_content = file.read()
0157 if hasattr(panda_config, "compress_file_names") and [
0158 True for patt in panda_config.compress_file_names.split(",") if re.search(patt, file_name) is not None
0159 ]:
0160 file_content = gzip.compress(file_content)
0161 file_object.write(file_content)
0162
0163 except Exception:
0164 error_message = ERROR_WRITE
0165 tmp_log.error(error_message)
0166 tmp_log.debug("trigger garbage collection")
0167 gc.collect()
0168 tmp_log.debug("end")
0169 return error_message
0170
0171
0172 try:
0173
0174 footer = file_content[-8:]
0175 checksum, _ = struct.unpack("II", footer)
0176 checksum = str(checksum)
0177 tmp_log.debug(f"CRC from gzip Footer {checksum}")
0178 except Exception:
0179
0180 checksum = ""
0181 tmp_log.debug(f"No CRC calculated")
0182
0183
0184 file_size = len(file_content)
0185
0186
0187 tmp_log.debug(f"written dn={user_name} file={full_path} size={file_size} crc={checksum}")
0188
0189
0190 if panda_config.record_sandbox_info:
0191
0192 to_insert = True
0193 for patt in IGNORED_SUFFIX:
0194 if file.filename.endswith(patt):
0195 to_insert = False
0196 break
0197 if not to_insert:
0198 tmp_log.debug("skipped to insert to DB")
0199 else:
0200 status_client, output_client = Client.register_cache_file(user_name, file.filename, file_size, checksum)
0201 if status_client != 0:
0202 error_message = f"ERROR : failed to register sandbox to DB with {status_client} {output_client}"
0203 tmp_log.error(error_message)
0204 tmp_log.debug("Done")
0205 return error_message
0206
0207 success = output_client["success"]
0208 message = output_client["message"]
0209 if not success:
0210 error_message = f"ERROR : failed to register sandbox to DB with {message}"
0211 tmp_log.error(error_message)
0212 tmp_log.debug("Done")
0213 return error_message
0214
0215 tmp_log.debug(f"Inserted sandbox to DB with {output_client}")
0216
0217 tmp_log.debug("trigger garbage collection")
0218 gc.collect()
0219 tmp_log.debug("Done")
0220
0221 return "True"
0222
0223
0224 def putEventPickingRequest(
0225 panda_request: PandaRequest,
0226 runEventList="",
0227 eventPickDataType="",
0228 eventPickStreamName="",
0229 eventPickDS="",
0230 eventPickAmiTag="",
0231 userDatasetName="",
0232 lockedBy="",
0233 params="",
0234 inputFileList="",
0235 eventPickNumSites="",
0236 userTaskName="",
0237 ei_api="",
0238 giveGUID=None,
0239 ) -> str:
0240 """
0241 Upload event picking request to the server.
0242
0243 Args:
0244 panda_request (PandaRequest): PanDA request object.
0245 runEventList (str): run and event list.
0246 eventPickDataType (str): data type.
0247 eventPickStreamName (str): stream name.
0248 eventPickDS (str): dataset name.
0249 eventPickAmiTag (str): AMI tag.
0250 userDatasetName (str): user dataset name.
0251 lockedBy (str): locking agent.
0252 params (str): parameters.
0253 inputFileList (str): input file list.
0254 eventPickNumSites (str): number of sites.
0255 userTaskName (str): user task name.
0256 ei_api (str): event index API.
0257 giveGUID (str): give GUID.
0258
0259 Returns:
0260 string: "True" if the upload was successful, otherwise an error message.
0261
0262 """
0263 if not Protocol.isSecure(panda_request):
0264 return ERROR_NOT_SECURE
0265
0266 user_name = panda_request.subprocess_env["SSL_CLIENT_S_DN"]
0267
0268 tmp_log = LogWrapper(_logger, f"putEventPickingRequest-{naive_utcnow().isoformat('/')}")
0269 tmp_log.debug(f"start for {user_name}")
0270
0271 creation_time = naive_utcnow().strftime("%Y-%m-%d %H:%M:%S")
0272
0273
0274 try:
0275 content_length = int(panda_request.headers_in["content-length"])
0276 except Exception:
0277 error_message = "cannot get content-length from HTTP request."
0278 tmp_log.error(f"{error_message}")
0279 tmp_log.debug("end")
0280 return "ERROR : " + error_message
0281 tmp_log.debug(f"size {content_length}")
0282
0283 if content_length > EVENT_PICKING_LIMIT:
0284 error_message = f"Run/event list is too large. Exceeded size limit {content_length}>{EVENT_PICKING_LIMIT}."
0285 tmp_log.error(f"{error_message} ")
0286 tmp_log.debug("end")
0287 return "ERROR : " + error_message
0288
0289 if giveGUID == "True":
0290 giveGUID = True
0291 else:
0292 giveGUID = False
0293
0294 try:
0295
0296 file_name = f"{panda_config.cache_dir}/evp.{str(uuid.uuid4())}"
0297 tmp_log.debug(f"file: {file_name}")
0298
0299
0300 file_content = (
0301 f"userName={user_name}\n"
0302 f"creationTime={creation_time}\n"
0303 f"eventPickDataType={eventPickDataType}\n"
0304 f"eventPickStreamName={eventPickStreamName}\n"
0305 f"eventPickDS={eventPickDS}\n"
0306 f"eventPickAmiTag={eventPickAmiTag}\n"
0307 f"eventPickNumSites={eventPickNumSites}\n"
0308 f"userTaskName={userTaskName}\n"
0309 f"userDatasetName={userDatasetName}\n"
0310 f"lockedBy={lockedBy}\n"
0311 f"params={params}\n"
0312 f"inputFileList={inputFileList}\n"
0313 f"ei_api={ei_api}\n"
0314 )
0315
0316 with open(file_name, "w") as file_object:
0317 file_object.write(file_content)
0318 run_event_guid_map = {}
0319 for tmp_line in runEventList.split("\n"):
0320 tmp_items = tmp_line.split()
0321 if (len(tmp_items) != 2 and not giveGUID) or (len(tmp_items) != 3 and giveGUID):
0322 continue
0323 file_object.write("runEvent=%s,%s\n" % tuple(tmp_items[:2]))
0324 if giveGUID:
0325 run_event_guid_map[tuple(tmp_items[:2])] = [tmp_items[2]]
0326 file_object.write(f"runEvtGuidMap={str(run_event_guid_map)}\n")
0327
0328 except Exception as e:
0329 error_message = f"cannot put request due to {str(e)}"
0330 tmp_log.error(error_message + traceback.format_exc())
0331 return f"ERROR : {error_message}"
0332
0333 tmp_log.debug("end")
0334 return "True"
0335
0336
0337
0338 def put_file_recovery_request(panda_request: PandaRequest, jediTaskID: str, dryRun: bool = None) -> str:
0339 """
0340 Upload lost file recovery request to the server.
0341
0342 Args:
0343 panda_request (PandaRequest): PanDA request object.
0344 jediTaskID (string): task ID.
0345 dryRun (bool): dry run flag.
0346
0347 Returns:
0348 string: String in json format with (boolean, message)
0349 """
0350 if not Protocol.isSecure(panda_request):
0351 return json.dumps((False, ERROR_NOT_SECURE))
0352 user_name = panda_request.subprocess_env["SSL_CLIENT_S_DN"]
0353 creation_time = naive_utcnow().strftime("%Y-%m-%d %H:%M:%S")
0354
0355 tmp_log = LogWrapper(_logger, f"put_file_recovery_request < jediTaskID={jediTaskID}")
0356 tmp_log.debug(f"start user={user_name}")
0357
0358 try:
0359 jedi_task_id = int(jediTaskID)
0360
0361
0362 file_name = f"{panda_config.cache_dir}/recov.{str(uuid.uuid4())}"
0363 tmp_log.debug(f"file={file_name}")
0364
0365
0366 with open(file_name, "w") as file_object:
0367 data = {
0368 "userName": user_name,
0369 "creationTime": creation_time,
0370 "jediTaskID": jedi_task_id,
0371 }
0372 if dryRun:
0373 data["dryRun"] = True
0374
0375 json.dump(data, file_object)
0376 except Exception as exc:
0377 error_message = f"cannot put request due to {str(exc)} "
0378 tmp_log.error(error_message + traceback.format_exc())
0379 return json.dumps((False, error_message))
0380
0381 tmp_log.debug("done")
0382 return json.dumps((True, "request was accepted and will be processed in a few minutes"))
0383
0384
0385 def put_workflow_request(panda_request: PandaRequest, data: str, check: bool = False, sync: bool = False) -> str:
0386 """
0387 Upload workflow request to the server.
0388 Args:
0389 panda_request (PandaRequest): PanDA request object.
0390 data (string): workflow request data.
0391 check (bool): check flag.
0392 sync (bool): synchronous processing.
0393 Returns:
0394 string: String in json format with (boolean, message)
0395 """
0396
0397 if not Protocol.isSecure(panda_request):
0398 return json.dumps((False, ERROR_NOT_SECURE))
0399
0400 user_name = panda_request.subprocess_env["SSL_CLIENT_S_DN"]
0401 creation_time = naive_utcnow().strftime("%Y-%m-%d %H:%M:%S")
0402
0403 tmp_log = LogWrapper(_logger, "put_workflow_request")
0404
0405 tmp_log.debug(f"start user={user_name} check={check}")
0406
0407 if check in ("True", True):
0408 check = True
0409 elif sync in ("True", True):
0410 sync = True
0411
0412 try:
0413
0414 file_name = f"{panda_config.cache_dir}/workflow.{str(uuid.uuid4())}"
0415 tmp_log.debug(f"file={file_name}")
0416
0417
0418 with open(file_name, "w") as file_object:
0419 data_dict = {
0420 "userName": user_name,
0421 "creationTime": creation_time,
0422 "data": json.loads(data),
0423 }
0424 json.dump(data_dict, file_object)
0425
0426 if sync or check:
0427 from pandaserver.taskbuffer.workflow_processor import WorkflowProcessor
0428
0429 processor = WorkflowProcessor(log_stream=_logger)
0430 if check:
0431 ret = processor.process(file_name, True, True, True, True)
0432 else:
0433 ret = processor.process(file_name, True, False, True, False)
0434 if os.path.exists(file_name):
0435 try:
0436 os.remove(file_name)
0437 except Exception:
0438 pass
0439 tmp_log.debug("done")
0440 return json.dumps((True, ret))
0441
0442 except Exception as exc:
0443 error_message = f"cannot put request due to {str(exc)} "
0444 tmp_log.error(error_message + traceback.format_exc())
0445 return json.dumps((False, error_message))
0446
0447 tmp_log.debug("done")
0448 return json.dumps((True, "request was accepted and will be processed in a few minutes"))
0449
0450
0451
0452 def deleteFile(panda_request: PandaRequest, file: FileStorage) -> str:
0453 """
0454 Delete a file from the cache directory.
0455 Args:
0456 panda_request (PandaRequest): PanDA request object.
0457 file (string): file name to be deleted
0458
0459 Returns:
0460 string: String with "True" or "False"
0461 """
0462 if not Protocol.isSecure(panda_request):
0463 return ERROR_NOT_SECURE
0464
0465 try:
0466
0467
0468 return "True"
0469 except Exception:
0470 return "False"
0471
0472
0473
0474 def touchFile(panda_request: PandaRequest, filename: str) -> str:
0475 """
0476 Touch a file in the cache directory.
0477 Args:
0478 panda_request (PandaRequest): PanDA request object.
0479 filename (string): file name to be deleted
0480
0481 Returns:
0482 string: String with "True" or "False"
0483 """
0484 if not Protocol.isSecure(panda_request):
0485 return "False"
0486
0487 try:
0488 os.utime(f"{panda_config.cache_dir}/{filename.split('/')[-1]}", None)
0489 return "True"
0490 except Exception:
0491 error_type, error_value = sys.exc_info()[:2]
0492 _logger.error(f"touchFile : {error_type} {error_value}")
0493 return "False"
0494
0495
0496
0497 def getServer(panda_request: PandaRequest) -> str:
0498 """
0499 Get the server name and port for HTTPS.
0500 Args:
0501 panda_request (PandaRequest): PanDA request object.
0502
0503 Returns:
0504 string: String with server:port
0505 """
0506 return f"{panda_config.pserverhost}:{panda_config.pserverport}"
0507
0508
0509
0510 def getServerHTTP(panda_request: PandaRequest) -> str:
0511 """
0512 Get the HTTP server name and port for HTTP.
0513 Args:
0514 panda_request (PandaRequest): PanDA request object.
0515
0516 Returns:
0517 string: String with server:port
0518 """
0519 return f"{panda_config.pserverhosthttp}:{panda_config.pserverporthttp}"
0520
0521
0522 def updateLog(panda_request: PandaRequest, file: FileStorage) -> str:
0523 """
0524 Update the log file, appending more content at the end of the file.
0525 Args:
0526 panda_request (PandaRequest): PanDA request object.
0527 file (FileStorage): werkzeug.FileStorage object to be updated.
0528
0529 Returns:
0530 string: String with "True" or error message
0531 """
0532 tmp_log = LogWrapper(_logger, f"updateLog < {file.filename} >")
0533 tmp_log.debug("start")
0534
0535
0536 try:
0537
0538 new_content = zlib.decompress(file.read())
0539
0540
0541 log_name = f"{panda_config.cache_dir}/{file.filename.split('/')[-1]}"
0542
0543
0544 with open(log_name, "a") as file_object:
0545 file_object.write(new_content)
0546
0547 except Exception:
0548 error_type, error_value, _ = sys.exc_info()
0549 tmp_log.error(f"{error_type} {error_value}")
0550 return f"ERROR: cannot update file with {error_type} {error_value}"
0551
0552 tmp_log.debug("end")
0553 return "True"
0554
0555
0556 def fetchLog(panda_request: PandaRequest, logName: str, offset: int = 0) -> str:
0557 """
0558 Fetch the log file, if required at a particular offset.
0559 Args:
0560 panda_request (PandaRequest): PanDA request object.
0561 logName (string): log file name
0562 offset (int): offset in the file
0563
0564 Returns:
0565 string: String with the log content
0566 """
0567 tmp_log = LogWrapper(_logger, f"fetchLog <{logName}>")
0568 tmp_log.debug(f"start offset={offset}")
0569
0570
0571 return_string = " "
0572 try:
0573
0574 full_log_name = f"{panda_config.cache_dir}/{logName.split('/')[-1]}"
0575
0576
0577 with open(full_log_name, "r") as file_object:
0578 file_object.seek(int(offset))
0579 return_string += file_object.read()
0580
0581 except Exception:
0582 error_type, error_value, _ = sys.exc_info()
0583 tmp_log.error(f"{error_type} {error_value}")
0584
0585 tmp_log.debug(f"end read={len(return_string)}")
0586 return return_string
0587
0588
0589 def getVomsAttr(panda_request: PandaRequest) -> str:
0590 """
0591 Get the VOMS attributes in sorted order.
0592 Args:
0593 panda_request (PandaRequest): PanDA request object.
0594
0595 Returns:
0596 string: String with the VOMS attributes
0597 """
0598 attributes = []
0599
0600
0601 for tmp_key in panda_request.subprocess_env:
0602 tmp_val = panda_request.subprocess_env[tmp_key]
0603
0604
0605 if tmp_key.startswith("GRST_CRED_"):
0606 attributes.append(f"{tmp_key} : {tmp_val}\n")
0607
0608 return "".join(sorted(attributes))
0609
0610
0611 def getAttr(panda_request: PandaRequest, **kv: dict) -> str:
0612 """
0613 Get all parameters and environment variables from the environment.
0614 Args:
0615 panda_request (PandaRequest): PanDA request object.
0616 kv (dict): dictionary with key-value pairs
0617
0618 Returns:
0619 string: String with the attributes
0620 """
0621
0622 return_string = "===== param =====\n"
0623 for tmp_key in sorted(kv.keys()):
0624 tmp_val = kv[tmp_key]
0625 return_string += f"{tmp_key} = {tmp_val}\n"
0626
0627
0628 attributes = []
0629 for tmp_key in panda_request.subprocess_env:
0630 tmp_val = panda_request.subprocess_env[tmp_key]
0631 attributes.append(f"{tmp_key} : {tmp_val}\n")
0632
0633 return_string += "\n====== env ======\n"
0634 attributes.sort()
0635 for attribute in sorted(attributes):
0636 return_string += attribute
0637
0638 return return_string
0639
0640
0641 def uploadLog(panda_request: PandaRequest, file: FileStorage) -> str:
0642 """
0643 Upload a JEDI log file
0644 Args:
0645 panda_request (PandaRequest): PanDA request object.
0646 file (FileStorage): werkzeug.FileStorage object to be uploaded.
0647
0648 Returns:
0649 string: String with the URL to the file
0650 """
0651
0652 if not Protocol.isSecure(panda_request):
0653 return ERROR_NOT_SECURE
0654 if "/CN=limited proxy" in panda_request.subprocess_env["SSL_CLIENT_S_DN"]:
0655 return ERROR_LIMITED_PROXY
0656
0657 tmp_log = LogWrapper(_logger, f"uploadLog <{file.filename}>")
0658 tmp_log.debug(f"start {panda_request.subprocess_env['SSL_CLIENT_S_DN']}")
0659
0660
0661 content_length = 0
0662 try:
0663 content_length = int(panda_request.headers_in["content-length"])
0664 except Exception:
0665 if "content-length" in panda_request.headers_in:
0666 tmp_log.error(f"cannot get CL : {panda_request.headers_in['content-length']}")
0667 else:
0668 tmp_log.error("no CL")
0669 tmp_log.debug(f"size {content_length}")
0670
0671
0672 if content_length > LOG_LIMIT:
0673 error_message = ERROR_SIZE_LIMIT
0674 tmp_log.error(error_message)
0675 tmp_log.debug("end")
0676 return error_message
0677
0678 jedi_log_directory = "/jedilog"
0679 try:
0680 file_base_name = file.filename.split("/")[-1]
0681 full_path = f"{panda_config.cache_dir}{jedi_log_directory}/{file_base_name}"
0682
0683
0684 if os.path.exists(full_path):
0685 os.remove(full_path)
0686
0687
0688 with open(full_path, "wb") as file_object:
0689 file_content = file.read()
0690 file_object.write(file_content)
0691 tmp_log.debug(f"written to {full_path}")
0692
0693
0694 if panda_config.disableHTTP:
0695 protocol = "https"
0696 server = getServer(None)
0697 else:
0698 protocol = "http"
0699 server = getServerHTTP(None)
0700 return_string = f"{protocol}://{server}/cache{jedi_log_directory}/{file_base_name}"
0701
0702 except Exception:
0703 error_type, error_value = sys.exc_info()[:2]
0704 error_message = f"failed to write log with {error_type.__name__}:{error_value}"
0705 tmp_log.error(error_message)
0706 tmp_log.debug("end")
0707 return error_message
0708
0709 tmp_log.debug("end")
0710 return return_string
0711
0712
0713 def create_shards(input_list: list, size: int) -> Generator:
0714 """
0715 Partitions input into shards of a given size for bulk operations.
0716 @author: Miguel Branco in DQ2 Site Services code
0717
0718 Args:
0719 input_list (list): list to be partitioned
0720 size (int): size of the shards
0721
0722 Returns:
0723 list: list of shards
0724
0725 """
0726 shard, i = [], 0
0727 for element in input_list:
0728 shard.append(element)
0729 i += 1
0730 if i == size:
0731 yield shard
0732 shard, i = [], 0
0733
0734 if i > 0:
0735 yield shard
0736
0737
0738 def get_checkpoint_filename(task_id: str, sub_id: str) -> str:
0739 """
0740 Get the checkpoint file name.
0741
0742 Args:
0743 task_id (str): task ID.
0744 sub_id (str): sub ID.
0745
0746 Returns:
0747 string: checkpoint file name.
0748 """
0749 return f"hpo_cp_{task_id}_{sub_id}"
0750
0751
0752 def put_checkpoint(panda_request: PandaRequest, file: FileStorage) -> str:
0753 """
0754 Upload a HPO checkpoint file to the server.
0755
0756 Args:
0757 panda_request (PandaRequest): PanDA request object.
0758 file (FileStorage): werkzeug.FileStorage object to be uploaded.
0759
0760 Returns:
0761 string: json formatted string with status and message.
0762 """
0763
0764 tmp_log = LogWrapper(_logger, f"put_checkpoint <jediTaskID_subID={file.filename}>")
0765
0766
0767 status = False
0768
0769 if not Protocol.isSecure(panda_request):
0770 error_message = "insecure request"
0771 tmp_log.error(error_message)
0772 return json.dumps({"status": status, "message": error_message})
0773
0774 tmp_log.debug(f"start {panda_request.subprocess_env['SSL_CLIENT_S_DN']}")
0775
0776
0777 try:
0778 task_id, sub_id = file.filename.split("/")[-1].split("_")
0779 except Exception:
0780 error_message = "failed to extract ID"
0781 tmp_log.error(error_message)
0782 return json.dumps({"status": status, "message": error_message})
0783
0784
0785 try:
0786 content_length = int(panda_request.headers_in["content-length"])
0787 except Exception as exc:
0788 error_message = f"cannot get int(content-length) due to {str(exc)}"
0789 tmp_log.error(error_message)
0790 return json.dumps({"status": status, "message": error_message})
0791 tmp_log.debug(f"size {content_length}")
0792
0793
0794 if content_length > CHECKPOINT_LIMIT:
0795 error_message = f"exceeded size limit {content_length}>{CHECKPOINT_LIMIT}"
0796 tmp_log.error(error_message)
0797 return json.dumps({"status": status, "message": error_message})
0798
0799
0800 try:
0801 full_path = os.path.join(panda_config.cache_dir, get_checkpoint_filename(task_id, sub_id))
0802
0803 with open(full_path, "wb") as file_object:
0804 file_object.write(file.read())
0805 except Exception as exc:
0806 error_message = f"cannot write file due to {str(exc)}"
0807 tmp_log.error(error_message)
0808 return json.dumps({"status": status, "message": error_message})
0809
0810 status = True
0811 success_message = f"successfully placed at {full_path}"
0812 tmp_log.debug(success_message)
0813 return json.dumps({"status": status, "message": success_message})
0814
0815
0816 def delete_checkpoint(panda_request: PandaRequest, task_id: str, sub_id: str) -> str:
0817 """
0818 Delete a HPO checkpoint file from the server.
0819
0820 Args:
0821 panda_request (PandaRequest): PanDA request object.
0822 task_id (str): task ID.
0823 sub_id (str): sub ID.
0824
0825 Returns:
0826 string: json formatted string with status and message.
0827 """
0828
0829 tmp_log = LogWrapper(_logger, f"delete_checkpoint <jediTaskID={task_id} ID={sub_id}>")
0830
0831 if not Protocol.isSecure(panda_request):
0832 tmp_log.error(ERROR_NOT_SECURE)
0833 return json.dumps({"status": False, "message": ERROR_NOT_SECURE})
0834
0835 tmp_log.debug(f"start {panda_request.subprocess_env['SSL_CLIENT_S_DN']}")
0836
0837 status = True
0838 try:
0839 full_path = os.path.join(panda_config.cache_dir, get_checkpoint_filename(task_id, sub_id))
0840 os.remove(full_path)
0841 message = "done"
0842 tmp_log.debug(message)
0843 except Exception as exc:
0844 message = f"failed to delete file due to {str(exc)}"
0845 tmp_log.error(message)
0846 status = False
0847
0848 return json.dumps({"status": status, "message": message})