Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:59

0001 import os
0002 import traceback
0003 from typing import Dict, Any
0004 
0005 from globus_compute_sdk import Executor, Client
0006 from globus_compute_sdk.errors.error_types import TaskExecutionFailed
0007 from globus_compute_sdk.sdk.shell_function import ShellFunction, ShellResult
0008 
0009 from cryptography.fernet import Fernet
0010 
0011 from pandaharvester.harvestercore import core_utils
0012 
0013 def _remote_write_token(encrypted_token: str, remote_token_path: str, key_file: str) -> str:
0014     """
0015     Remote function executed on the Globus Compute endpoint (HPC site, for example).
0016     It first reads the key from key_file, then decrypts encrypted_token based on key file
0017     Finally writes the plaintext to remote_token_path atomically (via tmp + os.replace)
0018     """
0019     import os
0020     from cryptography.fernet import Fernet
0021 
0022     with open(key_file, "rb") as f:
0023         key = f.read().strip()
0024     fernet = Fernet(key)
0025     plaintext = fernet.decrypt(encrypted_token.encode("utf-8"))
0026 
0027     dirname = os.path.dirname(remote_token_path)
0028     if dirname:
0029         os.makedirs(dirname, exist_ok=True)
0030     tmp_path = remote_token_path + ".tmp"
0031 
0032     fd = os.open(tmp_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
0033     with os.fdopen(fd, "wb") as fp:
0034         fp.write(plaintext)
0035         fp.flush()
0036         os.fsync(fp.fileno())
0037     os.replace(tmp_path, remote_token_path)
0038 
0039     return remote_token_path
0040 
0041 
0042 class GlobusComputeTokenReplicator:
0043     """
0044     Wrapper over globus compute client that is used by IamTokenCredManagerRemoteGlobusCompute to sync tokens with a remote site (like HPC).
0045     It first encrypts the token on local harvester machine using Fernet, 
0046     then submits a Globus Compute task which decrypts and writes the token file on the remote site.
0047     """
0048 
0049     def __init__(self, gc_cfg: Dict[str, Any], logger):
0050         """
0051         An example of gc_cfg is expected to look like:
0052         {
0053           "endpoint_id": "<UUID of GC endpoint>",
0054           "local_key_file": "/path/on/harvester/harvester_gc_token.key",
0055           "remote_key_file": "/path/on/endpoint/harvester_gc_token.key",
0056           "task_timeout": 120
0057         }
0058         The first three items are mandatory!
0059         """
0060         self.logger = logger
0061 
0062         try:
0063             self.endpoint_id = gc_cfg["endpoint_id"]
0064             self.local_key_file = gc_cfg["local_key_file"]
0065             self.remote_key_file = gc_cfg.get("remote_key_file", self.local_key_file)
0066         except KeyError as e:
0067             raise RuntimeError(f"GlobusComputeTokenReplicator missing required config key: {e}") from e
0068 
0069         self.executor = Executor(endpoint_id=self.endpoint_id)
0070         self.task_timeout = gc_cfg.get("task_timeout", 120)
0071 
0072         try:
0073             with open(self.local_key_file, "rb") as f:
0074                 key = f.read().strip()
0075             self.fernet = Fernet(key)
0076         except Exception:
0077             self.logger.error(f"Failed to load local Fernet key from {self.local_key_file}\n{traceback.format_exc()}")
0078             raise
0079 
0080     def do_it(self, token_str: str, remote_token_path: str) -> bool:
0081         encrypted = self.fernet.encrypt(token_str.encode("utf-8")).decode("ascii")
0082 
0083         try:
0084             future = self.executor.submit(
0085                 _remote_write_token, encrypted, remote_token_path, self.remote_key_file
0086             )
0087             result = future.result(timeout=self.task_timeout)
0088             self.logger.debug(f"Remote token sync to {remote_token_path} finished with result: {result}")
0089             return True
0090         except TaskExecutionFailed as e:
0091             self.logger.error(f"Globus Compute task failed for {remote_token_path}: {e}")
0092             return False
0093         except Exception:
0094             self.logger.error(f"Unexpected error during remote token sync for {remote_token_path}\n{traceback.format_exc()}")
0095             return False