Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:58

0001 import json
0002 import os
0003 import os.path
0004 import threading
0005 from http.server import BaseHTTPRequestHandler, HTTPServer
0006 from json.decoder import JSONDecodeError
0007 from queue import Queue
0008 
0009 # try:
0010 #     from urllib.parse import parse_qsl
0011 # except ImportError:
0012 #     from cgi import parse_qsl
0013 from socketserver import ThreadingMixIn
0014 
0015 from pandaharvester.harvesterconfig import harvester_config
0016 from pandaharvester.harvestercore import core_utils
0017 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0018 from pandaharvester.harvestermessenger import shared_file_messenger
0019 
0020 # logger
0021 _logger = core_utils.setup_logger("http_server_messenger")
0022 shared_file_messenger.set_logger(_logger)
0023 
0024 
0025 def set_logger(master_logger):
0026     global _logger
0027     _logger = master_logger
0028     shared_file_messenger.set_logger(master_logger)
0029 
0030 
0031 messenger_inst = shared_file_messenger.SharedFileMessenger()
0032 
0033 
0034 # handler for http front-end
0035 class HttpHandler(BaseHTTPRequestHandler):
0036     def __init__(self, *args, **kwargs):
0037         self.dbProxy = DBProxy()
0038         self.tmpLog = None
0039         BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
0040 
0041     def log_message(self, format, *args):
0042         # suppress console logging
0043         pass
0044 
0045     def get_form(self):
0046         dataStr = self.rfile.read(int(self.headers["Content-Length"]))
0047         return json.loads(dataStr)
0048 
0049     def do_postprocessing(self, message):
0050         self.end_headers()
0051         self.wfile.write(message)
0052 
0053     def do_POST(self):
0054         # logger
0055         if self.tmpLog is None:
0056             self.tmpLog = core_utils.make_logger(_logger)
0057         toSkip = False
0058         form = None
0059         methodName = None
0060         dataStr = None
0061         message = ""
0062         # parse the form data posted
0063         try:
0064             form = self.get_form()
0065         except Exception:
0066             message = "corrupted json"
0067             toSkip = True
0068         # check parameters
0069         if not toSkip:
0070             toSkip = True
0071             # method is not set
0072             if "methodName" not in form:
0073                 message = "methodName is not given"
0074                 self.send_response(400)
0075             elif "workerID" not in form:
0076                 message = "workerID is not given"
0077                 self.send_response(400)
0078             elif "data" not in form:
0079                 message = "data is not given"
0080                 self.send_response(400)
0081             else:
0082                 toSkip = False
0083         # get worker
0084         if not toSkip:
0085             try:
0086                 workerID = form["workerID"]
0087                 workSpec = self.dbProxy.get_worker_with_id(workerID)
0088                 if workSpec is None:
0089                     message = f"workerID={workerID} not found in DB"
0090                     self.send_response(400)
0091                 else:
0092                     # chose file and operation for each action
0093                     methodName = form["methodName"]
0094                     opType = None
0095                     filePath = ""
0096                     if methodName == "requestJobs":
0097                         filePath = os.path.join(workSpec.get_access_point(), messenger_inst.jsonJobRequestFileName)
0098                         opType = "w"
0099                     elif methodName == "getJobs":
0100                         filePath = os.path.join(workSpec.get_access_point(), messenger_inst.jobSpecFileName)
0101                         opType = "r"
0102                     elif methodName == "requestEventRanges":
0103                         filePath = os.path.join(workSpec.get_access_point(), messenger_inst.jsonEventsRequestFileName)
0104                         opType = "w"
0105                     elif methodName == "getEventRanges":
0106                         filePath = os.path.join(workSpec.get_access_point(), messenger_inst.jsonEventsFeedFileName)
0107                         opType = "r"
0108                     elif methodName == "updateJobs":
0109                         filePath = os.path.join(workSpec.get_access_point(), messenger_inst.jsonAttrsFileName)
0110                         opType = "w"
0111                     elif methodName == "uploadJobReport":
0112                         filePath = os.path.join(workSpec.get_access_point(), messenger_inst.jsonJobReport)
0113                         opType = "w"
0114                     elif methodName == "uploadEventOutputDump":
0115                         filePath = os.path.join(workSpec.get_access_point(), messenger_inst.jsonOutputsFileName)
0116                         opType = "w"
0117                     elif methodName == "setPandaIDs":
0118                         filePath = os.path.join(workSpec.get_access_point(), messenger_inst.pandaIDsFile)
0119                         opType = "w"
0120                     elif methodName == "killWorker":
0121                         filePath = os.path.join(workSpec.get_access_point(), messenger_inst.killWorkerFile)
0122                         opType = "w"
0123                     elif methodName == "heartbeat":
0124                         filePath = os.path.join(workSpec.get_access_point(), messenger_inst.heartbeatFile)
0125                         opType = "w"
0126                     else:
0127                         self.send_response(501)
0128                         message = "method not implemented"
0129                         toSkip = True
0130                     # take action
0131                     if not toSkip:
0132                         # write actions
0133                         if opType == "w":
0134                             # check if file exists. Methods such as heartbeat however need to overwrite the file
0135                             if os.path.exists(filePath) and methodName not in ["heartbeat"]:
0136                                 message = "previous request is not yet processed"
0137                                 self.send_response(503)
0138                             else:
0139                                 with open(filePath, "w") as fileHandle:
0140                                     json.dump(form["data"], fileHandle)
0141                                     message = "OK"
0142                                     self.send_response(200)
0143                         else:
0144                             # read actions
0145                             if os.path.exists(filePath):
0146                                 with open(filePath) as fileHandle:
0147                                     try:
0148                                         _message = json.load(fileHandle)
0149                                         message = json.dumps(_message)
0150                                         self.send_header("Content-Type", "application/json")
0151                                     except JSONDecodeError:
0152                                         _f_qs = open(filePath).read()
0153                                         # _message = dict(parse_qsl(_f_qs, keep_blank_values=True))
0154                                         message = _f_qs
0155                                         self.send_header("Content-Type", "text/plain")
0156                                     self.send_response(200)
0157                             else:
0158                                 message = "previous request is not yet processed"
0159                                 self.send_response(503)
0160             except Exception:
0161                 self.send_response(500)
0162                 message = core_utils.dump_error_message(_logger)
0163         if harvester_config.frontend.verbose:
0164             self.tmpLog.debug(f"ip={self.client_address[0]} - method={methodName} json={dataStr} msg={message}")
0165         # set the response
0166         self.do_postprocessing(message)
0167         return
0168 
0169 
0170 # http front-end with a thread pool
0171 class ThreadedHttpServer(ThreadingMixIn, HTTPServer):
0172     allow_reuse_address = True
0173 
0174     # override to make a thread pool
0175     def serve_forever(self, *args, **kwargs):
0176         # queue for requests
0177         self.requests = Queue(harvester_config.frontend.nThreads)
0178         # make thread pool
0179         for x in range(harvester_config.frontend.nThreads):
0180             thr = threading.Thread(target=self.thread_main_loop)
0181             thr.daemon = True
0182             thr.start()
0183         # original server main loop
0184         HTTPServer.serve_forever(self, *args, **kwargs)
0185 
0186     # main loop of the thread
0187     def thread_main_loop(self):
0188         while True:
0189             # get a request from queue to process
0190             ThreadingMixIn.process_request_thread(self, *self.requests.get())
0191 
0192     # override to queue requests
0193     def process_request(self, request, client_address):
0194         self.requests.put((request, client_address))
0195 
0196 
0197 # singleton launcher for http front-end
0198 class FrontendLauncher(object):
0199     instance = None
0200     lock = threading.Lock()
0201 
0202     # override __new__ to have a singleton
0203     def __new__(cls, *args, **kwargs):
0204         if cls.instance is None:
0205             with cls.lock:
0206                 if cls.instance is None:
0207                     if harvester_config.frontend.type == "simple":
0208                         httpd = ThreadedHttpServer(("", harvester_config.frontend.portNumber), HttpHandler)
0209                         thr = threading.Thread(target=httpd.serve_forever)
0210                         thr.daemon = True
0211                         thr.start()
0212                         cls.instance = thr
0213                     else:
0214                         cls.instance = 1
0215         return cls.instance
0216 
0217 
0218 # start frontend
0219 frontend = FrontendLauncher()
0220 
0221 
0222 # messenger with http frontend
0223 class HttpServerMessenger(shared_file_messenger.SharedFileMessenger):
0224     # constructor
0225     def __init__(self, **kwarg):
0226         shared_file_messenger.SharedFileMessenger.__init__(self, **kwarg)