Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-11 08:41:03

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Wen Guan, wen.guan@cern.ch, 2018
0009 # - Paul Nilsson, paul.nilsson@cern.ch, 2020-21
0010 
0011 import json
0012 import threading
0013 import traceback
0014 from os import environ
0015 
0016 from pilot.common import exception
0017 from pilot.util import https
0018 from pilot.util.config import config
0019 from ..communicationmanager import CommunicationResponse
0020 from .basecommunicator import BaseCommunicator
0021 
0022 import logging
0023 logger = logging.getLogger(__name__)
0024 
0025 """
0026 Panda Communicator
0027 """
0028 
0029 
0030 class PandaCommunicator(BaseCommunicator):
0031     def __init__(self, *args, **kwargs):
0032         super(PandaCommunicator, self).__init__(args, kwargs)
0033         self.get_jobs_lock = threading.Lock()
0034         self.get_events_lock = threading.Lock()
0035         self.update_events_lock = threading.Lock()
0036         self.update_jobs_lock = threading.Lock()
0037 
0038     def pre_check_get_jobs(self, req=None):
0039         """
0040         Precheck whether it's ok to send a requst to get jobs.
0041         """
0042         return CommunicationResponse({'status': 0})
0043 
0044     def request_get_jobs(self, req):
0045         """
0046         Send a requst to get jobs.
0047         """
0048         return CommunicationResponse({'status': 0})
0049 
0050     def check_get_jobs_status(self, req=None):
0051         """
0052         Check whether jobs are prepared
0053         """
0054         return CommunicationResponse({'status': 0})
0055 
0056     def get_jobs(self, req):
0057         """
0058         Get the job definition from panda server.
0059 
0060         :return: job definiton dictionary.
0061         """
0062 
0063         self.get_jobs_lock.acquire()
0064 
0065         try:
0066             jobs = []
0067             resp_attrs = None
0068 
0069             data = {'getProxyKey': 'False'}
0070             kmap = {'node': 'node', 'mem': 'mem', 'getProxyKey': 'getProxyKey', 'computingElement': 'queue', 'diskSpace': 'disk_space',
0071                     'siteName': 'site', 'prodSourceLabel': 'job_label', 'workingGroup': 'working_group', 'cpu': 'cpu'}
0072             for key, value in list(kmap.items()):  # Python 2/3
0073                 if hasattr(req, value):
0074                     data[key] = getattr(req, value)
0075 
0076             for i in range(req.num_jobs):
0077                 logger.info("Getting jobs: %s" % data)
0078                 url = environ.get('PANDA_SERVER_URL', config.Pilot.pandaserver)
0079                 res = https.request('{pandaserver}/server/panda/getJob'.format(pandaserver=url), data=data)
0080                 logger.info("Got jobs returns: %s" % res)
0081 
0082                 if res is None:
0083                     resp_attrs = {'status': None, 'content': None, 'exception': exception.CommunicationFailure("Get job failed to get response from Panda.")}
0084                     break
0085                 elif res['StatusCode'] == 20 and 'no jobs in PanDA' in res['errorDialog']:
0086                     resp_attrs = {'status': res['StatusCode'],
0087                                   'content': None,
0088                                   'exception': exception.CommunicationFailure("No jobs in panda")}
0089                 elif res['StatusCode'] != 0:
0090                     resp_attrs = {'status': res['StatusCode'],
0091                                   'content': None,
0092                                   'exception': exception.CommunicationFailure("Get job from Panda returns a non-zero value: %s" % res['StatusCode'])}
0093                     break
0094                 else:
0095                     jobs.append(res)
0096 
0097             if jobs:
0098                 resp_attrs = {'status': 0, 'content': jobs, 'exception': None}
0099             elif not resp_attrs:
0100                 resp_attrs = {'status': -1, 'content': None, 'exception': exception.UnknownException("Failed to get jobs")}
0101 
0102             resp = CommunicationResponse(resp_attrs)
0103         except Exception as e:  # Python 2/3
0104             logger.error("Failed to get jobs: %s, %s" % (e, traceback.format_exc()))
0105             resp_attrs = {'status': -1, 'content': None, 'exception': exception.UnknownException("Failed to get jobs: %s" % (traceback.format_exc()))}
0106             resp = CommunicationResponse(resp_attrs)
0107 
0108         self.get_jobs_lock.release()
0109 
0110         return resp
0111 
0112     def pre_check_get_events(self, req=None):
0113         """
0114         Precheck whether it's ok to send a request to get events.
0115         """
0116         return CommunicationResponse({'status': 0})
0117 
0118     def request_get_events(self, req):
0119         """
0120         Send a requst to get events.
0121         """
0122         return CommunicationResponse({'status': 0})
0123 
0124     def check_get_events_status(self, req=None):
0125         """
0126         Check whether events prepared
0127         """
0128         return CommunicationResponse({'status': 0})
0129 
0130     def get_events(self, req):
0131         """
0132         Get events
0133         """
0134         self.get_events_lock.acquire()
0135 
0136         resp = None
0137         try:
0138             if not req.num_ranges:
0139                 # ToBeFix num_ranges with corecount
0140                 req.num_ranges = 1
0141 
0142             data = {'pandaID': req.jobid,
0143                     'jobsetID': req.jobsetid,
0144                     'taskID': req.taskid,
0145                     'nRanges': req.num_ranges}
0146 
0147             logger.info("Downloading new event ranges: %s" % data)
0148             url = environ.get('PANDA_SERVER_URL', config.Pilot.pandaserver)
0149             res = https.request('{pandaserver}/server/panda/getEventRanges'.format(pandaserver=url), data=data)
0150             logger.info("Downloaded event ranges: %s" % res)
0151 
0152             if res is None:
0153                 resp_attrs = {'status': -1,
0154                               'content': None,
0155                               'exception': exception.CommunicationFailure("Get events from panda returns None as return value")}
0156             elif res['StatusCode'] == 0 or str(res['StatusCode']) == '0':
0157                 resp_attrs = {'status': 0, 'content': res['eventRanges'], 'exception': None}
0158             else:
0159                 resp_attrs = {'status': res['StatusCode'],
0160                               'content': None,
0161                               'exception': exception.CommunicationFailure("Get events from panda returns non-zero value: %s" % res['StatusCode'])}
0162 
0163             resp = CommunicationResponse(resp_attrs)
0164         except Exception as e:  # Python 2/3
0165             logger.error("Failed to download event ranges: %s, %s" % (e, traceback.format_exc()))
0166             resp_attrs = {'status': -1, 'content': None, 'exception': exception.UnknownException("Failed to get events: %s" % (traceback.format_exc()))}
0167             resp = CommunicationResponse(resp_attrs)
0168 
0169         self.get_events_lock.release()
0170 
0171         return resp
0172 
0173     def pre_check_update_events(self, req=None):
0174         """
0175         Precheck whether it's ok to update events.
0176         """
0177         self.update_events_lock.acquire()
0178         try:
0179             pass
0180         except Exception as e:  # Python 2/3
0181             logger.error("Failed to pre_check_update_events: %s, %s" % (e, traceback.format_exc()))
0182         self.update_events_lock.release()
0183         return CommunicationResponse({'status': 0})
0184 
0185     def update_events(self, req):
0186         """
0187         Update events.
0188         """
0189         self.update_events_lock.acquire()
0190 
0191         resp = None
0192         try:
0193             logger.info("Updating events: %s" % req)
0194             url = environ.get('PANDA_SERVER_URL', config.Pilot.pandaserver)
0195             res = https.request('{pandaserver}/server/panda/updateEventRanges'.format(pandaserver=url), data=req.update_events)
0196 
0197             logger.info("Updated event ranges status: %s" % res)
0198             resp_attrs = {'status': 0, 'content': res, 'exception': None}
0199             resp = CommunicationResponse(resp_attrs)
0200         except Exception as e:  # Python 2/3
0201             logger.error("Failed to update event ranges: %s, %s" % (e, traceback.format_exc()))
0202             resp_attrs = {'status': -1, 'content': None, 'exception': exception.UnknownException("Failed to update events: %s" % (traceback.format_exc()))}
0203             resp = CommunicationResponse(resp_attrs)
0204 
0205         self.update_events_lock.release()
0206         return resp
0207 
0208     def pre_check_update_jobs(self, req=None):
0209         """
0210         Precheck whether it's ok to update jobs.
0211         """
0212         self.update_jobs_lock.acquire()
0213         try:
0214             pass
0215         except Exception as e:  # Python 2/3
0216             logger.error("Failed to pre_check_update_jobs: %s, %s" % (e, traceback.format_exc()))
0217         self.update_jobs_lock.release()
0218         return CommunicationResponse({'status': 0})
0219 
0220     def update_job(self, job):
0221         """
0222         Update job.
0223         """
0224 
0225         try:
0226             logger.info("Updating job: %s" % job)
0227             url = environ.get('PANDA_SERVER_URL', config.Pilot.pandaserver)
0228             res = https.request('{pandaserver}/server/panda/updateJob'.format(pandaserver=url), data=job)
0229 
0230             logger.info("Updated jobs status: %s" % res)
0231             return res
0232         except Exception as e:  # Python 2/3
0233             logger.error("Failed to update jobs: %s, %s" % (e, traceback.format_exc()))
0234             return -1
0235 
0236     def update_jobs(self, req):
0237         """
0238         Update jobs.
0239         """
0240         self.update_jobs_lock.acquire()
0241 
0242         resp = None
0243         try:
0244             logger.info("Updating jobs: %s" % req)
0245             res_list = []
0246             for job in req.jobs:
0247                 res = self.update_job(job)
0248                 res_list.append(res)
0249             resp_attrs = {'status': 0, 'content': res_list, 'exception': None}
0250             resp = CommunicationResponse(resp_attrs)
0251         except Exception as e:  # Python 2/3
0252             logger.error("Failed to update jobs: %s, %s" % (e, traceback.format_exc()))
0253             resp_attrs = {'status': -1, 'content': None, 'exception': exception.UnknownException("Failed to update jobs: %s" % (traceback.format_exc()))}
0254             resp = CommunicationResponse(resp_attrs)
0255 
0256         self.update_jobs_lock.release()
0257         return resp
0258 
0259     def update_jobs_old(self, req):
0260         """
0261         Update jobs.
0262         """
0263         self.update_jobs_lock.acquire()
0264 
0265         try:
0266             logger.info("Updating jobs: %s" % req)
0267             data = {'jobList': json.dumps(req.jobs)}
0268             url = environ.get('PANDA_SERVER_URL', config.Pilot.pandaserver)
0269             res = https.request('{pandaserver}/server/panda/updateJobsInBulk'.format(pandaserver=url), data=data)
0270 
0271             logger.info("Updated jobs status: %s" % res)
0272             resp_attrs = {'status': 0, 'content': res, 'exception': None}
0273             resp = CommunicationResponse(resp_attrs)
0274         except Exception as e:  # Python 2/3
0275             logger.error("Failed to update jobs: %s, %s" % (e, traceback.format_exc()))
0276             resp_attrs = {'status': -1, 'content': None, 'exception': exception.UnknownException("Failed to update jobs: %s" % (traceback.format_exc()))}
0277             resp = CommunicationResponse(resp_attrs)
0278 
0279         self.update_jobs_lock.release()
0280         return resp