File indexing completed on 2026-04-09 08:38:46
0001 """
0002 client methods
0003 """
0004
0005 import gzip
0006 import json
0007 import os
0008 import pickle
0009 import socket
0010 import sys
0011 import tempfile
0012
0013 import requests
0014 from pandacommon.pandautils.net_utils import replace_hostname_in_url_randomly
0015
0016 from pandaserver.api.v1.http_client import HttpClient as HttpClientV1
0017 from pandaserver.api.v1.http_client import api_url_ssl as api_url_ssl_v1
0018 from pandaserver.taskbuffer.JobUtils import dump_jobs_json
0019
0020
0021 baseURL = os.environ.get("PANDA_URL", "http://pandaserver.cern.ch:25080/server/panda")
0022 baseURLSSL = os.environ.get("PANDA_URL_SSL", "https://pandaserver.cern.ch:25443/server/panda")
0023
0024 DEFAULT_CERT_PATH = "/etc/grid-security/certificates"
0025
0026
0027 EC_Failed = 255
0028
0029
0030 def is_https(url):
0031
0032 return url.startswith("https://")
0033
0034
0035 def pickle_dumps(obj):
0036
0037 return pickle.dumps(obj, protocol=0)
0038
0039
0040 def pickle_loads(obj_string):
0041 try:
0042 return pickle.loads(obj_string.encode())
0043 except Exception:
0044 return pickle.loads(obj_string)
0045
0046
0047 class HttpClient:
0048 def __init__(self):
0049
0050 if "PANDA_VERIFY_HOST" in os.environ and os.environ["PANDA_VERIFY_HOST"] == "off":
0051 self.verifyHost = False
0052 else:
0053 self.verifyHost = True
0054
0055
0056 self.compress = True
0057
0058
0059 self.ssl_certificate = self._x509()
0060 self.ssl_key = self._x509()
0061
0062 self.use_json = False
0063
0064
0065 self.oidc = os.getenv("PANDA_AUTH") == "oidc"
0066 self.auth_vo = os.getenv("PANDA_AUTH_VO") if self.oidc else None
0067 self.id_token = os.getenv("PANDA_AUTH_ID_TOKEN") if self.oidc else None
0068 if self.id_token and self.id_token.startswith("file:"):
0069 with open(self.id_token[5:], "r") as f:
0070 self.id_token = f.read().strip()
0071
0072 def _x509(self):
0073
0074 try:
0075 if "X509_USER_PROXY" in os.environ and os.access(os.environ["X509_USER_PROXY"], os.R_OK):
0076 return os.environ["X509_USER_PROXY"]
0077 except Exception:
0078 pass
0079
0080
0081 x509 = f"/tmp/x509up_u{os.getuid()}"
0082 if os.access(x509, os.R_OK):
0083 return x509
0084
0085
0086 print("No valid grid proxy certificate found")
0087 return ""
0088
0089 def _prepare_url(self, url):
0090 """Modify URL with HTTPS check and hostname replacement."""
0091 use_https = is_https(url)
0092 if "PANDA_BEHIND_REAL_LB" in os.environ:
0093 modified_url = url
0094 else:
0095 modified_url = replace_hostname_in_url_randomly(url)
0096 return modified_url, use_https
0097
0098 def _prepare_headers(self):
0099 """Prepare headers based on authentication and JSON settings."""
0100 headers = {}
0101
0102 if self.oidc:
0103 headers["Authorization"] = f"Bearer {self.id_token}"
0104 headers["Origin"] = self.auth_vo
0105
0106 if self.use_json:
0107 headers["Accept"] = "application/json"
0108
0109 return headers
0110
0111 def _prepare_ssl(self, use_https):
0112 """Prepare SSL configuration based on HTTPS usage and verification settings."""
0113 cert = None
0114 verify = True
0115
0116 if use_https:
0117
0118 if not self.oidc:
0119 cert = (self.ssl_certificate, self.ssl_key)
0120
0121
0122 if not self.verifyHost:
0123 verify = False
0124
0125 elif "X509_CERT_DIR" in os.environ and os.path.exists(os.environ["X509_CERT_DIR"]):
0126 verify = os.environ["X509_CERT_DIR"]
0127
0128 elif os.path.exists(DEFAULT_CERT_PATH):
0129 verify = DEFAULT_CERT_PATH
0130
0131 return cert, verify
0132
0133 def get(self, url, data):
0134 url, use_https = self._prepare_url(url)
0135 headers = self._prepare_headers()
0136 cert, verify = self._prepare_ssl(use_https)
0137
0138 try:
0139 response = requests.get(url, headers=headers, params=data, timeout=600, cert=cert, verify=verify)
0140 response.raise_for_status()
0141 return 0, response.text
0142 except requests.RequestException as e:
0143 return 255, str(e)
0144
0145 def post(self, url, data):
0146 url, use_https = self._prepare_url(url)
0147 headers = self._prepare_headers()
0148 cert, verify = self._prepare_ssl(use_https)
0149
0150 try:
0151 response = requests.post(url, headers=headers, data=data, timeout=600, cert=cert, verify=verify)
0152 response.raise_for_status()
0153 return 0, response.text
0154 except requests.RequestException as e:
0155 return 255, str(e)
0156
0157 def post_files(self, url, data):
0158 url, use_https = self._prepare_url(url)
0159 headers = self._prepare_headers()
0160 cert, verify = self._prepare_ssl(use_https)
0161
0162 files = {}
0163 try:
0164 for key, value in data.items():
0165 if type(data[key]) == str:
0166
0167 files[key] = open(data[key], "rb")
0168 else:
0169
0170 files[key] = (data[key][0], open(data[key][1], "rb"))
0171 response = requests.post(url, headers=headers, files=files, timeout=600, cert=cert, verify=verify)
0172 response.raise_for_status()
0173 return 0, response.text
0174 except requests.RequestException as e:
0175 return 255, str(e)
0176 finally:
0177 for file in files.values():
0178 if type(file) == tuple:
0179 file_handler = file[1]
0180 else:
0181 file_handler = file
0182 file_handler.close()
0183
0184
0185 """
0186 Client API
0187 """
0188
0189
0190 def submit_jobs(jobs):
0191 """
0192 Submit jobs
0193
0194 args:
0195 jobs: the list of JobSpecs
0196 returns:
0197 status code
0198 0: communication succeeded to the panda server
0199 255: communication failure
0200 return code
0201 True: request is processed
0202 False: not processed
0203 """
0204
0205 hostname = socket.getfqdn()
0206 for job in jobs:
0207 job.creationHost = hostname
0208
0209
0210 jobs = dump_jobs_json(jobs)
0211
0212 http_client = HttpClientV1()
0213
0214 url = f"{api_url_ssl_v1}/job/submit"
0215 data = {"jobs": jobs}
0216
0217 status, output = http_client.post(url, data)
0218 return status, output
0219
0220
0221 def get_job_status(job_ids):
0222 """
0223 Get job status
0224
0225 args:
0226 job_ids: the list of PandaIDs
0227 returns:
0228 status code
0229 0: communication succeeded to the panda server
0230 255: communication failure
0231 the list of JobSpecs (or Nones for non-existing PandaIDs)
0232 """
0233 http_client = HttpClientV1()
0234 url = f"{api_url_ssl_v1}/pilot/get_job_status"
0235 data = {"job_ids": job_ids}
0236 status, output = http_client.post(url, data)
0237
0238 return status, output
0239
0240
0241 def kill_jobs(
0242 job_ids,
0243 code=None,
0244 keep_unmerged=False,
0245 job_sub_status=None,
0246 ):
0247 """
0248 Kill jobs. Normal users can kill only their own jobs.
0249 People with production VOMS role can kill any jobs.
0250 Running jobs are killed when next heartbeat comes from the pilot.
0251 Set code=9 if running jobs need to be killed immediately.
0252
0253 args:
0254 job_ids: the list of PandaIDs
0255 code: specify why the jobs are killed
0256 2: expire
0257 3: aborted
0258 4: expire in waiting
0259 7: retry by server
0260 8: re-brokerage
0261 9: force kill
0262 10: fast re-brokerage on overloaded PQs
0263 50: kill by JEDI
0264 91: kill user jobs with prod role
0265 keep_unmerged: set True not to cancel unmerged jobs when pmerge is killed.
0266 job_sub_status: set job sub status if any
0267 returns:
0268 status code
0269 0: communication succeeded to the panda server
0270 255: communication failure
0271 the list of clouds (or Nones if tasks are not yet assigned)
0272 """
0273
0274 http_client = HttpClientV1()
0275
0276 url = f"{api_url_ssl_v1}/job/kill"
0277 data = {"job_ids": job_ids}
0278
0279 if code:
0280 data["code"] = code
0281
0282 kill_options = []
0283 if keep_unmerged == True:
0284 kill_options.append("keepUnmerged")
0285 if job_sub_status:
0286 kill_options.append(f"jobSubStatus={job_sub_status}")
0287 if kill_options:
0288 data["kill_options"] = kill_options
0289
0290 status, output = http_client.post(url, data)
0291 return status, output
0292
0293
0294 def reassign_jobs(job_ids):
0295 """
0296 Triggers reassignment of jobs.
0297
0298 args:
0299 ids: the list of taskIDs
0300 returns:
0301 status code
0302 0: communication succeeded to the panda server
0303 255: communication failure
0304 return code
0305 True: request is processed
0306 False: not processed
0307
0308 """
0309 http_client = HttpClientV1()
0310
0311 url = f"{api_url_ssl_v1}/job/reassign"
0312 data = {"job_ids": job_ids}
0313 status, output = http_client.post(url, data)
0314 return status, output
0315
0316
0317 def job_stats_by_cloud(job_type=None):
0318 """
0319 Get job statistics by cloud. Used by panglia monitor in TRIUMF
0320
0321 args:
0322 job_type: string with the type of jobs to consider
0323 analysis: analysis jobs
0324 production: production jobs
0325 returns:
0326 status code
0327 0: communication succeeded to the panda server
0328 255: communication failure
0329 map of the number jobs per job status in each site
0330
0331 """
0332 if job_type not in [None, "analysis", "production"]:
0333 print("Invalid job type, must be one of [None, 'analysis', 'production'']")
0334 return EC_Failed, "Invalid job type"
0335
0336 http_client = HttpClientV1()
0337 url = f"{api_url_ssl_v1}/statistics/job_stats_by_cloud"
0338
0339 data = {}
0340 if job_type:
0341 data["type"] = job_type
0342
0343 status, output = http_client.get(url, data)
0344 if status != 0 or not output.get("success"):
0345 print(f"Failed to retrieve job_stats_by_cloud for {job_type}. Status: {status}, Output: {output}")
0346 return status, output
0347
0348 statistics = output["data"]
0349 ret = {}
0350 for cloud, values in statistics.items():
0351 if cloud not in ret:
0352
0353 ret[cloud] = dict(values)
0354 else:
0355
0356 for job_status, count in values.items():
0357 ret[cloud][job_status] = ret[cloud].get(job_status, 0) + count
0358
0359 return 0, ret
0360
0361
0362
0363 def getJobStatistics(sourcetype=None):
0364 return job_stats_by_cloud(sourcetype)
0365
0366
0367 def production_job_stats_by_cloud_and_processing_type():
0368 """
0369 Get job statistics by cloud and processing type. Used by panglia monitor in TRIUMF
0370
0371 returns:
0372 status code
0373 0: communication succeeded to the panda server
0374 255: communication failure
0375 map of the number jobs per job status in each site
0376
0377 """
0378
0379 http_client = HttpClientV1()
0380 url = f"{api_url_ssl_v1}/statistics/production_job_stats_by_cloud_and_processing_type"
0381 status, output = http_client.get(url, {})
0382
0383 if status != 0 or not output.get("success"):
0384 print(f"Failed to retrieve production_job_stats_by_cloud_and_processing_type. Status: {status}, Output: {output}")
0385 return status, output
0386
0387 statistics = output["data"]
0388 aggregated = {}
0389
0390 for cloud, processing_type_map in statistics.items():
0391
0392 processing_type_map = dict(processing_type_map)
0393
0394 if cloud not in aggregated:
0395
0396 aggregated[cloud] = {processing_type: dict(status_map) for processing_type, status_map in processing_type_map.items()}
0397 continue
0398
0399
0400 for processing_type, status_map in processing_type_map.items():
0401 status_map = dict(status_map)
0402 if processing_type not in aggregated[cloud]:
0403 aggregated[cloud][processing_type] = status_map
0404 continue
0405
0406
0407 for status, count in status_map.items():
0408 aggregated[cloud][processing_type][status] = aggregated[cloud][processing_type].get(status, 0) + count
0409 return 0, aggregated
0410
0411
0412
0413 def getJobStatisticsForBamboo(useMorePG=False):
0414 return production_job_stats_by_cloud_and_processing_type()
0415
0416
0417 def job_stats_by_site_and_resource_type(time_window=None):
0418 """
0419 Get job statistics per site and resource. Used by panglia monitor in TRIUMF
0420
0421 args:
0422 time_window: to count number of jobs that finish/failed/cancelled for last N minutes. 12*60 by default
0423 returns:
0424 status code
0425 0: communication succeeded to the panda server
0426 255: communication failure
0427 map of the number jobs per job status in each site and resource
0428
0429 """
0430
0431 http_client = HttpClientV1()
0432 url = f"{api_url_ssl_v1}/statistics/job_stats_by_site_and_resource_type"
0433 data = {}
0434 if time_window:
0435 data["time_window"] = time_window
0436
0437 status, output = http_client.get(url, data)
0438
0439 if status != 0 or not output.get("success"):
0440 print(f"Failed to retrieve job_stats_by_site_and_resource_type. Status: {status}, Output: {output}")
0441 return status, output
0442
0443 return 0, output.get("data")
0444
0445
0446
0447 def getJobStatisticsPerSiteResource(timeWindow=None):
0448 return job_stats_by_site_and_resource_type(timeWindow)
0449
0450
0451 def job_stats_by_site_share_and_resource_type(time_window=None):
0452 """
0453 Get job statistics per site, label, and resource
0454
0455 args:
0456 timeWindow: to count number of jobs that finish/failed/cancelled for last N minutes. 12*60 by default
0457 returns:
0458 status code
0459 0: communication succeeded to the panda server
0460 255: communication failure
0461 map of the number jobs per job status in each site and resource
0462
0463 """
0464
0465 http_client = HttpClientV1()
0466 url = f"{api_url_ssl_v1}/statistics/job_stats_by_site_share_and_resource_type"
0467 data = {}
0468 if time_window:
0469 data["time_window"] = time_window
0470
0471 status, output = http_client.get(url, data)
0472
0473 if status != 0 or not output.get("success"):
0474 print(f"Failed to retrieve job_stats_by_site_share_and_resource_type. Status: {status}, Output: {output}")
0475 return status, output
0476
0477 return 0, output.get("data")
0478
0479
0480
0481 def get_job_statistics_per_site_label_resource(time_window=None):
0482 return job_stats_by_site_share_and_resource_type(time_window)
0483
0484
0485 def get_site_specs(site_type=None):
0486 """
0487 Get list of site specifications. Used by panglia monitor in TRIUMF
0488
0489 args:
0490 site_type: type of sites
0491 all: all sites
0492 None: defaults to analysis sites
0493 analysis: analysis sites
0494 production: production sites
0495 unified: unified sites
0496 returns:
0497 status code
0498 0: communication succeeded to the panda server
0499 255: communication failure
0500 map of site and attributes
0501
0502 """
0503 http_client = HttpClientV1()
0504 url = f"{api_url_ssl_v1}/metaconfig/get_site_specs"
0505 if site_type not in [None, "all", "analysis", "production", "unified"]:
0506 return EC_Failed, "Invalid site type"
0507
0508 data = {}
0509 if site_type:
0510 data = {"type": site_type}
0511
0512 status, output = http_client.get(url, data)
0513
0514 if status != 0 or not output.get("success"):
0515 print(f"Failed to retrieve get_site_specs. Status: {status}, Output: {output}")
0516 return status, output
0517
0518 return 0, output.get("data")
0519
0520
0521
0522 def getSiteSpecs(siteType=None):
0523 return get_site_specs(siteType)
0524
0525
0526 def register_cache_file(user_name: str, file_name: str, file_size: int, checksum: str):
0527 """
0528 Register information about the input sandbox that is being stored in PanDA cache
0529
0530 args:
0531 user_name: the name of the user
0532 file_name: the file name
0533 file_size: the file size
0534 checksum: md5sum of the file
0535 returns:
0536 status code
0537 0: communication succeeded to the panda server
0538 else: communication failure
0539
0540 """
0541 http_client = HttpClientV1()
0542
0543 url = f"{api_url_ssl_v1}/file_server/register_cache_file"
0544
0545 data = {
0546 "user_name": user_name,
0547 "file_name": file_name,
0548 "file_size": file_size,
0549 "checksum": str(checksum),
0550 }
0551 return http_client.post(url, data)
0552
0553
0554 def put_file(file):
0555 """
0556 Upload input sandbox to PanDA cache
0557
0558 args:
0559 file: the file name
0560 returns:
0561 status code
0562 0: communication succeeded to the panda server
0563 else: communication failure
0564
0565 """
0566
0567 http_client = HttpClientV1()
0568 url = f"{api_url_ssl_v1}/file_server/upload_cache_file"
0569 data = {"file": file}
0570 return http_client.post_files(url, data)
0571
0572
0573 def touch_file(source_url, file_name):
0574 http_client = HttpClientV1()
0575
0576
0577 url = f"{source_url}/api/v1/file_server/touch_cache_file"
0578 data = {"file_name": file_name}
0579 return http_client.post(url, data)
0580
0581
0582 def insertTaskParams(taskParams):
0583 """
0584 Insert task parameters
0585
0586 args:
0587 taskParams: a dictionary of task parameters
0588 returns:
0589 status code
0590 0: communication succeeded to the panda server
0591 255: communication failure
0592 tuple of return code and JediTaskID
0593 True: request is processed
0594 False: not processed
0595 """
0596
0597 taskParamsStr = json.dumps(taskParams)
0598
0599 http_client = HttpClient()
0600
0601
0602 url = f"{baseURLSSL}/insertTaskParams"
0603 data = {"taskParams": taskParamsStr}
0604 status, output = http_client.post(url, data)
0605 try:
0606 return status, pickle_loads(output)
0607 except Exception:
0608 error_type, error_value = sys.exc_info()[:2]
0609 error_str = f"ERROR insertTaskParams : {error_type} {error_value}"
0610 return EC_Failed, f"{output}\n{error_str}"
0611
0612
0613 def kill_task(task_id, broadcast=False):
0614 """
0615 Kill a task
0616
0617 args:
0618 task_id: task ID of the task to be killed
0619 broadcast: True to push the message to the pilot subscribing the MB
0620 returns:
0621 status code
0622 0: communication succeeded to the panda server
0623 255: communication failure
0624 tuple of return code and diagnostic message
0625 0: request is registered
0626 1: server error
0627 2: task not found
0628 3: permission denied
0629 4: irrelevant task status
0630 100: non SSL connection
0631 101: irrelevant taskID
0632 """
0633
0634 http_client = HttpClientV1()
0635
0636 url = f"{api_url_ssl_v1}/task/kill"
0637 data = {"task_id": task_id, "broadcast": broadcast}
0638
0639 status, output = http_client.post(url, data)
0640
0641 return status, output
0642
0643
0644 def finish_task(task_id, soft=False, broadcast=False):
0645 """
0646 Finish a task
0647
0648 args:
0649 jediTaskID: jediTaskID of the task to be finished
0650 soft: If True, new jobs are not generated and the task is
0651 finished once all remaining jobs are done.
0652 If False, all remaining jobs are killed and then the
0653 task is finished
0654 broadcast: True to push the message to the pilot subscribing the MB
0655 returns:
0656 status code
0657 0: communication succeeded to the panda server
0658 255: communication failure
0659 tuple of return code and diagnostic message
0660 0: request is registered
0661 1: server error
0662 2: task not found
0663 3: permission denied
0664 4: irrelevant task status
0665 100: non SSL connection
0666 101: irrelevant taskID
0667 """
0668 http_client = HttpClientV1()
0669
0670 url = f"{api_url_ssl_v1}/task/finish"
0671 data = {"task_id": task_id, "soft": soft, "broadcast": broadcast}
0672
0673 status, output = http_client.post(url, data)
0674
0675 return status, output
0676
0677
0678 def reassignTaskToSite(jediTaskID, site, mode=None):
0679 """
0680 Reassign a task to a site. Existing jobs are killed and new jobs are generated at the site
0681
0682 args:
0683 jediTaskID: jediTaskID of the task to be reassigned
0684 site: the site name where the task is reassigned
0685 mode: If soft, only defined/waiting/assigned/activated jobs are killed. If nokill, no jobs are killed. All jobs are killed by default.
0686 returns:
0687 status code
0688 0: communication succeeded to the panda server
0689 255: communication failure
0690 tuple of return code and diagnostic message
0691 0: request is registered
0692 1: server error
0693 2: task not found
0694 3: permission denied
0695 4: irrelevant task status
0696 100: non SSL connection
0697 101: irrelevant taskID
0698 """
0699 maxSite = 60
0700 if site is not None and len(site) > maxSite:
0701 return EC_Failed, f"site parameter is too long > {maxSite}chars"
0702
0703 http_client = HttpClient()
0704
0705
0706 url = f"{baseURLSSL}/reassignTask"
0707 data = {"jediTaskID": jediTaskID, "site": site}
0708 if mode is not None:
0709 data["mode"] = mode
0710 status, output = http_client.post(url, data)
0711 try:
0712 return status, pickle_loads(output)
0713 except Exception:
0714 error_type, error_value = sys.exc_info()[:2]
0715 error_str = f"ERROR reassignTaskToSite : {error_type} {error_value}"
0716 return EC_Failed, f"{output}\n{error_str}"
0717
0718
0719 def reassignTaskToCloud(jediTaskID, cloud, mode=None):
0720 """
0721 Reassign a task to a cloud. Existing jobs are killed and new jobs are generated in the cloud
0722
0723 args:
0724 jediTaskID: jediTaskID of the task to be reassigned
0725 cloud: the cloud name where the task is reassigned
0726 mode: If soft, only defined/waiting/assigned/activated jobs are killed. If nokill, no jobs are killed. All jobs are killed by default.
0727 returns:
0728 status code
0729 0: communication succeeded to the panda server
0730 255: communication failure
0731 tuple of return code and diagnostic message
0732 0: request is registered
0733 1: server error
0734 2: task not found
0735 3: permission denied
0736 4: irrelevant task status
0737 100: non SSL connection
0738 101: irrelevant taskID
0739 """
0740
0741 http_client = HttpClient()
0742
0743
0744 url = f"{baseURLSSL}/reassignTask"
0745 data = {"jediTaskID": jediTaskID, "cloud": cloud}
0746 if mode is not None:
0747 data["mode"] = mode
0748 status, output = http_client.post(url, data)
0749 try:
0750 return status, pickle_loads(output)
0751 except Exception:
0752 error_type, error_value = sys.exc_info()[:2]
0753 error_str = f"ERROR reassignTaskToCloud : {error_type} {error_value}"
0754 return EC_Failed, f"{output}\n{error_str}"
0755
0756
0757 def reassignTaskToNucleus(jediTaskID, nucleus, mode=None):
0758 """
0759 Reassign a task to a nucleus. Existing jobs are killed and new jobs are generated in the cloud
0760
0761 args:
0762 jediTaskID: jediTaskID of the task to be reassigned
0763 nucleus: the nucleus name where the task is reassigned
0764 mode: If soft, only defined/waiting/assigned/activated jobs are killed. If nokill, no jobs are killed. All jobs are killed by default.
0765 returns:
0766 status code
0767 0: communication succeeded to the panda server
0768 255: communication failure
0769 tuple of return code and diagnostic message
0770 0: request is registered
0771 1: server error
0772 2: task not found
0773 3: permission denied
0774 4: irrelevant task status
0775 100: non SSL connection
0776 101: irrelevant taskID
0777 """
0778
0779 http_client = HttpClient()
0780
0781
0782 url = f"{baseURLSSL}/reassignTask"
0783 data = {"jediTaskID": jediTaskID, "nucleus": nucleus}
0784 if mode is not None:
0785 data["mode"] = mode
0786 status, output = http_client.post(url, data)
0787 try:
0788 return status, pickle_loads(output)
0789 except Exception:
0790 error_type, error_value = sys.exc_info()[:2]
0791 error_str = f"ERROR reassignTaskToCloud : {error_type} {error_value}"
0792 return EC_Failed, f"{output}\n{error_str}"
0793
0794
0795 def uploadLog(log_string, log_file_name):
0796 """
0797 Upload log
0798
0799 args:
0800 logStr: log message
0801 log_file_name: name of log file
0802 returns:
0803 status code
0804 0: communication succeeded to the panda server
0805 else: communication failure
0806
0807 """
0808
0809 http_client = HttpClientV1()
0810
0811
0812 fh = tempfile.NamedTemporaryFile(delete=False)
0813 gfh = gzip.open(fh.name, mode="wb")
0814 if sys.version_info[0] >= 3:
0815 log_string = log_string.encode("utf-8")
0816 gfh.write(log_string)
0817 gfh.close()
0818
0819 url = f"{api_url_ssl_v1}/file_server/upload_jedi_log"
0820
0821
0822 log_file_name = str(log_file_name)
0823 data = {"file": (log_file_name, fh.name)}
0824 return_value = http_client.post_files(url, data, encoding="gzip")
0825 os.unlink(fh.name)
0826 return return_value
0827
0828
0829 def changeTaskPriority(jediTaskID, newPriority):
0830 """
0831 Change the task priority
0832
0833 args:
0834 jediTaskID: jediTaskID of the task to change the priority
0835 newPriority: new task priority
0836 returns:
0837 status code
0838 0: communication succeeded to the panda server
0839 255: communication failure
0840 return code
0841 0: unknown task
0842 1: succeeded
0843 None: database error
0844 """
0845
0846 http_client = HttpClient()
0847
0848
0849 url = f"{baseURLSSL}/changeTaskPriority"
0850 data = {"jediTaskID": jediTaskID, "newPriority": newPriority}
0851 status, output = http_client.post(url, data)
0852 try:
0853 return status, pickle_loads(output)
0854 except Exception:
0855 error_type, error_value = sys.exc_info()[:2]
0856 error_str = f"ERROR changeTaskPriority : {error_type} {error_value}"
0857 return EC_Failed, f"{output}\n{error_str}"
0858
0859
0860 def set_debug_mode(job_id, mode):
0861 """
0862 Turn debug mode for a job on/off
0863
0864 args:
0865 job_id: job_id of the job
0866 mode: True to turn it on. Oppositely, False
0867 returns:
0868 status code
0869 0: communication succeeded to the panda server
0870 another: communication failure
0871 error message
0872 """
0873
0874 http_client = HttpClientV1()
0875
0876 url = f"{api_url_ssl_v1}/job/set_debug_mode"
0877 data = {"job_id": job_id, "mode": mode}
0878
0879 status, output = http_client.post(url, data)
0880
0881 return status, output
0882
0883
0884 def retry_task(task_id, no_child_retry=False, discard_events=False, disable_staging_mode=False, keep_gshare_priority=False):
0885 """
0886 Retry a task
0887
0888 args:
0889 task_id: jediTaskID of the task to retry
0890 no_child_retry: True not to retry child tasks
0891 discard_events: discard events
0892 disable_staging_mode: disable staging mode
0893 keep_gshare_priority: keep current gshare and priority
0894 returns:
0895 status code
0896 0: communication succeeded to the panda server
0897 255: communication failure
0898 tuple of return code and diagnostic message
0899 0: request is registered
0900 1: server error
0901 2: task not found
0902 3: permission denied
0903 4: irrelevant task status
0904 100: non SSL connection
0905 101: irrelevant taskID
0906 """
0907
0908 http_client = HttpClientV1()
0909 url = f"{api_url_ssl_v1}/task/retry"
0910
0911 data = {"task_id": task_id}
0912 if no_child_retry:
0913 data["no_child_retry"] = True
0914 if discard_events:
0915 data["discard_events"] = True
0916 if disable_staging_mode:
0917 data["disable_staging_mode"] = True
0918 if keep_gshare_priority:
0919 data["keep_gshare_priority"] = True
0920
0921 status, output = http_client.post(url, data)
0922 return status, output
0923
0924
0925 def reload_input(task_id):
0926 """
0927 Reload the input for a task
0928
0929 args:
0930 jediTaskID: jediTaskID of the task to retry
0931 returns:
0932 status code
0933 0: communication succeeded to the panda server
0934 255: communication failure
0935 tuple of return code and diagnostic message
0936 0: request is registered
0937 1: server error
0938 2: task not found
0939 3: permission denied
0940 4: irrelevant task status
0941 100: non SSL connection
0942 101: irrelevant taskID
0943 """
0944 http_client = HttpClientV1()
0945
0946 url = f"{api_url_ssl_v1}/task/reload_input"
0947 data = {"task_id": task_id}
0948
0949 status, output = http_client.post(url, data)
0950
0951 return status, output
0952
0953
0954 def changeTaskWalltime(jediTaskID, wallTime):
0955 """
0956 Change task walltime
0957
0958 args:
0959 jediTaskID: jediTaskID of the task to change the priority
0960 wallTime: new walltime for the task
0961 returns:
0962 status code
0963 0: communication succeeded to the panda server
0964 255: communication failure
0965 return code
0966 0: unknown task
0967 1: succeeded
0968 None: database error
0969 """
0970
0971 http_client = HttpClient()
0972
0973
0974 url = f"{baseURLSSL}/changeTaskAttributePanda"
0975 data = {"jediTaskID": jediTaskID, "attrName": "wallTime", "attrValue": wallTime}
0976 status, output = http_client.post(url, data)
0977 try:
0978 return status, pickle_loads(output)
0979 except Exception:
0980 error_type, error_value = sys.exc_info()[:2]
0981 error_str = f"ERROR changeTaskWalltime : {error_type} {error_value}"
0982 return EC_Failed, f"{output}\n{error_str}"
0983
0984
0985 def changeTaskCputime(jediTaskID, cpuTime):
0986 """
0987 Change task CPU time
0988
0989 args:
0990 jediTaskID: jediTaskID of the task to change the priority
0991 cpuTime: new cputime for the task
0992 returns:
0993 status code
0994 0: communication succeeded to the panda server
0995 255: communication failure
0996 return code
0997 0: unknown task
0998 1: succeeded
0999 None: database error
1000 """
1001
1002 http_client = HttpClient()
1003
1004
1005 url = f"{baseURLSSL}/changeTaskAttributePanda"
1006 data = {"jediTaskID": jediTaskID, "attrName": "cpuTime", "attrValue": cpuTime}
1007 status, output = http_client.post(url, data)
1008 try:
1009 return status, pickle_loads(output)
1010 except Exception:
1011 error_type, error_value = sys.exc_info()[:2]
1012 error_str = f"ERROR changeTaskCputime : {error_type} {error_value}"
1013 return EC_Failed, f"{output}\n{error_str}"
1014
1015
1016 def changeTaskRamCount(jediTaskID, ramCount):
1017 """
1018 Change task RAM count
1019
1020 args:
1021 jediTaskID: jediTaskID of the task to change the priority
1022 ramCount: new ramCount for the task
1023 returns:
1024 status code
1025 0: communication succeeded to the panda server
1026 255: communication failure
1027 return code
1028 0: unknown task
1029 1: succeeded
1030 None: database error
1031 """
1032
1033 http_client = HttpClient()
1034
1035
1036 url = f"{baseURLSSL}/changeTaskAttributePanda"
1037 data = {"jediTaskID": jediTaskID, "attrName": "ramCount", "attrValue": ramCount}
1038 status, output = http_client.post(url, data)
1039 try:
1040 return status, pickle_loads(output)
1041 except Exception:
1042 error_type, error_value = sys.exc_info()[:2]
1043 error_str = f"ERROR changeTaskRamCount : {error_type} {error_value}"
1044 return EC_Failed, f"{output}\n{error_str}"
1045
1046
1047 def changeTaskAttribute(jediTaskID, attrName, attrValue):
1048 """
1049 Change task attribute
1050
1051 args:
1052 jediTaskID: jediTaskID of the task to change the attribute
1053 attrName: attribute name
1054 attrValue: new value for the attribute
1055 returns:
1056 status code
1057 0: communication succeeded to the panda server
1058 255: communication failure
1059 return: a tuple of return code and message
1060 0: unknown task
1061 1: succeeded
1062 2: disallowed to update the attribute
1063 None: database error
1064 """
1065
1066 http_client = HttpClient()
1067
1068
1069 url = f"{baseURLSSL}/changeTaskAttributePanda"
1070 data = {"jediTaskID": jediTaskID, "attrName": attrName, "attrValue": attrValue}
1071 status, output = http_client.post(url, data)
1072 try:
1073 return status, pickle_loads(output)
1074 except Exception:
1075 error_type, error_value = sys.exc_info()[:2]
1076 error_str = f"ERROR changeTaskAttributePanda : {error_type} {error_value}"
1077 return EC_Failed, f"{output}\n{error_str}"
1078
1079
1080 def changeTaskSplitRule(jediTaskID, ruleName, ruleValue):
1081 """
1082 Change split rule fo task
1083
1084 args:
1085 jediTaskID: jediTaskID of the task to change the rule
1086 ruleName: rule name
1087 ruleValue: new value for the rule
1088 returns:
1089 status code
1090 0: communication succeeded to the panda server
1091 255: communication failure
1092 return: a tuple of return code and message
1093 0: unknown task
1094 1: succeeded
1095 2: disallowed to update the attribute
1096 None: database error
1097 """
1098
1099 http_client = HttpClient()
1100
1101
1102 url = f"{baseURLSSL}/changeTaskSplitRulePanda"
1103 data = {"jediTaskID": jediTaskID, "attrName": ruleName, "attrValue": ruleValue}
1104 status, output = http_client.post(url, data)
1105 try:
1106 return status, pickle_loads(output)
1107 except Exception:
1108 error_type, error_value = sys.exc_info()[:2]
1109 error_str = f"ERROR changeTaskSplitRule : {error_type} {error_value}"
1110 return EC_Failed, f"{output}\n{error_str}"
1111
1112
1113 def pauseTask(jediTaskID):
1114 """
1115 Pause task
1116
1117 args:
1118 jediTaskID: jediTaskID of the task to pause
1119 returns:
1120 status code
1121 0: communication succeeded to the panda server
1122 255: communication failure
1123 tuple of return code and diagnostic message
1124 0: request is registered
1125 1: server error
1126 2: task not found
1127 3: permission denied
1128 4: irrelevant task status
1129 100: non SSL connection
1130 101: irrelevant taskID
1131 """
1132
1133 http_client = HttpClient()
1134
1135
1136 url = f"{baseURLSSL}/pauseTask"
1137 data = {"jediTaskID": jediTaskID}
1138 status, output = http_client.post(url, data)
1139 try:
1140 return status, pickle_loads(output)
1141 except Exception:
1142 error_type, error_value = sys.exc_info()[:2]
1143 error_str = f"ERROR pauseTask : {error_type} {error_value}"
1144 return EC_Failed, f"{output}\n{error_str}"
1145
1146
1147 def resumeTask(jediTaskID):
1148 """
1149 Resume task
1150
1151 args:
1152 jediTaskID: jediTaskID of the task to release
1153 returns:
1154 status code
1155 0: communication succeeded to the panda server
1156 255: communication failure
1157 tuple of return code and diagnostic message
1158 0: request is registered
1159 1: server error
1160 2: task not found
1161 3: permission denied
1162 4: irrelevant task status
1163 100: non SSL connection
1164 101: irrelevant taskID
1165 """
1166
1167 http_client = HttpClient()
1168
1169
1170 url = f"{baseURLSSL}/resumeTask"
1171 data = {"jediTaskID": jediTaskID}
1172 status, output = http_client.post(url, data)
1173 try:
1174 return status, pickle_loads(output)
1175 except Exception:
1176 error_type, error_value = sys.exc_info()[:2]
1177 error_str = f"ERROR resumeTask : {error_type} {error_value}"
1178 return EC_Failed, f"{output}\n{error_str}"
1179
1180
1181 def avalancheTask(jediTaskID):
1182 """
1183 Force avalanche for task
1184
1185 args:
1186 jediTaskID: jediTaskID of the task to avalanche
1187 returns:
1188 status code
1189 0: communication succeeded to the panda server
1190 255: communication failure
1191 tuple of return code and diagnostic message
1192 0: request is registered
1193 1: server error
1194 2: task not found
1195 3: permission denied
1196 4: irrelevant task status
1197 100: non SSL connection
1198 101: irrelevant taskID
1199 """
1200
1201 http_client = HttpClient()
1202
1203
1204 url = f"{baseURLSSL}/avalancheTask"
1205 data = {"jediTaskID": jediTaskID}
1206 status, output = http_client.post(url, data)
1207 try:
1208 return status, pickle_loads(output)
1209 except Exception:
1210 error_type, error_value = sys.exc_info()[:2]
1211 error_str = f"ERROR resumeTask : {error_type} {error_value}"
1212 return EC_Failed, f"{output}\n{error_str}"
1213
1214
1215 def increaseAttemptNr(jediTaskID, increase):
1216 """
1217 Change task priority
1218
1219 args:
1220 jediTaskID: jediTaskID of the task to increase attempt numbers
1221 increase: increase for attempt numbers
1222 returns:
1223 status code
1224 0: communication succeeded to the panda server
1225 255: communication failure
1226 return code
1227 0: succeeded
1228 1: unknown task
1229 2: invalid task status
1230 3: permission denied
1231 4: wrong parameter
1232 None: database error
1233 """
1234
1235 http_client = HttpClient()
1236
1237
1238 url = f"{baseURLSSL}/increaseAttemptNrPanda"
1239 data = {"jediTaskID": jediTaskID, "increasedNr": increase}
1240 status, output = http_client.post(url, data)
1241 try:
1242 return status, pickle_loads(output)
1243 except Exception:
1244 error_type, error_value = sys.exc_info()[:2]
1245 error_str = f"ERROR increaseAttemptNr : {error_type} {error_value}"
1246 return EC_Failed, f"{output}\n{error_str}"
1247
1248
1249 def killUnfinishedJobs(jediTaskID, code=None, useMailAsID=False):
1250 """
1251 Kill unfinished jobs in a task. Normal users can kill only their own jobs.
1252 People with production VOMS role can kill any jobs.
1253 Running jobs are killed when next heartbeat comes from the pilot.
1254 Set code=9 if running jobs need to be killed immediately.
1255
1256 args:
1257 jediTaskID: the taskID of the task
1258 code: specify why the jobs are killed
1259 2: expire
1260 3: aborted
1261 4: expire in waiting
1262 7: retry by server
1263 8: re-brokerage
1264 9: force kill
1265 50: kill by JEDI
1266 91: kill user jobs with prod role
1267 useMailAsID: obsolete
1268 returns:
1269 status code
1270 0: communication succeeded to the panda server
1271 255: communication failure
1272 the list of clouds (or Nones if tasks are not yet assigned)
1273 """
1274
1275 http_client = HttpClient()
1276
1277
1278 url = f"{baseURLSSL}/killUnfinishedJobs"
1279 data = {"jediTaskID": jediTaskID, "code": code, "useMailAsID": useMailAsID}
1280 status, output = http_client.post(url, data)
1281 try:
1282 return status, pickle_loads(output)
1283 except Exception:
1284 error_type, error_value, _ = sys.exc_info()
1285 error_str = f"ERROR killUnfinishedJobs : {error_type} {error_value}"
1286 print(error_str)
1287 return EC_Failed, f"{output}\n{error_str}"
1288
1289
1290 def triggerTaskBrokerage(jediTaskID):
1291 """
1292 Trigger task brokerage
1293
1294 args:
1295 jediTaskID: jediTaskID of the task to change the attribute
1296 returns:
1297 status code
1298 0: communication succeeded to the panda server
1299 255: communication failure
1300 return: a tuple of return code and message
1301 0: unknown task
1302 1: succeeded
1303 None: database error
1304 """
1305
1306 http_client = HttpClient()
1307
1308
1309 url = f"{baseURLSSL}/changeTaskModTimePanda"
1310 data = {"jediTaskID": jediTaskID, "diffValue": -12}
1311 status, output = http_client.post(url, data)
1312 try:
1313 return status, pickle_loads(output)
1314 except Exception:
1315 error_type, error_value = sys.exc_info()[:2]
1316 error_str = f"ERROR triggerTaskBrokerage : {error_type} {error_value}"
1317 return EC_Failed, f"{output}\n{error_str}"
1318
1319
1320 def getPandaIDsWithTaskID(jediTaskID):
1321 """
1322 Get PanDA IDs with TaskID
1323
1324 args:
1325 jediTaskID: jediTaskID of the task to get lit of PanDA IDs
1326 returns:
1327 status code
1328 0: communication succeeded to the panda server
1329 255: communication failure
1330 the list of PanDA IDs
1331 """
1332
1333 http_client = HttpClient()
1334
1335 url = f"{baseURL}/getPandaIDsWithTaskID"
1336 data = {"jediTaskID": jediTaskID}
1337 status, output = http_client.post(url, data)
1338 try:
1339 return status, pickle_loads(output)
1340 except Exception:
1341 error_type, error_value, _ = sys.exc_info()
1342 error_str = f"ERROR getPandaIDsWithTaskID : {error_type} {error_value}"
1343 print(error_str)
1344 return EC_Failed, f"{output}\n{error_str}"
1345
1346
1347 def reactivateTask(jediTaskID, keep_attempt_nr=False, trigger_job_generation=False):
1348 """
1349 Reactivate task
1350
1351 args:
1352 jediTaskID: jediTaskID of the task to be reactivated
1353 keep_attempt_nr: not to reset attempt numbers when being reactivated
1354 trigger_job_generation: trigger job generation once being reactivated
1355 returns:
1356 status code
1357 0: communication succeeded to the panda server
1358 255: communication failure
1359 return: a tuple of return code and message
1360 0: unknown task
1361 1: succeeded
1362 None: database error
1363 """
1364
1365 http_client = HttpClient()
1366
1367
1368 url = f"{baseURLSSL}/reactivateTask"
1369 data = {"jediTaskID": jediTaskID}
1370 if keep_attempt_nr:
1371 data["keep_attempt_nr"] = True
1372 if trigger_job_generation:
1373 data["trigger_job_generation"] = True
1374 status, output = http_client.post(url, data)
1375 try:
1376 return status, pickle_loads(output)
1377 except Exception:
1378 error_type, error_value = sys.exc_info()[:2]
1379 error_str = f"ERROR reactivateTask : {error_type} {error_value}"
1380 return EC_Failed, f"{output}\n{error_str}"
1381
1382
1383 def getTaskStatus(jediTaskID):
1384 """
1385 Get task status for a particular task ID
1386
1387 args:
1388 jediTaskID: jediTaskID of the task to get lit of PanDA IDs
1389 returns:
1390 status code
1391 0: communication succeeded to the panda server
1392 255: communication failure
1393 the status string
1394 """
1395
1396 http_client = HttpClient()
1397
1398 url = f"{baseURL}/getTaskStatus"
1399 data = {"jediTaskID": jediTaskID}
1400 status, output = http_client.post(url, data)
1401 try:
1402 return status, pickle_loads(output)
1403 except Exception:
1404 error_type, error_value, _ = sys.exc_info()
1405 error_str = f"ERROR getTaskStatus : {error_type} {error_value}"
1406 print(error_str)
1407 return EC_Failed, f"{output}\n{error_str}"
1408
1409
1410 def reassignShare(jedi_task_ids, share, reassign_running=False):
1411 """
1412 Reassign specified tasks (and their jobs) to a new share
1413
1414 args:
1415 jedi_task_ids: task ids to act on
1416 share: share to be applied to jedi task ids
1417 returns:
1418 status code
1419 0: communication succeeded to the panda server
1420 255: communication failure
1421 return: a tuple of return code and message
1422 1: logical error
1423 0: success
1424 None: database error
1425 """
1426
1427 http_client = HttpClient()
1428
1429 jedi_task_ids_pickle = pickle_dumps(jedi_task_ids)
1430 change_running_pickle = pickle_dumps(reassign_running)
1431
1432 url = f"{baseURLSSL}/reassignShare"
1433 data = {
1434 "jedi_task_ids_pickle": jedi_task_ids_pickle,
1435 "share": share,
1436 "reassign_running": change_running_pickle,
1437 }
1438 status, output = http_client.post(url, data)
1439
1440 try:
1441 return status, pickle_loads(output)
1442 except Exception:
1443 error_type, error_value = sys.exc_info()[:2]
1444 error_str = f"ERROR reassignShare : {error_type} {error_value}"
1445 return EC_Failed, f"{output}\n{error_str}"
1446
1447
1448 def getTaskParamsMap(jediTaskID):
1449 """
1450 Get task parameter map for a certain task ID
1451
1452 args:
1453 jediTaskID: jediTaskID of the task to get taskParamsMap
1454 returns:
1455 status code
1456 0: communication succeeded to the panda server
1457 255: communication failure
1458 return: a tuple of return code and taskParamsMap
1459 1: logical error
1460 0: success
1461 None: database error
1462 """
1463
1464 http_client = HttpClient()
1465
1466 url = f"{baseURL}/getTaskParamsMap"
1467 data = {"jediTaskID": jediTaskID}
1468 status, output = http_client.post(url, data)
1469 try:
1470 return status, pickle_loads(output)
1471 except Exception:
1472 error_type, error_value, _ = sys.exc_info()
1473 error_str = f"ERROR getTaskParamsMap : {error_type} {error_value}"
1474 print(error_str)
1475 return EC_Failed, f"{output}\n{error_str}"
1476
1477
1478 def setNumSlotsForWP(pandaQueueName, numSlots, gshare=None, resourceType=None, validPeriod=None):
1479 """
1480 Set num slots for workload provisioning
1481
1482 args:
1483 pandaQueueName: Panda Queue name
1484 numSlots: the number of slots. 0 to dynamically set based on the number of starting jobs
1485 gshare: global share. None to set for any global share (default)
1486 resourceType: resource type. None to set for any resource type (default)
1487 validPeriod: How long the rule is valid in days. None if no expiration (default)
1488 returns:
1489 status code
1490 0: communication succeeded to the panda server
1491 255: communication failure
1492 tuple of return code and diagnostic message
1493 0: succeeded
1494 1: server error
1495 100: non SSL connection
1496 101: missing production role
1497 102: type error for some parameters
1498 """
1499
1500 http_client = HttpClient()
1501
1502
1503 url = f"{baseURLSSL}/setNumSlotsForWP"
1504 data = {"pandaQueueName": pandaQueueName, "numSlots": numSlots}
1505 if gshare is not None:
1506 data["gshare"] = gshare
1507 if resourceType is not None:
1508 data["resourceType"] = resourceType
1509 if validPeriod is not None:
1510 data["validPeriod"] = validPeriod
1511 status, output = http_client.post(url, data)
1512 try:
1513 return status, json.loads(output)
1514 except Exception:
1515 error_type, error_value = sys.exc_info()[:2]
1516 error_str = f"ERROR setNumSlotsForWP : {error_type} {error_value}"
1517 return EC_Failed, f"{output}\n{error_str}"
1518
1519
1520
1521 def enableJumboJobs(jediTaskID, totalJumboJobs=1, nJumboPerSite=1):
1522 """
1523 Enable jumbo jobs for a task
1524
1525 args:
1526 jediTaskID: jediTaskID of the task
1527 totalJumboJobs: The total number of active jumbo jobs produced for the task. Use 0 to disable jumbo jobs for the task
1528 nJumboPerSite: The number of active jumbo jobs per site
1529 returns:
1530 status code
1531 0: communication succeeded to the panda server
1532 255: communication failure
1533 tuple of return code and diagnostic message
1534 0: succeeded
1535 1: server error
1536 100: non SSL connection
1537 101: missing production role
1538 102: type error for some parameters
1539 """
1540
1541 http_client = HttpClient()
1542
1543
1544 url = f"{baseURLSSL}/enableJumboJobs"
1545 data = {
1546 "jediTaskID": jediTaskID,
1547 "nJumboJobs": totalJumboJobs,
1548 "nJumboPerSite": nJumboPerSite,
1549 }
1550 status, output = http_client.post(url, data)
1551 try:
1552 return status, json.loads(output)
1553 except Exception:
1554 error_type, error_value = sys.exc_info()[:2]
1555 error_str = f"ERROR /enableJumboJobs : {error_type} {error_value}"
1556 return EC_Failed, f"{output}\n{error_str}"
1557
1558
1559 def sweepPQ(panda_queue, status_list, ce_list, submission_host_list):
1560 """
1561 Send a harvester command to panda server in order sweep a panda queue
1562
1563 args:
1564 panda_queue: panda queue name
1565 status_list: list with statuses to sweep, e.g. ['submitted']
1566 ce_list: list of CEs belonging to the site or 'ALL'
1567 submission_host_list: list of submission hosts this applies or 'ALL'
1568 returns:
1569 status code
1570 0: communication succeeded to the panda server
1571 255: communication failure
1572 return: a tuple of return code and message
1573 False: logical error
1574 True: success
1575 """
1576
1577 http_client = HttpClient()
1578
1579 panda_queue_json = json.dumps(panda_queue)
1580 status_list_json = json.dumps(status_list)
1581 ce_list_json = json.dumps(ce_list)
1582 submission_host_list_json = json.dumps(submission_host_list)
1583
1584
1585 url = f"{baseURLSSL}/sweepPQ"
1586 data = {
1587 "panda_queue": panda_queue_json,
1588 "status_list": status_list_json,
1589 "ce_list": ce_list_json,
1590 "submission_host_list": submission_host_list_json,
1591 }
1592 status, output = http_client.post(url, data)
1593
1594 try:
1595 return status, json.loads(output)
1596 except Exception:
1597 error_type, error_value = sys.exc_info()[:2]
1598 error_str = f"ERROR sweepPQ : {error_type} {error_value}"
1599 return EC_Failed, f"{output}\n{error_str}"
1600
1601
1602 def send_command_to_job(panda_id, command):
1603 """
1604 Send a command to a job
1605
1606 args:
1607 panda_id: PandaID of the job
1608 com: a command string passed to the pilot. max 250 chars
1609 returns:
1610 status code
1611 0: communication succeeded to the panda server
1612 255: communication failure
1613 return: a tuple of return code and message
1614 False: failed
1615 True: the command received
1616 """
1617
1618 http_client = HttpClientV1()
1619 url = f"{api_url_ssl_v1}/job/set_command"
1620 data = {"job_id": panda_id, "command": command}
1621
1622 status, output = http_client.post(url, data)
1623
1624 return status, output
1625
1626
1627 def get_banned_users():
1628 """
1629 Get list of banned users
1630
1631 returns:
1632 status code
1633 True: communication succeeded to the panda server
1634 False: communication failure
1635
1636
1637 """
1638
1639 http_client = HttpClientV1()
1640 url = f"{api_url_ssl_v1}/metaconfig/get_banned_users"
1641
1642 status, output = http_client.get(url, {})
1643 if status != 0:
1644 return False, f"bad response: {output}"
1645
1646 success = output["success"]
1647 message = output.get("message", "")
1648 data = output.get("data", None)
1649
1650 if success:
1651 return True, data
1652 else:
1653 return False, f"error message: {message}"
1654
1655
1656 def release_task(jedi_task_id):
1657 """
1658 Release task from staging
1659
1660 args:
1661 jedi_task_id: jediTaskID of the task to avalanche
1662 returns:
1663 status code
1664 0: communication succeeded to the panda server
1665 255: communication failure
1666 tuple of return code and diagnostic message
1667 0: request is registered
1668 1: server error
1669 2: task not found
1670 3: permission denied
1671 4: irrelevant task status
1672 100: non SSL connection
1673 101: irrelevant taskID
1674 """
1675
1676 http_client = HttpClient()
1677
1678
1679 url = f"{baseURLSSL}/release_task"
1680 data = {"jedi_task_id": jedi_task_id}
1681 status, output = http_client.post(url, data)
1682 try:
1683 return status, json.loads(output)
1684 except Exception as e:
1685 error_str = f"ERROR release_task : failed with {str(e)}"
1686 return EC_Failed, f"{output}\n{error_str}"