File indexing completed on 2026-04-11 08:41:04
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 """
0011 Main classes to manage the messages between ES and harvester/ACT/Panda.
0012 """
0013
0014 import json
0015 import logging
0016 import os
0017 import threading
0018 import time
0019 try:
0020 import Queue as queue
0021 except Exception:
0022 import queue
0023
0024 from pilot.common import exception
0025 from pilot.common.pluginfactory import PluginFactory
0026
0027
0028 logger = logging.getLogger(__name__)
0029
0030
0031 """
0032 Communication response
0033 """
0034
0035
0036 class CommunicationResponse(object):
0037 def __init__(self, attrs=None):
0038 if not attrs:
0039 attrs = {}
0040 if not isinstance(attrs, dict):
0041 attrs = json.loads(attrs)
0042
0043 def_attrs = {'status': None, 'content': None, 'exception': None}
0044
0045 for key in def_attrs:
0046 if key not in attrs:
0047 attrs[key] = def_attrs[key]
0048
0049 for key in attrs:
0050 setattr(self, key, attrs[key])
0051
0052 def __str__(self):
0053 json_str = {}
0054 for key, value in list(self.__dict__.items()):
0055 if value and type(value) is list:
0056 json_str[key] = []
0057 for list_item in value:
0058 json_str[key].append(str(list_item))
0059 elif value:
0060 json_str[key] = str(value)
0061 else:
0062 json_str[key] = value
0063 return json.dumps(json_str)
0064
0065
0066 """
0067 Communication request
0068 """
0069
0070
0071 class CommunicationRequest(object):
0072 class RequestType(object):
0073 RequestJobs = 'request_jobs'
0074 UpdateJobs = 'update_jobs'
0075 RequestEvents = 'request_events'
0076 UpdateEvents = 'update_events'
0077
0078 def __init__(self, attrs=None):
0079 if not attrs:
0080 attrs = {}
0081 if not isinstance(attrs, dict):
0082 attrs = json.loads(attrs)
0083
0084 if attrs['request_type'] == CommunicationRequest.RequestType.RequestJobs:
0085 def_attrs = {'num_jobs': 1, 'post_hook': None, 'response': None}
0086 if attrs['request_type'] == CommunicationRequest.RequestType.RequestEvents:
0087 def_attrs = {'num_event_ranges': 1, 'post_hook': None, 'response': None}
0088 if attrs['request_type'] == CommunicationRequest.RequestType.UpdateEvents:
0089 def_attrs = {'update_events': None, 'post_hook': None, 'response': None}
0090 if attrs['request_type'] == CommunicationRequest.RequestType.UpdateJobs:
0091 def_attrs = {'jobs': None, 'post_hook': None, 'response': None}
0092
0093 for key in def_attrs:
0094 if key not in attrs:
0095 attrs[key] = def_attrs[key]
0096
0097 for key in attrs:
0098 setattr(self, key, attrs[key])
0099 self.abort = False
0100
0101 def __str__(self):
0102 json_str = {}
0103 for key, value in list(self.__dict__.items()):
0104 if value and type(value) is list:
0105 json_str[key] = []
0106 for list_item in value:
0107 json_str[key].append(str(list_item))
0108 elif value:
0109 json_str[key] = str(value)
0110 else:
0111 json_str[key] = value
0112 return json.dumps(json_str)
0113
0114
0115 """
0116 Communication manager thread
0117 """
0118
0119
0120 class CommunicationManager(threading.Thread, PluginFactory):
0121
0122 def __init__(self, *args, **kwargs):
0123 super(CommunicationManager, self).__init__()
0124 PluginFactory.__init__(self, *args, **kwargs)
0125 self.setName("CommunicationManager")
0126 self.post_get_jobs = None
0127 self.post_get_event_ranges_hook = None
0128 self.queues = {'request_get_jobs': queue.Queue(),
0129 'update_jobs': queue.Queue(),
0130 'request_get_events': queue.Queue(),
0131 'update_events': queue.Queue(),
0132 'processing_get_jobs': queue.Queue(),
0133 'processing_update_jobs': queue.Queue(),
0134 'processing_get_events': queue.Queue(),
0135 'processing_update_events': queue.Queue()}
0136 self.queue_limits = {'request_get_jobs': None,
0137 'update_jobs': None,
0138 'request_get_events': None,
0139 'update_events': None,
0140 'processing_get_jobs': 1,
0141 'processing_update_jobs': 1,
0142 'processing_get_events': 1,
0143 'processing_update_events': 1}
0144 self.stop_event = threading.Event()
0145 self.args = args
0146 self.kwargs = kwargs
0147
0148 def stop(self):
0149 """
0150 Set stop signal(main run process will clean queued requests to release waiting clients and then quit)
0151 """
0152 if not self.is_stop():
0153 logger.info("Stopping Communication Manager.")
0154 self.stop_event.set()
0155
0156 def is_stop(self):
0157 """
0158 check whether the stop signal is set
0159
0160 :returns: True if the stop signal is set, otherwise False
0161 """
0162 return self.stop_event.is_set()
0163
0164 def get_jobs(self, njobs=1, post_hook=None, args=None):
0165 """
0166 Function can be called by client to send a get_job request and get a response with jobs.
0167
0168 :returns: jobs(got from jobs servers)
0169 :raise: Exception catched when getting jobs
0170 """
0171
0172 if self.is_stop():
0173 return None
0174
0175 req_attrs = {}
0176 if args:
0177 if not type(args) is dict:
0178 args = vars(args)
0179 for key, value in list(args.items()):
0180 req_attrs[key] = value
0181
0182 other_req_attrs = {'request_type': CommunicationRequest.RequestType.RequestJobs,
0183 'num_jobs': njobs,
0184 'post_hook': post_hook}
0185 for key, value in list(other_req_attrs.items()):
0186 req_attrs[key] = value
0187
0188 req = CommunicationRequest(req_attrs)
0189 self.queues['request_get_jobs'].put(req)
0190
0191 if req.post_hook:
0192 return
0193
0194 while req.response is None:
0195 time.sleep(1)
0196 if req.response.exception:
0197 raise req.response.exception
0198 if req.response.status is False:
0199 return None
0200 else:
0201 return req.response.content
0202
0203 def update_jobs(self, jobs, post_hook=None):
0204 """
0205 Function can be called by client to update jobs' status to server.
0206
0207 :returns: status of updating jobs
0208 :raise: Exception catched when updating jobs
0209 """
0210
0211 if self.is_stop():
0212 return None
0213
0214 req_attrs = {'request_type': CommunicationRequest.RequestType.UpdateJobs,
0215 'jobs': jobs,
0216 'post_hook': post_hook}
0217
0218 req = CommunicationRequest(req_attrs)
0219 self.queues['update_jobs'].put(req)
0220
0221 if req.post_hook:
0222 return
0223
0224 while req.response is None:
0225 time.sleep(1)
0226 if req.response.exception:
0227 raise req.response.exception
0228 if req.response.status is False:
0229 return None
0230 else:
0231 return req.response.content
0232
0233 def get_event_ranges(self, num_event_ranges=1, post_hook=None, job=None):
0234 """
0235 Function can be called by client to send a get_event_ranges request and get a response with event ranges.
0236
0237 :returns: event ranges (got from jobs servers)
0238 :raise: Exception caught when getting event ranges
0239 """
0240
0241 if self.is_stop():
0242 return None
0243
0244 if not job:
0245 resp_attrs = {'status': -1,
0246 'content': None,
0247 'exception': exception.CommunicationFailure("Get events failed because job info missing(job: %s)" % job)}
0248 resp = CommunicationResponse(resp_attrs)
0249 raise resp.exception
0250
0251 req_attrs = {'request_type': CommunicationRequest.RequestType.RequestEvents,
0252 'num_event_ranges': num_event_ranges,
0253 'post_hook': post_hook}
0254 req_attrs['jobid'] = job['PandaID']
0255 req_attrs['jobsetid'] = job['jobsetID']
0256 req_attrs['taskid'] = job['taskID']
0257 req_attrs['num_ranges'] = num_event_ranges
0258
0259 req = CommunicationRequest(req_attrs)
0260 self.queues['request_get_events'].put(req)
0261
0262 if req.post_hook:
0263 return
0264
0265 while req.response is None:
0266 time.sleep(1)
0267 if req.response.exception:
0268 raise req.response.exception
0269 if req.response.status is False:
0270 return None
0271 else:
0272 return req.response.content
0273
0274 def update_events(self, update_events, post_hook=None):
0275 """
0276 Function can be called by client to send a update_events request.
0277
0278 :returns: status of updating event ranges
0279 :raise: Exception catched when updating event ranges
0280 """
0281
0282 if self.is_stop():
0283 return None
0284
0285 req_attrs = {'request_type': CommunicationRequest.RequestType.UpdateEvents,
0286 'update_events': update_events,
0287 'post_hook': post_hook}
0288 req = CommunicationRequest(req_attrs)
0289 self.queues['update_events'].put(req)
0290
0291 if req.post_hook:
0292 return
0293
0294 while req.response is None:
0295 time.sleep(1)
0296 if req.response.exception:
0297 raise req.response.exception
0298 if req.response.status is False:
0299 return None
0300 else:
0301 return req.response.content
0302
0303 def get_plugin_confs(self):
0304 """
0305 Get different plugin for different communicator
0306
0307 :returns: dict with {'class': <plugin_class>} and other items
0308 """
0309
0310 plugin = os.environ.get('COMMUNICATOR_PLUGIN', None)
0311 if not plugin:
0312 plugin_confs = {'class': 'pilot.eventservice.communicationmanager.plugins.pandacommunicator.PandaCommunicator'}
0313 elif plugin == 'act':
0314 plugin_confs = {'class': 'pilot.eventservice.communicationmanager.plugins.actcommunicator.ACTCommunicator'}
0315 elif plugin == 'harvestersf':
0316 plugin_confs = {'class': 'pilot.eventservice.communicationmanager.plugins.harvestersharefilecommunicator.HarvesterShareFileCommunicator'}
0317 else:
0318 plugin_confs = {'class': 'pilot.eventservice.communicationmanager.plugins.pandacommunicator.PandaCommunicator'}
0319
0320 if self.args:
0321 for key, value in list(vars(self.args).items()):
0322 plugin_confs[key] = value
0323 return plugin_confs
0324
0325 def can_process_request(self, processor, process_type):
0326 """
0327 To check whether it is ready to process request in a type.
0328 For request such as HarvesterShareFileCommunicator, it should check whether there are processing requests to avoid overwriting files.
0329
0330 :returns: True or False
0331 """
0332
0333 if self.queues[process_type].empty():
0334 return False
0335
0336 next_queue = processor[process_type]['next_queue']
0337 if next_queue is None or self.queue_limits[next_queue] is None:
0338 return True
0339
0340 if self.queues[next_queue].qsize() < self.queue_limits[next_queue]:
0341 return True
0342
0343 return False
0344
0345 def run(self):
0346 """
0347 Main loop to handle communication requests
0348 """
0349
0350 confs = self.get_plugin_confs()
0351 logger.info("Communication plugin confs: %s" % confs)
0352 communicator = self.get_plugin(confs)
0353 logger.info("Communication: %s" % communicator)
0354
0355 processor = {'request_get_jobs': {'pre_check': communicator.pre_check_get_jobs,
0356 'handler': communicator.request_get_jobs,
0357 'next_queue': 'processing_get_jobs',
0358 'process_req_post_hook': False},
0359 'request_get_events': {'pre_check': communicator.pre_check_get_events,
0360 'handler': communicator.request_get_events,
0361 'next_queue': 'processing_get_events',
0362 'process_req_post_hook': False},
0363 'update_jobs': {'pre_check': communicator.pre_check_update_jobs,
0364 'handler': communicator.update_jobs,
0365 'next_queue': None,
0366 'process_req_post_hook': True},
0367 'update_events': {'pre_check': communicator.pre_check_update_events,
0368 'handler': communicator.update_events,
0369 'next_queue': None,
0370 'process_req_post_hook': True},
0371 'processing_get_jobs': {'pre_check': communicator.check_get_jobs_status,
0372 'handler': communicator.get_jobs,
0373 'next_queue': None,
0374 'process_req_post_hook': True},
0375 'processing_get_events': {'pre_check': communicator.check_get_events_status,
0376 'handler': communicator.get_events,
0377 'next_queue': None,
0378 'process_req_post_hook': True}
0379 }
0380
0381 while True:
0382 has_req = False
0383 for process_type in processor:
0384 if self.is_stop():
0385 while not self.queues[process_type].empty():
0386 req = self.queues[process_type].get()
0387 logger.info("Is going to stop, aborting request: %s" % req)
0388 req.abort = True
0389 resp_attrs = {'status': None,
0390 'content': None,
0391 'exception': exception.CommunicationFailure("Communication manager is stopping, abort this request")}
0392 req.response = CommunicationResponse(resp_attrs)
0393 elif self.can_process_request(processor, process_type):
0394 pre_check_resp = processor[process_type]['pre_check']()
0395 if not pre_check_resp.status == 0:
0396 continue
0397
0398 logger.info("Processing %s" % process_type)
0399
0400 has_req = True
0401 req = self.queues[process_type].get()
0402
0403 logger.info("Processing %s request: %s" % (process_type, req))
0404 res = processor[process_type]['handler'](req)
0405 logger.info("Processing %s respone: %s" % (process_type, res))
0406
0407 if res.status is False:
0408 req.response = res
0409 else:
0410 next_queue = processor[process_type]['next_queue']
0411 if next_queue:
0412 self.queues[next_queue].put(req)
0413 else:
0414 req.response = res
0415 process_req_post_hook = processor[process_type]['process_req_post_hook']
0416 if process_req_post_hook and req.post_hook:
0417 req.post_hook(res)
0418 if not has_req:
0419 if self.is_stop():
0420 break
0421 time.sleep(1)
0422 logger.info("Communication manager stopped.")