Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:14

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Wen Guan, wen.guan@cern,ch, 2018
0009 # - Alexey Anisenkov, anisyonk@cern.ch, 2019
0010 # - Paul Nilsson, paul.nilsson@cern.ch, 2021
0011 
0012 import logging
0013 
0014 from pilot.api.data import StageInClient, StageOutClient
0015 
0016 logger = logging.getLogger(__name__)
0017 
0018 
0019 class StageInESClient(StageInClient):
0020 
0021     def __init__(self, *argc, **kwargs):
0022         super(StageInESClient, self).__init__(*argc, **kwargs)
0023 
0024         self.copytool_modules.setdefault('objectstore', {'module_name': 'objectstore'})
0025         self.acopytools.setdefault('es_events_read', ['objectstore'])
0026 
0027     def prepare_sources(self, files, activities=None):
0028         """
0029             Customize/prepare source data for each entry in `files` optionally checking data for requested `activities`
0030             (custom StageClient could extend this logic if need)
0031             :param files: list of `FileSpec` objects to be processed
0032             :param activities: string or ordered list of activities to resolve `astorages` (optional)
0033             :return: None
0034 
0035             If storage_id is specified, replace ddmendpoint by parsing storage_id
0036         """
0037 
0038         if not self.infosys:
0039             self.logger.warning('infosys instance is not initialized: skip calling prepare_sources()')
0040             return
0041 
0042         for fspec in files:
0043             if fspec.storage_token:   ## FIX ME LATER: no need to parse each time storage_id, all this staff should be applied in FileSpec clean method
0044                 storage_id, path_convention = fspec.get_storage_id_and_path_convention()
0045                 if path_convention and path_convention == 1000:
0046                     fspec.scope = 'transient'
0047                 if storage_id:
0048                     fspec.ddmendpoint = self.infosys.get_ddmendpoint(storage_id)
0049                 logger.info("Processed file with storage id: %s", fspec)
0050 
0051 
0052 class StageOutESClient(StageOutClient):
0053 
0054     def __init__(self, *argc, **kwargs):
0055         super(StageOutESClient, self).__init__(*argc, **kwargs)
0056 
0057         self.copytool_modules.setdefault('objectstore', {'module_name': 'objectstore'})
0058         self.acopytools.setdefault('es_events', ['objectstore'])