Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:00

0001 import datetime
0002 import threading
0003 import traceback
0004 
0005 from pandacommon.pandalogger.PandaLogger import PandaLogger
0006 from pandaserver.config import panda_config
0007 from pandaserver.configurator import aux
0008 
0009 _logger = PandaLogger().getLogger("carbon")
0010 
0011 
0012 class CarbonEmissions(threading.Thread):
0013     """
0014     Downloads the carbon information from the relevant sources
0015     """
0016 
0017     def __init__(self, taskBuffer):
0018         threading.Thread.__init__(self)
0019 
0020         self.bearer_token = None
0021         if hasattr(panda_config, "CO2_BEARER_TOKEN"):
0022             self.bearer_token = panda_config.CO2_BEARER_TOKEN
0023 
0024         self.taskBuffer = taskBuffer
0025 
0026     def download_region_emissions(self):
0027         # Don't indent the query
0028         query = """
0029 {"search_type": "query_then_fetch","ignore_unavailable": true,"index": ["monit_prod_green-it_raw_regionmetric*"]}
0030 {"query": {"range": {"metadata.timestamp": {"gte": "now-2h","lt": "now"}}}, "size": 100}}
0031 """
0032         try:
0033             results = aux.query_grafana_proxy(query, self.bearer_token)
0034             if not results:
0035                 _logger.error("download_region_emissions was not able to download data")
0036                 return None
0037 
0038             status = results["responses"][0]["status"]
0039             if status != 200:
0040                 _logger.error(f"download_region_emissions was not able to download data with status {status}")
0041                 return None
0042 
0043             hits = results["responses"][0]["hits"]["hits"]  # That's how the json is structured...
0044             results = []
0045             for entry in hits:
0046                 simplified_entry = {
0047                     "region": entry["_source"]["data"]["region"],
0048                     # timestamps come in ms
0049                     "timestamp": datetime.datetime.fromtimestamp(entry["_source"]["metadata"]["timestamp"] / 1000),
0050                     "value": entry["_source"]["data"]["gCO2_perkWh"],
0051                 }
0052                 results.append(simplified_entry)
0053             return results
0054         except Exception:
0055             _logger.error(f"download_region_emissions excepted with {traceback.format_exc()}")
0056             return None
0057 
0058     def run(self):
0059         # download emissions and store them in the DB
0060         results = self.download_region_emissions()
0061         if results:
0062             self.taskBuffer.carbon_write_region_emissions(results)
0063 
0064             # aggregate the emissions for the grid
0065             self.taskBuffer.carbon_aggregate_emissions()