File indexing completed on 2026-04-20 07:58:58
0001 import json
0002 import os
0003 import os.path
0004 import threading
0005 from http.server import BaseHTTPRequestHandler, HTTPServer
0006 from json.decoder import JSONDecodeError
0007 from queue import Queue
0008
0009
0010
0011
0012
0013 from socketserver import ThreadingMixIn
0014
0015 from pandaharvester.harvesterconfig import harvester_config
0016 from pandaharvester.harvestercore import core_utils
0017 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0018 from pandaharvester.harvestermessenger import shared_file_messenger
0019
0020
0021 _logger = core_utils.setup_logger("http_server_messenger")
0022 shared_file_messenger.set_logger(_logger)
0023
0024
0025 def set_logger(master_logger):
0026 global _logger
0027 _logger = master_logger
0028 shared_file_messenger.set_logger(master_logger)
0029
0030
0031 messenger_inst = shared_file_messenger.SharedFileMessenger()
0032
0033
0034
0035 class HttpHandler(BaseHTTPRequestHandler):
0036 def __init__(self, *args, **kwargs):
0037 self.dbProxy = DBProxy()
0038 self.tmpLog = None
0039 BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
0040
0041 def log_message(self, format, *args):
0042
0043 pass
0044
0045 def get_form(self):
0046 dataStr = self.rfile.read(int(self.headers["Content-Length"]))
0047 return json.loads(dataStr)
0048
0049 def do_postprocessing(self, message):
0050 self.end_headers()
0051 self.wfile.write(message)
0052
0053 def do_POST(self):
0054
0055 if self.tmpLog is None:
0056 self.tmpLog = core_utils.make_logger(_logger)
0057 toSkip = False
0058 form = None
0059 methodName = None
0060 dataStr = None
0061 message = ""
0062
0063 try:
0064 form = self.get_form()
0065 except Exception:
0066 message = "corrupted json"
0067 toSkip = True
0068
0069 if not toSkip:
0070 toSkip = True
0071
0072 if "methodName" not in form:
0073 message = "methodName is not given"
0074 self.send_response(400)
0075 elif "workerID" not in form:
0076 message = "workerID is not given"
0077 self.send_response(400)
0078 elif "data" not in form:
0079 message = "data is not given"
0080 self.send_response(400)
0081 else:
0082 toSkip = False
0083
0084 if not toSkip:
0085 try:
0086 workerID = form["workerID"]
0087 workSpec = self.dbProxy.get_worker_with_id(workerID)
0088 if workSpec is None:
0089 message = f"workerID={workerID} not found in DB"
0090 self.send_response(400)
0091 else:
0092
0093 methodName = form["methodName"]
0094 opType = None
0095 filePath = ""
0096 if methodName == "requestJobs":
0097 filePath = os.path.join(workSpec.get_access_point(), messenger_inst.jsonJobRequestFileName)
0098 opType = "w"
0099 elif methodName == "getJobs":
0100 filePath = os.path.join(workSpec.get_access_point(), messenger_inst.jobSpecFileName)
0101 opType = "r"
0102 elif methodName == "requestEventRanges":
0103 filePath = os.path.join(workSpec.get_access_point(), messenger_inst.jsonEventsRequestFileName)
0104 opType = "w"
0105 elif methodName == "getEventRanges":
0106 filePath = os.path.join(workSpec.get_access_point(), messenger_inst.jsonEventsFeedFileName)
0107 opType = "r"
0108 elif methodName == "updateJobs":
0109 filePath = os.path.join(workSpec.get_access_point(), messenger_inst.jsonAttrsFileName)
0110 opType = "w"
0111 elif methodName == "uploadJobReport":
0112 filePath = os.path.join(workSpec.get_access_point(), messenger_inst.jsonJobReport)
0113 opType = "w"
0114 elif methodName == "uploadEventOutputDump":
0115 filePath = os.path.join(workSpec.get_access_point(), messenger_inst.jsonOutputsFileName)
0116 opType = "w"
0117 elif methodName == "setPandaIDs":
0118 filePath = os.path.join(workSpec.get_access_point(), messenger_inst.pandaIDsFile)
0119 opType = "w"
0120 elif methodName == "killWorker":
0121 filePath = os.path.join(workSpec.get_access_point(), messenger_inst.killWorkerFile)
0122 opType = "w"
0123 elif methodName == "heartbeat":
0124 filePath = os.path.join(workSpec.get_access_point(), messenger_inst.heartbeatFile)
0125 opType = "w"
0126 else:
0127 self.send_response(501)
0128 message = "method not implemented"
0129 toSkip = True
0130
0131 if not toSkip:
0132
0133 if opType == "w":
0134
0135 if os.path.exists(filePath) and methodName not in ["heartbeat"]:
0136 message = "previous request is not yet processed"
0137 self.send_response(503)
0138 else:
0139 with open(filePath, "w") as fileHandle:
0140 json.dump(form["data"], fileHandle)
0141 message = "OK"
0142 self.send_response(200)
0143 else:
0144
0145 if os.path.exists(filePath):
0146 with open(filePath) as fileHandle:
0147 try:
0148 _message = json.load(fileHandle)
0149 message = json.dumps(_message)
0150 self.send_header("Content-Type", "application/json")
0151 except JSONDecodeError:
0152 _f_qs = open(filePath).read()
0153
0154 message = _f_qs
0155 self.send_header("Content-Type", "text/plain")
0156 self.send_response(200)
0157 else:
0158 message = "previous request is not yet processed"
0159 self.send_response(503)
0160 except Exception:
0161 self.send_response(500)
0162 message = core_utils.dump_error_message(_logger)
0163 if harvester_config.frontend.verbose:
0164 self.tmpLog.debug(f"ip={self.client_address[0]} - method={methodName} json={dataStr} msg={message}")
0165
0166 self.do_postprocessing(message)
0167 return
0168
0169
0170
0171 class ThreadedHttpServer(ThreadingMixIn, HTTPServer):
0172 allow_reuse_address = True
0173
0174
0175 def serve_forever(self, *args, **kwargs):
0176
0177 self.requests = Queue(harvester_config.frontend.nThreads)
0178
0179 for x in range(harvester_config.frontend.nThreads):
0180 thr = threading.Thread(target=self.thread_main_loop)
0181 thr.daemon = True
0182 thr.start()
0183
0184 HTTPServer.serve_forever(self, *args, **kwargs)
0185
0186
0187 def thread_main_loop(self):
0188 while True:
0189
0190 ThreadingMixIn.process_request_thread(self, *self.requests.get())
0191
0192
0193 def process_request(self, request, client_address):
0194 self.requests.put((request, client_address))
0195
0196
0197
0198 class FrontendLauncher(object):
0199 instance = None
0200 lock = threading.Lock()
0201
0202
0203 def __new__(cls, *args, **kwargs):
0204 if cls.instance is None:
0205 with cls.lock:
0206 if cls.instance is None:
0207 if harvester_config.frontend.type == "simple":
0208 httpd = ThreadedHttpServer(("", harvester_config.frontend.portNumber), HttpHandler)
0209 thr = threading.Thread(target=httpd.serve_forever)
0210 thr.daemon = True
0211 thr.start()
0212 cls.instance = thr
0213 else:
0214 cls.instance = 1
0215 return cls.instance
0216
0217
0218
0219 frontend = FrontendLauncher()
0220
0221
0222
0223 class HttpServerMessenger(shared_file_messenger.SharedFileMessenger):
0224
0225 def __init__(self, **kwarg):
0226 shared_file_messenger.SharedFileMessenger.__init__(self, **kwarg)