Back to home page

EIC code displayed by LXR

 
 

    


Warning, /swf-monitor/scripts/pcs-task-cmd is written in an unsupported language. File is not indexed.

0001 #!/usr/bin/env python3
0002 """
0003 pcs-task-cmd — fetch a PCS ProdTask's submission artifact, or submit it.
0004 
0005 Thin HTTP client over swf-monitor's REST API. No Django, no DB access.
0006 The server regenerates from current PCS state on every call, so what
0007 you print here reflects the task + dataset + config as they exist now.
0008 
0009 Inspection mode (default; stdlib-only):
0010     pcs-task-cmd <name> --format {condor|panda|jedi|dump}
0011 
0012     # condor: bash one-liner (env-prefixed submit_csv.sh)
0013     # panda:  prun command
0014     # jedi:   JSON taskParamMap for Client.insertTaskParams()
0015     # dump:   full JSON view of the task + dataset + tags + config
0016 
0017 Submission mode (one-command JEDI submit, requires pandaclient):
0018     pcs-task-cmd <name> --submit
0019 
0020     Fetches the taskParamMap, calls pandaclient.Client.insertTaskParams()
0021     using the operator's existing PanDA auth (proxy or OIDC token), then
0022     posts the resulting jediTaskID back to swf-monitor so PCS records the
0023     submission. Prints jediTaskID on success.
0024 
0025 Env:
0026     SWFMON_URL    base URL, default https://epic-devcloud.org/prod
0027     SWFMON_TOKEN  optional DRF token (required if record-submission
0028                   endpoint is non-public on this deployment)
0029 """
0030 import argparse
0031 import json
0032 import os
0033 import re
0034 import sys
0035 import urllib.parse
0036 import urllib.request
0037 
0038 
0039 DEFAULT_URL = os.environ.get('SWFMON_URL', 'https://epic-devcloud.org/prod')
0040 
0041 
0042 def _swfmon_request(method, url, body=None):
0043     """Make a swf-monitor request with optional token and JSON body."""
0044     data = None
0045     headers = {}
0046     if body is not None:
0047         data = json.dumps(body).encode('utf-8')
0048         headers['Content-Type'] = 'application/json'
0049     token = os.environ.get('SWFMON_TOKEN')
0050     if token:
0051         headers['Authorization'] = f'Token {token}'
0052     req = urllib.request.Request(url, data=data, headers=headers, method=method)
0053     return urllib.request.urlopen(req)
0054 
0055 
0056 def _fetch_format(base_url, name, fmt):
0057     qs = urllib.parse.urlencode({'name': name, 'fmt': fmt})
0058     url = f'{base_url.rstrip("/")}/pcs/api/prod-tasks/command/?{qs}'
0059     with _swfmon_request('GET', url) as resp:
0060         return resp.read().decode()
0061 
0062 
0063 def _record_submission(base_url, name, jedi_task_id, status='submitted'):
0064     qs = urllib.parse.urlencode({'name': name})
0065     url = f'{base_url.rstrip("/")}/pcs/api/prod-tasks/record-submission/?{qs}'
0066     with _swfmon_request('POST', url,
0067                          {'jedi_task_id': jedi_task_id, 'status': status}) as resp:
0068         return json.loads(resp.read().decode())
0069 
0070 
0071 def _do_inspect(base_url, name, fmt):
0072     try:
0073         body = _fetch_format(base_url, name, fmt)
0074     except urllib.error.HTTPError as e:
0075         sys.stderr.write(f'HTTP {e.code}: {e.read().decode(errors="replace")}\n')
0076         return 1
0077     except urllib.error.URLError as e:
0078         sys.stderr.write(f'Request failed: {e.reason}\n')
0079         return 1
0080     sys.stdout.write(body)
0081     if sys.stdout.isatty():
0082         sys.stdout.write('\n')
0083     return 0
0084 
0085 
0086 def _do_submit(base_url, name):
0087     # 1. Fetch taskParamMap from PCS.
0088     try:
0089         params_json = _fetch_format(base_url, name, 'jedi')
0090     except urllib.error.HTTPError as e:
0091         sys.stderr.write(f'PCS HTTP {e.code} fetching taskParamMap: '
0092                          f'{e.read().decode(errors="replace")}\n')
0093         return 1
0094     except urllib.error.URLError as e:
0095         sys.stderr.write(f'PCS request failed: {e.reason}\n')
0096         return 1
0097     try:
0098         task_params = json.loads(params_json)
0099     except json.JSONDecodeError as e:
0100         sys.stderr.write(f'Could not parse taskParamMap JSON: {e}\n')
0101         return 1
0102 
0103     # 2. Lazy-import pandaclient — only required for --submit path.
0104     try:
0105         from pandaclient import Client  # noqa: WPS433
0106     except ImportError as e:
0107         sys.stderr.write(
0108             f'pandaclient not importable ({e}). Install panda-client and '
0109             f'ensure your PanDA auth context (proxy or OIDC token) is live.\n'
0110         )
0111         return 1
0112 
0113     # 3. Submit to JEDI. Returns (comms_status, (inner_code, message)) on
0114     #    comms success; (255, output) on comms failure.
0115     try:
0116         comms_status, result = Client.insertTaskParams(task_params)
0117     except Exception as e:
0118         sys.stderr.write(f'Client.insertTaskParams raised: {e}\n')
0119         return 1
0120     if comms_status != 0:
0121         sys.stderr.write(f'PanDA comms failure (status={comms_status}): {result}\n')
0122         return 1
0123     try:
0124         inner_code, message = result
0125     except (TypeError, ValueError):
0126         sys.stderr.write(f'Unexpected response shape: {result}\n')
0127         return 1
0128     if inner_code != 0:
0129         sys.stderr.write(
0130             f'JEDI rejected submission: code={inner_code} msg={message}\n'
0131         )
0132         return 1
0133 
0134     # 4. Parse jediTaskID from message. Server-side message formats
0135     #    (panda-server/.../task_event_module.py):
0136     #      "succeeded. new jediTaskID=12345"
0137     #      "reactivation accepted. jediTaskID=12345 ..."
0138     #      "<status>. new tasks params have been set to jediTaskID=12345. "
0139     m = re.search(r'jediTaskID=(\d+)', message)
0140     if not m:
0141         sys.stderr.write(f'Could not parse jediTaskID from message: {message}\n')
0142         return 1
0143     jedi_task_id = int(m.group(1))
0144 
0145     # 5. Record back to PCS. If write-back fails we still surface the
0146     #    jediTaskID — submission succeeded, only PCS bookkeeping failed.
0147     try:
0148         _record_submission(base_url, name, jedi_task_id)
0149     except urllib.error.HTTPError as e:
0150         sys.stderr.write(
0151             f'Submitted to JEDI as jediTaskID={jedi_task_id}, but PCS '
0152             f'record-submission HTTP {e.code}: '
0153             f'{e.read().decode(errors="replace")}\n'
0154         )
0155         return 2
0156     except urllib.error.URLError as e:
0157         sys.stderr.write(
0158             f'Submitted to JEDI as jediTaskID={jedi_task_id}, but PCS '
0159             f'record-submission request failed: {e.reason}\n'
0160         )
0161         return 2
0162 
0163     sys.stdout.write(
0164         f'Submitted: jediTaskID={jedi_task_id} (PCS status=submitted)\n'
0165         f'  PanDA message: {message}\n'
0166     )
0167     return 0
0168 
0169 
0170 def main() -> int:
0171     ap = argparse.ArgumentParser(description=__doc__.strip().splitlines()[0])
0172     ap.add_argument('name', help='ProdTask name')
0173     mode = ap.add_mutually_exclusive_group(required=True)
0174     mode.add_argument('--format', choices=['condor', 'panda', 'jedi', 'dump'],
0175                       help='inspection mode: print submission artifact')
0176     mode.add_argument('--submit', action='store_true',
0177                       help='submission mode: send taskParamMap to JEDI '
0178                            'and record the resulting jediTaskID in PCS')
0179     ap.add_argument('--url', default=DEFAULT_URL,
0180                     help=f'swf-monitor base URL (default: {DEFAULT_URL})')
0181     args = ap.parse_args()
0182 
0183     if args.submit:
0184         return _do_submit(args.url, args.name)
0185     return _do_inspect(args.url, args.name, args.format)
0186 
0187 
0188 if __name__ == '__main__':
0189     sys.exit(main())