Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:59

0001 """
0002 API described here: http://apfmon.lancs.ac.uk/help
0003 """
0004 
0005 import json
0006 import time
0007 import traceback
0008 
0009 import requests
0010 
0011 from pandaharvester import panda_pkg_info
0012 from pandaharvester.harvesterconfig import harvester_config
0013 from pandaharvester.harvestercore import core_utils
0014 from pandaharvester.harvestercore.work_spec import WorkSpec
0015 from pandaharvester.harvestermisc.info_utils import PandaQueuesDict
0016 
0017 _base_logger = core_utils.setup_logger("apfmon")
0018 NO_CE = "noCE"
0019 
0020 
0021 def apfmon_active(method, *args, **kwargs):
0022     if cls.__active:
0023         method(*args, **kwargs)
0024     else:
0025         return
0026 
0027 
0028 def clean_ce(ce):
0029     return ce.split(".")[0].split("://")[-1]
0030 
0031 
0032 class Apfmon(object):
0033     def __init__(self, queue_config_mapper):
0034         try:
0035             self.__active = harvester_config.apfmon.active
0036         except BaseException:
0037             self.__active = False
0038 
0039         try:
0040             self.__worker_timeout = harvester_config.apfmon.worker_timeout
0041         except BaseException:
0042             self.__worker_timeout = 0.5
0043 
0044         try:
0045             self.__worker_update_timeout = harvester_config.apfmon.worker_timeout
0046         except BaseException:
0047             self.__worker_update_timeout = 0.2
0048 
0049         try:
0050             self.__label_timeout = harvester_config.apfmon.worker_timeout
0051         except BaseException:
0052             self.__label_timeout = 1
0053 
0054         # TODO: make proper exception handling and defaults
0055         try:
0056             self.harvester_id = harvester_config.master.harvester_id
0057         except BaseException:
0058             self.harvester_id = "DUMMY"
0059 
0060         try:
0061             self.base_url = harvester_config.apfmon.base_url
0062         except BaseException:
0063             self.base_url = "http://apfmon.lancs.ac.uk/api"
0064 
0065         self.queue_config_mapper = queue_config_mapper
0066 
0067     def create_factory(self):
0068         """
0069         Creates or updates a harvester instance to APF Mon. Should be done at startup of the instance.
0070         """
0071 
0072         start_time = time.time()
0073         tmp_log = core_utils.make_logger(_base_logger, f"harvester_id={self.harvester_id}", method_name="create_factory")
0074 
0075         if not self.__active:
0076             tmp_log.debug("APFMon reporting not enabled")
0077             return
0078 
0079         try:
0080             tmp_log.debug("start")
0081 
0082             url = f"{self.base_url}/factories/{self.harvester_id}"
0083 
0084             f = {"url": "url_to_logs", "email": "atlas-adc-harvester-central-support@cern.ch", "version": panda_pkg_info.release_version}
0085             payload = json.dumps(f)
0086 
0087             r = requests.put(url, data=payload, timeout=self.__label_timeout)
0088             tmp_log.debug(f"registration ended with {r.status_code} {r.text}")
0089             end_time = time.time()
0090             tmp_log.debug(f"done (took {end_time - start_time})")
0091         except BaseException:
0092             tmp_log.error(f"Excepted with: {traceback.format_exc()}")
0093 
0094     def create_labels(self):
0095         """
0096         Creates or updates a collection of labels (=panda queue+CE)
0097         """
0098         start_time = time.time()
0099         tmp_log = core_utils.make_logger(_base_logger, f"harvester_id={self.harvester_id}", method_name="create_labels")
0100 
0101         if not self.__active:
0102             tmp_log.debug("APFMon reporting not enabled")
0103             return
0104 
0105         try:
0106             tmp_log.debug("start")
0107 
0108             url = f"{self.base_url}/labels"
0109 
0110             # get the active queues from the config mapper
0111             all_sites = self.queue_config_mapper.get_active_queues().keys()
0112             panda_queues_dict = PandaQueuesDict()
0113 
0114             # publish the active queues to APF mon in shards
0115             for sites in core_utils.create_shards(all_sites, 20):
0116                 labels = []
0117                 for site in sites:
0118                     try:
0119                         site_info = panda_queues_dict.get(site, dict())
0120                         if not site_info:
0121                             tmp_log.warning(f"No site info for {site}")
0122                             continue
0123 
0124                         # when no CEs associated to a queue, e.g. P1, HPCs, etc. Try to see if there is something
0125                         # in local configuration, otherwise set it to a dummy value
0126                         try:
0127                             ce = self.queue_config_mapper.queueConfig[site].submitter["ceEndpoint"]
0128                             queues = [{"ce_endpoint": ce}]
0129                         except KeyError:
0130                             if site_info["queues"]:
0131                                 queues = site_info["queues"]
0132                             else:
0133                                 queues = [{"ce_endpoint": NO_CE}]
0134 
0135                         for queue in queues:
0136                             try:
0137                                 ce = clean_ce(queue["ce_endpoint"])
0138                             except BaseException:
0139                                 ce = ""
0140 
0141                             try:
0142                                 ce_queue_id = queue["ce_queue_id"]
0143                             except KeyError:
0144                                 ce_queue_id = 0
0145 
0146                             labels.append({"name": f"{site}-{ce}", "wmsqueue": site, "ce_queue_id": ce_queue_id, "factory": self.harvester_id})
0147                     except BaseException:
0148                         tmp_log.error(f"Excepted for site {site} with: {traceback.format_exc()}")
0149                         continue
0150 
0151                 payload = json.dumps(labels)
0152 
0153                 r = requests.put(url, data=payload, timeout=self.__label_timeout)
0154                 tmp_log.debug(f"label creation for {sites} ended with {r.status_code} {r.text}")
0155 
0156             end_time = time.time()
0157             tmp_log.debug(f"done (took {end_time - start_time})")
0158         except BaseException:
0159             tmp_log.error(f"Excepted with: {traceback.format_exc()}")
0160 
0161     def massage_label_data(self, data):
0162         tmp_log = core_utils.make_logger(_base_logger, f"harvester_id={self.harvester_id}", method_name="massage_label_data")
0163         if not data:
0164             return data
0165 
0166         try:
0167             # Preserve original ANY if it exists
0168             any_backup = data.get("ANY", {})
0169             agg = {}
0170 
0171             # Iterate through job_type, resource_type, pilot_type dimensions
0172             # and aggregate all metrics for non-ANY combinations
0173             for job_type in data:
0174                 if job_type == "ANY":
0175                     continue
0176 
0177                 for resource_type in data[job_type]:
0178                     if resource_type == "ANY":
0179                         continue
0180 
0181                     for pilot_type in data[job_type][resource_type]:
0182                         if pilot_type == "ANY":
0183                             continue
0184 
0185                         # Get all metrics for this combination
0186                         metrics = data[job_type][resource_type][pilot_type]
0187                         for metric_key, metric_value in metrics.items():
0188                             if isinstance(metric_value, (int, float)):
0189                                 agg.setdefault(metric_key, 0)
0190                                 agg[metric_key] += metric_value
0191 
0192             # Update the ANY entry with aggregated values
0193             if agg:
0194                 # Initialize ANY structure if needed
0195                 if "ANY" not in data:
0196                     data["ANY"] = {}
0197                 if "ANY" not in data["ANY"]:
0198                     data["ANY"]["ANY"] = {}
0199                 if "ANY" not in data["ANY"]["ANY"]:
0200                     data["ANY"]["ANY"]["ANY"] = {}
0201 
0202                 data["ANY"]["ANY"]["ANY"].update(agg)
0203             elif any_backup:
0204                 data["ANY"] = any_backup
0205 
0206             tmp_log.debug(f"Massaged to data: {data}")
0207 
0208         except Exception:
0209             tmp_log.debug(f"Exception in data: {data}")
0210 
0211         return data
0212 
0213     def update_label(self, site, msg, data):
0214         """
0215         Updates a label (=panda queue+CE)
0216         """
0217         start_time = time.time()
0218         tmp_log = core_utils.make_logger(_base_logger, f"harvester_id={self.harvester_id}", method_name="update_label")
0219 
0220         if not self.__active:
0221             tmp_log.debug("APFMon reporting not enabled")
0222             return
0223 
0224         try:
0225             tmp_log.debug("start")
0226             data = self.massage_label_data(data)
0227 
0228             # get the active queues from the config mapper
0229             all_sites = self.queue_config_mapper.get_active_queues().keys()
0230             panda_queues_dict = PandaQueuesDict()
0231 
0232             site_info = panda_queues_dict.get(site, dict())
0233             if not site_info:
0234                 tmp_log.warning(f"No site info for {site}")
0235                 return
0236 
0237             # when no CEs associated to a queue, e.g. P1, HPCs, etc. Try to see if there is something
0238             # in local configuration, otherwise set it to a dummy value
0239             try:
0240                 ce = self.queue_config_mapper.queueConfig[site].submitter["ceEndpoint"]
0241                 queues = [{"ce_endpoint": ce}]
0242             except KeyError:
0243                 if site_info["queues"]:
0244                     queues = site_info["queues"]
0245                 else:
0246                     queues = [{"ce_endpoint": NO_CE}]
0247 
0248             for queue in queues:
0249                 try:
0250                     try:
0251                         ce = clean_ce(queue["ce_endpoint"])
0252                     except BaseException:
0253                         ce = ""
0254 
0255                     label_data = {"status": msg, "data": data}
0256                     label = f"{site}-{ce}"
0257                     label_id = f"{self.harvester_id}:{label}"
0258                     url = f"{self.base_url}/labels/{label_id}"
0259 
0260                     r = requests.post(url, data=json.dumps(label_data), timeout=self.__label_timeout)
0261                     tmp_log.debug(f"label update for {label} ended with {r.status_code} {r.text}")
0262                 except BaseException:
0263                     tmp_log.error(f"Excepted for site {label} with: {traceback.format_exc()}")
0264 
0265             end_time = time.time()
0266             tmp_log.debug(f"done (took {end_time - start_time})")
0267         except BaseException:
0268             tmp_log.error(f"Excepted with: {traceback.format_exc()}")
0269 
0270     def create_workers(self, worker_spec_list):
0271         """
0272         Creates a worker
0273         """
0274         start_time = time.time()
0275         tmp_log = core_utils.make_logger(_base_logger, f"harvester_id={self.harvester_id}", method_name="create_workers")
0276 
0277         if not self.__active:
0278             tmp_log.debug("APFMon reporting not enabled")
0279             return
0280 
0281         try:
0282             tmp_log.debug("start")
0283 
0284             url = f"{self.base_url}/jobs"
0285 
0286             for worker_spec_shard in core_utils.create_shards(worker_spec_list, 20):
0287                 apfmon_workers = []
0288                 for worker_spec in worker_spec_shard:
0289                     batch_id = worker_spec.batchID
0290                     worker_id = worker_spec.workerID
0291                     if not batch_id:
0292                         tmp_log.debug(f"no batchID found for workerID {worker_id}... skipping")
0293                         continue
0294                     factory = self.harvester_id
0295                     computingsite = worker_spec.computingSite
0296                     try:
0297                         ce = clean_ce(worker_spec.computingElement)
0298                     except AttributeError:
0299                         tmp_log.debug(f"no CE found for workerID {worker_id} batchID {batch_id}")
0300                         ce = NO_CE
0301 
0302                     # extract the log URLs
0303                     stdout_url = ""
0304                     stderr_url = ""
0305                     log_url = ""
0306                     jdl_url = ""
0307 
0308                     work_attribs = worker_spec.workAttributes
0309                     if work_attribs:
0310                         if "stdOut" in work_attribs:
0311                             stdout_url = work_attribs["stdOut"]
0312                             # jdl_url = '{0}.jdl'.format(stdout_url[:-4])
0313                         if "stdErr" in work_attribs:
0314                             stderr_url = work_attribs["stdErr"]
0315                         if "batchLog" in work_attribs:
0316                             log_url = work_attribs["batchLog"]
0317                         if "jdl" in work_attribs:
0318                             jdl_url = work_attribs["jdl"]
0319 
0320                     apfmon_worker = {
0321                         "cid": batch_id,
0322                         "factory": factory,
0323                         "harvesterid": self.harvester_id,
0324                         "workerid": worker_id,
0325                         "computingsite": computingsite,
0326                         "computingelement": ce,
0327                         "label": f"{computingsite}-{ce}",
0328                         "jdlurl": jdl_url,
0329                         "stdouturl": stdout_url,
0330                         "stderrurl": stderr_url,
0331                         "logurl": log_url,
0332                     }
0333                     tmp_log.debug(f"packed worker: {apfmon_worker}")
0334                     apfmon_workers.append(apfmon_worker)
0335 
0336                 payload = json.dumps(apfmon_workers)
0337 
0338                 try:
0339                     r = requests.put(url, data=payload, timeout=self.__worker_timeout)
0340                     tmp_log.debug(f"worker creation for {apfmon_workers} ended with {r.status_code} {r.text}")
0341                 except BaseException:
0342                     tmp_log.debug("worker creation for {0} failed with".format(apfmon_workers, format(traceback.format_exc())))
0343 
0344             end_time = time.time()
0345             tmp_log.debug(f"done (took {end_time - start_time})")
0346         except BaseException:
0347             tmp_log.error(f"Excepted with: {traceback.format_exc()}")
0348 
0349     def convert_status(self, harvester_status):
0350         """
0351         convert harvester status to APFMon status
0352         :param harvester_status
0353         :return: list with apfmon_status. Usually it's just one status, except for exiting&done
0354         """
0355         if harvester_status == "submitted":
0356             return "created"
0357         if harvester_status in ["running", "idle"]:
0358             return "running"
0359         if harvester_status in ["missed", "failed", "cancelled"]:
0360             return "fault"
0361         if harvester_status == "finished":
0362             return "done"
0363 
0364     def update_worker(self, worker_spec, worker_status):
0365         """
0366         Updates the state of a worker. This can also be done directly from the wrapper, assuming there is outbound
0367         connectivity on the worker node
0368         """
0369         start_time = time.time()
0370         tmp_log = core_utils.make_logger(_base_logger, f"harvester_id={self.harvester_id}", method_name="update_worker")
0371 
0372         if not self.__active:
0373             tmp_log.debug("APFMon reporting not enabled")
0374             return
0375 
0376         try:
0377             tmp_log.debug("start")
0378 
0379             batch_id = worker_spec.batchID
0380             factory = self.harvester_id
0381 
0382             url = f"{self.base_url}/jobs/{factory}:{batch_id}"
0383 
0384             apfmon_status = self.convert_status(worker_status)
0385             apfmon_worker = {}
0386             apfmon_worker["state"] = apfmon_status
0387 
0388             # For final states include panda id's if available (push mode only)
0389             if apfmon_status in ("fault", "done") and hasattr(worker_spec, "pandaid_list") and worker_spec.pandaid_list:
0390                 apfmon_worker["ids"] = ",".join(str(x) for x in worker_spec.pandaid_list)
0391 
0392             tmp_log.debug(f"updating worker {batch_id}: {apfmon_worker}")
0393 
0394             r = requests.post(url, data=apfmon_worker, timeout=self.__worker_update_timeout)
0395             tmp_log.debug(f"worker update for {batch_id} ended with {r.status_code} {r.text}")
0396 
0397             end_time = time.time()
0398             tmp_log.debug(f"done (took {end_time - start_time})")
0399         except BaseException:
0400             tmp_log.error(f"Excepted with: {traceback.format_exc()}")
0401 
0402 
0403 if __name__ == "__main__":
0404     """
0405     Quick tests
0406     """
0407     from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0408 
0409     queue_config_mapper = QueueConfigMapper()
0410 
0411     apfmon = Apfmon(queue_config_mapper)
0412     apfmon.create_factory()
0413     apfmon.create_labels()
0414 
0415     worker_a = WorkSpec()
0416     worker_a.batchID = 1
0417     worker_a.computingSite = "CERN-PROD-DEV_UCORE"
0418     worker_a.computingElement = "bla1"
0419     worker_a.workAttributes = {
0420         "batchLog": "https://aipanda024.cern.ch/condor_logs/18-07-19_09/grid.9659.0.log",
0421         "stdErr": "https://aipanda024.cern.ch/condor_logs/18-07-19_09/grid.9659.0.err",
0422         "stdOut": "https://aipanda024.cern.ch/condor_logs/18-07-19_09/grid.9659.0.out",
0423     }
0424     worker_a.pandaid_list = [1234, 5678]
0425 
0426     worker_b = WorkSpec()
0427     worker_b.batchID = 2
0428     worker_b.computingSite = "CERN-PROD-DEV_UCORE"
0429     worker_b.computingElement = "bla2"
0430     worker_b.workAttributes = {
0431         "batchLog": "https://aipanda024.cern.ch/condor_logs/18-07-19_09/grid.9659.0.log",
0432         "stdErr": "https://aipanda024.cern.ch/condor_logs/18-07-19_09/grid.9659.0.err",
0433         "stdOut": "https://aipanda024.cern.ch/condor_logs/18-07-19_09/grid.9659.0.out",
0434     }
0435 
0436     workers = [worker_a, worker_b]
0437 
0438     apfmon.create_workers(workers)
0439     worker_a.status = "running"
0440     worker_b.status = "running"
0441     apfmon.update_workers(workers)
0442     worker_a.status = "finished"
0443     worker_b.status = "failed"
0444     apfmon.update_workers(workers)