Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-11 08:41:04

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Wen Guan, wen.guan@cern.ch, 2018
0009 
0010 """
0011 Main classes to manage the messages between ES and harvester/ACT/Panda.
0012 """
0013 
0014 import json
0015 import logging
0016 import os
0017 import threading
0018 import time
0019 try:
0020     import Queue as queue  # noqa: N813
0021 except Exception:
0022     import queue  # Python 3
0023 
0024 from pilot.common import exception
0025 from pilot.common.pluginfactory import PluginFactory
0026 
0027 
0028 logger = logging.getLogger(__name__)
0029 
0030 
0031 """
0032 Communication response
0033 """
0034 
0035 
0036 class CommunicationResponse(object):
0037     def __init__(self, attrs=None):
0038         if not attrs:
0039             attrs = {}
0040         if not isinstance(attrs, dict):
0041             attrs = json.loads(attrs)
0042 
0043         def_attrs = {'status': None, 'content': None, 'exception': None}
0044 
0045         for key in def_attrs:
0046             if key not in attrs:
0047                 attrs[key] = def_attrs[key]
0048 
0049         for key in attrs:
0050             setattr(self, key, attrs[key])
0051 
0052     def __str__(self):
0053         json_str = {}
0054         for key, value in list(self.__dict__.items()):  # Python 2/3
0055             if value and type(value) is list:
0056                 json_str[key] = []
0057                 for list_item in value:
0058                     json_str[key].append(str(list_item))
0059             elif value:
0060                 json_str[key] = str(value)
0061             else:
0062                 json_str[key] = value
0063         return json.dumps(json_str)
0064 
0065 
0066 """
0067 Communication request
0068 """
0069 
0070 
0071 class CommunicationRequest(object):
0072     class RequestType(object):
0073         RequestJobs = 'request_jobs'
0074         UpdateJobs = 'update_jobs'
0075         RequestEvents = 'request_events'
0076         UpdateEvents = 'update_events'
0077 
0078     def __init__(self, attrs=None):
0079         if not attrs:
0080             attrs = {}
0081         if not isinstance(attrs, dict):
0082             attrs = json.loads(attrs)
0083 
0084         if attrs['request_type'] == CommunicationRequest.RequestType.RequestJobs:
0085             def_attrs = {'num_jobs': 1, 'post_hook': None, 'response': None}
0086         if attrs['request_type'] == CommunicationRequest.RequestType.RequestEvents:
0087             def_attrs = {'num_event_ranges': 1, 'post_hook': None, 'response': None}
0088         if attrs['request_type'] == CommunicationRequest.RequestType.UpdateEvents:
0089             def_attrs = {'update_events': None, 'post_hook': None, 'response': None}
0090         if attrs['request_type'] == CommunicationRequest.RequestType.UpdateJobs:
0091             def_attrs = {'jobs': None, 'post_hook': None, 'response': None}
0092 
0093         for key in def_attrs:
0094             if key not in attrs:
0095                 attrs[key] = def_attrs[key]
0096 
0097         for key in attrs:
0098             setattr(self, key, attrs[key])
0099         self.abort = False
0100 
0101     def __str__(self):
0102         json_str = {}
0103         for key, value in list(self.__dict__.items()):  # Python 2/3
0104             if value and type(value) is list:
0105                 json_str[key] = []
0106                 for list_item in value:
0107                     json_str[key].append(str(list_item))
0108             elif value:
0109                 json_str[key] = str(value)
0110             else:
0111                 json_str[key] = value
0112         return json.dumps(json_str)
0113 
0114 
0115 """
0116 Communication manager thread
0117 """
0118 
0119 
0120 class CommunicationManager(threading.Thread, PluginFactory):
0121 
0122     def __init__(self, *args, **kwargs):
0123         super(CommunicationManager, self).__init__()
0124         PluginFactory.__init__(self, *args, **kwargs)
0125         self.setName("CommunicationManager")
0126         self.post_get_jobs = None
0127         self.post_get_event_ranges_hook = None
0128         self.queues = {'request_get_jobs': queue.Queue(),  # Python 2/3
0129                        'update_jobs': queue.Queue(),
0130                        'request_get_events': queue.Queue(),
0131                        'update_events': queue.Queue(),
0132                        'processing_get_jobs': queue.Queue(),
0133                        'processing_update_jobs': queue.Queue(),
0134                        'processing_get_events': queue.Queue(),
0135                        'processing_update_events': queue.Queue()}
0136         self.queue_limits = {'request_get_jobs': None,
0137                              'update_jobs': None,
0138                              'request_get_events': None,
0139                              'update_events': None,
0140                              'processing_get_jobs': 1,
0141                              'processing_update_jobs': 1,
0142                              'processing_get_events': 1,
0143                              'processing_update_events': 1}
0144         self.stop_event = threading.Event()
0145         self.args = args
0146         self.kwargs = kwargs
0147 
0148     def stop(self):
0149         """
0150         Set stop signal(main run process will clean queued requests to release waiting clients and then quit)
0151         """
0152         if not self.is_stop():
0153             logger.info("Stopping Communication Manager.")
0154             self.stop_event.set()
0155 
0156     def is_stop(self):
0157         """
0158         check whether the stop signal is set
0159 
0160         :returns: True if the stop signal is set, otherwise False
0161         """
0162         return self.stop_event.is_set()
0163 
0164     def get_jobs(self, njobs=1, post_hook=None, args=None):
0165         """
0166         Function can be called by client to send a get_job request and get a response with jobs.
0167 
0168         :returns: jobs(got from jobs servers)
0169         :raise: Exception catched when getting jobs
0170         """
0171 
0172         if self.is_stop():
0173             return None
0174 
0175         req_attrs = {}
0176         if args:
0177             if not type(args) is dict:
0178                 args = vars(args)
0179             for key, value in list(args.items()):  # Python 2/3
0180                 req_attrs[key] = value
0181 
0182         other_req_attrs = {'request_type': CommunicationRequest.RequestType.RequestJobs,
0183                            'num_jobs': njobs,
0184                            'post_hook': post_hook}
0185         for key, value in list(other_req_attrs.items()):  # Python 2/3
0186             req_attrs[key] = value
0187 
0188         req = CommunicationRequest(req_attrs)
0189         self.queues['request_get_jobs'].put(req)
0190 
0191         if req.post_hook:
0192             return
0193 
0194         while req.response is None:
0195             time.sleep(1)
0196         if req.response.exception:
0197             raise req.response.exception
0198         if req.response.status is False:
0199             return None
0200         else:
0201             return req.response.content
0202 
0203     def update_jobs(self, jobs, post_hook=None):
0204         """
0205         Function can be called by client to update jobs' status to server.
0206 
0207         :returns: status of updating jobs
0208         :raise: Exception catched when updating jobs
0209         """
0210 
0211         if self.is_stop():
0212             return None
0213 
0214         req_attrs = {'request_type': CommunicationRequest.RequestType.UpdateJobs,
0215                      'jobs': jobs,
0216                      'post_hook': post_hook}
0217 
0218         req = CommunicationRequest(req_attrs)
0219         self.queues['update_jobs'].put(req)
0220 
0221         if req.post_hook:
0222             return
0223 
0224         while req.response is None:
0225             time.sleep(1)
0226         if req.response.exception:
0227             raise req.response.exception
0228         if req.response.status is False:
0229             return None
0230         else:
0231             return req.response.content
0232 
0233     def get_event_ranges(self, num_event_ranges=1, post_hook=None, job=None):
0234         """
0235         Function can be called by client to send a get_event_ranges request and get a response with event ranges.
0236 
0237         :returns: event ranges (got from jobs servers)
0238         :raise: Exception caught when getting event ranges
0239         """
0240 
0241         if self.is_stop():
0242             return None
0243 
0244         if not job:
0245             resp_attrs = {'status': -1,
0246                           'content': None,
0247                           'exception': exception.CommunicationFailure("Get events failed because job info missing(job: %s)" % job)}
0248             resp = CommunicationResponse(resp_attrs)
0249             raise resp.exception
0250 
0251         req_attrs = {'request_type': CommunicationRequest.RequestType.RequestEvents,
0252                      'num_event_ranges': num_event_ranges,
0253                      'post_hook': post_hook}
0254         req_attrs['jobid'] = job['PandaID']
0255         req_attrs['jobsetid'] = job['jobsetID']
0256         req_attrs['taskid'] = job['taskID']
0257         req_attrs['num_ranges'] = num_event_ranges
0258 
0259         req = CommunicationRequest(req_attrs)
0260         self.queues['request_get_events'].put(req)
0261 
0262         if req.post_hook:
0263             return
0264 
0265         while req.response is None:
0266             time.sleep(1)
0267         if req.response.exception:
0268             raise req.response.exception
0269         if req.response.status is False:
0270             return None
0271         else:
0272             return req.response.content
0273 
0274     def update_events(self, update_events, post_hook=None):
0275         """
0276         Function can be called by client to send a update_events request.
0277 
0278         :returns: status of updating event ranges
0279         :raise: Exception catched when updating event ranges
0280         """
0281 
0282         if self.is_stop():
0283             return None
0284 
0285         req_attrs = {'request_type': CommunicationRequest.RequestType.UpdateEvents,
0286                      'update_events': update_events,
0287                      'post_hook': post_hook}
0288         req = CommunicationRequest(req_attrs)
0289         self.queues['update_events'].put(req)
0290 
0291         if req.post_hook:
0292             return
0293 
0294         while req.response is None:
0295             time.sleep(1)
0296         if req.response.exception:
0297             raise req.response.exception
0298         if req.response.status is False:
0299             return None
0300         else:
0301             return req.response.content
0302 
0303     def get_plugin_confs(self):
0304         """
0305         Get different plugin for different communicator
0306 
0307         :returns: dict with {'class': <plugin_class>} and other items
0308         """
0309 
0310         plugin = os.environ.get('COMMUNICATOR_PLUGIN', None)
0311         if not plugin:
0312             plugin_confs = {'class': 'pilot.eventservice.communicationmanager.plugins.pandacommunicator.PandaCommunicator'}
0313         elif plugin == 'act':
0314             plugin_confs = {'class': 'pilot.eventservice.communicationmanager.plugins.actcommunicator.ACTCommunicator'}
0315         elif plugin == 'harvestersf':
0316             plugin_confs = {'class': 'pilot.eventservice.communicationmanager.plugins.harvestersharefilecommunicator.HarvesterShareFileCommunicator'}
0317         else:
0318             plugin_confs = {'class': 'pilot.eventservice.communicationmanager.plugins.pandacommunicator.PandaCommunicator'}
0319 
0320         if self.args:
0321             for key, value in list(vars(self.args).items()):  # Python 2/3
0322                 plugin_confs[key] = value
0323         return plugin_confs
0324 
0325     def can_process_request(self, processor, process_type):
0326         """
0327         To check whether it is ready to process request in a type.
0328         For request such as HarvesterShareFileCommunicator, it should check whether there are processing requests to avoid overwriting files.
0329 
0330         :returns: True or False
0331         """
0332 
0333         if self.queues[process_type].empty():
0334             return False
0335 
0336         next_queue = processor[process_type]['next_queue']
0337         if next_queue is None or self.queue_limits[next_queue] is None:
0338             return True
0339 
0340         if self.queues[next_queue].qsize() < self.queue_limits[next_queue]:
0341             return True
0342 
0343         return False
0344 
0345     def run(self):
0346         """
0347         Main loop to handle communication requests
0348         """
0349 
0350         confs = self.get_plugin_confs()
0351         logger.info("Communication plugin confs: %s" % confs)
0352         communicator = self.get_plugin(confs)
0353         logger.info("Communication: %s" % communicator)
0354 
0355         processor = {'request_get_jobs': {'pre_check': communicator.pre_check_get_jobs,
0356                                           'handler': communicator.request_get_jobs,
0357                                           'next_queue': 'processing_get_jobs',
0358                                           'process_req_post_hook': False},
0359                      'request_get_events': {'pre_check': communicator.pre_check_get_events,
0360                                             'handler': communicator.request_get_events,
0361                                             'next_queue': 'processing_get_events',
0362                                             'process_req_post_hook': False},
0363                      'update_jobs': {'pre_check': communicator.pre_check_update_jobs,
0364                                      'handler': communicator.update_jobs,
0365                                      'next_queue': None,
0366                                      'process_req_post_hook': True},
0367                      'update_events': {'pre_check': communicator.pre_check_update_events,
0368                                        'handler': communicator.update_events,
0369                                        'next_queue': None,
0370                                        'process_req_post_hook': True},
0371                      'processing_get_jobs': {'pre_check': communicator.check_get_jobs_status,
0372                                              'handler': communicator.get_jobs,
0373                                              'next_queue': None,
0374                                              'process_req_post_hook': True},
0375                      'processing_get_events': {'pre_check': communicator.check_get_events_status,
0376                                                'handler': communicator.get_events,
0377                                                'next_queue': None,
0378                                                'process_req_post_hook': True}
0379                      }
0380 
0381         while True:
0382             has_req = False
0383             for process_type in processor:
0384                 if self.is_stop():
0385                     while not self.queues[process_type].empty():
0386                         req = self.queues[process_type].get()
0387                         logger.info("Is going to stop, aborting request: %s" % req)
0388                         req.abort = True
0389                         resp_attrs = {'status': None,
0390                                       'content': None,
0391                                       'exception': exception.CommunicationFailure("Communication manager is stopping, abort this request")}
0392                         req.response = CommunicationResponse(resp_attrs)
0393                 elif self.can_process_request(processor, process_type):
0394                     pre_check_resp = processor[process_type]['pre_check']()
0395                     if not pre_check_resp.status == 0:
0396                         continue
0397 
0398                     logger.info("Processing %s" % process_type)
0399 
0400                     has_req = True
0401                     req = self.queues[process_type].get()
0402 
0403                     logger.info("Processing %s request: %s" % (process_type, req))
0404                     res = processor[process_type]['handler'](req)
0405                     logger.info("Processing %s respone: %s" % (process_type, res))
0406 
0407                     if res.status is False:
0408                         req.response = res
0409                     else:
0410                         next_queue = processor[process_type]['next_queue']
0411                         if next_queue:
0412                             self.queues[next_queue].put(req)
0413                         else:
0414                             req.response = res
0415                         process_req_post_hook = processor[process_type]['process_req_post_hook']
0416                         if process_req_post_hook and req.post_hook:
0417                             req.post_hook(res)
0418             if not has_req:
0419                 if self.is_stop():
0420                     break
0421             time.sleep(1)
0422         logger.info("Communication manager stopped.")