File indexing completed on 2026-04-20 07:58:57
0001 """
0002 Connection to the PanDA server
0003
0004 """
0005
0006 import ssl
0007
0008 try:
0009
0010 ssl.HAS_SNI = False
0011 except Exception:
0012 pass
0013 import datetime
0014 import json
0015 import os
0016 import sys
0017 import traceback
0018 import uuid
0019 import zlib
0020 from urllib.parse import urlparse
0021
0022
0023 import requests.packages.urllib3
0024
0025 try:
0026 requests.packages.urllib3.disable_warnings()
0027 except Exception:
0028 pass
0029
0030 from pandacommon.pandautils.net_utils import get_http_adapter_with_random_dns_resolution
0031
0032 from pandaharvester.harvesterconfig import harvester_config
0033 from pandaharvester.harvestercore import core_utils
0034 from pandaharvester.harvestermisc import idds_utils
0035
0036 from .base_communicator import BaseCommunicator
0037
0038
0039
0040 class PandaCommunicator(BaseCommunicator):
0041
0042 def __init__(self):
0043 BaseCommunicator.__init__(self)
0044 self.useInspect = False
0045 if hasattr(harvester_config.pandacon, "verbose") and harvester_config.pandacon.verbose:
0046 self.verbose = True
0047 if hasattr(harvester_config.pandacon, "useInspect") and harvester_config.pandacon.useInspect is True:
0048 self.useInspect = True
0049 else:
0050 self.verbose = False
0051 if hasattr(harvester_config.pandacon, "auth_type"):
0052 self.auth_type = harvester_config.pandacon.auth_type
0053 else:
0054 self.auth_type = "x509"
0055 self.auth_token = None
0056 self.auth_token_last_update = None
0057 if hasattr(harvester_config.pandacon, "cert_file"):
0058 self.cert_file = harvester_config.pandacon.cert_file
0059 else:
0060 self.cert_file = None
0061 if hasattr(harvester_config.pandacon, "key_file"):
0062 self.key_file = harvester_config.pandacon.key_file
0063 else:
0064 self.key_file = None
0065 if hasattr(harvester_config.pandacon, "ca_cert"):
0066 self.ca_cert = harvester_config.pandacon.ca_cert
0067 else:
0068 self.ca_cert = False
0069
0070
0071 if hasattr(harvester_config.pandacon, "server_api_url_ssl"):
0072 self.server_base_path_ssl = harvester_config.pandacon.server_api_url_ssl
0073 elif hasattr(harvester_config.pandacon, "pandaURLSSL"):
0074 parsed = urlparse(harvester_config.pandacon.pandaURLSSL)
0075 self.server_base_path_ssl = f"{parsed.scheme}://{parsed.netloc}/api/v1"
0076 else:
0077
0078 self.server_base_path_ssl = None
0079
0080
0081 if hasattr(harvester_config.pandacon, "cache_api_url_ssl"):
0082 self.cache_base_path_ssl = harvester_config.pandacon.cache_api_url_ssl
0083 elif hasattr(harvester_config.pandacon, "pandaCacheURL_W"):
0084 parsed = urlparse(harvester_config.pandacon.pandaCacheURL_W)
0085 self.cache_base_path_ssl = f"{parsed.scheme}://{parsed.netloc}/api/v1"
0086 else:
0087
0088 self.cache_base_path_ssl = None
0089
0090
0091 self.multihost_auth_config = {}
0092 if hasattr(harvester_config.pandacon, "multihost_auth_config") and harvester_config.pandacon.multihost_auth_config:
0093 try:
0094 with open(harvester_config.pandacon.multihost_auth_config) as f:
0095 self.multihost_auth_config = json.load(f)
0096 except Exception:
0097 pass
0098
0099
0100 self.base_url_host_map = {}
0101
0102 try:
0103 self.renew_token()
0104 except Exception:
0105 pass
0106
0107
0108 def force_credential_renewal(self):
0109 """
0110 Unset timestamp to trigger token renewal
0111 """
0112 self.auth_token_last_update = None
0113
0114
0115 def renew_token(self):
0116 if self.auth_token_last_update is not None and core_utils.naive_utcnow() - self.auth_token_last_update < datetime.timedelta(minutes=10):
0117 return
0118 self.auth_token_last_update = core_utils.naive_utcnow()
0119 if hasattr(harvester_config.pandacon, "auth_token"):
0120 if harvester_config.pandacon.auth_token.startswith("file:"):
0121 with open(harvester_config.pandacon.auth_token.split(":")[-1]) as f:
0122 self.auth_token = f.read()
0123 else:
0124 self.auth_token = harvester_config.pandacon.auth_token
0125 for config_map in self.multihost_auth_config.values():
0126 if "auth_token" in config_map:
0127 if config_map["auth_token"].startswith("file:"):
0128 with open(config_map["auth_token"].split(":")[-1]) as f:
0129 config_map["auth_token_str"] = f.read()
0130 else:
0131 config_map["auth_token_str"] = config_map["auth_token"]
0132
0133
0134 def get_host_specific_auth_config(self, base_url: str) -> tuple:
0135 """
0136 Get host-specific auth configuration for a base URL
0137
0138 Args:
0139 base_url: base URL
0140
0141 Returns:
0142 list: host-specific configuration: auth_type, cert_file, key_file, ca_cert, auth_token. Return the default configuration if the host is not in the multihost configuration.
0143 """
0144
0145 if base_url not in self.base_url_host_map:
0146 parsed_uri = urlparse(base_url)
0147 self.base_url_host_map[base_url] = parsed_uri.netloc
0148 host = self.base_url_host_map[base_url]
0149 if host in self.multihost_auth_config:
0150 return (
0151 self.multihost_auth_config[host].get("auth_type", self.auth_type),
0152 self.multihost_auth_config[host].get("cert_file", self.cert_file),
0153 self.multihost_auth_config[host].get("key_file", self.key_file),
0154 self.multihost_auth_config[host].get("ca_cert", self.ca_cert),
0155 self.multihost_auth_config[host].get("auth_token_str", self.auth_token),
0156 )
0157 else:
0158 return self.auth_type, self.cert_file, self.key_file, self.ca_cert, self.auth_token
0159
0160 def request_ssl(self, method: str, path: str, data: dict = None, files: dict = None, cert: tuple[str, str] = None, base_url: str = None):
0161 """
0162 Generic HTTPS request function for GET, POST, and file uploads.
0163
0164 :param method: HTTP method ("GET", "POST", or "UPLOAD"). The method defines how the data is sent.
0165 :param path: URL path
0166 :param data: Query data for GET/POST.
0167 :param files: Files to upload (for "UPLOAD" method).
0168 :param cert: SSL certificate tuple (cert_file, key_file). Defaults to None (use system default).
0169 :param base_url: Base URL. Defaults to None (use the default base URL).
0170
0171 :return: Tuple (status, response or message).
0172 """
0173
0174 if method not in ("GET", "POST", "UPLOAD"):
0175 raise ValueError(f"Unsupported method: {method}")
0176
0177 try:
0178 tmp_log = None
0179 if self.verbose:
0180 tmp_log = self.make_logger(method_name=f"{method.lower()}_ssl")
0181 tmp_exec = str(uuid.uuid4())
0182
0183 if base_url is None:
0184
0185 base_url = self.server_base_path_ssl if method != "UPLOAD" else self.cache_base_path_ssl
0186 url = f"{base_url}/{path}"
0187
0188
0189 auth_type, cert_file, key_file, ca_cert, auth_token = self.get_host_specific_auth_config(base_url)
0190
0191 if self.verbose:
0192 tmp_log.debug(f"exec={tmp_exec} URL={url} data={data} files={files}")
0193
0194 headers = {"Accept": "application/json", "Connection": "close"}
0195 if auth_type == "oidc":
0196 self.renew_token()
0197 cert = None
0198 headers["Authorization"] = f"Bearer {self.auth_token}"
0199 headers["Origin"] = harvester_config.pandacon.auth_origin
0200 else:
0201 if cert is None:
0202 cert = (cert_file, key_file)
0203
0204 session = get_http_adapter_with_random_dns_resolution()
0205 sw = core_utils.get_stopwatch()
0206
0207
0208 if method == "GET":
0209
0210 response = session.request(method, url, params=data, headers=headers, timeout=harvester_config.pandacon.timeout, verify=ca_cert, cert=cert)
0211 elif method == "POST":
0212
0213 headers["Content-Type"] = "application/json"
0214 response = session.request(method, url, json=data, headers=headers, timeout=harvester_config.pandacon.timeout, verify=ca_cert, cert=cert)
0215 if method == "UPLOAD":
0216
0217 response = session.post(url, files=files, headers=headers, timeout=harvester_config.pandacon.timeout, verify=ca_cert, cert=cert)
0218
0219 if self.verbose:
0220 tmp_log.debug(f"exec={tmp_exec} code={response.status_code} {sw.get_elapsed_time()}. return={response.text}")
0221
0222 if response.status_code == 200:
0223 return True, response.json()
0224 else:
0225 err_msg = f"StatusCode={response.status_code} {response.text}"
0226
0227 except Exception:
0228 err_type, err_value = sys.exc_info()[:2]
0229 err_msg = f"failed to {method} with {err_type}:{err_value} "
0230 err_msg += traceback.format_exc()
0231
0232 return False, err_msg
0233
0234
0235 def check_panda(self):
0236 tmp_status, tmp_response = self.request_ssl("GET", "system/is_alive", {})
0237 return tmp_status, tmp_response
0238
0239
0240 def is_alive(self, metadata_dictionary):
0241 tmp_log = self.make_logger(method_name="is_alive")
0242 tmp_log.debug("Start")
0243
0244
0245 for tmp_key, tmp_val in metadata_dictionary.items():
0246 if isinstance(tmp_val, datetime.datetime):
0247 tmp_val = "datetime/" + tmp_val.strftime("%Y-%m-%d %H:%M:%S.%f")
0248 metadata_dictionary[tmp_key] = tmp_val
0249
0250
0251 data = {
0252 "harvester_id": harvester_config.master.harvester_id,
0253 "data": metadata_dictionary,
0254 }
0255 tmp_status, tmp_response = self.request_ssl("POST", "harvester/heartbeat", data)
0256
0257
0258 if tmp_status is False:
0259 tmp_str = core_utils.dump_error_message(tmp_log, tmp_response)
0260 tmp_log.debug(f"Done with {tmp_status} : {tmp_str}")
0261 return tmp_status, tmp_str
0262
0263
0264 tmp_status = tmp_response["success"]
0265 tmp_str = ""
0266 if not tmp_status:
0267 tmp_str = tmp_response["message"]
0268
0269 tmp_log.debug(f"Done with {tmp_status} : {tmp_str}")
0270 return tmp_status, tmp_str
0271
0272
0273 def get_jobs(self, site_name, node_name, prod_source_label, computing_element, n_jobs, additional_criteria):
0274 tmp_log = self.make_logger(f"siteName={site_name}", method_name="get_jobs")
0275 tmp_log.debug(f"try to get {n_jobs} jobs")
0276
0277 data = {
0278 "site_name": site_name,
0279 "node": node_name,
0280 "prod_source_label": prod_source_label,
0281 "computing_element": computing_element,
0282 "n_jobs": n_jobs,
0283 "scheduler_id": f"harvester-{harvester_config.master.harvester_id}",
0284 }
0285 if additional_criteria:
0286 for tmp_key, tmp_val in additional_criteria.items():
0287 data[tmp_key] = tmp_val
0288
0289
0290 sw = core_utils.get_stopwatch()
0291 tmp_status, tmp_response = self.request_ssl("POST", "pilot/acquire_jobs", data)
0292 tmp_log.debug(f"get_jobs for {n_jobs} jobs {sw.get_elapsed_time()}")
0293
0294
0295 if tmp_status is False:
0296 err_string = core_utils.dump_error_message(tmp_log, tmp_response)
0297 return [], err_string
0298
0299
0300 err_string = "OK"
0301 try:
0302 tmp_dict = tmp_response["data"]
0303 tmp_log.debug(f"StatusCode={tmp_dict['StatusCode']}")
0304 if tmp_dict["StatusCode"] == 0:
0305 tmp_log.debug(f"got {len(tmp_dict['jobs'])} jobs")
0306 return tmp_dict["jobs"], err_string
0307
0308 if "errorDialog" in tmp_dict:
0309 err_string = tmp_dict["errorDialog"]
0310 else:
0311 err_string = f"StatusCode={tmp_dict['StatusCode']}"
0312 return [], err_string
0313 except Exception:
0314 err_string = core_utils.dump_error_message(tmp_log, tmp_response["message"])
0315
0316 return [], err_string
0317
0318
0319 def update_jobs(self, jobspec_list, id):
0320 sw = core_utils.get_stopwatch()
0321 tmp_logger = self.make_logger(f"id={id}", method_name="update_jobs")
0322 tmp_logger.debug(f"update {len(jobspec_list)} jobs")
0323 ret_list = []
0324
0325
0326
0327 for jobSpec in jobspec_list:
0328 if jobSpec.outFiles:
0329 tmp_logger.debug(f"upload {len(jobSpec.outFiles)} checkpoint files for PandaID={jobSpec.PandaID}")
0330 for fileSpec in jobSpec.outFiles:
0331 if "sourceURL" in jobSpec.jobParams:
0332 tmp_status = self.upload_checkpoint(jobSpec.jobParams["sourceURL"], jobSpec.taskID, jobSpec.PandaID, fileSpec.lfn, fileSpec.path)
0333 if tmp_status:
0334 fileSpec.status = "done"
0335
0336
0337
0338 for jobSpec in jobspec_list:
0339 event_ranges, event_specs = jobSpec.to_event_data(max_events=10000)
0340 if event_ranges != []:
0341 tmp_logger.debug(f"update {len(event_specs)} events for PandaID={jobSpec.PandaID}")
0342 tmp_ret = self.update_event_ranges(event_ranges, tmp_logger)
0343 if tmp_ret["StatusCode"] == 0:
0344 for event_spec, ret_value in zip(event_specs, tmp_ret["Returns"]):
0345 if ret_value in [True, False] and event_spec.is_final_status():
0346 event_spec.subStatus = "done"
0347
0348
0349 n_lookup = 100
0350 i_lookup = 0
0351 while i_lookup < len(jobspec_list):
0352
0353
0354 job_list = []
0355 jobspec_shard = jobspec_list[i_lookup : i_lookup + n_lookup]
0356 for job_spec in jobspec_shard:
0357
0358
0359
0360 job_dict = job_spec.get_job_attributes_for_panda()
0361
0362
0363 job_dict["job_id"] = job_spec.PandaID
0364 job_dict["site_name"] = job_spec.computingSite
0365 job_dict["job_status"] = job_spec.get_status()
0366 job_dict["job_sub_status"] = job_spec.subStatus
0367 job_dict["attempt_nr"] = job_spec.attemptNr
0368
0369
0370 if job_dict["job_status"] in ["cancelled", "missed"]:
0371 if job_spec.is_pilot_closed():
0372 job_dict["job_sub_status"] = "pilot_closed"
0373 else:
0374 job_dict["job_sub_status"] = job_dict["job_status"]
0375 job_dict["job_status"] = "failed"
0376
0377 if job_spec.startTime is not None and "startTime" not in job_dict:
0378 job_dict["start_time"] = job_spec.startTime.strftime("%Y-%m-%d %H:%M:%S")
0379 if job_spec.endTime is not None and "endTime" not in job_dict:
0380 job_dict["end_time"] = job_spec.endTime.strftime("%Y-%m-%d %H:%M:%S")
0381
0382 if "core_count" not in job_dict and job_spec.nCore is not None:
0383 job_dict["core_count"] = job_spec.nCore
0384
0385
0386 if job_spec.is_final_status() and job_spec.status == job_spec.get_status():
0387 if job_spec.metaData is not None:
0388 job_dict["meta_data"] = json.dumps(job_spec.metaData)
0389 if job_spec.outputFilesToReport is not None:
0390 job_dict["job_output_report"] = job_spec.outputFilesToReport
0391
0392 job_list.append(job_dict)
0393
0394
0395 data = {"job_list": job_list, "harvester_id": harvester_config.master.harvester_id}
0396
0397 tmp_status, tmp_response = self.request_ssl("POST", "pilot/update_jobs_bulk", data)
0398 ret_maps = None
0399
0400
0401 error_message = ""
0402 if tmp_status is False:
0403 error_message = core_utils.dump_error_message(tmp_logger, tmp_response)
0404 else:
0405 try:
0406 tmp_status, ret_message, ret_maps = tmp_response["success"], tmp_response["message"], tmp_response["data"]
0407 if tmp_status is False:
0408 tmp_logger.error(f"failed with {ret_message} {ret_maps}")
0409 ret_maps = None
0410 error_message = ret_message
0411 except Exception:
0412 error_message = core_utils.dump_error_message(tmp_logger)
0413
0414
0415 if ret_maps is None:
0416 ret_map = {"StatusCode": 999, "ErrorDiag": error_message}
0417 ret_maps = [ret_map] * len(jobspec_shard)
0418
0419
0420 for job_spec, ret_map, job_dict in zip(jobspec_shard, ret_maps, job_list):
0421 tmp_log = self.make_logger(f"id={id} PandaID={job_spec.PandaID}", method_name="update_jobs")
0422 tmp_log.debug(f"job_dict={job_dict}")
0423
0424 try:
0425 tmp_success = ret_map.get("success", False)
0426 except Exception:
0427 tmp_success = False
0428 (tmp_log.error if not tmp_success else tmp_log.debug)(f"Done with {ret_map}")
0429
0430
0431 job_ret_map = ret_map.get("data")
0432 job_message = ret_map.get("message")
0433 if not job_ret_map:
0434 job_ret_map = {"StatusCode": 999, "ErrorDiag": job_message}
0435
0436 ret_list.append(job_ret_map)
0437
0438 i_lookup += n_lookup
0439
0440 tmp_logger.debug(f"Done. Took {sw.get_elapsed_time()} seconds")
0441 return ret_list
0442
0443
0444 def get_event_ranges(self, data_map, scattered, base_path):
0445 ret_status = False
0446 ret_value = dict()
0447
0448
0449 try:
0450 default_chunk_size = harvester_config.pandacon.getEventsChunkSize
0451 except Exception:
0452 default_chunk_size = 5120
0453
0454 for panda_id, data in data_map.items():
0455
0456 tmp_log = self.make_logger(f"PandaID={panda_id}", method_name="get_event_ranges")
0457
0458 is_hpo = data.get("isHPO", False)
0459 source_url = data.get("sourceURL")
0460 n_ranges = data.get("nRanges", 1)
0461
0462 data_request = {"job_id": panda_id, "task_id": data.get("taskID"), "jobset_id": data.get("jobsetID")}
0463
0464 if scattered:
0465 data_request["scattered"] = True
0466
0467 tmp_log.debug(f"Start n_ranges={n_ranges}")
0468
0469 while n_ranges > 0:
0470
0471 chunk_size = min(default_chunk_size, n_ranges)
0472 data_request["n_ranges"] = chunk_size
0473 tmp_status, tmp_response = self.request_ssl("POST", "event/acquire_event_ranges", data_request)
0474
0475
0476 n_ranges -= chunk_size
0477
0478
0479 if tmp_status is False:
0480 core_utils.dump_error_message(tmp_log, tmp_response)
0481 continue
0482
0483
0484 try:
0485 tmp_dict = tmp_response["data"]
0486 if tmp_dict["StatusCode"] == 0:
0487 ret_status = True
0488 ret_value.setdefault(panda_id, [])
0489
0490 if not is_hpo:
0491 ret_value[panda_id] += tmp_dict["eventRanges"]
0492 else:
0493 for event in tmp_dict["eventRanges"]:
0494 event_id = event["eventRangeID"]
0495 task_id = event_id.split("-")[0]
0496 point_id = event_id.split("-")[3]
0497
0498
0499 tmp_si, tmp_oi = idds_utils.get_hp_point(harvester_config.pandacon.iddsURL, task_id, point_id, tmp_log, self.verbose)
0500 if tmp_si:
0501 event["hp_point"] = tmp_oi
0502
0503
0504 if source_url:
0505 tmp_so, tmp_oo = self.download_checkpoint(source_url, task_id, panda_id, point_id, base_path)
0506 if tmp_so:
0507 event["checkpoint"] = tmp_oo
0508 ret_value[panda_id].append(event)
0509 else:
0510 core_utils.dump_error_message(tmp_log, tmp_oi)
0511
0512 if len(tmp_dict["eventRanges"]) == 0:
0513 break
0514 except Exception:
0515 core_utils.dump_error_message(tmp_log)
0516 break
0517
0518 tmp_log.debug(f"Done with {ret_value}")
0519
0520 return ret_status, ret_value
0521
0522
0523 def update_event_ranges(self, event_ranges, tmp_log):
0524
0525 tmp_log.debug("Start update_event_ranges")
0526
0527
0528 for item in event_ranges:
0529 new_event_ranges = []
0530 for event in item["eventRanges"]:
0531
0532
0533 if "loss" in event:
0534 event_id = event["eventRangeID"]
0535 task_id = event_id.split("-")[0]
0536 point_id = event_id.split("-")[3]
0537 tmp_si, tmp_oi = idds_utils.update_hp_point(harvester_config.pandacon.iddsURL, task_id, point_id, event["loss"], tmp_log, self.verbose)
0538 if not tmp_si:
0539 core_utils.dump_error_message(tmp_log, tmp_oi)
0540 tmp_log.error(f"skip {event_id} since cannot update iDDS")
0541 continue
0542 else:
0543
0544 if "sourceURL" in item:
0545 tmp_sc, tmp_oc = self.clear_checkpoint(item["sourceURL"], task_id, point_id)
0546 if not tmp_sc:
0547 core_utils.dump_error_message(tmp_log, tmp_oc)
0548 del event["loss"]
0549 new_event_ranges.append(event)
0550 item["eventRanges"] = new_event_ranges
0551
0552
0553 data = {"event_ranges": json.dumps(event_ranges), "version": 1}
0554 tmp_log.debug(f"data={data}")
0555 tmp_status, tmp_response = self.request_ssl("POST", "event/update_event_ranges", data)
0556 ret_map = None
0557
0558
0559 if tmp_status is False:
0560 core_utils.dump_error_message(tmp_log, tmp_response)
0561 return {"StatusCode": 999}
0562
0563
0564 tmp_success = tmp_response.get("success", False)
0565 tmp_message = tmp_response.get("message")
0566 ret_map = tmp_response.get("data")
0567
0568
0569 if not tmp_success:
0570 core_utils.dump_error_message(tmp_log, tmp_message)
0571 return {"StatusCode": 999}
0572
0573 if ret_map is None:
0574 ret_map = {"StatusCode": 999}
0575
0576 tmp_log.debug(f"Done update_event_ranges with: {ret_map}")
0577 return ret_map
0578
0579
0580 def get_commands(self, n_commands):
0581 tmp_log = self.make_logger(method_name="get_commands")
0582 tmp_log.debug(f"Start retrieving {n_commands} commands")
0583
0584 data = {"harvester_id": harvester_config.master.harvester_id, "n_commands": n_commands}
0585 tmp_status, tmp_response = self.request_ssl("POST", "harvester/acquire_commands", data)
0586
0587
0588 if tmp_status is False:
0589 core_utils.dump_error_message(tmp_log, tmp_response)
0590 return []
0591
0592
0593 tmp_success = tmp_response.get("success", False)
0594 tmp_message = tmp_response.get("message")
0595
0596
0597 if not tmp_success:
0598 core_utils.dump_error_message(tmp_log, tmp_message)
0599 return []
0600
0601 commands = tmp_response.get("data", [])
0602 tmp_log.debug(f"Done.")
0603 return commands
0604
0605
0606 def ack_commands(self, command_ids):
0607 tmp_log = self.make_logger(method_name="ack_commands")
0608 tmp_log.debug(f"Start acknowledging {len(command_ids)} commands (command_ids={command_ids})")
0609
0610 data = {"command_ids": command_ids}
0611 tmp_status, tmp_response = self.request_ssl("POST", "harvester/acknowledge_commands", data)
0612
0613
0614 if tmp_status is False:
0615 core_utils.dump_error_message(tmp_log, tmp_response)
0616 return False
0617
0618
0619 tmp_success = tmp_response.get("success", False)
0620 tmp_message = tmp_response.get("message")
0621
0622
0623 if not tmp_success:
0624 core_utils.dump_error_message(tmp_log, tmp_message)
0625 return False
0626
0627 return True
0628
0629
0630 def get_proxy(self, voms_role, cert=None):
0631 ret_value = None
0632 ret_message = ""
0633
0634 tmp_log = self.make_logger(method_name="get_proxy")
0635 tmp_log.debug("Start")
0636
0637 data = {"role": voms_role}
0638 tmp_status, tmp_response = self.request_ssl("GET", "creds/get_proxy", data, cert)
0639
0640
0641 if tmp_status is False:
0642 core_utils.dump_error_message(tmp_log, tmp_response)
0643 return ret_value, tmp_response
0644
0645
0646 tmp_success = tmp_response.get("success", False)
0647 tmp_message = tmp_response.get("message")
0648 tmp_data = tmp_response.get("data")
0649
0650 if not tmp_success:
0651 core_utils.dump_error_message(tmp_log, tmp_message)
0652 return ret_value, tmp_message
0653
0654 ret_value = tmp_data["userProxy"]
0655 tmp_log.debug(f"Done with {ret_value}")
0656
0657 return ret_value, ret_message
0658
0659
0660 def get_token_key(self, client_name: str) -> tuple[bool, str]:
0661 """
0662 Get a token key
0663
0664 :param client_name: client name
0665
0666 :return: a tuple of (status, token key or error message)
0667 """
0668 ret_value = None
0669 ret_message = ""
0670
0671 tmp_log = self.make_logger(method_name="get_token_key")
0672 tmp_log.debug("Start")
0673
0674 data = {"client_name": client_name}
0675 tmp_status, tmp_response = self.request_ssl("GET", "creds/get_token_key", data)
0676
0677
0678 if tmp_status is False:
0679 core_utils.dump_error_message(tmp_log, tmp_response)
0680 return ret_value, ret_message
0681
0682
0683 tmp_success = tmp_response.get("success", False)
0684 tmp_message = tmp_response.get("message")
0685 tmp_data = tmp_response.get("data")
0686
0687 if not tmp_success:
0688 core_utils.dump_error_message(tmp_log, tmp_message)
0689 return ret_value, tmp_message
0690
0691 ret_value = tmp_data["tokenKey"]
0692 tmp_log.debug(f"Done with {ret_value}")
0693
0694 return ret_value, ret_message
0695
0696
0697 def get_resource_types(self):
0698 tmp_log = self.make_logger(method_name="get_resource_types")
0699 tmp_log.debug("Start retrieving resource types")
0700
0701 data = {}
0702 ret_message = ""
0703 ret_value = None
0704 tmp_status, tmp_response = self.request_ssl("GET", "metaconfig/get_resource_types", data)
0705 if tmp_status is False:
0706 core_utils.dump_error_message(tmp_log, tmp_response)
0707 return ret_value, ret_message
0708
0709 tmp_success = tmp_response.get("success", False)
0710 tmp_message = tmp_response.get("message")
0711 tmp_data = tmp_response.get("data")
0712
0713 if not tmp_success:
0714 ret_message = tmp_message
0715 core_utils.dump_error_message(tmp_log, ret_message)
0716 return ret_value, ret_message
0717
0718 tmp_log.debug(f"Resource types: {tmp_data}")
0719
0720 return tmp_data, ret_message
0721
0722
0723 def get_job_stats(self):
0724 tmp_log = self.make_logger(method_name="get_job_stats")
0725 tmp_log.debug("Start")
0726
0727 tmp_status, tmp_response = self.request_ssl("GET", "statistics/active_job_stats_by_site", {})
0728 stats = {}
0729 ret_message = "FAILED"
0730
0731
0732 if tmp_status is False:
0733 core_utils.dump_error_message(tmp_log, tmp_response)
0734 return stats, ret_message
0735
0736 tmp_success = tmp_response.get("success", False)
0737 tmp_message = tmp_response.get("message")
0738 stats = tmp_response.get("data")
0739
0740 if not tmp_success:
0741 ret_message = tmp_message
0742 core_utils.dump_error_message(tmp_log, ret_message)
0743 return stats, ret_message
0744
0745 return stats, "OK"
0746
0747
0748 def get_job_stats_new(self):
0749 tmp_log = self.make_logger(method_name="get_job_stats_new")
0750 tmp_log.debug("Start")
0751
0752 tmp_status, tmp_response = self.request_ssl("GET", "statistics/active_job_detailed_stats_by_site", {})
0753 stats = {}
0754 ret_message = "FAILED"
0755
0756
0757 if tmp_status is False:
0758 core_utils.dump_error_message(tmp_log, tmp_response)
0759 return stats, ret_message
0760
0761 tmp_success = tmp_response.get("success", False)
0762 tmp_message = tmp_response.get("message")
0763 stats = tmp_response.get("data")
0764
0765 if not tmp_success:
0766 ret_message = tmp_message
0767 core_utils.dump_error_message(tmp_log, ret_message)
0768 return stats, ret_message
0769
0770 return stats, "OK"
0771
0772
0773 def update_workers(self, workspec_list):
0774 tmp_log = self.make_logger(method_name="update_workers")
0775 tmp_log.debug("Start")
0776 data_list = []
0777 for workSpec in workspec_list:
0778 data_list.append(workSpec.convert_to_propagate())
0779
0780 data = {
0781 "harvester_id": harvester_config.master.harvester_id,
0782 "workers": data_list,
0783 }
0784
0785 tmp_log.debug(f"Update {len(data_list)} workers")
0786 tmp_status, tmp_response = self.request_ssl("POST", "harvester/update_workers", data)
0787
0788 ret_list = None
0789 ret_message = "OK"
0790
0791
0792 if tmp_status is False:
0793 ret_message = core_utils.dump_error_message(tmp_log, tmp_response)
0794 return ret_list, ret_message
0795
0796
0797 tmp_success = tmp_response.get("success", False)
0798 tmp_message = tmp_response.get("message")
0799 tmp_data = tmp_response.get("data")
0800
0801
0802 if not tmp_success:
0803 ret_message = core_utils.dump_error_message(tmp_log, tmp_message)
0804 return ret_list, ret_message
0805
0806 ret_list = tmp_data
0807 tmp_log.debug(f"Done with {ret_message}")
0808
0809 return ret_list, ret_message
0810
0811
0812 def update_worker_stats(self, site_name, stats):
0813 tmp_log = self.make_logger(method_name="update_worker_stats")
0814 tmp_log.debug("Start")
0815
0816 data = {
0817 "harvester_id": harvester_config.master.harvester_id,
0818 "panda_queue": site_name,
0819 "statistics": json.dumps(stats),
0820 }
0821 tmp_log.debug(f"update stats for {site_name}, stats: {stats}")
0822 tmp_status, tmp_response = self.request_ssl("POST", "harvester/report_worker_statistics", data)
0823
0824 ret_status = True
0825 ret_message = "OK"
0826
0827
0828 if tmp_status is False:
0829 ret_status = tmp_status
0830 ret_message = core_utils.dump_error_message(tmp_log, tmp_response)
0831 return ret_status, ret_message
0832
0833
0834 tmp_success = tmp_response.get("success", False)
0835 tmp_message = tmp_response.get("message")
0836
0837
0838
0839
0840 if not tmp_success:
0841 ret_status = tmp_success
0842 ret_message = core_utils.dump_error_message(tmp_log, tmp_message)
0843 return ret_status, ret_message
0844
0845 tmp_log.debug(f"Done with {ret_status}:{ret_message}")
0846
0847 return ret_status, ret_message
0848
0849
0850 def check_jobs(self, jobspec_list):
0851 tmp_log = self.make_logger(method_name="check_jobs")
0852 tmp_log.debug("Start")
0853
0854 ret_list = []
0855
0856
0857 n_lookup = 100
0858 i_lookup = 0
0859
0860 while i_lookup < len(jobspec_list):
0861
0862 job_ids = []
0863 for jobSpec in jobspec_list[i_lookup : i_lookup + n_lookup]:
0864 job_ids.append(jobSpec.PandaID)
0865
0866 i_lookup += n_lookup
0867
0868 data = {"job_ids": job_ids}
0869 tmp_status, tmp_response = self.request_ssl("GET", "job/get_status", data)
0870
0871 err_string = "OK"
0872 job_statuses = []
0873
0874
0875 if tmp_status is False:
0876 err_string = core_utils.dump_error_message(tmp_log, tmp_response)
0877 else:
0878
0879 tmp_success = tmp_response.get("success", False)
0880 tmp_message = tmp_response.get("message")
0881
0882
0883 if not tmp_success:
0884 err_string = core_utils.dump_error_message(tmp_log, tmp_message)
0885
0886
0887 job_statuses = tmp_response.get("data")
0888
0889 for idx, job_id in enumerate(job_ids):
0890
0891 if not job_statuses or idx >= len(job_statuses):
0892 ret_map = {"StatusCode": 999, "ErrorDiag": err_string}
0893 else:
0894 ret_map = job_statuses[idx]
0895 ret_map["StatusCode"] = 0
0896 ret_map["ErrorDiag"] = err_string
0897 ret_list.append(ret_map)
0898 tmp_log.debug(f"Received {ret_map} for PandaID={job_id}")
0899
0900 tmp_log.debug("Done")
0901 return ret_list
0902
0903
0904 def get_key_pair(self, public_key_name, private_key_name):
0905 tmp_log = self.make_logger(method_name="get_key_pair")
0906 tmp_log.debug(f"Start for {public_key_name}:{private_key_name}")
0907
0908 data = {
0909 "public_key_name": public_key_name,
0910 "private_key_name": private_key_name,
0911 }
0912
0913 tmp_status, tmp_response = self.request_ssl("GET", "creds/get_key_pair", data)
0914
0915 key_pair = None
0916 err_string = None
0917
0918
0919 if tmp_status is False:
0920 err_string = core_utils.dump_error_message(tmp_log, tmp_response)
0921 return key_pair, err_string
0922
0923
0924 tmp_success = tmp_response.get("success", False)
0925 tmp_message = tmp_response.get("message")
0926
0927
0928 if not tmp_success:
0929 err_string = core_utils.dump_error_message(tmp_log, tmp_message)
0930 return key_pair, err_string
0931
0932
0933 key_pair = tmp_response.get("data")
0934 tmp_log.debug(f"Got key_pair: {key_pair} err_string: {err_string}")
0935
0936 return key_pair, err_string
0937
0938
0939 def upload_file(self, file_name, file_object, offset, read_bytes):
0940 tmp_log = self.make_logger(method_name="upload_file")
0941 tmp_log.debug(f"Start for {file_name} {offset}:{read_bytes}")
0942 file_object.seek(offset)
0943 files = {"file": (file_name, zlib.compress(file_object.read(read_bytes)))}
0944
0945 tmp_status, tmp_response = self.request_ssl("UPLOAD", "file_server/upload_jedi_log", files=files)
0946
0947
0948 if tmp_status is False:
0949 err_string = core_utils.dump_error_message(tmp_log, tmp_response)
0950 return tmp_status, err_string
0951
0952
0953 tmp_success = tmp_response.get("success", False)
0954 tmp_message = tmp_response.get("message")
0955
0956 (tmp_log.error if not tmp_success else tmp_log.debug)(f"Done with {tmp_success}:{tmp_message}")
0957 return tmp_success, tmp_message
0958
0959
0960 def check_event_availability(self, jobspec):
0961 tmp_log = self.make_logger(f"PandaID={jobspec.PandaID}", method_name="check_event_availability")
0962 tmp_log.debug("Start")
0963
0964 data = {
0965 "job_id": jobspec.PandaID,
0966 "jobset_id": jobspec.jobsetID or jobspec.jobParams["jobsetID"],
0967 "task_id": jobspec.taskID,
0968 }
0969
0970 tmp_status, tmp_response = self.request_ssl("GET", "event/get_available_event_range_count", data)
0971
0972
0973 if tmp_status is False:
0974 ret_message = core_utils.dump_error_message(tmp_log, tmp_response)
0975 return tmp_status, ret_message
0976
0977
0978 tmp_success = tmp_response.get("success", False)
0979 tmp_message = tmp_response.get("message")
0980 n_event_ranges = tmp_response.get("data", None)
0981
0982
0983 if not tmp_success:
0984 err_string = core_utils.dump_error_message(tmp_log, tmp_message)
0985 return tmp_success, err_string
0986
0987 tmp_log.debug(f"Done with {n_event_ranges}")
0988 return tmp_success, n_event_ranges
0989
0990
0991 def send_dialog_messages(self, dialog_list):
0992 tmp_log = self.make_logger(method_name="send_dialog_messages")
0993 tmp_log.debug("Start")
0994
0995 data_list = []
0996 for diagSpec in dialog_list:
0997 data_list.append(diagSpec.convert_to_propagate())
0998
0999 data = {"harvester_id": harvester_config.master.harvester_id, "dialogs": data_list}
1000
1001 tmp_log.debug(f"Sending {len(data_list)} messages")
1002 tmp_status, tmp_response = self.request_ssl("POST", "harvester/add_dialogs", data)
1003
1004
1005 if tmp_status is False:
1006 ret_message = core_utils.dump_error_message(tmp_log, tmp_response)
1007 return tmp_status, ret_message
1008
1009
1010 tmp_success = tmp_response.get("success", False)
1011 tmp_message = tmp_response.get("message")
1012
1013 (tmp_log.error if not tmp_success else tmp_log.debug)(f"Done with {tmp_success}:{tmp_message}")
1014
1015 return tmp_success, tmp_message
1016
1017
1018 def update_service_metrics(self, service_metrics_list):
1019 tmp_log = self.make_logger(method_name="update_service_metrics")
1020 tmp_log.debug("Start")
1021
1022 data = {"harvester_id": harvester_config.master.harvester_id, "metrics": service_metrics_list}
1023
1024 tmp_log.debug("Updating metrics")
1025 tmp_status, tmp_response = self.request_ssl("POST", "harvester/update_service_metrics", data)
1026
1027
1028 if tmp_status is False:
1029 ret_message = core_utils.dump_error_message(tmp_log, tmp_response)
1030 return tmp_status, ret_message
1031
1032
1033 tmp_success = tmp_response.get("success", False)
1034 tmp_message = tmp_response.get("message")
1035
1036 (tmp_log.error if not tmp_success else tmp_log.debug)(f"Done with {tmp_success}:{tmp_message}")
1037
1038 return tmp_success, tmp_message
1039
1040
1041 def upload_checkpoint(self, base_url, task_id, panda_id, file_name, file_path):
1042 tmp_log = self.make_logger(f"taskID={task_id} pandaID={panda_id}", method_name="upload_checkpoint")
1043 tmp_log.debug(f"Start for {file_name}")
1044
1045 ret_status = False
1046
1047 try:
1048 files = {"file": (file_name, open(file_path).read())}
1049 tmp_status, tmp_response = self.request_ssl("UPLOAD", "file_server/upload_hpo_checkpoint", files=files, base_url=base_url)
1050
1051 if tmp_status is False:
1052 core_utils.dump_error_message(tmp_log, tmp_response)
1053 ret_status = tmp_status
1054 else:
1055 tmp_success = tmp_response.get("success", False)
1056 tmp_message = tmp_response.get("message")
1057 (tmp_log.error if not tmp_success else tmp_log.debug)(f"Done with {tmp_success}:{tmp_message}")
1058 ret_status = tmp_success
1059
1060 return ret_status
1061
1062 except Exception:
1063 core_utils.dump_error_message(tmp_log)
1064 return ret_status
1065
1066
1067 def download_checkpoint(self, base_url, task_id, panda_id, point_id, base_path):
1068 tmp_log = self.make_logger(f"taskID={task_id} pandaID={panda_id}", method_name="download_checkpoint")
1069 tmp_log.debug(f"Start for ID={point_id}")
1070 try:
1071
1072 path = f"cache/hpo_cp_{task_id}_{point_id}"
1073 tmp_status, tmp_response = self.request_ssl("POST", path, {}, base_url=base_url)
1074 file_name = None
1075
1076
1077 if tmp_status is False:
1078 core_utils.dump_error_message(tmp_log, tmp_response)
1079 return tmp_status, file_name
1080
1081
1082 file_name = os.path.join(base_path, str(uuid.uuid4()))
1083 with open(file_name, "w") as f:
1084 f.write(tmp_response.content)
1085
1086 tmp_log.debug(f"Downloaded {file_name}")
1087 return tmp_status, file_name
1088
1089 except Exception:
1090 core_utils.dump_error_message(tmp_log)
1091 return False, None
1092
1093
1094 def clear_checkpoint(self, base_url, task_id, point_id):
1095 tmp_log = self.make_logger(f"taskID={task_id} pointID={point_id}", method_name="clear_checkpoints")
1096 tmp_log.debug("Start")
1097
1098 data = {"task_id": task_id, "sub_id": point_id}
1099
1100 tmp_status, tmp_response = self.request_ssl("POST", "file_server/delete_hpo_checkpoint", data, base_url=base_url)
1101
1102
1103 if tmp_status is False:
1104 ret_message = core_utils.dump_error_message(tmp_log, tmp_response)
1105 return tmp_status, ret_message
1106
1107
1108 tmp_success = tmp_response.get("success", False)
1109 tmp_message = tmp_response.get("message")
1110
1111 (tmp_log.error if not tmp_success else tmp_log.debug)(f"Done with {tmp_success}:{tmp_message}")
1112
1113 return tmp_success, tmp_message
1114
1115
1116 def get_max_worker_id(self):
1117 tmp_log = self.make_logger(method_name="get_max_worker_id")
1118 tmp_log.debug("Start")
1119
1120 data = {"harvester_id": harvester_config.master.harvester_id}
1121
1122 tmp_status, tmp_response = self.request_ssl("GET", "harvester/get_current_worker_id", data)
1123
1124
1125 if tmp_status is False:
1126 core_utils.dump_error_message(tmp_log, tmp_response)
1127 return tmp_status, None
1128
1129
1130 tmp_success = tmp_response.get("success", False)
1131 tmp_message = tmp_response.get("message")
1132 tmp_data = tmp_response.get("data")
1133
1134
1135 ret_value = tmp_data if tmp_data is not None else tmp_message
1136
1137 (tmp_log.error if not tmp_success else tmp_log.debug)(f"Done with {tmp_success}:{tmp_message}")
1138 return tmp_success, ret_value
1139
1140
1141 def get_worker_stats_from_panda(self):
1142 tmp_log = self.make_logger(method_name="get_worker_stats_from_panda")
1143 tmp_log.debug("Start")
1144
1145 tmp_status, tmp_response = self.request_ssl("GET", "harvester/get_worker_statistics", {})
1146
1147
1148 if tmp_status is False:
1149 ret_message = core_utils.dump_error_message(tmp_log, tmp_response)
1150 return {}, ret_message
1151
1152
1153 tmp_success = tmp_response.get("success", False)
1154 tmp_message = tmp_response.get("message")
1155 stats = tmp_response.get("data", {})
1156
1157 if not tmp_success:
1158 ret_message = core_utils.dump_error_message(tmp_log, tmp_message)
1159 return {}, ret_message
1160
1161 tmp_log.debug(f"Done with {stats}")
1162 return stats, "OK"