Warning, /swf-monitor/scripts/pcs-task-cmd is written in an unsupported language. File is not indexed.
0001 #!/usr/bin/env python3
0002 """
0003 pcs-task-cmd — fetch a PCS ProdTask's submission artifact, or submit it.
0004
0005 Thin HTTP client over swf-monitor's REST API. No Django, no DB access.
0006 The server regenerates from current PCS state on every call, so what
0007 you print here reflects the task + dataset + config as they exist now.
0008
0009 Inspection mode (default; stdlib-only):
0010 pcs-task-cmd <name> --format {condor|panda|jedi|dump}
0011
0012 # condor: bash one-liner (env-prefixed submit_csv.sh)
0013 # panda: prun command
0014 # jedi: JSON taskParamMap for Client.insertTaskParams()
0015 # dump: full JSON view of the task + dataset + tags + config
0016
0017 Submission mode (one-command JEDI submit, requires pandaclient):
0018 pcs-task-cmd <name> --submit
0019
0020 Fetches the taskParamMap, calls pandaclient.Client.insertTaskParams()
0021 using the operator's existing PanDA auth (proxy or OIDC token), then
0022 posts the resulting jediTaskID back to swf-monitor so PCS records the
0023 submission. Prints jediTaskID on success.
0024
0025 Env:
0026 SWFMON_URL base URL, default https://epic-devcloud.org/prod
0027 SWFMON_TOKEN optional DRF token (required if record-submission
0028 endpoint is non-public on this deployment)
0029 """
0030 import argparse
0031 import json
0032 import os
0033 import re
0034 import sys
0035 import urllib.parse
0036 import urllib.request
0037
0038
0039 DEFAULT_URL = os.environ.get('SWFMON_URL', 'https://epic-devcloud.org/prod')
0040
0041
0042 def _swfmon_request(method, url, body=None):
0043 """Make a swf-monitor request with optional token and JSON body."""
0044 data = None
0045 headers = {}
0046 if body is not None:
0047 data = json.dumps(body).encode('utf-8')
0048 headers['Content-Type'] = 'application/json'
0049 token = os.environ.get('SWFMON_TOKEN')
0050 if token:
0051 headers['Authorization'] = f'Token {token}'
0052 req = urllib.request.Request(url, data=data, headers=headers, method=method)
0053 return urllib.request.urlopen(req)
0054
0055
0056 def _fetch_format(base_url, name, fmt):
0057 qs = urllib.parse.urlencode({'name': name, 'fmt': fmt})
0058 url = f'{base_url.rstrip("/")}/pcs/api/prod-tasks/command/?{qs}'
0059 with _swfmon_request('GET', url) as resp:
0060 return resp.read().decode()
0061
0062
0063 def _record_submission(base_url, name, jedi_task_id, status='submitted'):
0064 qs = urllib.parse.urlencode({'name': name})
0065 url = f'{base_url.rstrip("/")}/pcs/api/prod-tasks/record-submission/?{qs}'
0066 with _swfmon_request('POST', url,
0067 {'jedi_task_id': jedi_task_id, 'status': status}) as resp:
0068 return json.loads(resp.read().decode())
0069
0070
0071 def _do_inspect(base_url, name, fmt):
0072 try:
0073 body = _fetch_format(base_url, name, fmt)
0074 except urllib.error.HTTPError as e:
0075 sys.stderr.write(f'HTTP {e.code}: {e.read().decode(errors="replace")}\n')
0076 return 1
0077 except urllib.error.URLError as e:
0078 sys.stderr.write(f'Request failed: {e.reason}\n')
0079 return 1
0080 sys.stdout.write(body)
0081 if sys.stdout.isatty():
0082 sys.stdout.write('\n')
0083 return 0
0084
0085
0086 def _do_submit(base_url, name):
0087 # 1. Fetch taskParamMap from PCS.
0088 try:
0089 params_json = _fetch_format(base_url, name, 'jedi')
0090 except urllib.error.HTTPError as e:
0091 sys.stderr.write(f'PCS HTTP {e.code} fetching taskParamMap: '
0092 f'{e.read().decode(errors="replace")}\n')
0093 return 1
0094 except urllib.error.URLError as e:
0095 sys.stderr.write(f'PCS request failed: {e.reason}\n')
0096 return 1
0097 try:
0098 task_params = json.loads(params_json)
0099 except json.JSONDecodeError as e:
0100 sys.stderr.write(f'Could not parse taskParamMap JSON: {e}\n')
0101 return 1
0102
0103 # 2. Lazy-import pandaclient — only required for --submit path.
0104 try:
0105 from pandaclient import Client # noqa: WPS433
0106 except ImportError as e:
0107 sys.stderr.write(
0108 f'pandaclient not importable ({e}). Install panda-client and '
0109 f'ensure your PanDA auth context (proxy or OIDC token) is live.\n'
0110 )
0111 return 1
0112
0113 # 3. Submit to JEDI. Returns (comms_status, (inner_code, message)) on
0114 # comms success; (255, output) on comms failure.
0115 try:
0116 comms_status, result = Client.insertTaskParams(task_params)
0117 except Exception as e:
0118 sys.stderr.write(f'Client.insertTaskParams raised: {e}\n')
0119 return 1
0120 if comms_status != 0:
0121 sys.stderr.write(f'PanDA comms failure (status={comms_status}): {result}\n')
0122 return 1
0123 try:
0124 inner_code, message = result
0125 except (TypeError, ValueError):
0126 sys.stderr.write(f'Unexpected response shape: {result}\n')
0127 return 1
0128 if inner_code != 0:
0129 sys.stderr.write(
0130 f'JEDI rejected submission: code={inner_code} msg={message}\n'
0131 )
0132 return 1
0133
0134 # 4. Parse jediTaskID from message. Server-side message formats
0135 # (panda-server/.../task_event_module.py):
0136 # "succeeded. new jediTaskID=12345"
0137 # "reactivation accepted. jediTaskID=12345 ..."
0138 # "<status>. new tasks params have been set to jediTaskID=12345. "
0139 m = re.search(r'jediTaskID=(\d+)', message)
0140 if not m:
0141 sys.stderr.write(f'Could not parse jediTaskID from message: {message}\n')
0142 return 1
0143 jedi_task_id = int(m.group(1))
0144
0145 # 5. Record back to PCS. If write-back fails we still surface the
0146 # jediTaskID — submission succeeded, only PCS bookkeeping failed.
0147 try:
0148 _record_submission(base_url, name, jedi_task_id)
0149 except urllib.error.HTTPError as e:
0150 sys.stderr.write(
0151 f'Submitted to JEDI as jediTaskID={jedi_task_id}, but PCS '
0152 f'record-submission HTTP {e.code}: '
0153 f'{e.read().decode(errors="replace")}\n'
0154 )
0155 return 2
0156 except urllib.error.URLError as e:
0157 sys.stderr.write(
0158 f'Submitted to JEDI as jediTaskID={jedi_task_id}, but PCS '
0159 f'record-submission request failed: {e.reason}\n'
0160 )
0161 return 2
0162
0163 sys.stdout.write(
0164 f'Submitted: jediTaskID={jedi_task_id} (PCS status=submitted)\n'
0165 f' PanDA message: {message}\n'
0166 )
0167 return 0
0168
0169
0170 def main() -> int:
0171 ap = argparse.ArgumentParser(description=__doc__.strip().splitlines()[0])
0172 ap.add_argument('name', help='ProdTask name')
0173 mode = ap.add_mutually_exclusive_group(required=True)
0174 mode.add_argument('--format', choices=['condor', 'panda', 'jedi', 'dump'],
0175 help='inspection mode: print submission artifact')
0176 mode.add_argument('--submit', action='store_true',
0177 help='submission mode: send taskParamMap to JEDI '
0178 'and record the resulting jediTaskID in PCS')
0179 ap.add_argument('--url', default=DEFAULT_URL,
0180 help=f'swf-monitor base URL (default: {DEFAULT_URL})')
0181 args = ap.parse_args()
0182
0183 if args.submit:
0184 return _do_submit(args.url, args.name)
0185 return _do_inspect(args.url, args.name, args.format)
0186
0187
0188 if __name__ == '__main__':
0189 sys.exit(main())