File indexing completed on 2026-04-10 08:39:14
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 import logging
0013
0014 from pilot.api.data import StageInClient, StageOutClient
0015
0016 logger = logging.getLogger(__name__)
0017
0018
0019 class StageInESClient(StageInClient):
0020
0021 def __init__(self, *argc, **kwargs):
0022 super(StageInESClient, self).__init__(*argc, **kwargs)
0023
0024 self.copytool_modules.setdefault('objectstore', {'module_name': 'objectstore'})
0025 self.acopytools.setdefault('es_events_read', ['objectstore'])
0026
0027 def prepare_sources(self, files, activities=None):
0028 """
0029 Customize/prepare source data for each entry in `files` optionally checking data for requested `activities`
0030 (custom StageClient could extend this logic if need)
0031 :param files: list of `FileSpec` objects to be processed
0032 :param activities: string or ordered list of activities to resolve `astorages` (optional)
0033 :return: None
0034
0035 If storage_id is specified, replace ddmendpoint by parsing storage_id
0036 """
0037
0038 if not self.infosys:
0039 self.logger.warning('infosys instance is not initialized: skip calling prepare_sources()')
0040 return
0041
0042 for fspec in files:
0043 if fspec.storage_token:
0044 storage_id, path_convention = fspec.get_storage_id_and_path_convention()
0045 if path_convention and path_convention == 1000:
0046 fspec.scope = 'transient'
0047 if storage_id:
0048 fspec.ddmendpoint = self.infosys.get_ddmendpoint(storage_id)
0049 logger.info("Processed file with storage id: %s", fspec)
0050
0051
0052 class StageOutESClient(StageOutClient):
0053
0054 def __init__(self, *argc, **kwargs):
0055 super(StageOutESClient, self).__init__(*argc, **kwargs)
0056
0057 self.copytool_modules.setdefault('objectstore', {'module_name': 'objectstore'})
0058 self.acopytools.setdefault('es_events', ['objectstore'])