File indexing completed on 2026-04-20 07:58:59
0001 import os
0002 import traceback
0003 from typing import Dict, Any
0004
0005 from globus_compute_sdk import Executor, Client
0006 from globus_compute_sdk.errors.error_types import TaskExecutionFailed
0007 from globus_compute_sdk.sdk.shell_function import ShellFunction, ShellResult
0008
0009 from cryptography.fernet import Fernet
0010
0011 from pandaharvester.harvestercore import core_utils
0012
0013 def _remote_write_token(encrypted_token: str, remote_token_path: str, key_file: str) -> str:
0014 """
0015 Remote function executed on the Globus Compute endpoint (HPC site, for example).
0016 It first reads the key from key_file, then decrypts encrypted_token based on key file
0017 Finally writes the plaintext to remote_token_path atomically (via tmp + os.replace)
0018 """
0019 import os
0020 from cryptography.fernet import Fernet
0021
0022 with open(key_file, "rb") as f:
0023 key = f.read().strip()
0024 fernet = Fernet(key)
0025 plaintext = fernet.decrypt(encrypted_token.encode("utf-8"))
0026
0027 dirname = os.path.dirname(remote_token_path)
0028 if dirname:
0029 os.makedirs(dirname, exist_ok=True)
0030 tmp_path = remote_token_path + ".tmp"
0031
0032 fd = os.open(tmp_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
0033 with os.fdopen(fd, "wb") as fp:
0034 fp.write(plaintext)
0035 fp.flush()
0036 os.fsync(fp.fileno())
0037 os.replace(tmp_path, remote_token_path)
0038
0039 return remote_token_path
0040
0041
0042 class GlobusComputeTokenReplicator:
0043 """
0044 Wrapper over globus compute client that is used by IamTokenCredManagerRemoteGlobusCompute to sync tokens with a remote site (like HPC).
0045 It first encrypts the token on local harvester machine using Fernet,
0046 then submits a Globus Compute task which decrypts and writes the token file on the remote site.
0047 """
0048
0049 def __init__(self, gc_cfg: Dict[str, Any], logger):
0050 """
0051 An example of gc_cfg is expected to look like:
0052 {
0053 "endpoint_id": "<UUID of GC endpoint>",
0054 "local_key_file": "/path/on/harvester/harvester_gc_token.key",
0055 "remote_key_file": "/path/on/endpoint/harvester_gc_token.key",
0056 "task_timeout": 120
0057 }
0058 The first three items are mandatory!
0059 """
0060 self.logger = logger
0061
0062 try:
0063 self.endpoint_id = gc_cfg["endpoint_id"]
0064 self.local_key_file = gc_cfg["local_key_file"]
0065 self.remote_key_file = gc_cfg.get("remote_key_file", self.local_key_file)
0066 except KeyError as e:
0067 raise RuntimeError(f"GlobusComputeTokenReplicator missing required config key: {e}") from e
0068
0069 self.executor = Executor(endpoint_id=self.endpoint_id)
0070 self.task_timeout = gc_cfg.get("task_timeout", 120)
0071
0072 try:
0073 with open(self.local_key_file, "rb") as f:
0074 key = f.read().strip()
0075 self.fernet = Fernet(key)
0076 except Exception:
0077 self.logger.error(f"Failed to load local Fernet key from {self.local_key_file}\n{traceback.format_exc()}")
0078 raise
0079
0080 def do_it(self, token_str: str, remote_token_path: str) -> bool:
0081 encrypted = self.fernet.encrypt(token_str.encode("utf-8")).decode("ascii")
0082
0083 try:
0084 future = self.executor.submit(
0085 _remote_write_token, encrypted, remote_token_path, self.remote_key_file
0086 )
0087 result = future.result(timeout=self.task_timeout)
0088 self.logger.debug(f"Remote token sync to {remote_token_path} finished with result: {result}")
0089 return True
0090 except TaskExecutionFailed as e:
0091 self.logger.error(f"Globus Compute task failed for {remote_token_path}: {e}")
0092 return False
0093 except Exception:
0094 self.logger.error(f"Unexpected error during remote token sync for {remote_token_path}\n{traceback.format_exc()}")
0095 return False