Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:57

0001 """
0002 Connection to the PanDA server
0003 
0004 """
0005 
0006 import ssl
0007 
0008 try:
0009     # disable SNI for TLSV1_UNRECOGNIZED_NAME before importing requests
0010     ssl.HAS_SNI = False
0011 except Exception:
0012     pass
0013 import datetime
0014 import json
0015 import os
0016 import sys
0017 import traceback
0018 import uuid
0019 import zlib
0020 from urllib.parse import urlparse
0021 
0022 # TO BE REMOVED for python2.7
0023 import requests.packages.urllib3
0024 
0025 try:
0026     requests.packages.urllib3.disable_warnings()
0027 except Exception:
0028     pass
0029 
0030 from pandacommon.pandautils.net_utils import get_http_adapter_with_random_dns_resolution
0031 
0032 from pandaharvester.harvesterconfig import harvester_config
0033 from pandaharvester.harvestercore import core_utils
0034 from pandaharvester.harvestermisc import idds_utils
0035 
0036 from .base_communicator import BaseCommunicator
0037 
0038 
0039 # connection class
0040 class PandaCommunicator(BaseCommunicator):
0041     # constructor
0042     def __init__(self):
0043         BaseCommunicator.__init__(self)
0044         self.useInspect = False
0045         if hasattr(harvester_config.pandacon, "verbose") and harvester_config.pandacon.verbose:
0046             self.verbose = True
0047             if hasattr(harvester_config.pandacon, "useInspect") and harvester_config.pandacon.useInspect is True:
0048                 self.useInspect = True
0049         else:
0050             self.verbose = False
0051         if hasattr(harvester_config.pandacon, "auth_type"):
0052             self.auth_type = harvester_config.pandacon.auth_type
0053         else:
0054             self.auth_type = "x509"
0055         self.auth_token = None
0056         self.auth_token_last_update = None
0057         if hasattr(harvester_config.pandacon, "cert_file"):
0058             self.cert_file = harvester_config.pandacon.cert_file
0059         else:
0060             self.cert_file = None
0061         if hasattr(harvester_config.pandacon, "key_file"):
0062             self.key_file = harvester_config.pandacon.key_file
0063         else:
0064             self.key_file = None
0065         if hasattr(harvester_config.pandacon, "ca_cert"):
0066             self.ca_cert = harvester_config.pandacon.ca_cert
0067         else:
0068             self.ca_cert = False
0069 
0070         # Generate the base URL for the server. Priority for new URL directly configured in pandacon, otherwise we try to generate it from the old config
0071         if hasattr(harvester_config.pandacon, "server_api_url_ssl"):
0072             self.server_base_path_ssl = harvester_config.pandacon.server_api_url_ssl
0073         elif hasattr(harvester_config.pandacon, "pandaURLSSL"):
0074             parsed = urlparse(harvester_config.pandacon.pandaURLSSL)
0075             self.server_base_path_ssl = f"{parsed.scheme}://{parsed.netloc}/api/v1"
0076         else:
0077             # No configuration at all will lead to broken configuration
0078             self.server_base_path_ssl = None
0079 
0080         # Generate the base URL for the cache. Priority for new URL directly configured in pandacon, otherwise we try to generate it from the old config
0081         if hasattr(harvester_config.pandacon, "cache_api_url_ssl"):
0082             self.cache_base_path_ssl = harvester_config.pandacon.cache_api_url_ssl
0083         elif hasattr(harvester_config.pandacon, "pandaCacheURL_W"):
0084             parsed = urlparse(harvester_config.pandacon.pandaCacheURL_W)
0085             self.cache_base_path_ssl = f"{parsed.scheme}://{parsed.netloc}/api/v1"
0086         else:
0087             # No configuration at all will lead to broken configuration
0088             self.cache_base_path_ssl = None
0089 
0090         # multihost auth configuration
0091         self.multihost_auth_config = {}
0092         if hasattr(harvester_config.pandacon, "multihost_auth_config") and harvester_config.pandacon.multihost_auth_config:
0093             try:
0094                 with open(harvester_config.pandacon.multihost_auth_config) as f:
0095                     self.multihost_auth_config = json.load(f)
0096             except Exception:
0097                 pass
0098 
0099         # mapping between base URL and host
0100         self.base_url_host_map = {}
0101         # renew token
0102         try:
0103             self.renew_token()
0104         except Exception:
0105             pass
0106 
0107     # force token renewal
0108     def force_credential_renewal(self):
0109         """
0110         Unset timestamp to trigger token renewal
0111         """
0112         self.auth_token_last_update = None
0113 
0114     # renew token
0115     def renew_token(self):
0116         if self.auth_token_last_update is not None and core_utils.naive_utcnow() - self.auth_token_last_update < datetime.timedelta(minutes=10):
0117             return
0118         self.auth_token_last_update = core_utils.naive_utcnow()
0119         if hasattr(harvester_config.pandacon, "auth_token"):
0120             if harvester_config.pandacon.auth_token.startswith("file:"):
0121                 with open(harvester_config.pandacon.auth_token.split(":")[-1]) as f:
0122                     self.auth_token = f.read()
0123             else:
0124                 self.auth_token = harvester_config.pandacon.auth_token
0125         for config_map in self.multihost_auth_config.values():
0126             if "auth_token" in config_map:
0127                 if config_map["auth_token"].startswith("file:"):
0128                     with open(config_map["auth_token"].split(":")[-1]) as f:
0129                         config_map["auth_token_str"] = f.read()
0130                 else:
0131                     config_map["auth_token_str"] = config_map["auth_token"]
0132 
0133     # def get host-specific auth config for a base URL
0134     def get_host_specific_auth_config(self, base_url: str) -> tuple:
0135         """
0136         Get host-specific auth configuration for a base URL
0137 
0138         Args:
0139             base_url: base URL
0140 
0141         Returns:
0142             list: host-specific configuration: auth_type, cert_file, key_file, ca_cert, auth_token. Return the default configuration if the host is not in the multihost configuration.
0143         """
0144         # check if the base URL is already in the cache
0145         if base_url not in self.base_url_host_map:
0146             parsed_uri = urlparse(base_url)
0147             self.base_url_host_map[base_url] = parsed_uri.netloc
0148         host = self.base_url_host_map[base_url]
0149         if host in self.multihost_auth_config:
0150             return (
0151                 self.multihost_auth_config[host].get("auth_type", self.auth_type),
0152                 self.multihost_auth_config[host].get("cert_file", self.cert_file),
0153                 self.multihost_auth_config[host].get("key_file", self.key_file),
0154                 self.multihost_auth_config[host].get("ca_cert", self.ca_cert),
0155                 self.multihost_auth_config[host].get("auth_token_str", self.auth_token),
0156             )
0157         else:
0158             return self.auth_type, self.cert_file, self.key_file, self.ca_cert, self.auth_token
0159 
0160     def request_ssl(self, method: str, path: str, data: dict = None, files: dict = None, cert: tuple[str, str] = None, base_url: str = None):
0161         """
0162         Generic HTTPS request function for GET, POST, and file uploads.
0163 
0164         :param method: HTTP method ("GET", "POST", or "UPLOAD"). The method defines how the data is sent.
0165         :param path: URL path
0166         :param data: Query data for GET/POST.
0167         :param files: Files to upload (for "UPLOAD" method).
0168         :param cert: SSL certificate tuple (cert_file, key_file). Defaults to None (use system default).
0169         :param base_url: Base URL. Defaults to None (use the default base URL).
0170 
0171         :return: Tuple (status, response or message).
0172         """
0173 
0174         if method not in ("GET", "POST", "UPLOAD"):
0175             raise ValueError(f"Unsupported method: {method}")
0176 
0177         try:
0178             tmp_log = None
0179             if self.verbose:
0180                 tmp_log = self.make_logger(method_name=f"{method.lower()}_ssl")
0181                 tmp_exec = str(uuid.uuid4())
0182 
0183             if base_url is None:
0184                 # Most operations go to PanDA server, except for file uploads that go to PanDA cache
0185                 base_url = self.server_base_path_ssl if method != "UPLOAD" else self.cache_base_path_ssl
0186             url = f"{base_url}/{path}"
0187 
0188             # Get authentication config
0189             auth_type, cert_file, key_file, ca_cert, auth_token = self.get_host_specific_auth_config(base_url)
0190 
0191             if self.verbose:
0192                 tmp_log.debug(f"exec={tmp_exec} URL={url} data={data} files={files}")
0193 
0194             headers = {"Accept": "application/json", "Connection": "close"}
0195             if auth_type == "oidc":
0196                 self.renew_token()
0197                 cert = None
0198                 headers["Authorization"] = f"Bearer {self.auth_token}"
0199                 headers["Origin"] = harvester_config.pandacon.auth_origin
0200             else:
0201                 if cert is None:
0202                     cert = (cert_file, key_file)
0203 
0204             session = get_http_adapter_with_random_dns_resolution()
0205             sw = core_utils.get_stopwatch()
0206 
0207             # Determine request type
0208             if method == "GET":
0209                 # URL encoding
0210                 response = session.request(method, url, params=data, headers=headers, timeout=harvester_config.pandacon.timeout, verify=ca_cert, cert=cert)
0211             elif method == "POST":
0212                 # JSON encoding in body
0213                 headers["Content-Type"] = "application/json"
0214                 response = session.request(method, url, json=data, headers=headers, timeout=harvester_config.pandacon.timeout, verify=ca_cert, cert=cert)
0215             if method == "UPLOAD":
0216                 # Upload files
0217                 response = session.post(url, files=files, headers=headers, timeout=harvester_config.pandacon.timeout, verify=ca_cert, cert=cert)
0218 
0219             if self.verbose:
0220                 tmp_log.debug(f"exec={tmp_exec} code={response.status_code} {sw.get_elapsed_time()}. return={response.text}")
0221 
0222             if response.status_code == 200:
0223                 return True, response.json()
0224             else:
0225                 err_msg = f"StatusCode={response.status_code} {response.text}"
0226 
0227         except Exception:
0228             err_type, err_value = sys.exc_info()[:2]
0229             err_msg = f"failed to {method} with {err_type}:{err_value} "
0230             err_msg += traceback.format_exc()
0231 
0232         return False, err_msg
0233 
0234     # check server, this method is only used for testing the connectivity
0235     def check_panda(self):
0236         tmp_status, tmp_response = self.request_ssl("GET", "system/is_alive", {})
0237         return tmp_status, tmp_response
0238 
0239     # send heartbeat of harvester instance
0240     def is_alive(self, metadata_dictionary):
0241         tmp_log = self.make_logger(method_name="is_alive")
0242         tmp_log.debug("Start")
0243 
0244         # convert datetime
0245         for tmp_key, tmp_val in metadata_dictionary.items():
0246             if isinstance(tmp_val, datetime.datetime):
0247                 tmp_val = "datetime/" + tmp_val.strftime("%Y-%m-%d %H:%M:%S.%f")
0248                 metadata_dictionary[tmp_key] = tmp_val
0249 
0250         # send data
0251         data = {
0252             "harvester_id": harvester_config.master.harvester_id,
0253             "data": metadata_dictionary,
0254         }
0255         tmp_status, tmp_response = self.request_ssl("POST", "harvester/heartbeat", data)
0256 
0257         # Communication issue
0258         if tmp_status is False:
0259             tmp_str = core_utils.dump_error_message(tmp_log, tmp_response)
0260             tmp_log.debug(f"Done with {tmp_status} : {tmp_str}")
0261             return tmp_status, tmp_str
0262 
0263         # We see what PanDA server replied
0264         tmp_status = tmp_response["success"]
0265         tmp_str = ""
0266         if not tmp_status:
0267             tmp_str = tmp_response["message"]
0268 
0269         tmp_log.debug(f"Done with {tmp_status} : {tmp_str}")
0270         return tmp_status, tmp_str
0271 
0272     # get jobs
0273     def get_jobs(self, site_name, node_name, prod_source_label, computing_element, n_jobs, additional_criteria):
0274         tmp_log = self.make_logger(f"siteName={site_name}", method_name="get_jobs")
0275         tmp_log.debug(f"try to get {n_jobs} jobs")
0276 
0277         data = {
0278             "site_name": site_name,
0279             "node": node_name,
0280             "prod_source_label": prod_source_label,
0281             "computing_element": computing_element,
0282             "n_jobs": n_jobs,
0283             "scheduler_id": f"harvester-{harvester_config.master.harvester_id}",
0284         }
0285         if additional_criteria:
0286             for tmp_key, tmp_val in additional_criteria.items():
0287                 data[tmp_key] = tmp_val
0288 
0289         # Get the jobs from PanDA server and measure the time
0290         sw = core_utils.get_stopwatch()
0291         tmp_status, tmp_response = self.request_ssl("POST", "pilot/acquire_jobs", data)
0292         tmp_log.debug(f"get_jobs for {n_jobs} jobs {sw.get_elapsed_time()}")
0293 
0294         # Communication issue
0295         if tmp_status is False:
0296             err_string = core_utils.dump_error_message(tmp_log, tmp_response)
0297             return [], err_string
0298 
0299         # Parse the response
0300         err_string = "OK"
0301         try:
0302             tmp_dict = tmp_response["data"]
0303             tmp_log.debug(f"StatusCode={tmp_dict['StatusCode']}")
0304             if tmp_dict["StatusCode"] == 0:
0305                 tmp_log.debug(f"got {len(tmp_dict['jobs'])} jobs")
0306                 return tmp_dict["jobs"], err_string
0307 
0308             if "errorDialog" in tmp_dict:
0309                 err_string = tmp_dict["errorDialog"]
0310             else:
0311                 err_string = f"StatusCode={tmp_dict['StatusCode']}"
0312             return [], err_string
0313         except Exception:
0314             err_string = core_utils.dump_error_message(tmp_log, tmp_response["message"])
0315 
0316         return [], err_string
0317 
0318     # update jobs
0319     def update_jobs(self, jobspec_list, id):
0320         sw = core_utils.get_stopwatch()
0321         tmp_logger = self.make_logger(f"id={id}", method_name="update_jobs")
0322         tmp_logger.debug(f"update {len(jobspec_list)} jobs")
0323         ret_list = []
0324 
0325         # TODO: check these work OK after the API migration
0326         # upload checkpoints
0327         for jobSpec in jobspec_list:
0328             if jobSpec.outFiles:
0329                 tmp_logger.debug(f"upload {len(jobSpec.outFiles)} checkpoint files for PandaID={jobSpec.PandaID}")
0330             for fileSpec in jobSpec.outFiles:
0331                 if "sourceURL" in jobSpec.jobParams:
0332                     tmp_status = self.upload_checkpoint(jobSpec.jobParams["sourceURL"], jobSpec.taskID, jobSpec.PandaID, fileSpec.lfn, fileSpec.path)
0333                     if tmp_status:
0334                         fileSpec.status = "done"
0335 
0336         # TODO: check these work OK after the API migration
0337         # update events
0338         for jobSpec in jobspec_list:
0339             event_ranges, event_specs = jobSpec.to_event_data(max_events=10000)
0340             if event_ranges != []:
0341                 tmp_logger.debug(f"update {len(event_specs)} events for PandaID={jobSpec.PandaID}")
0342                 tmp_ret = self.update_event_ranges(event_ranges, tmp_logger)
0343                 if tmp_ret["StatusCode"] == 0:
0344                     for event_spec, ret_value in zip(event_specs, tmp_ret["Returns"]):
0345                         if ret_value in [True, False] and event_spec.is_final_status():
0346                             event_spec.subStatus = "done"
0347 
0348         # update jobs in bulk
0349         n_lookup = 100
0350         i_lookup = 0
0351         while i_lookup < len(jobspec_list):
0352 
0353             # break down the jobspec_list into chunks of n_lookup
0354             job_list = []
0355             jobspec_shard = jobspec_list[i_lookup : i_lookup + n_lookup]
0356             for job_spec in jobspec_shard:
0357 
0358                 # pre-fill job data
0359                 # TODO: this function needs to be reviewed, since now we changed the names
0360                 job_dict = job_spec.get_job_attributes_for_panda()
0361 
0362                 # basic fields
0363                 job_dict["job_id"] = job_spec.PandaID
0364                 job_dict["site_name"] = job_spec.computingSite
0365                 job_dict["job_status"] = job_spec.get_status()
0366                 job_dict["job_sub_status"] = job_spec.subStatus
0367                 job_dict["attempt_nr"] = job_spec.attemptNr
0368 
0369                 # change cancelled/missed to failed to be accepted by panda server
0370                 if job_dict["job_status"] in ["cancelled", "missed"]:
0371                     if job_spec.is_pilot_closed():
0372                         job_dict["job_sub_status"] = "pilot_closed"
0373                     else:
0374                         job_dict["job_sub_status"] = job_dict["job_status"]
0375                     job_dict["job_status"] = "failed"
0376 
0377                 if job_spec.startTime is not None and "startTime" not in job_dict:
0378                     job_dict["start_time"] = job_spec.startTime.strftime("%Y-%m-%d %H:%M:%S")
0379                 if job_spec.endTime is not None and "endTime" not in job_dict:
0380                     job_dict["end_time"] = job_spec.endTime.strftime("%Y-%m-%d %H:%M:%S")
0381 
0382                 if "core_count" not in job_dict and job_spec.nCore is not None:
0383                     job_dict["core_count"] = job_spec.nCore
0384 
0385                 # for jobs in final status we upload metadata and the job output report
0386                 if job_spec.is_final_status() and job_spec.status == job_spec.get_status():
0387                     if job_spec.metaData is not None:
0388                         job_dict["meta_data"] = json.dumps(job_spec.metaData)
0389                     if job_spec.outputFilesToReport is not None:
0390                         job_dict["job_output_report"] = job_spec.outputFilesToReport
0391 
0392                 job_list.append(job_dict)
0393 
0394             # data dictionary to be sent to PanDA server
0395             data = {"job_list": job_list, "harvester_id": harvester_config.master.harvester_id}
0396 
0397             tmp_status, tmp_response = self.request_ssl("POST", "pilot/update_jobs_bulk", data)
0398             ret_maps = None
0399 
0400             # Communication issue
0401             error_message = ""
0402             if tmp_status is False:
0403                 error_message = core_utils.dump_error_message(tmp_logger, tmp_response)
0404             else:
0405                 try:
0406                     tmp_status, ret_message, ret_maps = tmp_response["success"], tmp_response["message"], tmp_response["data"]
0407                     if tmp_status is False:
0408                         tmp_logger.error(f"failed with {ret_message} {ret_maps}")
0409                         ret_maps = None
0410                         error_message = ret_message
0411                 except Exception:
0412                     error_message = core_utils.dump_error_message(tmp_logger)
0413 
0414             # make a default map when the jobs could not be updated
0415             if ret_maps is None:
0416                 ret_map = {"StatusCode": 999, "ErrorDiag": error_message}
0417                 ret_maps = [ret_map] * len(jobspec_shard)
0418 
0419             # iterate the results
0420             for job_spec, ret_map, job_dict in zip(jobspec_shard, ret_maps, job_list):
0421                 tmp_log = self.make_logger(f"id={id} PandaID={job_spec.PandaID}", method_name="update_jobs")
0422                 tmp_log.debug(f"job_dict={job_dict}")
0423 
0424                 try:
0425                     tmp_success = ret_map.get("success", False)
0426                 except Exception:
0427                     tmp_success = False
0428                 (tmp_log.error if not tmp_success else tmp_log.debug)(f"Done with {ret_map}")
0429 
0430                 # Get the status code and command from the API response
0431                 job_ret_map = ret_map.get("data")
0432                 job_message = ret_map.get("message")
0433                 if not job_ret_map:
0434                     job_ret_map = {"StatusCode": 999, "ErrorDiag": job_message}
0435 
0436                 ret_list.append(job_ret_map)
0437 
0438             i_lookup += n_lookup
0439 
0440         tmp_logger.debug(f"Done. Took {sw.get_elapsed_time()} seconds")
0441         return ret_list
0442 
0443     # get events
0444     def get_event_ranges(self, data_map, scattered, base_path):
0445         ret_status = False
0446         ret_value = dict()
0447 
0448         # define the chunk size
0449         try:
0450             default_chunk_size = harvester_config.pandacon.getEventsChunkSize
0451         except Exception:
0452             default_chunk_size = 5120
0453 
0454         for panda_id, data in data_map.items():
0455             # job-specific logger
0456             tmp_log = self.make_logger(f"PandaID={panda_id}", method_name="get_event_ranges")
0457 
0458             is_hpo = data.get("isHPO", False)
0459             source_url = data.get("sourceURL")
0460             n_ranges = data.get("nRanges", 1)
0461 
0462             data_request = {"job_id": panda_id, "task_id": data.get("taskID"), "jobset_id": data.get("jobsetID")}
0463 
0464             if scattered:
0465                 data_request["scattered"] = True
0466 
0467             tmp_log.debug(f"Start n_ranges={n_ranges}")
0468 
0469             while n_ranges > 0:
0470                 # use a small chunk size to avoid timeout
0471                 chunk_size = min(default_chunk_size, n_ranges)
0472                 data_request["n_ranges"] = chunk_size
0473                 tmp_status, tmp_response = self.request_ssl("POST", "event/acquire_event_ranges", data_request)
0474 
0475                 # decrease the number of ranges
0476                 n_ranges -= chunk_size
0477 
0478                 # communication error
0479                 if tmp_status is False:
0480                     core_utils.dump_error_message(tmp_log, tmp_response)
0481                     continue
0482 
0483                 # communication with PanDA server was OK
0484                 try:
0485                     tmp_dict = tmp_response["data"]
0486                     if tmp_dict["StatusCode"] == 0:
0487                         ret_status = True
0488                         ret_value.setdefault(panda_id, [])
0489 
0490                         if not is_hpo:
0491                             ret_value[panda_id] += tmp_dict["eventRanges"]
0492                         else:
0493                             for event in tmp_dict["eventRanges"]:
0494                                 event_id = event["eventRangeID"]
0495                                 task_id = event_id.split("-")[0]
0496                                 point_id = event_id.split("-")[3]
0497 
0498                                 # get HP point
0499                                 tmp_si, tmp_oi = idds_utils.get_hp_point(harvester_config.pandacon.iddsURL, task_id, point_id, tmp_log, self.verbose)
0500                                 if tmp_si:
0501                                     event["hp_point"] = tmp_oi
0502 
0503                                     # get checkpoint
0504                                     if source_url:
0505                                         tmp_so, tmp_oo = self.download_checkpoint(source_url, task_id, panda_id, point_id, base_path)
0506                                         if tmp_so:
0507                                             event["checkpoint"] = tmp_oo
0508                                     ret_value[panda_id].append(event)
0509                                 else:
0510                                     core_utils.dump_error_message(tmp_log, tmp_oi)
0511                         # got empty
0512                         if len(tmp_dict["eventRanges"]) == 0:
0513                             break
0514                 except Exception:
0515                     core_utils.dump_error_message(tmp_log)
0516                     break
0517 
0518             tmp_log.debug(f"Done with {ret_value}")
0519 
0520         return ret_status, ret_value
0521 
0522     # update events
0523     def update_event_ranges(self, event_ranges, tmp_log):
0524         # We are already receiving a tagged logger, no need to do a new one
0525         tmp_log.debug("Start update_event_ranges")
0526 
0527         # loop over for HPO
0528         for item in event_ranges:
0529             new_event_ranges = []
0530             for event in item["eventRanges"]:
0531 
0532                 # report loss to idds
0533                 if "loss" in event:
0534                     event_id = event["eventRangeID"]
0535                     task_id = event_id.split("-")[0]
0536                     point_id = event_id.split("-")[3]
0537                     tmp_si, tmp_oi = idds_utils.update_hp_point(harvester_config.pandacon.iddsURL, task_id, point_id, event["loss"], tmp_log, self.verbose)
0538                     if not tmp_si:
0539                         core_utils.dump_error_message(tmp_log, tmp_oi)
0540                         tmp_log.error(f"skip {event_id} since cannot update iDDS")
0541                         continue
0542                     else:
0543                         # clear checkpoint
0544                         if "sourceURL" in item:
0545                             tmp_sc, tmp_oc = self.clear_checkpoint(item["sourceURL"], task_id, point_id)
0546                             if not tmp_sc:
0547                                 core_utils.dump_error_message(tmp_log, tmp_oc)
0548                     del event["loss"]
0549                 new_event_ranges.append(event)
0550             item["eventRanges"] = new_event_ranges
0551 
0552         # update in panda
0553         data = {"event_ranges": json.dumps(event_ranges), "version": 1}
0554         tmp_log.debug(f"data={data}")
0555         tmp_status, tmp_response = self.request_ssl("POST", "event/update_event_ranges", data)
0556         ret_map = None
0557 
0558         # Communication issue
0559         if tmp_status is False:
0560             core_utils.dump_error_message(tmp_log, tmp_response)
0561             return {"StatusCode": 999}
0562 
0563         # Parse the response
0564         tmp_success = tmp_response.get("success", False)
0565         tmp_message = tmp_response.get("message")
0566         ret_map = tmp_response.get("data")
0567 
0568         # Check the success flag
0569         if not tmp_success:
0570             core_utils.dump_error_message(tmp_log, tmp_message)
0571             return {"StatusCode": 999}
0572 
0573         if ret_map is None:
0574             ret_map = {"StatusCode": 999}
0575 
0576         tmp_log.debug(f"Done update_event_ranges with: {ret_map}")
0577         return ret_map
0578 
0579     # get commands
0580     def get_commands(self, n_commands):
0581         tmp_log = self.make_logger(method_name="get_commands")
0582         tmp_log.debug(f"Start retrieving {n_commands} commands")
0583 
0584         data = {"harvester_id": harvester_config.master.harvester_id, "n_commands": n_commands}
0585         tmp_status, tmp_response = self.request_ssl("POST", "harvester/acquire_commands", data)
0586 
0587         # Communication issue
0588         if tmp_status is False:
0589             core_utils.dump_error_message(tmp_log, tmp_response)
0590             return []
0591 
0592         # Parse the response
0593         tmp_success = tmp_response.get("success", False)
0594         tmp_message = tmp_response.get("message")
0595 
0596         # Some issue on the server side
0597         if not tmp_success:
0598             core_utils.dump_error_message(tmp_log, tmp_message)
0599             return []
0600 
0601         commands = tmp_response.get("data", [])
0602         tmp_log.debug(f"Done.")
0603         return commands
0604 
0605     # send ACKs
0606     def ack_commands(self, command_ids):
0607         tmp_log = self.make_logger(method_name="ack_commands")
0608         tmp_log.debug(f"Start acknowledging {len(command_ids)} commands (command_ids={command_ids})")
0609 
0610         data = {"command_ids": command_ids}
0611         tmp_status, tmp_response = self.request_ssl("POST", "harvester/acknowledge_commands", data)
0612 
0613         # Communication issue
0614         if tmp_status is False:
0615             core_utils.dump_error_message(tmp_log, tmp_response)
0616             return False
0617 
0618         # Parse the response
0619         tmp_success = tmp_response.get("success", False)
0620         tmp_message = tmp_response.get("message")
0621 
0622         # Check the success flag
0623         if not tmp_success:
0624             core_utils.dump_error_message(tmp_log, tmp_message)
0625             return False
0626 
0627         return True
0628 
0629     # get proxy
0630     def get_proxy(self, voms_role, cert=None):
0631         ret_value = None
0632         ret_message = ""
0633 
0634         tmp_log = self.make_logger(method_name="get_proxy")
0635         tmp_log.debug("Start")
0636 
0637         data = {"role": voms_role}
0638         tmp_status, tmp_response = self.request_ssl("GET", "creds/get_proxy", data, cert)
0639 
0640         # Communication issue
0641         if tmp_status is False:
0642             core_utils.dump_error_message(tmp_log, tmp_response)
0643             return ret_value, tmp_response
0644 
0645         # Parse the response
0646         tmp_success = tmp_response.get("success", False)
0647         tmp_message = tmp_response.get("message")
0648         tmp_data = tmp_response.get("data")
0649 
0650         if not tmp_success:
0651             core_utils.dump_error_message(tmp_log, tmp_message)
0652             return ret_value, tmp_message
0653 
0654         ret_value = tmp_data["userProxy"]
0655         tmp_log.debug(f"Done with {ret_value}")
0656 
0657         return ret_value, ret_message
0658 
0659     # get token key
0660     def get_token_key(self, client_name: str) -> tuple[bool, str]:
0661         """
0662         Get a token key
0663 
0664         :param client_name: client name
0665 
0666         :return: a tuple of (status, token key or error message)
0667         """
0668         ret_value = None
0669         ret_message = ""
0670 
0671         tmp_log = self.make_logger(method_name="get_token_key")
0672         tmp_log.debug("Start")
0673 
0674         data = {"client_name": client_name}
0675         tmp_status, tmp_response = self.request_ssl("GET", "creds/get_token_key", data)
0676 
0677         # Communication issue
0678         if tmp_status is False:
0679             core_utils.dump_error_message(tmp_log, tmp_response)
0680             return ret_value, ret_message
0681 
0682         # Parse the response
0683         tmp_success = tmp_response.get("success", False)
0684         tmp_message = tmp_response.get("message")
0685         tmp_data = tmp_response.get("data")
0686 
0687         if not tmp_success:
0688             core_utils.dump_error_message(tmp_log, tmp_message)
0689             return ret_value, tmp_message
0690 
0691         ret_value = tmp_data["tokenKey"]
0692         tmp_log.debug(f"Done with {ret_value}")
0693 
0694         return ret_value, ret_message
0695 
0696     # get resource types
0697     def get_resource_types(self):
0698         tmp_log = self.make_logger(method_name="get_resource_types")
0699         tmp_log.debug("Start retrieving resource types")
0700 
0701         data = {}
0702         ret_message = ""
0703         ret_value = None
0704         tmp_status, tmp_response = self.request_ssl("GET", "metaconfig/get_resource_types", data)
0705         if tmp_status is False:
0706             core_utils.dump_error_message(tmp_log, tmp_response)
0707             return ret_value, ret_message
0708 
0709         tmp_success = tmp_response.get("success", False)
0710         tmp_message = tmp_response.get("message")
0711         tmp_data = tmp_response.get("data")
0712 
0713         if not tmp_success:
0714             ret_message = tmp_message
0715             core_utils.dump_error_message(tmp_log, ret_message)
0716             return ret_value, ret_message
0717 
0718         tmp_log.debug(f"Resource types: {tmp_data}")
0719 
0720         return tmp_data, ret_message
0721 
0722     # get job statistics
0723     def get_job_stats(self):
0724         tmp_log = self.make_logger(method_name="get_job_stats")
0725         tmp_log.debug("Start")
0726 
0727         tmp_status, tmp_response = self.request_ssl("GET", "statistics/active_job_stats_by_site", {})
0728         stats = {}
0729         ret_message = "FAILED"
0730 
0731         # Communication issue
0732         if tmp_status is False:
0733             core_utils.dump_error_message(tmp_log, tmp_response)
0734             return stats, ret_message
0735 
0736         tmp_success = tmp_response.get("success", False)
0737         tmp_message = tmp_response.get("message")
0738         stats = tmp_response.get("data")
0739 
0740         if not tmp_success:
0741             ret_message = tmp_message
0742             core_utils.dump_error_message(tmp_log, ret_message)
0743             return stats, ret_message
0744 
0745         return stats, "OK"
0746 
0747     # get job statistics: new function with prod_source_label, under testing and may replace the old one
0748     def get_job_stats_new(self):
0749         tmp_log = self.make_logger(method_name="get_job_stats_new")
0750         tmp_log.debug("Start")
0751 
0752         tmp_status, tmp_response = self.request_ssl("GET", "statistics/active_job_detailed_stats_by_site", {})
0753         stats = {}
0754         ret_message = "FAILED"
0755 
0756         # Communication issue
0757         if tmp_status is False:
0758             core_utils.dump_error_message(tmp_log, tmp_response)
0759             return stats, ret_message
0760 
0761         tmp_success = tmp_response.get("success", False)
0762         tmp_message = tmp_response.get("message")
0763         stats = tmp_response.get("data")
0764 
0765         if not tmp_success:
0766             ret_message = tmp_message
0767             core_utils.dump_error_message(tmp_log, ret_message)
0768             return stats, ret_message
0769 
0770         return stats, "OK"
0771 
0772     # update workers
0773     def update_workers(self, workspec_list):
0774         tmp_log = self.make_logger(method_name="update_workers")
0775         tmp_log.debug("Start")
0776         data_list = []
0777         for workSpec in workspec_list:
0778             data_list.append(workSpec.convert_to_propagate())
0779 
0780         data = {
0781             "harvester_id": harvester_config.master.harvester_id,
0782             "workers": data_list,
0783         }
0784 
0785         tmp_log.debug(f"Update {len(data_list)} workers")
0786         tmp_status, tmp_response = self.request_ssl("POST", "harvester/update_workers", data)
0787 
0788         ret_list = None
0789         ret_message = "OK"
0790 
0791         # Communication issue
0792         if tmp_status is False:
0793             ret_message = core_utils.dump_error_message(tmp_log, tmp_response)
0794             return ret_list, ret_message
0795 
0796         # Parse the response
0797         tmp_success = tmp_response.get("success", False)
0798         tmp_message = tmp_response.get("message")
0799         tmp_data = tmp_response.get("data")
0800 
0801         # Update was not done correctly
0802         if not tmp_success:
0803             ret_message = core_utils.dump_error_message(tmp_log, tmp_message)
0804             return ret_list, ret_message
0805 
0806         ret_list = tmp_data
0807         tmp_log.debug(f"Done with {ret_message}")
0808 
0809         return ret_list, ret_message
0810 
0811     # update worker stats
0812     def update_worker_stats(self, site_name, stats):
0813         tmp_log = self.make_logger(method_name="update_worker_stats")
0814         tmp_log.debug("Start")
0815 
0816         data = {
0817             "harvester_id": harvester_config.master.harvester_id,
0818             "panda_queue": site_name,
0819             "statistics": json.dumps(stats),
0820         }
0821         tmp_log.debug(f"update stats for {site_name}, stats: {stats}")
0822         tmp_status, tmp_response = self.request_ssl("POST", "harvester/report_worker_statistics", data)
0823 
0824         ret_status = True
0825         ret_message = "OK"
0826 
0827         # Communication issue
0828         if tmp_status is False:
0829             ret_status = tmp_status
0830             ret_message = core_utils.dump_error_message(tmp_log, tmp_response)
0831             return ret_status, ret_message
0832 
0833         # Parse the response
0834         tmp_success = tmp_response.get("success", False)
0835         tmp_message = tmp_response.get("message")
0836         # No need to parse the data field
0837         # tmp_data = tmp_response.get("data")
0838 
0839         # Update was not done correctly
0840         if not tmp_success:
0841             ret_status = tmp_success
0842             ret_message = core_utils.dump_error_message(tmp_log, tmp_message)
0843             return ret_status, ret_message
0844 
0845         tmp_log.debug(f"Done with {ret_status}:{ret_message}")
0846 
0847         return ret_status, ret_message
0848 
0849     # check jobs
0850     def check_jobs(self, jobspec_list):
0851         tmp_log = self.make_logger(method_name="check_jobs")
0852         tmp_log.debug("Start")
0853 
0854         ret_list = []
0855 
0856         # Chunk size for job lookup
0857         n_lookup = 100
0858         i_lookup = 0
0859 
0860         while i_lookup < len(jobspec_list):
0861             # Create job shards for lookup
0862             job_ids = []
0863             for jobSpec in jobspec_list[i_lookup : i_lookup + n_lookup]:
0864                 job_ids.append(jobSpec.PandaID)
0865 
0866             i_lookup += n_lookup
0867 
0868             data = {"job_ids": job_ids}
0869             tmp_status, tmp_response = self.request_ssl("GET", "job/get_status", data)
0870 
0871             err_string = "OK"
0872             job_statuses = []
0873 
0874             # Communication issue
0875             if tmp_status is False:
0876                 err_string = core_utils.dump_error_message(tmp_log, tmp_response)
0877             else:
0878                 # Parse the response
0879                 tmp_success = tmp_response.get("success", False)
0880                 tmp_message = tmp_response.get("message")
0881 
0882                 # Exception or rejection from PanDA server
0883                 if not tmp_success:
0884                     err_string = core_utils.dump_error_message(tmp_log, tmp_message)
0885 
0886                 # No need to parse the data field
0887                 job_statuses = tmp_response.get("data")
0888 
0889             for idx, job_id in enumerate(job_ids):
0890                 # We requested more jobs than we got back
0891                 if not job_statuses or idx >= len(job_statuses):
0892                     ret_map = {"StatusCode": 999, "ErrorDiag": err_string}
0893                 else:
0894                     ret_map = job_statuses[idx]
0895                     ret_map["StatusCode"] = 0
0896                     ret_map["ErrorDiag"] = err_string
0897                 ret_list.append(ret_map)
0898                 tmp_log.debug(f"Received {ret_map} for PandaID={job_id}")
0899 
0900         tmp_log.debug("Done")
0901         return ret_list
0902 
0903     # get key pair
0904     def get_key_pair(self, public_key_name, private_key_name):
0905         tmp_log = self.make_logger(method_name="get_key_pair")
0906         tmp_log.debug(f"Start for {public_key_name}:{private_key_name}")
0907 
0908         data = {
0909             "public_key_name": public_key_name,
0910             "private_key_name": private_key_name,
0911         }
0912 
0913         tmp_status, tmp_response = self.request_ssl("GET", "creds/get_key_pair", data)
0914 
0915         key_pair = None
0916         err_string = None
0917 
0918         # Communication issue
0919         if tmp_status is False:
0920             err_string = core_utils.dump_error_message(tmp_log, tmp_response)
0921             return key_pair, err_string
0922 
0923         # Parse the response
0924         tmp_success = tmp_response.get("success", False)
0925         tmp_message = tmp_response.get("message")
0926 
0927         # Issue/condition on server side
0928         if not tmp_success:
0929             err_string = core_utils.dump_error_message(tmp_log, tmp_message)
0930             return key_pair, err_string
0931 
0932         # Set the key-pair
0933         key_pair = tmp_response.get("data")
0934         tmp_log.debug(f"Got key_pair: {key_pair} err_string: {err_string}")
0935 
0936         return key_pair, err_string
0937 
0938     # upload file
0939     def upload_file(self, file_name, file_object, offset, read_bytes):
0940         tmp_log = self.make_logger(method_name="upload_file")
0941         tmp_log.debug(f"Start for {file_name} {offset}:{read_bytes}")
0942         file_object.seek(offset)
0943         files = {"file": (file_name, zlib.compress(file_object.read(read_bytes)))}
0944 
0945         tmp_status, tmp_response = self.request_ssl("UPLOAD", "file_server/upload_jedi_log", files=files)
0946 
0947         # Communication issue
0948         if tmp_status is False:
0949             err_string = core_utils.dump_error_message(tmp_log, tmp_response)
0950             return tmp_status, err_string
0951 
0952         # Parse the response
0953         tmp_success = tmp_response.get("success", False)
0954         tmp_message = tmp_response.get("message")
0955 
0956         (tmp_log.error if not tmp_success else tmp_log.debug)(f"Done with {tmp_success}:{tmp_message}")
0957         return tmp_success, tmp_message
0958 
0959     # check event availability
0960     def check_event_availability(self, jobspec):
0961         tmp_log = self.make_logger(f"PandaID={jobspec.PandaID}", method_name="check_event_availability")
0962         tmp_log.debug("Start")
0963 
0964         data = {
0965             "job_id": jobspec.PandaID,
0966             "jobset_id": jobspec.jobsetID or jobspec.jobParams["jobsetID"],
0967             "task_id": jobspec.taskID,
0968         }
0969 
0970         tmp_status, tmp_response = self.request_ssl("GET", "event/get_available_event_range_count", data)
0971 
0972         # Communication issue
0973         if tmp_status is False:
0974             ret_message = core_utils.dump_error_message(tmp_log, tmp_response)
0975             return tmp_status, ret_message
0976 
0977         # Parse the response
0978         tmp_success = tmp_response.get("success", False)
0979         tmp_message = tmp_response.get("message")
0980         n_event_ranges = tmp_response.get("data", None)
0981 
0982         # Issue/condition on server side
0983         if not tmp_success:
0984             err_string = core_utils.dump_error_message(tmp_log, tmp_message)
0985             return tmp_success, err_string
0986 
0987         tmp_log.debug(f"Done with {n_event_ranges}")
0988         return tmp_success, n_event_ranges
0989 
0990     # send dialog messages
0991     def send_dialog_messages(self, dialog_list):
0992         tmp_log = self.make_logger(method_name="send_dialog_messages")
0993         tmp_log.debug("Start")
0994 
0995         data_list = []
0996         for diagSpec in dialog_list:
0997             data_list.append(diagSpec.convert_to_propagate())
0998 
0999         data = {"harvester_id": harvester_config.master.harvester_id, "dialogs": data_list}
1000 
1001         tmp_log.debug(f"Sending {len(data_list)} messages")
1002         tmp_status, tmp_response = self.request_ssl("POST", "harvester/add_dialogs", data)
1003 
1004         # Communication issue
1005         if tmp_status is False:
1006             ret_message = core_utils.dump_error_message(tmp_log, tmp_response)
1007             return tmp_status, ret_message
1008 
1009         # Parse the response
1010         tmp_success = tmp_response.get("success", False)
1011         tmp_message = tmp_response.get("message")
1012 
1013         (tmp_log.error if not tmp_success else tmp_log.debug)(f"Done with {tmp_success}:{tmp_message}")
1014 
1015         return tmp_success, tmp_message
1016 
1017     # update service metrics
1018     def update_service_metrics(self, service_metrics_list):
1019         tmp_log = self.make_logger(method_name="update_service_metrics")
1020         tmp_log.debug("Start")
1021 
1022         data = {"harvester_id": harvester_config.master.harvester_id, "metrics": service_metrics_list}
1023 
1024         tmp_log.debug("Updating metrics")
1025         tmp_status, tmp_response = self.request_ssl("POST", "harvester/update_service_metrics", data)
1026 
1027         # Communication issue
1028         if tmp_status is False:
1029             ret_message = core_utils.dump_error_message(tmp_log, tmp_response)
1030             return tmp_status, ret_message
1031 
1032         # Parse the response
1033         tmp_success = tmp_response.get("success", False)
1034         tmp_message = tmp_response.get("message")
1035 
1036         (tmp_log.error if not tmp_success else tmp_log.debug)(f"Done with {tmp_success}:{tmp_message}")
1037 
1038         return tmp_success, tmp_message
1039 
1040     # upload checkpoint
1041     def upload_checkpoint(self, base_url, task_id, panda_id, file_name, file_path):
1042         tmp_log = self.make_logger(f"taskID={task_id} pandaID={panda_id}", method_name="upload_checkpoint")
1043         tmp_log.debug(f"Start for {file_name}")
1044 
1045         ret_status = False
1046 
1047         try:
1048             files = {"file": (file_name, open(file_path).read())}
1049             tmp_status, tmp_response = self.request_ssl("UPLOAD", "file_server/upload_hpo_checkpoint", files=files, base_url=base_url)
1050 
1051             if tmp_status is False:  # Communication issue
1052                 core_utils.dump_error_message(tmp_log, tmp_response)
1053                 ret_status = tmp_status
1054             else:  # Parse the response
1055                 tmp_success = tmp_response.get("success", False)
1056                 tmp_message = tmp_response.get("message")
1057                 (tmp_log.error if not tmp_success else tmp_log.debug)(f"Done with {tmp_success}:{tmp_message}")
1058                 ret_status = tmp_success
1059 
1060             return ret_status
1061 
1062         except Exception:
1063             core_utils.dump_error_message(tmp_log)
1064             return ret_status
1065 
1066     # download checkpoint
1067     def download_checkpoint(self, base_url, task_id, panda_id, point_id, base_path):
1068         tmp_log = self.make_logger(f"taskID={task_id} pandaID={panda_id}", method_name="download_checkpoint")
1069         tmp_log.debug(f"Start for ID={point_id}")
1070         try:
1071             # This method doesn't go through the API. The file is downloaded directly from Apache file server
1072             path = f"cache/hpo_cp_{task_id}_{point_id}"
1073             tmp_status, tmp_response = self.request_ssl("POST", path, {}, base_url=base_url)
1074             file_name = None
1075 
1076             # Communication issue
1077             if tmp_status is False:
1078                 core_utils.dump_error_message(tmp_log, tmp_response)
1079                 return tmp_status, file_name
1080 
1081             # Generate a random file name
1082             file_name = os.path.join(base_path, str(uuid.uuid4()))
1083             with open(file_name, "w") as f:
1084                 f.write(tmp_response.content)
1085 
1086             tmp_log.debug(f"Downloaded {file_name}")
1087             return tmp_status, file_name
1088 
1089         except Exception:
1090             core_utils.dump_error_message(tmp_log)
1091             return False, None
1092 
1093     # clear checkpoint
1094     def clear_checkpoint(self, base_url, task_id, point_id):
1095         tmp_log = self.make_logger(f"taskID={task_id} pointID={point_id}", method_name="clear_checkpoints")
1096         tmp_log.debug("Start")
1097 
1098         data = {"task_id": task_id, "sub_id": point_id}
1099 
1100         tmp_status, tmp_response = self.request_ssl("POST", "file_server/delete_hpo_checkpoint", data, base_url=base_url)
1101 
1102         # Communication issue
1103         if tmp_status is False:
1104             ret_message = core_utils.dump_error_message(tmp_log, tmp_response)
1105             return tmp_status, ret_message
1106 
1107         # Parse the response
1108         tmp_success = tmp_response.get("success", False)
1109         tmp_message = tmp_response.get("message")
1110 
1111         (tmp_log.error if not tmp_success else tmp_log.debug)(f"Done with {tmp_success}:{tmp_message}")
1112 
1113         return tmp_success, tmp_message
1114 
1115     # get the current/max worker id
1116     def get_max_worker_id(self):
1117         tmp_log = self.make_logger(method_name="get_max_worker_id")
1118         tmp_log.debug("Start")
1119 
1120         data = {"harvester_id": harvester_config.master.harvester_id}
1121 
1122         tmp_status, tmp_response = self.request_ssl("GET", "harvester/get_current_worker_id", data)
1123 
1124         # Communication issue
1125         if tmp_status is False:
1126             core_utils.dump_error_message(tmp_log, tmp_response)
1127             return tmp_status, None
1128 
1129         # Parse the response
1130         tmp_success = tmp_response.get("success", False)
1131         tmp_message = tmp_response.get("message")
1132         tmp_data = tmp_response.get("data")
1133 
1134         # If there was a max worker id, return it. Otherwise return the error message
1135         ret_value = tmp_data if tmp_data is not None else tmp_message
1136 
1137         (tmp_log.error if not tmp_success else tmp_log.debug)(f"Done with {tmp_success}:{tmp_message}")
1138         return tmp_success, ret_value
1139 
1140     # get worker stats from PanDA
1141     def get_worker_stats_from_panda(self):
1142         tmp_log = self.make_logger(method_name="get_worker_stats_from_panda")
1143         tmp_log.debug("Start")
1144 
1145         tmp_status, tmp_response = self.request_ssl("GET", "harvester/get_worker_statistics", {})
1146 
1147         # Communication issue
1148         if tmp_status is False:
1149             ret_message = core_utils.dump_error_message(tmp_log, tmp_response)
1150             return {}, ret_message
1151 
1152         # Parse the response
1153         tmp_success = tmp_response.get("success", False)
1154         tmp_message = tmp_response.get("message")
1155         stats = tmp_response.get("data", {})
1156 
1157         if not tmp_success:
1158             ret_message = core_utils.dump_error_message(tmp_log, tmp_message)
1159             return {}, ret_message
1160 
1161         tmp_log.debug(f"Done with {stats}")
1162         return stats, "OK"