Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:01

0001 import sys
0002 
0003 import polars as pl
0004 import requests
0005 from bs4 import BeautifulSoup
0006 from pandacommon.pandalogger import logger_utils
0007 from pandacommon.pandalogger.PandaLogger import PandaLogger
0008 from pandacommon.pandautils.thread_utils import GenericThread
0009 
0010 from pandaserver.config import panda_config
0011 from pandaserver.srvcore.CoreUtils import normalize_cpu_model
0012 
0013 main_logger = PandaLogger().getLogger("hs_scrapers")
0014 
0015 DEFAULT_URL_SL6 = "https://w3.hepix.org/benchmarking/sl6-x86_64-gcc44.html"
0016 DEFAULT_URL_SL7 = "https://w3.hepix.org/benchmarking/sl7-x86_64-gcc48.html"
0017 DEFAULT_URL_HS23 = "https://raw.githubusercontent.com/HEPiX-Forum/hepix-forum.github.io/master/_data/HS23scores.csv"
0018 
0019 
0020 def _parse_cores(val: str) -> tuple[int | None, int]:
0021     smt = 1 if ("HT on" in val or "SMT on" in val) else 0
0022     tokens = val.replace("(", " ").split()
0023     ncores = next((int(t) for t in tokens if t.isdigit()), None)
0024     return ncores, smt
0025 
0026 
0027 # ---------------------- HTML scrapers (SL6/SL7) ----------------------
0028 class BaseHS06Scraper:
0029     HEADERS = [
0030         "CPU",
0031         "HS06",
0032         "Clock speed (MHz)",
0033         "L2+L3 cache size (grand total, KB)",
0034         "Cores (runs)",
0035         "Memory (GB)",
0036         "Mainboard type",
0037         "Site",
0038     ]
0039 
0040     def __init__(self, task_buffer, url: str):
0041         self.url = url
0042         self.session = requests.Session()
0043         self.task_buffer = task_buffer
0044 
0045     def run(self) -> None:
0046         html = self._fetch_html(self.url)
0047         df = self._parse_html_to_polars(html)
0048         self._insert(df)
0049 
0050     def _insert_cpu_perf_rows(self, rows: list[dict], source_url: str) -> None:
0051         sql = (
0052             "INSERT INTO atlas_panda.cpu_benchmarks "
0053             "(cpu_type, cpu_type_normalized, smt_enabled, ncores, site, score_per_core, source) "
0054             "VALUES (:cpu_type, :cpu_type_normalized, :smt_enabled, :ncores, :site, :score_per_core, :source)"
0055         )
0056         for r in rows:
0057             r["source"] = source_url
0058             r["cpu_type_normalized"] = normalize_cpu_model(r["cpu_type"])
0059             status, res = self.task_buffer.querySQLS(sql, r)
0060 
0061     def _fetch_html(self, url: str) -> str:
0062         resp = self.session.get(url, timeout=30)
0063         resp.raise_for_status()
0064         return resp.text
0065 
0066     def _parse_html_to_polars(self, html: str) -> pl.DataFrame:
0067         soup = BeautifulSoup(html, "html.parser")
0068         h2 = soup.find("h2", {"id": "benchmark-results"})
0069         if not h2:
0070             raise RuntimeError("Benchmark results heading not found.")
0071         tables = h2.find_all_next("table", class_="striped")
0072         if not tables:
0073             raise RuntimeError("No benchmark results table found after the heading.")
0074         table = tables[0]
0075 
0076         # Build rows -> Polars
0077         rows = []
0078         for tr in table.find_all("tr")[1:]:
0079             cols = [td.get_text(strip=True) for td in tr.find_all("td")]
0080             if not any(cols):
0081                 continue
0082             rows.append(cols)
0083 
0084         # Use fixed headers for SL6; SL7 will override to read <th>
0085         df = pl.DataFrame(rows, schema=self.HEADERS)
0086         df = (
0087             df.select(
0088                 pl.col("CPU").alias("cpu_type"),
0089                 pl.col("HS06").cast(pl.Float64, strict=False).alias("score"),
0090                 pl.col("Cores (runs)").alias("cores"),
0091                 pl.col("Site").alias("site"),
0092             )
0093             .with_columns(
0094                 pl.col("cores").map_elements(lambda s: _parse_cores(s)[0], return_dtype=pl.Int64).alias("ncores"),
0095                 pl.col("cores").map_elements(lambda s: _parse_cores(s)[1], return_dtype=pl.Int64).alias("smt_enabled"),
0096             )
0097             .with_columns((pl.col("score") / pl.col("ncores")).alias("score_per_core"))
0098             .drop(["cores", "score"])
0099             .drop_nulls(["ncores", "score_per_core"])
0100         )
0101         return df
0102 
0103     def _insert(self, df: pl.DataFrame) -> None:
0104         self._insert_cpu_perf_rows(df.to_dicts(), self.url)
0105 
0106 
0107 class HS06ScraperSL6(BaseHS06Scraper):
0108     def __init__(self, task_buffer, url: str = DEFAULT_URL_SL6):
0109         super().__init__(task_buffer, url)
0110 
0111 
0112 class HS06ScraperSL7(BaseHS06Scraper):
0113     def __init__(self, task_buffer, url: str = DEFAULT_URL_SL7):
0114         super().__init__(task_buffer, url)
0115 
0116     # SL7 table provides <th> headers; reuse logic but rebuild schema from them.
0117     def _parse_html_to_polars(self, html: str) -> pl.DataFrame:
0118         soup = BeautifulSoup(html, "html.parser")
0119         h2 = soup.find("h2", {"id": "benchmark-results"})
0120         if not h2:
0121             raise RuntimeError("Benchmark results heading not found.")
0122         tables = h2.find_all_next("table", class_="striped")
0123         if not tables:
0124             raise RuntimeError("No benchmark results table found after the heading.")
0125         table = tables[0]
0126 
0127         headers = [th.get_text(strip=True) for th in table.find_all("th")]
0128         rows = []
0129         for tr in table.find_all("tr")[1:]:
0130             cols = [td.get_text(strip=True) for td in tr.find_all("td")]
0131             if not any(cols):
0132                 continue
0133             rows.append(cols)
0134 
0135         df = pl.DataFrame(rows, schema=headers)
0136         df = (
0137             df.select(
0138                 pl.col("CPU").alias("cpu_type"),
0139                 pl.col("HS06").cast(pl.Float64, strict=False).alias("score"),
0140                 pl.col("Cores (runs)").alias("cores"),
0141                 pl.col("Site").alias("site"),
0142             )
0143             .with_columns(
0144                 pl.col("cores").map_elements(lambda s: _parse_cores(s)[0], return_dtype=pl.Int64).alias("ncores"),
0145                 pl.col("cores").map_elements(lambda s: _parse_cores(s)[1], return_dtype=pl.Int64).alias("smt_enabled"),
0146             )
0147             .with_columns((pl.col("score") / pl.col("ncores")).alias("score_per_core"))
0148             .drop(["cores", "score"])
0149             .drop_nulls(["ncores", "score_per_core"])
0150         )
0151         return df
0152 
0153 
0154 # ---------------------- HS23 CSV ingestor (Polars) ----------------------
0155 class HS23Ingestor:
0156     def __init__(self, task_buffer, url: str = DEFAULT_URL_HS23):
0157         self.url = url
0158         self.logger = logger_utils.make_logger(main_logger, "HS23Ingestor")
0159         self.task_buffer = task_buffer
0160 
0161     def run(self) -> None:
0162         max_timestamp = self._select_max_timestamp(None)
0163         df = self._fetch()
0164         df = self._transform(df, max_timestamp)
0165         self._insert(df)
0166 
0167     def _fetch(self) -> pl.DataFrame:
0168         df = pl.read_csv(self.url, ignore_errors=True)
0169         return df
0170 
0171     def _transform(self, df: pl.DataFrame, max_timestamp) -> pl.DataFrame:
0172         out = (
0173             df.select(
0174                 pl.col("CPU").alias("cpu_type"),
0175                 pl.col("SMT enabled").alias("smt_enabled"),
0176                 pl.col("# Sockets").alias("sockets"),
0177                 pl.col("Cores/Socket").alias("cores_per_socket"),
0178                 pl.col("Ncores").alias("ncores"),
0179                 pl.col("Site").alias("site"),
0180                 pl.col("Score/Ncores").alias("score_per_core"),
0181                 pl.col("last_date").alias("timestamp"),
0182             )
0183             .with_columns(
0184                 pl.col("timestamp").str.strptime(pl.Datetime, "%Y-%m-%d %H:%M:%S", strict=False),
0185                 # normalize SMT to 0/1
0186                 pl.when(pl.col("smt_enabled").cast(pl.Utf8, strict=False).str.to_lowercase().is_in(["1", "true", "yes", "on"]))
0187                 .then(pl.lit(1))
0188                 .otherwise(
0189                     pl.when(pl.col("smt_enabled").cast(pl.Utf8, strict=False).str.to_lowercase().is_in(["0", "false", "no", "off"]))
0190                     .then(pl.lit(0))
0191                     .otherwise(pl.col("smt_enabled").cast(pl.Int64, strict=False).fill_null(0))
0192                 )
0193                 .alias("smt_enabled"),
0194                 pl.col("sockets").cast(pl.Int64, strict=False),
0195                 pl.col("cores_per_socket").cast(pl.Int64, strict=False),
0196                 pl.col("ncores").cast(pl.Int64, strict=False),
0197                 pl.col("score_per_core").cast(pl.Float64, strict=False),
0198             )
0199             .filter(pl.col("timestamp") > max_timestamp)
0200         )
0201         return out
0202 
0203     def _select_max_timestamp(self, df: pl.DataFrame) -> None:
0204         sql = "SELECT max(timestamp) FROM atlas_panda.cpu_benchmarks"
0205         status, res = self.task_buffer.querySQLS(sql, {})
0206         max_timestamp = res[0][0]
0207         if not max_timestamp:
0208             max_timestamp = pl.datetime(1970, 1, 1)
0209         return max_timestamp
0210 
0211     def _insert(self, df: pl.DataFrame) -> None:
0212         sql = (
0213             "INSERT INTO atlas_panda.cpu_benchmarks "
0214             "(cpu_type, cpu_type_normalized, smt_enabled, sockets, cores_per_socket, ncores, site, "
0215             "score_per_core, timestamp, source) "
0216             "VALUES (:cpu_type, :cpu_type_normalized, :smt_enabled, :sockets, :cores_per_socket, :ncores, "
0217             ":site, :score_per_core, :timestamp, :source)"
0218         )
0219         for row in df.to_dicts():
0220             row["source"] = self.url
0221             row["cpu_type_normalized"] = normalize_cpu_model(row["cpu_type"])
0222             _, _ = self.task_buffer.querySQLS(sql, row)
0223 
0224 
0225 def main(tbuf=None, **kwargs):
0226     requester_id = GenericThread().get_full_id(__name__, sys.modules[__name__].__file__)
0227 
0228     # instantiate TB
0229     if tbuf is None:
0230         from pandaserver.taskbuffer.TaskBuffer import taskBuffer
0231 
0232         taskBuffer.init(
0233             panda_config.dbhost,
0234             panda_config.dbpasswd,
0235             nDBConnection=1,
0236             useTimeout=True,
0237             requester=requester_id,
0238         )
0239     else:
0240         taskBuffer = tbuf
0241 
0242     try:
0243         main_logger.debug("Start")
0244         # We assume the SL6 and SL7 pages don't change and do not need to be updated.
0245         # In case you re-run the scripts, be aware that it will cause duplicate entries.
0246         # main_logger.debug("Starting HS06 SL6 scraper...")
0247         # HS06ScraperSL6(taskBuffer).run()
0248         # main_logger.debug("Starting HS06 SL7 scraper...")
0249         # HS06ScraperSL7(taskBuffer).run()
0250 
0251         main_logger.debug("Starting HS23 ingestor...")
0252         HS23Ingestor(taskBuffer).run()
0253         main_logger.debug("Done.")
0254     except Exception as e:
0255         main_logger.error(f"Error: {e}")
0256     finally:
0257         # stop the taskBuffer if it was created inside this script
0258         if tbuf is None:
0259             taskBuffer.cleanup(requester=requester_id)
0260 
0261 
0262 # ---------------------- main ----------------------
0263 if __name__ == "__main__":
0264     main()