Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 08:38:46

0001 """
0002 client methods
0003 """
0004 
0005 import gzip
0006 import json
0007 import os
0008 import pickle
0009 import socket
0010 import sys
0011 import tempfile
0012 
0013 import requests
0014 from pandacommon.pandautils.net_utils import replace_hostname_in_url_randomly
0015 
0016 from pandaserver.api.v1.http_client import HttpClient as HttpClientV1
0017 from pandaserver.api.v1.http_client import api_url_ssl as api_url_ssl_v1
0018 from pandaserver.taskbuffer.JobUtils import dump_jobs_json
0019 
0020 # PanDA server configuration
0021 baseURL = os.environ.get("PANDA_URL", "http://pandaserver.cern.ch:25080/server/panda")
0022 baseURLSSL = os.environ.get("PANDA_URL_SSL", "https://pandaserver.cern.ch:25443/server/panda")
0023 
0024 DEFAULT_CERT_PATH = "/etc/grid-security/certificates"
0025 
0026 # exit code
0027 EC_Failed = 255
0028 
0029 
0030 def is_https(url):
0031     # check if https is used
0032     return url.startswith("https://")
0033 
0034 
0035 def pickle_dumps(obj):
0036     # wrapper for pickle with python 3
0037     return pickle.dumps(obj, protocol=0)
0038 
0039 
0040 def pickle_loads(obj_string):
0041     try:
0042         return pickle.loads(obj_string.encode())
0043     except Exception:
0044         return pickle.loads(obj_string)
0045 
0046 
0047 class HttpClient:
0048     def __init__(self):
0049         # verification of the host certificate
0050         if "PANDA_VERIFY_HOST" in os.environ and os.environ["PANDA_VERIFY_HOST"] == "off":
0051             self.verifyHost = False
0052         else:
0053             self.verifyHost = True
0054 
0055         # request a compressed response
0056         self.compress = True
0057 
0058         # SSL cert/key
0059         self.ssl_certificate = self._x509()
0060         self.ssl_key = self._x509()
0061 
0062         self.use_json = False
0063 
0064         # OIDC
0065         self.oidc = os.getenv("PANDA_AUTH") == "oidc"
0066         self.auth_vo = os.getenv("PANDA_AUTH_VO") if self.oidc else None
0067         self.id_token = os.getenv("PANDA_AUTH_ID_TOKEN") if self.oidc else None
0068         if self.id_token and self.id_token.startswith("file:"):
0069             with open(self.id_token[5:], "r") as f:
0070                 self.id_token = f.read().strip()
0071 
0072     def _x509(self):
0073         # retrieve the X509_USER_PROXY from the environment variables and check if it is readable
0074         try:
0075             if "X509_USER_PROXY" in os.environ and os.access(os.environ["X509_USER_PROXY"], os.R_OK):
0076                 return os.environ["X509_USER_PROXY"]
0077         except Exception:
0078             pass
0079 
0080         # look for the default place
0081         x509 = f"/tmp/x509up_u{os.getuid()}"
0082         if os.access(x509, os.R_OK):
0083             return x509
0084 
0085         # no valid proxy certificate
0086         print("No valid grid proxy certificate found")
0087         return ""
0088 
0089     def _prepare_url(self, url):
0090         """Modify URL with HTTPS check and hostname replacement."""
0091         use_https = is_https(url)
0092         if "PANDA_BEHIND_REAL_LB" in os.environ:
0093             modified_url = url
0094         else:
0095             modified_url = replace_hostname_in_url_randomly(url)
0096         return modified_url, use_https
0097 
0098     def _prepare_headers(self):
0099         """Prepare headers based on authentication and JSON settings."""
0100         headers = {}
0101 
0102         if self.oidc:
0103             headers["Authorization"] = f"Bearer {self.id_token}"
0104             headers["Origin"] = self.auth_vo
0105 
0106         if self.use_json:
0107             headers["Accept"] = "application/json"
0108 
0109         return headers
0110 
0111     def _prepare_ssl(self, use_https):
0112         """Prepare SSL configuration based on HTTPS usage and verification settings."""
0113         cert = None  # no certificate by default when no HTTS or using oidc headers
0114         verify = True  # validate against default system CA certificates
0115 
0116         if use_https:
0117             # oidc tokens are added to the headers, we don't need to provide a certificate
0118             if not self.oidc:
0119                 cert = (self.ssl_certificate, self.ssl_key)
0120 
0121             # the host verification has been disabled in the configuration
0122             if not self.verifyHost:
0123                 verify = False
0124             # there is a path to the CA certificate folder and it exists
0125             elif "X509_CERT_DIR" in os.environ and os.path.exists(os.environ["X509_CERT_DIR"]):
0126                 verify = os.environ["X509_CERT_DIR"]
0127             # the CA certificate folder is available in the standard location
0128             elif os.path.exists(DEFAULT_CERT_PATH):
0129                 verify = DEFAULT_CERT_PATH
0130 
0131         return cert, verify
0132 
0133     def get(self, url, data):
0134         url, use_https = self._prepare_url(url)
0135         headers = self._prepare_headers()
0136         cert, verify = self._prepare_ssl(use_https)
0137 
0138         try:
0139             response = requests.get(url, headers=headers, params=data, timeout=600, cert=cert, verify=verify)
0140             response.raise_for_status()
0141             return 0, response.text
0142         except requests.RequestException as e:
0143             return 255, str(e)
0144 
0145     def post(self, url, data):
0146         url, use_https = self._prepare_url(url)
0147         headers = self._prepare_headers()
0148         cert, verify = self._prepare_ssl(use_https)
0149 
0150         try:
0151             response = requests.post(url, headers=headers, data=data, timeout=600, cert=cert, verify=verify)
0152             response.raise_for_status()
0153             return 0, response.text
0154         except requests.RequestException as e:
0155             return 255, str(e)
0156 
0157     def post_files(self, url, data):
0158         url, use_https = self._prepare_url(url)
0159         headers = self._prepare_headers()
0160         cert, verify = self._prepare_ssl(use_https)
0161 
0162         files = {}
0163         try:
0164             for key, value in data.items():
0165                 if type(data[key]) == str:
0166                     # we got a file to upload without specifying the destination name
0167                     files[key] = open(data[key], "rb")
0168                 else:
0169                     # we got a file to upload which specifies the destination name
0170                     files[key] = (data[key][0], open(data[key][1], "rb"))
0171             response = requests.post(url, headers=headers, files=files, timeout=600, cert=cert, verify=verify)
0172             response.raise_for_status()
0173             return 0, response.text
0174         except requests.RequestException as e:
0175             return 255, str(e)
0176         finally:
0177             for file in files.values():
0178                 if type(file) == tuple:
0179                     file_handler = file[1]
0180                 else:
0181                     file_handler = file
0182                 file_handler.close()
0183 
0184 
0185 """
0186 Client API
0187 """
0188 
0189 
0190 def submit_jobs(jobs):
0191     """
0192     Submit jobs
0193 
0194     args:
0195         jobs: the list of JobSpecs
0196     returns:
0197         status code
0198               0: communication succeeded to the panda server
0199               255: communication failure
0200         return code
0201               True: request is processed
0202               False: not processed
0203     """
0204     # set hostname to jobs
0205     hostname = socket.getfqdn()
0206     for job in jobs:
0207         job.creationHost = hostname
0208 
0209     # serialize the jobs to json
0210     jobs = dump_jobs_json(jobs)
0211 
0212     http_client = HttpClientV1()
0213 
0214     url = f"{api_url_ssl_v1}/job/submit"
0215     data = {"jobs": jobs}
0216 
0217     status, output = http_client.post(url, data)
0218     return status, output
0219 
0220 
0221 def get_job_status(job_ids):
0222     """
0223     Get job status
0224 
0225     args:
0226         job_ids: the list of PandaIDs
0227     returns:
0228         status code
0229               0: communication succeeded to the panda server
0230               255: communication failure
0231         the list of JobSpecs (or Nones for non-existing PandaIDs)
0232     """
0233     http_client = HttpClientV1()
0234     url = f"{api_url_ssl_v1}/pilot/get_job_status"
0235     data = {"job_ids": job_ids}
0236     status, output = http_client.post(url, data)
0237 
0238     return status, output
0239 
0240 
0241 def kill_jobs(
0242     job_ids,
0243     code=None,
0244     keep_unmerged=False,
0245     job_sub_status=None,
0246 ):
0247     """
0248     Kill jobs. Normal users can kill only their own jobs.
0249     People with production VOMS role can kill any jobs.
0250     Running jobs are killed when next heartbeat comes from the pilot.
0251     Set code=9 if running jobs need to be killed immediately.
0252 
0253        args:
0254            job_ids: the list of PandaIDs
0255            code: specify why the jobs are killed
0256                  2: expire
0257                  3: aborted
0258                  4: expire in waiting
0259                  7: retry by server
0260                  8: re-brokerage
0261                  9: force kill
0262                  10: fast re-brokerage on overloaded PQs
0263                  50: kill by JEDI
0264                  91: kill user jobs with prod role
0265            keep_unmerged: set True not to cancel unmerged jobs when pmerge is killed.
0266            job_sub_status: set job sub status if any
0267        returns:
0268            status code
0269                  0: communication succeeded to the panda server
0270                  255: communication failure
0271            the list of clouds (or Nones if tasks are not yet assigned)
0272     """
0273 
0274     http_client = HttpClientV1()
0275 
0276     url = f"{api_url_ssl_v1}/job/kill"
0277     data = {"job_ids": job_ids}
0278 
0279     if code:
0280         data["code"] = code
0281 
0282     kill_options = []
0283     if keep_unmerged == True:
0284         kill_options.append("keepUnmerged")
0285     if job_sub_status:
0286         kill_options.append(f"jobSubStatus={job_sub_status}")
0287     if kill_options:
0288         data["kill_options"] = kill_options
0289 
0290     status, output = http_client.post(url, data)
0291     return status, output
0292 
0293 
0294 def reassign_jobs(job_ids):
0295     """
0296     Triggers reassignment of jobs.
0297 
0298     args:
0299         ids: the list of taskIDs
0300     returns:
0301         status code
0302               0: communication succeeded to the panda server
0303               255: communication failure
0304         return code
0305               True: request is processed
0306               False: not processed
0307 
0308     """
0309     http_client = HttpClientV1()
0310 
0311     url = f"{api_url_ssl_v1}/job/reassign"
0312     data = {"job_ids": job_ids}
0313     status, output = http_client.post(url, data)
0314     return status, output
0315 
0316 
0317 def job_stats_by_cloud(job_type=None):
0318     """
0319     Get job statistics by cloud. Used by panglia monitor in TRIUMF
0320 
0321     args:
0322         job_type: string with the type of jobs to consider
0323             analysis: analysis jobs
0324             production: production jobs
0325     returns:
0326         status code
0327               0: communication succeeded to the panda server
0328               255: communication failure
0329         map of the number jobs per job status in each site
0330 
0331     """
0332     if job_type not in [None, "analysis", "production"]:
0333         print("Invalid job type, must be one of [None, 'analysis', 'production'']")
0334         return EC_Failed, "Invalid job type"
0335 
0336     http_client = HttpClientV1()
0337     url = f"{api_url_ssl_v1}/statistics/job_stats_by_cloud"
0338 
0339     data = {}
0340     if job_type:
0341         data["type"] = job_type
0342 
0343     status, output = http_client.get(url, data)
0344     if status != 0 or not output.get("success"):
0345         print(f"Failed to retrieve job_stats_by_cloud for {job_type}. Status: {status}, Output: {output}")
0346         return status, output
0347 
0348     statistics = output["data"]
0349     ret = {}
0350     for cloud, values in statistics.items():
0351         if cloud not in ret:
0352             # append cloud values (make a shallow copy to avoid mutating original)
0353             ret[cloud] = dict(values)
0354         else:
0355             # sum statistics per status
0356             for job_status, count in values.items():
0357                 ret[cloud][job_status] = ret[cloud].get(job_status, 0) + count
0358 
0359     return 0, ret
0360 
0361 
0362 # alias the old name to the new function for backwards compatibility
0363 def getJobStatistics(sourcetype=None):
0364     return job_stats_by_cloud(sourcetype)
0365 
0366 
0367 def production_job_stats_by_cloud_and_processing_type():
0368     """
0369     Get job statistics by cloud and processing type. Used by panglia monitor in TRIUMF
0370 
0371     returns:
0372         status code
0373               0: communication succeeded to the panda server
0374               255: communication failure
0375         map of the number jobs per job status in each site
0376 
0377     """
0378 
0379     http_client = HttpClientV1()
0380     url = f"{api_url_ssl_v1}/statistics/production_job_stats_by_cloud_and_processing_type"
0381     status, output = http_client.get(url, {})
0382 
0383     if status != 0 or not output.get("success"):
0384         print(f"Failed to retrieve production_job_stats_by_cloud_and_processing_type. Status: {status}, Output: {output}")
0385         return status, output
0386 
0387     statistics = output["data"]
0388     aggregated = {}
0389 
0390     for cloud, processing_type_map in statistics.items():
0391         # ensure we work with a shallow copy of the incoming mappings
0392         processing_type_map = dict(processing_type_map)
0393 
0394         if cloud not in aggregated:
0395             # copy nested structures to avoid mutating the original
0396             aggregated[cloud] = {processing_type: dict(status_map) for processing_type, status_map in processing_type_map.items()}
0397             continue
0398 
0399         # merge into existing cloud entry
0400         for processing_type, status_map in processing_type_map.items():
0401             status_map = dict(status_map)
0402             if processing_type not in aggregated[cloud]:
0403                 aggregated[cloud][processing_type] = status_map
0404                 continue
0405 
0406             # sum counts per status
0407             for status, count in status_map.items():
0408                 aggregated[cloud][processing_type][status] = aggregated[cloud][processing_type].get(status, 0) + count
0409     return 0, aggregated
0410 
0411 
0412 # alias the old name to the new function for backwards compatibility
0413 def getJobStatisticsForBamboo(useMorePG=False):
0414     return production_job_stats_by_cloud_and_processing_type()
0415 
0416 
0417 def job_stats_by_site_and_resource_type(time_window=None):
0418     """
0419     Get job statistics per site and resource. Used by panglia monitor in TRIUMF
0420 
0421     args:
0422        time_window: to count number of jobs that finish/failed/cancelled for last N minutes. 12*60 by default
0423     returns:
0424         status code
0425               0: communication succeeded to the panda server
0426               255: communication failure
0427         map of the number jobs per job status in each site and resource
0428 
0429     """
0430 
0431     http_client = HttpClientV1()
0432     url = f"{api_url_ssl_v1}/statistics/job_stats_by_site_and_resource_type"
0433     data = {}
0434     if time_window:
0435         data["time_window"] = time_window
0436 
0437     status, output = http_client.get(url, data)
0438 
0439     if status != 0 or not output.get("success"):
0440         print(f"Failed to retrieve job_stats_by_site_and_resource_type. Status: {status}, Output: {output}")
0441         return status, output
0442 
0443     return 0, output.get("data")
0444 
0445 
0446 # alias the old name to the new function for backwards compatibility
0447 def getJobStatisticsPerSiteResource(timeWindow=None):
0448     return job_stats_by_site_and_resource_type(timeWindow)
0449 
0450 
0451 def job_stats_by_site_share_and_resource_type(time_window=None):
0452     """
0453     Get job statistics per site, label, and resource
0454 
0455     args:
0456        timeWindow: to count number of jobs that finish/failed/cancelled for last N minutes. 12*60 by default
0457     returns:
0458         status code
0459               0: communication succeeded to the panda server
0460               255: communication failure
0461         map of the number jobs per job status in each site and resource
0462 
0463     """
0464 
0465     http_client = HttpClientV1()
0466     url = f"{api_url_ssl_v1}/statistics/job_stats_by_site_share_and_resource_type"
0467     data = {}
0468     if time_window:
0469         data["time_window"] = time_window
0470 
0471     status, output = http_client.get(url, data)
0472 
0473     if status != 0 or not output.get("success"):
0474         print(f"Failed to retrieve job_stats_by_site_share_and_resource_type. Status: {status}, Output: {output}")
0475         return status, output
0476 
0477     return 0, output.get("data")
0478 
0479 
0480 # alias the old name to the new function for backwards compatibility
0481 def get_job_statistics_per_site_label_resource(time_window=None):
0482     return job_stats_by_site_share_and_resource_type(time_window)
0483 
0484 
0485 def get_site_specs(site_type=None):
0486     """
0487     Get list of site specifications. Used by panglia monitor in TRIUMF
0488 
0489     args:
0490         site_type: type of sites
0491             all: all sites
0492             None: defaults to analysis sites
0493             analysis: analysis sites
0494             production: production sites
0495             unified: unified sites
0496     returns:
0497         status code
0498               0: communication succeeded to the panda server
0499               255: communication failure
0500         map of site and attributes
0501 
0502     """
0503     http_client = HttpClientV1()
0504     url = f"{api_url_ssl_v1}/metaconfig/get_site_specs"
0505     if site_type not in [None, "all", "analysis", "production", "unified"]:
0506         return EC_Failed, "Invalid site type"
0507 
0508     data = {}
0509     if site_type:
0510         data = {"type": site_type}
0511 
0512     status, output = http_client.get(url, data)
0513 
0514     if status != 0 or not output.get("success"):
0515         print(f"Failed to retrieve get_site_specs. Status: {status}, Output: {output}")
0516         return status, output
0517 
0518     return 0, output.get("data")
0519 
0520 
0521 # alias the old name to the new function for backwards compatibility
0522 def getSiteSpecs(siteType=None):
0523     return get_site_specs(siteType)
0524 
0525 
0526 def register_cache_file(user_name: str, file_name: str, file_size: int, checksum: str):
0527     """
0528     Register information about the input sandbox that is being stored in PanDA cache
0529 
0530     args:
0531         user_name: the name of the user
0532         file_name: the file name
0533         file_size: the file size
0534         checksum: md5sum of the file
0535     returns:
0536         status code
0537               0: communication succeeded to the panda server
0538               else: communication failure
0539 
0540     """
0541     http_client = HttpClientV1()
0542 
0543     url = f"{api_url_ssl_v1}/file_server/register_cache_file"
0544 
0545     data = {
0546         "user_name": user_name,
0547         "file_name": file_name,
0548         "file_size": file_size,
0549         "checksum": str(checksum),
0550     }
0551     return http_client.post(url, data)
0552 
0553 
0554 def put_file(file):
0555     """
0556     Upload input sandbox to PanDA cache
0557 
0558     args:
0559         file: the file name
0560     returns:
0561         status code
0562               0: communication succeeded to the panda server
0563               else: communication failure
0564 
0565     """
0566 
0567     http_client = HttpClientV1()
0568     url = f"{api_url_ssl_v1}/file_server/upload_cache_file"
0569     data = {"file": file}
0570     return http_client.post_files(url, data)
0571 
0572 
0573 def touch_file(source_url, file_name):
0574     http_client = HttpClientV1()
0575     # Note the special construction of the URL here, since it is not going through the api_url_ssl_v1,
0576     # but directly to the source_url pointing at the concrete instance provided
0577     url = f"{source_url}/api/v1/file_server/touch_cache_file"
0578     data = {"file_name": file_name}
0579     return http_client.post(url, data)
0580 
0581 
0582 def insertTaskParams(taskParams):
0583     """
0584     Insert task parameters
0585 
0586     args:
0587         taskParams: a dictionary of task parameters
0588     returns:
0589         status code
0590               0: communication succeeded to the panda server
0591               255: communication failure
0592         tuple of return code and JediTaskID
0593               True: request is processed
0594               False: not processed
0595     """
0596     # serialize
0597     taskParamsStr = json.dumps(taskParams)
0598 
0599     http_client = HttpClient()
0600 
0601     # execute
0602     url = f"{baseURLSSL}/insertTaskParams"
0603     data = {"taskParams": taskParamsStr}
0604     status, output = http_client.post(url, data)
0605     try:
0606         return status, pickle_loads(output)
0607     except Exception:
0608         error_type, error_value = sys.exc_info()[:2]
0609         error_str = f"ERROR insertTaskParams : {error_type} {error_value}"
0610         return EC_Failed, f"{output}\n{error_str}"
0611 
0612 
0613 def kill_task(task_id, broadcast=False):
0614     """
0615     Kill a task
0616 
0617     args:
0618         task_id: task ID of the task to be killed
0619         broadcast: True to push the message to the pilot subscribing the MB
0620     returns:
0621         status code
0622               0: communication succeeded to the panda server
0623               255: communication failure
0624         tuple of return code and diagnostic message
0625               0: request is registered
0626               1: server error
0627               2: task not found
0628               3: permission denied
0629               4: irrelevant task status
0630             100: non SSL connection
0631             101: irrelevant taskID
0632     """
0633 
0634     http_client = HttpClientV1()
0635 
0636     url = f"{api_url_ssl_v1}/task/kill"
0637     data = {"task_id": task_id, "broadcast": broadcast}
0638 
0639     status, output = http_client.post(url, data)
0640 
0641     return status, output
0642 
0643 
0644 def finish_task(task_id, soft=False, broadcast=False):
0645     """
0646     Finish a task
0647 
0648     args:
0649         jediTaskID: jediTaskID of the task to be finished
0650         soft: If True, new jobs are not generated and the task is
0651               finished once all remaining jobs are done.
0652               If False, all remaining jobs are killed and then the
0653               task is finished
0654         broadcast: True to push the message to the pilot subscribing the MB
0655     returns:
0656         status code
0657               0: communication succeeded to the panda server
0658               255: communication failure
0659         tuple of return code and diagnostic message
0660               0: request is registered
0661               1: server error
0662               2: task not found
0663               3: permission denied
0664               4: irrelevant task status
0665             100: non SSL connection
0666             101: irrelevant taskID
0667     """
0668     http_client = HttpClientV1()
0669 
0670     url = f"{api_url_ssl_v1}/task/finish"
0671     data = {"task_id": task_id, "soft": soft, "broadcast": broadcast}
0672 
0673     status, output = http_client.post(url, data)
0674 
0675     return status, output
0676 
0677 
0678 def reassignTaskToSite(jediTaskID, site, mode=None):
0679     """
0680     Reassign a task to a site. Existing jobs are killed and new jobs are generated at the site
0681 
0682     args:
0683         jediTaskID: jediTaskID of the task to be reassigned
0684         site: the site name where the task is reassigned
0685         mode: If soft, only defined/waiting/assigned/activated jobs are killed. If nokill, no jobs are killed. All jobs are killed by default.
0686     returns:
0687         status code
0688               0: communication succeeded to the panda server
0689               255: communication failure
0690         tuple of return code and diagnostic message
0691               0: request is registered
0692               1: server error
0693               2: task not found
0694               3: permission denied
0695               4: irrelevant task status
0696             100: non SSL connection
0697             101: irrelevant taskID
0698     """
0699     maxSite = 60
0700     if site is not None and len(site) > maxSite:
0701         return EC_Failed, f"site parameter is too long > {maxSite}chars"
0702 
0703     http_client = HttpClient()
0704 
0705     # execute
0706     url = f"{baseURLSSL}/reassignTask"
0707     data = {"jediTaskID": jediTaskID, "site": site}
0708     if mode is not None:
0709         data["mode"] = mode
0710     status, output = http_client.post(url, data)
0711     try:
0712         return status, pickle_loads(output)
0713     except Exception:
0714         error_type, error_value = sys.exc_info()[:2]
0715         error_str = f"ERROR reassignTaskToSite : {error_type} {error_value}"
0716         return EC_Failed, f"{output}\n{error_str}"
0717 
0718 
0719 def reassignTaskToCloud(jediTaskID, cloud, mode=None):
0720     """
0721     Reassign a task to a cloud. Existing jobs are killed and new jobs are generated in the cloud
0722 
0723     args:
0724         jediTaskID: jediTaskID of the task to be reassigned
0725         cloud: the cloud name where the task is reassigned
0726         mode: If soft, only defined/waiting/assigned/activated jobs are killed. If nokill, no jobs are killed. All jobs are killed by default.
0727     returns:
0728         status code
0729               0: communication succeeded to the panda server
0730               255: communication failure
0731         tuple of return code and diagnostic message
0732               0: request is registered
0733               1: server error
0734               2: task not found
0735               3: permission denied
0736               4: irrelevant task status
0737             100: non SSL connection
0738             101: irrelevant taskID
0739     """
0740 
0741     http_client = HttpClient()
0742 
0743     # execute
0744     url = f"{baseURLSSL}/reassignTask"
0745     data = {"jediTaskID": jediTaskID, "cloud": cloud}
0746     if mode is not None:
0747         data["mode"] = mode
0748     status, output = http_client.post(url, data)
0749     try:
0750         return status, pickle_loads(output)
0751     except Exception:
0752         error_type, error_value = sys.exc_info()[:2]
0753         error_str = f"ERROR reassignTaskToCloud : {error_type} {error_value}"
0754         return EC_Failed, f"{output}\n{error_str}"
0755 
0756 
0757 def reassignTaskToNucleus(jediTaskID, nucleus, mode=None):
0758     """
0759     Reassign a task to a nucleus. Existing jobs are killed and new jobs are generated in the cloud
0760 
0761     args:
0762         jediTaskID: jediTaskID of the task to be reassigned
0763         nucleus: the nucleus name where the task is reassigned
0764         mode: If soft, only defined/waiting/assigned/activated jobs are killed. If nokill, no jobs are killed. All jobs are killed by default.
0765     returns:
0766         status code
0767               0: communication succeeded to the panda server
0768               255: communication failure
0769         tuple of return code and diagnostic message
0770               0: request is registered
0771               1: server error
0772               2: task not found
0773               3: permission denied
0774               4: irrelevant task status
0775             100: non SSL connection
0776             101: irrelevant taskID
0777     """
0778 
0779     http_client = HttpClient()
0780 
0781     # execute
0782     url = f"{baseURLSSL}/reassignTask"
0783     data = {"jediTaskID": jediTaskID, "nucleus": nucleus}
0784     if mode is not None:
0785         data["mode"] = mode
0786     status, output = http_client.post(url, data)
0787     try:
0788         return status, pickle_loads(output)
0789     except Exception:
0790         error_type, error_value = sys.exc_info()[:2]
0791         error_str = f"ERROR reassignTaskToCloud : {error_type} {error_value}"
0792         return EC_Failed, f"{output}\n{error_str}"
0793 
0794 
0795 def uploadLog(log_string, log_file_name):
0796     """
0797     Upload log
0798 
0799     args:
0800         logStr: log message
0801         log_file_name: name of log file
0802     returns:
0803         status code
0804               0: communication succeeded to the panda server
0805               else: communication failure
0806 
0807     """
0808 
0809     http_client = HttpClientV1()
0810 
0811     # write log to a tmp file
0812     fh = tempfile.NamedTemporaryFile(delete=False)
0813     gfh = gzip.open(fh.name, mode="wb")
0814     if sys.version_info[0] >= 3:
0815         log_string = log_string.encode("utf-8")
0816     gfh.write(log_string)
0817     gfh.close()
0818     # execute
0819     url = f"{api_url_ssl_v1}/file_server/upload_jedi_log"
0820 
0821     # sometimes the destination file name (=logFileName) comes as an integer (e.g. a JEDI task ID) and it needs to be converted to a string
0822     log_file_name = str(log_file_name)
0823     data = {"file": (log_file_name, fh.name)}
0824     return_value = http_client.post_files(url, data, encoding="gzip")
0825     os.unlink(fh.name)
0826     return return_value
0827 
0828 
0829 def changeTaskPriority(jediTaskID, newPriority):
0830     """
0831     Change the task priority
0832 
0833     args:
0834         jediTaskID: jediTaskID of the task to change the priority
0835         newPriority: new task priority
0836     returns:
0837         status code
0838               0: communication succeeded to the panda server
0839               255: communication failure
0840         return code
0841               0: unknown task
0842               1: succeeded
0843               None: database error
0844     """
0845 
0846     http_client = HttpClient()
0847 
0848     # execute
0849     url = f"{baseURLSSL}/changeTaskPriority"
0850     data = {"jediTaskID": jediTaskID, "newPriority": newPriority}
0851     status, output = http_client.post(url, data)
0852     try:
0853         return status, pickle_loads(output)
0854     except Exception:
0855         error_type, error_value = sys.exc_info()[:2]
0856         error_str = f"ERROR changeTaskPriority : {error_type} {error_value}"
0857         return EC_Failed, f"{output}\n{error_str}"
0858 
0859 
0860 def set_debug_mode(job_id, mode):
0861     """
0862     Turn debug mode for a job on/off
0863 
0864     args:
0865         job_id: job_id of the job
0866         mode: True to turn it on. Oppositely, False
0867     returns:
0868         status code
0869               0: communication succeeded to the panda server
0870               another: communication failure
0871         error message
0872     """
0873 
0874     http_client = HttpClientV1()
0875 
0876     url = f"{api_url_ssl_v1}/job/set_debug_mode"
0877     data = {"job_id": job_id, "mode": mode}
0878 
0879     status, output = http_client.post(url, data)
0880 
0881     return status, output
0882 
0883 
0884 def retry_task(task_id, no_child_retry=False, discard_events=False, disable_staging_mode=False, keep_gshare_priority=False):
0885     """
0886     Retry a task
0887 
0888     args:
0889         task_id: jediTaskID of the task to retry
0890         no_child_retry: True not to retry child tasks
0891         discard_events: discard events
0892         disable_staging_mode: disable staging mode
0893         keep_gshare_priority: keep current gshare and priority
0894     returns:
0895         status code
0896               0: communication succeeded to the panda server
0897               255: communication failure
0898         tuple of return code and diagnostic message
0899               0: request is registered
0900               1: server error
0901               2: task not found
0902               3: permission denied
0903               4: irrelevant task status
0904             100: non SSL connection
0905             101: irrelevant taskID
0906     """
0907 
0908     http_client = HttpClientV1()
0909     url = f"{api_url_ssl_v1}/task/retry"
0910 
0911     data = {"task_id": task_id}
0912     if no_child_retry:
0913         data["no_child_retry"] = True
0914     if discard_events:
0915         data["discard_events"] = True
0916     if disable_staging_mode:
0917         data["disable_staging_mode"] = True
0918     if keep_gshare_priority:
0919         data["keep_gshare_priority"] = True
0920 
0921     status, output = http_client.post(url, data)
0922     return status, output
0923 
0924 
0925 def reload_input(task_id):
0926     """
0927     Reload the input for a task
0928 
0929     args:
0930         jediTaskID: jediTaskID of the task to retry
0931     returns:
0932         status code
0933               0: communication succeeded to the panda server
0934               255: communication failure
0935         tuple of return code and diagnostic message
0936               0: request is registered
0937               1: server error
0938               2: task not found
0939               3: permission denied
0940               4: irrelevant task status
0941             100: non SSL connection
0942             101: irrelevant taskID
0943     """
0944     http_client = HttpClientV1()
0945 
0946     url = f"{api_url_ssl_v1}/task/reload_input"
0947     data = {"task_id": task_id}
0948 
0949     status, output = http_client.post(url, data)
0950 
0951     return status, output
0952 
0953 
0954 def changeTaskWalltime(jediTaskID, wallTime):
0955     """
0956     Change task walltime
0957 
0958     args:
0959         jediTaskID: jediTaskID of the task to change the priority
0960         wallTime: new walltime for the task
0961     returns:
0962         status code
0963               0: communication succeeded to the panda server
0964               255: communication failure
0965         return code
0966               0: unknown task
0967               1: succeeded
0968               None: database error
0969     """
0970 
0971     http_client = HttpClient()
0972 
0973     # execute
0974     url = f"{baseURLSSL}/changeTaskAttributePanda"
0975     data = {"jediTaskID": jediTaskID, "attrName": "wallTime", "attrValue": wallTime}
0976     status, output = http_client.post(url, data)
0977     try:
0978         return status, pickle_loads(output)
0979     except Exception:
0980         error_type, error_value = sys.exc_info()[:2]
0981         error_str = f"ERROR changeTaskWalltime : {error_type} {error_value}"
0982         return EC_Failed, f"{output}\n{error_str}"
0983 
0984 
0985 def changeTaskCputime(jediTaskID, cpuTime):
0986     """
0987     Change task CPU time
0988 
0989     args:
0990         jediTaskID: jediTaskID of the task to change the priority
0991         cpuTime: new cputime for the task
0992     returns:
0993         status code
0994               0: communication succeeded to the panda server
0995               255: communication failure
0996         return code
0997               0: unknown task
0998               1: succeeded
0999               None: database error
1000     """
1001 
1002     http_client = HttpClient()
1003 
1004     # execute
1005     url = f"{baseURLSSL}/changeTaskAttributePanda"
1006     data = {"jediTaskID": jediTaskID, "attrName": "cpuTime", "attrValue": cpuTime}
1007     status, output = http_client.post(url, data)
1008     try:
1009         return status, pickle_loads(output)
1010     except Exception:
1011         error_type, error_value = sys.exc_info()[:2]
1012         error_str = f"ERROR changeTaskCputime : {error_type} {error_value}"
1013         return EC_Failed, f"{output}\n{error_str}"
1014 
1015 
1016 def changeTaskRamCount(jediTaskID, ramCount):
1017     """
1018     Change task RAM count
1019 
1020     args:
1021         jediTaskID: jediTaskID of the task to change the priority
1022         ramCount: new ramCount for the task
1023     returns:
1024         status code
1025               0: communication succeeded to the panda server
1026               255: communication failure
1027         return code
1028               0: unknown task
1029               1: succeeded
1030               None: database error
1031     """
1032 
1033     http_client = HttpClient()
1034 
1035     # execute
1036     url = f"{baseURLSSL}/changeTaskAttributePanda"
1037     data = {"jediTaskID": jediTaskID, "attrName": "ramCount", "attrValue": ramCount}
1038     status, output = http_client.post(url, data)
1039     try:
1040         return status, pickle_loads(output)
1041     except Exception:
1042         error_type, error_value = sys.exc_info()[:2]
1043         error_str = f"ERROR changeTaskRamCount : {error_type} {error_value}"
1044         return EC_Failed, f"{output}\n{error_str}"
1045 
1046 
1047 def changeTaskAttribute(jediTaskID, attrName, attrValue):
1048     """
1049     Change task attribute
1050 
1051     args:
1052         jediTaskID: jediTaskID of the task to change the attribute
1053         attrName: attribute name
1054         attrValue: new value for the attribute
1055     returns:
1056         status code
1057               0: communication succeeded to the panda server
1058               255: communication failure
1059         return: a tuple of return code and message
1060               0: unknown task
1061               1: succeeded
1062               2: disallowed to update the attribute
1063               None: database error
1064     """
1065 
1066     http_client = HttpClient()
1067 
1068     # execute
1069     url = f"{baseURLSSL}/changeTaskAttributePanda"
1070     data = {"jediTaskID": jediTaskID, "attrName": attrName, "attrValue": attrValue}
1071     status, output = http_client.post(url, data)
1072     try:
1073         return status, pickle_loads(output)
1074     except Exception:
1075         error_type, error_value = sys.exc_info()[:2]
1076         error_str = f"ERROR changeTaskAttributePanda : {error_type} {error_value}"
1077         return EC_Failed, f"{output}\n{error_str}"
1078 
1079 
1080 def changeTaskSplitRule(jediTaskID, ruleName, ruleValue):
1081     """
1082     Change split rule fo task
1083 
1084     args:
1085         jediTaskID: jediTaskID of the task to change the rule
1086         ruleName: rule name
1087         ruleValue: new value for the rule
1088     returns:
1089         status code
1090               0: communication succeeded to the panda server
1091               255: communication failure
1092         return: a tuple of return code and message
1093               0: unknown task
1094               1: succeeded
1095               2: disallowed to update the attribute
1096               None: database error
1097     """
1098 
1099     http_client = HttpClient()
1100 
1101     # execute
1102     url = f"{baseURLSSL}/changeTaskSplitRulePanda"
1103     data = {"jediTaskID": jediTaskID, "attrName": ruleName, "attrValue": ruleValue}
1104     status, output = http_client.post(url, data)
1105     try:
1106         return status, pickle_loads(output)
1107     except Exception:
1108         error_type, error_value = sys.exc_info()[:2]
1109         error_str = f"ERROR changeTaskSplitRule : {error_type} {error_value}"
1110         return EC_Failed, f"{output}\n{error_str}"
1111 
1112 
1113 def pauseTask(jediTaskID):
1114     """
1115     Pause task
1116 
1117     args:
1118         jediTaskID: jediTaskID of the task to pause
1119     returns:
1120         status code
1121               0: communication succeeded to the panda server
1122               255: communication failure
1123         tuple of return code and diagnostic message
1124               0: request is registered
1125               1: server error
1126               2: task not found
1127               3: permission denied
1128               4: irrelevant task status
1129             100: non SSL connection
1130             101: irrelevant taskID
1131     """
1132 
1133     http_client = HttpClient()
1134 
1135     # execute
1136     url = f"{baseURLSSL}/pauseTask"
1137     data = {"jediTaskID": jediTaskID}
1138     status, output = http_client.post(url, data)
1139     try:
1140         return status, pickle_loads(output)
1141     except Exception:
1142         error_type, error_value = sys.exc_info()[:2]
1143         error_str = f"ERROR pauseTask : {error_type} {error_value}"
1144         return EC_Failed, f"{output}\n{error_str}"
1145 
1146 
1147 def resumeTask(jediTaskID):
1148     """
1149     Resume task
1150 
1151     args:
1152         jediTaskID: jediTaskID of the task to release
1153     returns:
1154         status code
1155               0: communication succeeded to the panda server
1156               255: communication failure
1157         tuple of return code and diagnostic message
1158               0: request is registered
1159               1: server error
1160               2: task not found
1161               3: permission denied
1162               4: irrelevant task status
1163             100: non SSL connection
1164             101: irrelevant taskID
1165     """
1166 
1167     http_client = HttpClient()
1168 
1169     # execute
1170     url = f"{baseURLSSL}/resumeTask"
1171     data = {"jediTaskID": jediTaskID}
1172     status, output = http_client.post(url, data)
1173     try:
1174         return status, pickle_loads(output)
1175     except Exception:
1176         error_type, error_value = sys.exc_info()[:2]
1177         error_str = f"ERROR resumeTask : {error_type} {error_value}"
1178         return EC_Failed, f"{output}\n{error_str}"
1179 
1180 
1181 def avalancheTask(jediTaskID):
1182     """
1183     Force avalanche for task
1184 
1185     args:
1186         jediTaskID: jediTaskID of the task to avalanche
1187     returns:
1188         status code
1189               0: communication succeeded to the panda server
1190               255: communication failure
1191         tuple of return code and diagnostic message
1192               0: request is registered
1193               1: server error
1194               2: task not found
1195               3: permission denied
1196               4: irrelevant task status
1197             100: non SSL connection
1198             101: irrelevant taskID
1199     """
1200 
1201     http_client = HttpClient()
1202 
1203     # execute
1204     url = f"{baseURLSSL}/avalancheTask"
1205     data = {"jediTaskID": jediTaskID}
1206     status, output = http_client.post(url, data)
1207     try:
1208         return status, pickle_loads(output)
1209     except Exception:
1210         error_type, error_value = sys.exc_info()[:2]
1211         error_str = f"ERROR resumeTask : {error_type} {error_value}"
1212         return EC_Failed, f"{output}\n{error_str}"
1213 
1214 
1215 def increaseAttemptNr(jediTaskID, increase):
1216     """
1217     Change task priority
1218 
1219     args:
1220         jediTaskID: jediTaskID of the task to increase attempt numbers
1221         increase: increase for attempt numbers
1222     returns:
1223         status code
1224               0: communication succeeded to the panda server
1225               255: communication failure
1226         return code
1227               0: succeeded
1228               1: unknown task
1229               2: invalid task status
1230               3: permission denied
1231               4: wrong parameter
1232               None: database error
1233     """
1234 
1235     http_client = HttpClient()
1236 
1237     # execute
1238     url = f"{baseURLSSL}/increaseAttemptNrPanda"
1239     data = {"jediTaskID": jediTaskID, "increasedNr": increase}
1240     status, output = http_client.post(url, data)
1241     try:
1242         return status, pickle_loads(output)
1243     except Exception:
1244         error_type, error_value = sys.exc_info()[:2]
1245         error_str = f"ERROR increaseAttemptNr : {error_type} {error_value}"
1246         return EC_Failed, f"{output}\n{error_str}"
1247 
1248 
1249 def killUnfinishedJobs(jediTaskID, code=None, useMailAsID=False):
1250     """
1251     Kill unfinished jobs in a task. Normal users can kill only their own jobs.
1252     People with production VOMS role can kill any jobs.
1253     Running jobs are killed when next heartbeat comes from the pilot.
1254     Set code=9 if running jobs need to be killed immediately.
1255 
1256        args:
1257            jediTaskID: the taskID of the task
1258            code: specify why the jobs are killed
1259                  2: expire
1260                  3: aborted
1261                  4: expire in waiting
1262                  7: retry by server
1263                  8: re-brokerage
1264                  9: force kill
1265                  50: kill by JEDI
1266                  91: kill user jobs with prod role
1267            useMailAsID: obsolete
1268        returns:
1269            status code
1270                  0: communication succeeded to the panda server
1271                  255: communication failure
1272            the list of clouds (or Nones if tasks are not yet assigned)
1273     """
1274 
1275     http_client = HttpClient()
1276 
1277     # execute
1278     url = f"{baseURLSSL}/killUnfinishedJobs"
1279     data = {"jediTaskID": jediTaskID, "code": code, "useMailAsID": useMailAsID}
1280     status, output = http_client.post(url, data)
1281     try:
1282         return status, pickle_loads(output)
1283     except Exception:
1284         error_type, error_value, _ = sys.exc_info()
1285         error_str = f"ERROR killUnfinishedJobs : {error_type} {error_value}"
1286         print(error_str)
1287         return EC_Failed, f"{output}\n{error_str}"
1288 
1289 
1290 def triggerTaskBrokerage(jediTaskID):
1291     """
1292     Trigger task brokerage
1293 
1294     args:
1295         jediTaskID: jediTaskID of the task to change the attribute
1296     returns:
1297         status code
1298               0: communication succeeded to the panda server
1299               255: communication failure
1300         return: a tuple of return code and message
1301               0: unknown task
1302               1: succeeded
1303               None: database error
1304     """
1305 
1306     http_client = HttpClient()
1307 
1308     # execute
1309     url = f"{baseURLSSL}/changeTaskModTimePanda"
1310     data = {"jediTaskID": jediTaskID, "diffValue": -12}
1311     status, output = http_client.post(url, data)
1312     try:
1313         return status, pickle_loads(output)
1314     except Exception:
1315         error_type, error_value = sys.exc_info()[:2]
1316         error_str = f"ERROR triggerTaskBrokerage : {error_type} {error_value}"
1317         return EC_Failed, f"{output}\n{error_str}"
1318 
1319 
1320 def getPandaIDsWithTaskID(jediTaskID):
1321     """
1322     Get PanDA IDs with TaskID
1323 
1324     args:
1325         jediTaskID: jediTaskID of the task to get lit of PanDA IDs
1326     returns:
1327         status code
1328               0: communication succeeded to the panda server
1329               255: communication failure
1330         the list of PanDA IDs
1331     """
1332 
1333     http_client = HttpClient()
1334     # execute
1335     url = f"{baseURL}/getPandaIDsWithTaskID"
1336     data = {"jediTaskID": jediTaskID}
1337     status, output = http_client.post(url, data)
1338     try:
1339         return status, pickle_loads(output)
1340     except Exception:
1341         error_type, error_value, _ = sys.exc_info()
1342         error_str = f"ERROR getPandaIDsWithTaskID : {error_type} {error_value}"
1343         print(error_str)
1344         return EC_Failed, f"{output}\n{error_str}"
1345 
1346 
1347 def reactivateTask(jediTaskID, keep_attempt_nr=False, trigger_job_generation=False):
1348     """
1349     Reactivate task
1350 
1351     args:
1352         jediTaskID: jediTaskID of the task to be reactivated
1353         keep_attempt_nr: not to reset attempt numbers when being reactivated
1354         trigger_job_generation: trigger job generation once being reactivated
1355     returns:
1356         status code
1357               0: communication succeeded to the panda server
1358               255: communication failure
1359         return: a tuple of return code and message
1360               0: unknown task
1361               1: succeeded
1362               None: database error
1363     """
1364 
1365     http_client = HttpClient()
1366 
1367     # execute
1368     url = f"{baseURLSSL}/reactivateTask"
1369     data = {"jediTaskID": jediTaskID}
1370     if keep_attempt_nr:
1371         data["keep_attempt_nr"] = True
1372     if trigger_job_generation:
1373         data["trigger_job_generation"] = True
1374     status, output = http_client.post(url, data)
1375     try:
1376         return status, pickle_loads(output)
1377     except Exception:
1378         error_type, error_value = sys.exc_info()[:2]
1379         error_str = f"ERROR reactivateTask : {error_type} {error_value}"
1380         return EC_Failed, f"{output}\n{error_str}"
1381 
1382 
1383 def getTaskStatus(jediTaskID):
1384     """
1385     Get task status for a particular task ID
1386 
1387     args:
1388         jediTaskID: jediTaskID of the task to get lit of PanDA IDs
1389     returns:
1390         status code
1391               0: communication succeeded to the panda server
1392               255: communication failure
1393         the status string
1394     """
1395 
1396     http_client = HttpClient()
1397     # execute
1398     url = f"{baseURL}/getTaskStatus"
1399     data = {"jediTaskID": jediTaskID}
1400     status, output = http_client.post(url, data)
1401     try:
1402         return status, pickle_loads(output)
1403     except Exception:
1404         error_type, error_value, _ = sys.exc_info()
1405         error_str = f"ERROR getTaskStatus : {error_type} {error_value}"
1406         print(error_str)
1407         return EC_Failed, f"{output}\n{error_str}"
1408 
1409 
1410 def reassignShare(jedi_task_ids, share, reassign_running=False):
1411     """
1412     Reassign specified tasks (and their jobs) to a new share
1413 
1414     args:
1415         jedi_task_ids: task ids to act on
1416         share: share to be applied to jedi task ids
1417     returns:
1418         status code
1419               0: communication succeeded to the panda server
1420               255: communication failure
1421         return: a tuple of return code and message
1422               1: logical error
1423               0: success
1424               None: database error
1425     """
1426 
1427     http_client = HttpClient()
1428 
1429     jedi_task_ids_pickle = pickle_dumps(jedi_task_ids)
1430     change_running_pickle = pickle_dumps(reassign_running)
1431     # execute
1432     url = f"{baseURLSSL}/reassignShare"
1433     data = {
1434         "jedi_task_ids_pickle": jedi_task_ids_pickle,
1435         "share": share,
1436         "reassign_running": change_running_pickle,
1437     }
1438     status, output = http_client.post(url, data)
1439 
1440     try:
1441         return status, pickle_loads(output)
1442     except Exception:
1443         error_type, error_value = sys.exc_info()[:2]
1444         error_str = f"ERROR reassignShare : {error_type} {error_value}"
1445         return EC_Failed, f"{output}\n{error_str}"
1446 
1447 
1448 def getTaskParamsMap(jediTaskID):
1449     """
1450     Get task parameter map for a certain task ID
1451 
1452     args:
1453         jediTaskID: jediTaskID of the task to get taskParamsMap
1454     returns:
1455         status code
1456               0: communication succeeded to the panda server
1457               255: communication failure
1458         return: a tuple of return code and taskParamsMap
1459               1: logical error
1460               0: success
1461               None: database error
1462     """
1463 
1464     http_client = HttpClient()
1465     # execute
1466     url = f"{baseURL}/getTaskParamsMap"
1467     data = {"jediTaskID": jediTaskID}
1468     status, output = http_client.post(url, data)
1469     try:
1470         return status, pickle_loads(output)
1471     except Exception:
1472         error_type, error_value, _ = sys.exc_info()
1473         error_str = f"ERROR getTaskParamsMap : {error_type} {error_value}"
1474         print(error_str)
1475         return EC_Failed, f"{output}\n{error_str}"
1476 
1477 
1478 def setNumSlotsForWP(pandaQueueName, numSlots, gshare=None, resourceType=None, validPeriod=None):
1479     """
1480     Set num slots for workload provisioning
1481 
1482     args:
1483         pandaQueueName: Panda Queue name
1484         numSlots: the number of slots. 0 to dynamically set based on the number of starting jobs
1485         gshare: global share. None to set for any global share (default)
1486         resourceType: resource type. None to set for any resource type (default)
1487         validPeriod: How long the rule is valid in days. None if no expiration (default)
1488     returns:
1489         status code
1490               0: communication succeeded to the panda server
1491               255: communication failure
1492         tuple of return code and diagnostic message
1493               0: succeeded
1494               1: server error
1495             100: non SSL connection
1496             101: missing production role
1497             102: type error for some parameters
1498     """
1499 
1500     http_client = HttpClient()
1501 
1502     # execute
1503     url = f"{baseURLSSL}/setNumSlotsForWP"
1504     data = {"pandaQueueName": pandaQueueName, "numSlots": numSlots}
1505     if gshare is not None:
1506         data["gshare"] = gshare
1507     if resourceType is not None:
1508         data["resourceType"] = resourceType
1509     if validPeriod is not None:
1510         data["validPeriod"] = validPeriod
1511     status, output = http_client.post(url, data)
1512     try:
1513         return status, json.loads(output)
1514     except Exception:
1515         error_type, error_value = sys.exc_info()[:2]
1516         error_str = f"ERROR setNumSlotsForWP : {error_type} {error_value}"
1517         return EC_Failed, f"{output}\n{error_str}"
1518 
1519 
1520 # enable jumbo jobs
1521 def enableJumboJobs(jediTaskID, totalJumboJobs=1, nJumboPerSite=1):
1522     """
1523     Enable jumbo jobs for a task
1524 
1525     args:
1526         jediTaskID: jediTaskID of the task
1527         totalJumboJobs: The total number of active jumbo jobs produced for the task. Use 0 to disable jumbo jobs for the task
1528         nJumboPerSite: The number of active jumbo jobs per site
1529     returns:
1530         status code
1531               0: communication succeeded to the panda server
1532               255: communication failure
1533         tuple of return code and diagnostic message
1534               0: succeeded
1535               1: server error
1536             100: non SSL connection
1537             101: missing production role
1538             102: type error for some parameters
1539     """
1540 
1541     http_client = HttpClient()
1542 
1543     # execute
1544     url = f"{baseURLSSL}/enableJumboJobs"
1545     data = {
1546         "jediTaskID": jediTaskID,
1547         "nJumboJobs": totalJumboJobs,
1548         "nJumboPerSite": nJumboPerSite,
1549     }
1550     status, output = http_client.post(url, data)
1551     try:
1552         return status, json.loads(output)
1553     except Exception:
1554         error_type, error_value = sys.exc_info()[:2]
1555         error_str = f"ERROR /enableJumboJobs : {error_type} {error_value}"
1556         return EC_Failed, f"{output}\n{error_str}"
1557 
1558 
1559 def sweepPQ(panda_queue, status_list, ce_list, submission_host_list):
1560     """
1561     Send a harvester command to panda server in order sweep a panda queue
1562 
1563     args:
1564         panda_queue: panda queue name
1565         status_list: list with statuses to sweep, e.g. ['submitted']
1566         ce_list: list of CEs belonging to the site or 'ALL'
1567         submission_host_list: list of submission hosts this applies or 'ALL'
1568     returns:
1569         status code
1570               0: communication succeeded to the panda server
1571               255: communication failure
1572         return: a tuple of return code and message
1573               False: logical error
1574               True: success
1575     """
1576 
1577     http_client = HttpClient()
1578 
1579     panda_queue_json = json.dumps(panda_queue)
1580     status_list_json = json.dumps(status_list)
1581     ce_list_json = json.dumps(ce_list)
1582     submission_host_list_json = json.dumps(submission_host_list)
1583 
1584     # execute
1585     url = f"{baseURLSSL}/sweepPQ"
1586     data = {
1587         "panda_queue": panda_queue_json,
1588         "status_list": status_list_json,
1589         "ce_list": ce_list_json,
1590         "submission_host_list": submission_host_list_json,
1591     }
1592     status, output = http_client.post(url, data)
1593 
1594     try:
1595         return status, json.loads(output)
1596     except Exception:
1597         error_type, error_value = sys.exc_info()[:2]
1598         error_str = f"ERROR sweepPQ : {error_type} {error_value}"
1599         return EC_Failed, f"{output}\n{error_str}"
1600 
1601 
1602 def send_command_to_job(panda_id, command):
1603     """
1604     Send a command to a job
1605 
1606     args:
1607         panda_id: PandaID of the job
1608         com: a command string passed to the pilot. max 250 chars
1609     returns:
1610         status code
1611               0: communication succeeded to the panda server
1612               255: communication failure
1613         return: a tuple of return code and message
1614               False: failed
1615               True: the command received
1616     """
1617 
1618     http_client = HttpClientV1()
1619     url = f"{api_url_ssl_v1}/job/set_command"
1620     data = {"job_id": panda_id, "command": command}
1621 
1622     status, output = http_client.post(url, data)
1623 
1624     return status, output
1625 
1626 
1627 def get_banned_users():
1628     """
1629     Get list of banned users
1630 
1631     returns:
1632         status code
1633               True: communication succeeded to the panda server
1634               False: communication failure
1635 
1636 
1637     """
1638 
1639     http_client = HttpClientV1()
1640     url = f"{api_url_ssl_v1}/metaconfig/get_banned_users"
1641 
1642     status, output = http_client.get(url, {})
1643     if status != 0:
1644         return False, f"bad response: {output}"
1645 
1646     success = output["success"]
1647     message = output.get("message", "")
1648     data = output.get("data", None)
1649 
1650     if success:
1651         return True, data
1652     else:
1653         return False, f"error message: {message}"
1654 
1655 
1656 def release_task(jedi_task_id):
1657     """
1658     Release task from staging
1659 
1660     args:
1661         jedi_task_id: jediTaskID of the task to avalanche
1662     returns:
1663         status code
1664               0: communication succeeded to the panda server
1665               255: communication failure
1666         tuple of return code and diagnostic message
1667               0: request is registered
1668               1: server error
1669               2: task not found
1670               3: permission denied
1671               4: irrelevant task status
1672             100: non SSL connection
1673             101: irrelevant taskID
1674     """
1675 
1676     http_client = HttpClient()
1677 
1678     # execute
1679     url = f"{baseURLSSL}/release_task"
1680     data = {"jedi_task_id": jedi_task_id}
1681     status, output = http_client.post(url, data)
1682     try:
1683         return status, json.loads(output)
1684     except Exception as e:
1685         error_str = f"ERROR release_task : failed with {str(e)}"
1686         return EC_Failed, f"{output}\n{error_str}"