File indexing completed on 2026-04-20 07:58:59
0001 """
0002 API described here: http://apfmon.lancs.ac.uk/help
0003 """
0004
0005 import json
0006 import time
0007 import traceback
0008
0009 import requests
0010
0011 from pandaharvester import panda_pkg_info
0012 from pandaharvester.harvesterconfig import harvester_config
0013 from pandaharvester.harvestercore import core_utils
0014 from pandaharvester.harvestercore.work_spec import WorkSpec
0015 from pandaharvester.harvestermisc.info_utils import PandaQueuesDict
0016
0017 _base_logger = core_utils.setup_logger("apfmon")
0018 NO_CE = "noCE"
0019
0020
0021 def apfmon_active(method, *args, **kwargs):
0022 if cls.__active:
0023 method(*args, **kwargs)
0024 else:
0025 return
0026
0027
0028 def clean_ce(ce):
0029 return ce.split(".")[0].split("://")[-1]
0030
0031
0032 class Apfmon(object):
0033 def __init__(self, queue_config_mapper):
0034 try:
0035 self.__active = harvester_config.apfmon.active
0036 except BaseException:
0037 self.__active = False
0038
0039 try:
0040 self.__worker_timeout = harvester_config.apfmon.worker_timeout
0041 except BaseException:
0042 self.__worker_timeout = 0.5
0043
0044 try:
0045 self.__worker_update_timeout = harvester_config.apfmon.worker_timeout
0046 except BaseException:
0047 self.__worker_update_timeout = 0.2
0048
0049 try:
0050 self.__label_timeout = harvester_config.apfmon.worker_timeout
0051 except BaseException:
0052 self.__label_timeout = 1
0053
0054
0055 try:
0056 self.harvester_id = harvester_config.master.harvester_id
0057 except BaseException:
0058 self.harvester_id = "DUMMY"
0059
0060 try:
0061 self.base_url = harvester_config.apfmon.base_url
0062 except BaseException:
0063 self.base_url = "http://apfmon.lancs.ac.uk/api"
0064
0065 self.queue_config_mapper = queue_config_mapper
0066
0067 def create_factory(self):
0068 """
0069 Creates or updates a harvester instance to APF Mon. Should be done at startup of the instance.
0070 """
0071
0072 start_time = time.time()
0073 tmp_log = core_utils.make_logger(_base_logger, f"harvester_id={self.harvester_id}", method_name="create_factory")
0074
0075 if not self.__active:
0076 tmp_log.debug("APFMon reporting not enabled")
0077 return
0078
0079 try:
0080 tmp_log.debug("start")
0081
0082 url = f"{self.base_url}/factories/{self.harvester_id}"
0083
0084 f = {"url": "url_to_logs", "email": "atlas-adc-harvester-central-support@cern.ch", "version": panda_pkg_info.release_version}
0085 payload = json.dumps(f)
0086
0087 r = requests.put(url, data=payload, timeout=self.__label_timeout)
0088 tmp_log.debug(f"registration ended with {r.status_code} {r.text}")
0089 end_time = time.time()
0090 tmp_log.debug(f"done (took {end_time - start_time})")
0091 except BaseException:
0092 tmp_log.error(f"Excepted with: {traceback.format_exc()}")
0093
0094 def create_labels(self):
0095 """
0096 Creates or updates a collection of labels (=panda queue+CE)
0097 """
0098 start_time = time.time()
0099 tmp_log = core_utils.make_logger(_base_logger, f"harvester_id={self.harvester_id}", method_name="create_labels")
0100
0101 if not self.__active:
0102 tmp_log.debug("APFMon reporting not enabled")
0103 return
0104
0105 try:
0106 tmp_log.debug("start")
0107
0108 url = f"{self.base_url}/labels"
0109
0110
0111 all_sites = self.queue_config_mapper.get_active_queues().keys()
0112 panda_queues_dict = PandaQueuesDict()
0113
0114
0115 for sites in core_utils.create_shards(all_sites, 20):
0116 labels = []
0117 for site in sites:
0118 try:
0119 site_info = panda_queues_dict.get(site, dict())
0120 if not site_info:
0121 tmp_log.warning(f"No site info for {site}")
0122 continue
0123
0124
0125
0126 try:
0127 ce = self.queue_config_mapper.queueConfig[site].submitter["ceEndpoint"]
0128 queues = [{"ce_endpoint": ce}]
0129 except KeyError:
0130 if site_info["queues"]:
0131 queues = site_info["queues"]
0132 else:
0133 queues = [{"ce_endpoint": NO_CE}]
0134
0135 for queue in queues:
0136 try:
0137 ce = clean_ce(queue["ce_endpoint"])
0138 except BaseException:
0139 ce = ""
0140
0141 try:
0142 ce_queue_id = queue["ce_queue_id"]
0143 except KeyError:
0144 ce_queue_id = 0
0145
0146 labels.append({"name": f"{site}-{ce}", "wmsqueue": site, "ce_queue_id": ce_queue_id, "factory": self.harvester_id})
0147 except BaseException:
0148 tmp_log.error(f"Excepted for site {site} with: {traceback.format_exc()}")
0149 continue
0150
0151 payload = json.dumps(labels)
0152
0153 r = requests.put(url, data=payload, timeout=self.__label_timeout)
0154 tmp_log.debug(f"label creation for {sites} ended with {r.status_code} {r.text}")
0155
0156 end_time = time.time()
0157 tmp_log.debug(f"done (took {end_time - start_time})")
0158 except BaseException:
0159 tmp_log.error(f"Excepted with: {traceback.format_exc()}")
0160
0161 def massage_label_data(self, data):
0162 tmp_log = core_utils.make_logger(_base_logger, f"harvester_id={self.harvester_id}", method_name="massage_label_data")
0163 if not data:
0164 return data
0165
0166 try:
0167
0168 any_backup = data.get("ANY", {})
0169 agg = {}
0170
0171
0172
0173 for job_type in data:
0174 if job_type == "ANY":
0175 continue
0176
0177 for resource_type in data[job_type]:
0178 if resource_type == "ANY":
0179 continue
0180
0181 for pilot_type in data[job_type][resource_type]:
0182 if pilot_type == "ANY":
0183 continue
0184
0185
0186 metrics = data[job_type][resource_type][pilot_type]
0187 for metric_key, metric_value in metrics.items():
0188 if isinstance(metric_value, (int, float)):
0189 agg.setdefault(metric_key, 0)
0190 agg[metric_key] += metric_value
0191
0192
0193 if agg:
0194
0195 if "ANY" not in data:
0196 data["ANY"] = {}
0197 if "ANY" not in data["ANY"]:
0198 data["ANY"]["ANY"] = {}
0199 if "ANY" not in data["ANY"]["ANY"]:
0200 data["ANY"]["ANY"]["ANY"] = {}
0201
0202 data["ANY"]["ANY"]["ANY"].update(agg)
0203 elif any_backup:
0204 data["ANY"] = any_backup
0205
0206 tmp_log.debug(f"Massaged to data: {data}")
0207
0208 except Exception:
0209 tmp_log.debug(f"Exception in data: {data}")
0210
0211 return data
0212
0213 def update_label(self, site, msg, data):
0214 """
0215 Updates a label (=panda queue+CE)
0216 """
0217 start_time = time.time()
0218 tmp_log = core_utils.make_logger(_base_logger, f"harvester_id={self.harvester_id}", method_name="update_label")
0219
0220 if not self.__active:
0221 tmp_log.debug("APFMon reporting not enabled")
0222 return
0223
0224 try:
0225 tmp_log.debug("start")
0226 data = self.massage_label_data(data)
0227
0228
0229 all_sites = self.queue_config_mapper.get_active_queues().keys()
0230 panda_queues_dict = PandaQueuesDict()
0231
0232 site_info = panda_queues_dict.get(site, dict())
0233 if not site_info:
0234 tmp_log.warning(f"No site info for {site}")
0235 return
0236
0237
0238
0239 try:
0240 ce = self.queue_config_mapper.queueConfig[site].submitter["ceEndpoint"]
0241 queues = [{"ce_endpoint": ce}]
0242 except KeyError:
0243 if site_info["queues"]:
0244 queues = site_info["queues"]
0245 else:
0246 queues = [{"ce_endpoint": NO_CE}]
0247
0248 for queue in queues:
0249 try:
0250 try:
0251 ce = clean_ce(queue["ce_endpoint"])
0252 except BaseException:
0253 ce = ""
0254
0255 label_data = {"status": msg, "data": data}
0256 label = f"{site}-{ce}"
0257 label_id = f"{self.harvester_id}:{label}"
0258 url = f"{self.base_url}/labels/{label_id}"
0259
0260 r = requests.post(url, data=json.dumps(label_data), timeout=self.__label_timeout)
0261 tmp_log.debug(f"label update for {label} ended with {r.status_code} {r.text}")
0262 except BaseException:
0263 tmp_log.error(f"Excepted for site {label} with: {traceback.format_exc()}")
0264
0265 end_time = time.time()
0266 tmp_log.debug(f"done (took {end_time - start_time})")
0267 except BaseException:
0268 tmp_log.error(f"Excepted with: {traceback.format_exc()}")
0269
0270 def create_workers(self, worker_spec_list):
0271 """
0272 Creates a worker
0273 """
0274 start_time = time.time()
0275 tmp_log = core_utils.make_logger(_base_logger, f"harvester_id={self.harvester_id}", method_name="create_workers")
0276
0277 if not self.__active:
0278 tmp_log.debug("APFMon reporting not enabled")
0279 return
0280
0281 try:
0282 tmp_log.debug("start")
0283
0284 url = f"{self.base_url}/jobs"
0285
0286 for worker_spec_shard in core_utils.create_shards(worker_spec_list, 20):
0287 apfmon_workers = []
0288 for worker_spec in worker_spec_shard:
0289 batch_id = worker_spec.batchID
0290 worker_id = worker_spec.workerID
0291 if not batch_id:
0292 tmp_log.debug(f"no batchID found for workerID {worker_id}... skipping")
0293 continue
0294 factory = self.harvester_id
0295 computingsite = worker_spec.computingSite
0296 try:
0297 ce = clean_ce(worker_spec.computingElement)
0298 except AttributeError:
0299 tmp_log.debug(f"no CE found for workerID {worker_id} batchID {batch_id}")
0300 ce = NO_CE
0301
0302
0303 stdout_url = ""
0304 stderr_url = ""
0305 log_url = ""
0306 jdl_url = ""
0307
0308 work_attribs = worker_spec.workAttributes
0309 if work_attribs:
0310 if "stdOut" in work_attribs:
0311 stdout_url = work_attribs["stdOut"]
0312
0313 if "stdErr" in work_attribs:
0314 stderr_url = work_attribs["stdErr"]
0315 if "batchLog" in work_attribs:
0316 log_url = work_attribs["batchLog"]
0317 if "jdl" in work_attribs:
0318 jdl_url = work_attribs["jdl"]
0319
0320 apfmon_worker = {
0321 "cid": batch_id,
0322 "factory": factory,
0323 "harvesterid": self.harvester_id,
0324 "workerid": worker_id,
0325 "computingsite": computingsite,
0326 "computingelement": ce,
0327 "label": f"{computingsite}-{ce}",
0328 "jdlurl": jdl_url,
0329 "stdouturl": stdout_url,
0330 "stderrurl": stderr_url,
0331 "logurl": log_url,
0332 }
0333 tmp_log.debug(f"packed worker: {apfmon_worker}")
0334 apfmon_workers.append(apfmon_worker)
0335
0336 payload = json.dumps(apfmon_workers)
0337
0338 try:
0339 r = requests.put(url, data=payload, timeout=self.__worker_timeout)
0340 tmp_log.debug(f"worker creation for {apfmon_workers} ended with {r.status_code} {r.text}")
0341 except BaseException:
0342 tmp_log.debug("worker creation for {0} failed with".format(apfmon_workers, format(traceback.format_exc())))
0343
0344 end_time = time.time()
0345 tmp_log.debug(f"done (took {end_time - start_time})")
0346 except BaseException:
0347 tmp_log.error(f"Excepted with: {traceback.format_exc()}")
0348
0349 def convert_status(self, harvester_status):
0350 """
0351 convert harvester status to APFMon status
0352 :param harvester_status
0353 :return: list with apfmon_status. Usually it's just one status, except for exiting&done
0354 """
0355 if harvester_status == "submitted":
0356 return "created"
0357 if harvester_status in ["running", "idle"]:
0358 return "running"
0359 if harvester_status in ["missed", "failed", "cancelled"]:
0360 return "fault"
0361 if harvester_status == "finished":
0362 return "done"
0363
0364 def update_worker(self, worker_spec, worker_status):
0365 """
0366 Updates the state of a worker. This can also be done directly from the wrapper, assuming there is outbound
0367 connectivity on the worker node
0368 """
0369 start_time = time.time()
0370 tmp_log = core_utils.make_logger(_base_logger, f"harvester_id={self.harvester_id}", method_name="update_worker")
0371
0372 if not self.__active:
0373 tmp_log.debug("APFMon reporting not enabled")
0374 return
0375
0376 try:
0377 tmp_log.debug("start")
0378
0379 batch_id = worker_spec.batchID
0380 factory = self.harvester_id
0381
0382 url = f"{self.base_url}/jobs/{factory}:{batch_id}"
0383
0384 apfmon_status = self.convert_status(worker_status)
0385 apfmon_worker = {}
0386 apfmon_worker["state"] = apfmon_status
0387
0388
0389 if apfmon_status in ("fault", "done") and hasattr(worker_spec, "pandaid_list") and worker_spec.pandaid_list:
0390 apfmon_worker["ids"] = ",".join(str(x) for x in worker_spec.pandaid_list)
0391
0392 tmp_log.debug(f"updating worker {batch_id}: {apfmon_worker}")
0393
0394 r = requests.post(url, data=apfmon_worker, timeout=self.__worker_update_timeout)
0395 tmp_log.debug(f"worker update for {batch_id} ended with {r.status_code} {r.text}")
0396
0397 end_time = time.time()
0398 tmp_log.debug(f"done (took {end_time - start_time})")
0399 except BaseException:
0400 tmp_log.error(f"Excepted with: {traceback.format_exc()}")
0401
0402
0403 if __name__ == "__main__":
0404 """
0405 Quick tests
0406 """
0407 from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0408
0409 queue_config_mapper = QueueConfigMapper()
0410
0411 apfmon = Apfmon(queue_config_mapper)
0412 apfmon.create_factory()
0413 apfmon.create_labels()
0414
0415 worker_a = WorkSpec()
0416 worker_a.batchID = 1
0417 worker_a.computingSite = "CERN-PROD-DEV_UCORE"
0418 worker_a.computingElement = "bla1"
0419 worker_a.workAttributes = {
0420 "batchLog": "https://aipanda024.cern.ch/condor_logs/18-07-19_09/grid.9659.0.log",
0421 "stdErr": "https://aipanda024.cern.ch/condor_logs/18-07-19_09/grid.9659.0.err",
0422 "stdOut": "https://aipanda024.cern.ch/condor_logs/18-07-19_09/grid.9659.0.out",
0423 }
0424 worker_a.pandaid_list = [1234, 5678]
0425
0426 worker_b = WorkSpec()
0427 worker_b.batchID = 2
0428 worker_b.computingSite = "CERN-PROD-DEV_UCORE"
0429 worker_b.computingElement = "bla2"
0430 worker_b.workAttributes = {
0431 "batchLog": "https://aipanda024.cern.ch/condor_logs/18-07-19_09/grid.9659.0.log",
0432 "stdErr": "https://aipanda024.cern.ch/condor_logs/18-07-19_09/grid.9659.0.err",
0433 "stdOut": "https://aipanda024.cern.ch/condor_logs/18-07-19_09/grid.9659.0.out",
0434 }
0435
0436 workers = [worker_a, worker_b]
0437
0438 apfmon.create_workers(workers)
0439 worker_a.status = "running"
0440 worker_b.status = "running"
0441 apfmon.update_workers(workers)
0442 worker_a.status = "finished"
0443 worker_b.status = "failed"
0444 apfmon.update_workers(workers)