File indexing completed on 2026-04-11 08:41:03
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 import json
0012 import threading
0013 import traceback
0014 from os import environ
0015
0016 from pilot.common import exception
0017 from pilot.util import https
0018 from pilot.util.config import config
0019 from ..communicationmanager import CommunicationResponse
0020 from .basecommunicator import BaseCommunicator
0021
0022 import logging
0023 logger = logging.getLogger(__name__)
0024
0025 """
0026 Panda Communicator
0027 """
0028
0029
0030 class PandaCommunicator(BaseCommunicator):
0031 def __init__(self, *args, **kwargs):
0032 super(PandaCommunicator, self).__init__(args, kwargs)
0033 self.get_jobs_lock = threading.Lock()
0034 self.get_events_lock = threading.Lock()
0035 self.update_events_lock = threading.Lock()
0036 self.update_jobs_lock = threading.Lock()
0037
0038 def pre_check_get_jobs(self, req=None):
0039 """
0040 Precheck whether it's ok to send a requst to get jobs.
0041 """
0042 return CommunicationResponse({'status': 0})
0043
0044 def request_get_jobs(self, req):
0045 """
0046 Send a requst to get jobs.
0047 """
0048 return CommunicationResponse({'status': 0})
0049
0050 def check_get_jobs_status(self, req=None):
0051 """
0052 Check whether jobs are prepared
0053 """
0054 return CommunicationResponse({'status': 0})
0055
0056 def get_jobs(self, req):
0057 """
0058 Get the job definition from panda server.
0059
0060 :return: job definiton dictionary.
0061 """
0062
0063 self.get_jobs_lock.acquire()
0064
0065 try:
0066 jobs = []
0067 resp_attrs = None
0068
0069 data = {'getProxyKey': 'False'}
0070 kmap = {'node': 'node', 'mem': 'mem', 'getProxyKey': 'getProxyKey', 'computingElement': 'queue', 'diskSpace': 'disk_space',
0071 'siteName': 'site', 'prodSourceLabel': 'job_label', 'workingGroup': 'working_group', 'cpu': 'cpu'}
0072 for key, value in list(kmap.items()):
0073 if hasattr(req, value):
0074 data[key] = getattr(req, value)
0075
0076 for i in range(req.num_jobs):
0077 logger.info("Getting jobs: %s" % data)
0078 url = environ.get('PANDA_SERVER_URL', config.Pilot.pandaserver)
0079 res = https.request('{pandaserver}/server/panda/getJob'.format(pandaserver=url), data=data)
0080 logger.info("Got jobs returns: %s" % res)
0081
0082 if res is None:
0083 resp_attrs = {'status': None, 'content': None, 'exception': exception.CommunicationFailure("Get job failed to get response from Panda.")}
0084 break
0085 elif res['StatusCode'] == 20 and 'no jobs in PanDA' in res['errorDialog']:
0086 resp_attrs = {'status': res['StatusCode'],
0087 'content': None,
0088 'exception': exception.CommunicationFailure("No jobs in panda")}
0089 elif res['StatusCode'] != 0:
0090 resp_attrs = {'status': res['StatusCode'],
0091 'content': None,
0092 'exception': exception.CommunicationFailure("Get job from Panda returns a non-zero value: %s" % res['StatusCode'])}
0093 break
0094 else:
0095 jobs.append(res)
0096
0097 if jobs:
0098 resp_attrs = {'status': 0, 'content': jobs, 'exception': None}
0099 elif not resp_attrs:
0100 resp_attrs = {'status': -1, 'content': None, 'exception': exception.UnknownException("Failed to get jobs")}
0101
0102 resp = CommunicationResponse(resp_attrs)
0103 except Exception as e:
0104 logger.error("Failed to get jobs: %s, %s" % (e, traceback.format_exc()))
0105 resp_attrs = {'status': -1, 'content': None, 'exception': exception.UnknownException("Failed to get jobs: %s" % (traceback.format_exc()))}
0106 resp = CommunicationResponse(resp_attrs)
0107
0108 self.get_jobs_lock.release()
0109
0110 return resp
0111
0112 def pre_check_get_events(self, req=None):
0113 """
0114 Precheck whether it's ok to send a request to get events.
0115 """
0116 return CommunicationResponse({'status': 0})
0117
0118 def request_get_events(self, req):
0119 """
0120 Send a requst to get events.
0121 """
0122 return CommunicationResponse({'status': 0})
0123
0124 def check_get_events_status(self, req=None):
0125 """
0126 Check whether events prepared
0127 """
0128 return CommunicationResponse({'status': 0})
0129
0130 def get_events(self, req):
0131 """
0132 Get events
0133 """
0134 self.get_events_lock.acquire()
0135
0136 resp = None
0137 try:
0138 if not req.num_ranges:
0139
0140 req.num_ranges = 1
0141
0142 data = {'pandaID': req.jobid,
0143 'jobsetID': req.jobsetid,
0144 'taskID': req.taskid,
0145 'nRanges': req.num_ranges}
0146
0147 logger.info("Downloading new event ranges: %s" % data)
0148 url = environ.get('PANDA_SERVER_URL', config.Pilot.pandaserver)
0149 res = https.request('{pandaserver}/server/panda/getEventRanges'.format(pandaserver=url), data=data)
0150 logger.info("Downloaded event ranges: %s" % res)
0151
0152 if res is None:
0153 resp_attrs = {'status': -1,
0154 'content': None,
0155 'exception': exception.CommunicationFailure("Get events from panda returns None as return value")}
0156 elif res['StatusCode'] == 0 or str(res['StatusCode']) == '0':
0157 resp_attrs = {'status': 0, 'content': res['eventRanges'], 'exception': None}
0158 else:
0159 resp_attrs = {'status': res['StatusCode'],
0160 'content': None,
0161 'exception': exception.CommunicationFailure("Get events from panda returns non-zero value: %s" % res['StatusCode'])}
0162
0163 resp = CommunicationResponse(resp_attrs)
0164 except Exception as e:
0165 logger.error("Failed to download event ranges: %s, %s" % (e, traceback.format_exc()))
0166 resp_attrs = {'status': -1, 'content': None, 'exception': exception.UnknownException("Failed to get events: %s" % (traceback.format_exc()))}
0167 resp = CommunicationResponse(resp_attrs)
0168
0169 self.get_events_lock.release()
0170
0171 return resp
0172
0173 def pre_check_update_events(self, req=None):
0174 """
0175 Precheck whether it's ok to update events.
0176 """
0177 self.update_events_lock.acquire()
0178 try:
0179 pass
0180 except Exception as e:
0181 logger.error("Failed to pre_check_update_events: %s, %s" % (e, traceback.format_exc()))
0182 self.update_events_lock.release()
0183 return CommunicationResponse({'status': 0})
0184
0185 def update_events(self, req):
0186 """
0187 Update events.
0188 """
0189 self.update_events_lock.acquire()
0190
0191 resp = None
0192 try:
0193 logger.info("Updating events: %s" % req)
0194 url = environ.get('PANDA_SERVER_URL', config.Pilot.pandaserver)
0195 res = https.request('{pandaserver}/server/panda/updateEventRanges'.format(pandaserver=url), data=req.update_events)
0196
0197 logger.info("Updated event ranges status: %s" % res)
0198 resp_attrs = {'status': 0, 'content': res, 'exception': None}
0199 resp = CommunicationResponse(resp_attrs)
0200 except Exception as e:
0201 logger.error("Failed to update event ranges: %s, %s" % (e, traceback.format_exc()))
0202 resp_attrs = {'status': -1, 'content': None, 'exception': exception.UnknownException("Failed to update events: %s" % (traceback.format_exc()))}
0203 resp = CommunicationResponse(resp_attrs)
0204
0205 self.update_events_lock.release()
0206 return resp
0207
0208 def pre_check_update_jobs(self, req=None):
0209 """
0210 Precheck whether it's ok to update jobs.
0211 """
0212 self.update_jobs_lock.acquire()
0213 try:
0214 pass
0215 except Exception as e:
0216 logger.error("Failed to pre_check_update_jobs: %s, %s" % (e, traceback.format_exc()))
0217 self.update_jobs_lock.release()
0218 return CommunicationResponse({'status': 0})
0219
0220 def update_job(self, job):
0221 """
0222 Update job.
0223 """
0224
0225 try:
0226 logger.info("Updating job: %s" % job)
0227 url = environ.get('PANDA_SERVER_URL', config.Pilot.pandaserver)
0228 res = https.request('{pandaserver}/server/panda/updateJob'.format(pandaserver=url), data=job)
0229
0230 logger.info("Updated jobs status: %s" % res)
0231 return res
0232 except Exception as e:
0233 logger.error("Failed to update jobs: %s, %s" % (e, traceback.format_exc()))
0234 return -1
0235
0236 def update_jobs(self, req):
0237 """
0238 Update jobs.
0239 """
0240 self.update_jobs_lock.acquire()
0241
0242 resp = None
0243 try:
0244 logger.info("Updating jobs: %s" % req)
0245 res_list = []
0246 for job in req.jobs:
0247 res = self.update_job(job)
0248 res_list.append(res)
0249 resp_attrs = {'status': 0, 'content': res_list, 'exception': None}
0250 resp = CommunicationResponse(resp_attrs)
0251 except Exception as e:
0252 logger.error("Failed to update jobs: %s, %s" % (e, traceback.format_exc()))
0253 resp_attrs = {'status': -1, 'content': None, 'exception': exception.UnknownException("Failed to update jobs: %s" % (traceback.format_exc()))}
0254 resp = CommunicationResponse(resp_attrs)
0255
0256 self.update_jobs_lock.release()
0257 return resp
0258
0259 def update_jobs_old(self, req):
0260 """
0261 Update jobs.
0262 """
0263 self.update_jobs_lock.acquire()
0264
0265 try:
0266 logger.info("Updating jobs: %s" % req)
0267 data = {'jobList': json.dumps(req.jobs)}
0268 url = environ.get('PANDA_SERVER_URL', config.Pilot.pandaserver)
0269 res = https.request('{pandaserver}/server/panda/updateJobsInBulk'.format(pandaserver=url), data=data)
0270
0271 logger.info("Updated jobs status: %s" % res)
0272 resp_attrs = {'status': 0, 'content': res, 'exception': None}
0273 resp = CommunicationResponse(resp_attrs)
0274 except Exception as e:
0275 logger.error("Failed to update jobs: %s, %s" % (e, traceback.format_exc()))
0276 resp_attrs = {'status': -1, 'content': None, 'exception': exception.UnknownException("Failed to update jobs: %s" % (traceback.format_exc()))}
0277 resp = CommunicationResponse(resp_attrs)
0278
0279 self.update_jobs_lock.release()
0280 return resp