File indexing completed on 2026-04-10 08:39:14
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015 import os
0016 import hashlib
0017 import logging
0018 import time
0019
0020 try:
0021 from functools import reduce
0022
0023 except Exception:
0024 pass
0025
0026 from pilot.info import infosys
0027 from pilot.common.exception import PilotException, ErrorCodes, SizeTooLarge, NoLocalSpace, ReplicasNotFound
0028 from pilot.util.auxiliary import show_memory_usage
0029 from pilot.util.config import config
0030 from pilot.util.filehandling import calculate_checksum, write_json
0031 from pilot.util.math import convert_mb_to_b
0032 from pilot.util.parameters import get_maximum_input_sizes
0033 from pilot.util.workernode import get_local_disk_space
0034 from pilot.util.timer import TimeoutException
0035 from pilot.util.tracereport import TraceReport
0036
0037
0038 class StagingClient(object):
0039 """
0040 Base Staging Client
0041 """
0042
0043 mode = ""
0044 copytool_modules = {'rucio': {'module_name': 'rucio'},
0045 'gfal': {'module_name': 'gfal'},
0046 'gfalcopy': {'module_name': 'gfal'},
0047 'xrdcp': {'module_name': 'xrdcp'},
0048 'mv': {'module_name': 'mv'},
0049 'objectstore': {'module_name': 'objectstore'},
0050 's3': {'module_name': 's3'},
0051 'gs': {'module_name': 'gs'},
0052 'lsm': {'module_name': 'lsm'}
0053 }
0054
0055
0056 direct_remoteinput_allowed_schemas = ['root', 'https']
0057
0058 direct_localinput_allowed_schemas = ['root', 'dcache', 'dcap', 'file', 'https', 'davs']
0059
0060 remoteinput_allowed_schemas = ['root', 'gsiftp', 'dcap', 'davs', 'srm', 'storm', 'https']
0061
0062 def __init__(self, infosys_instance=None, acopytools=None, logger=None, default_copytools='rucio', trace_report=None):
0063 """
0064 If `acopytools` is not specified then it will be automatically resolved via infosys. In this case `infosys` requires initialization.
0065 :param acopytools: dict of copytool names per activity to be used for transfers. Accepts also list of names or string value without activity passed.
0066 :param logger: logging.Logger object to use for logging (None means no logging)
0067 :param default_copytools: copytool name(s) to be used in case of unknown activity passed. Accepts either list of names or single string value.
0068 """
0069
0070 super(StagingClient, self).__init__()
0071
0072 if not logger:
0073 logger = logging.getLogger(__name__ + '.null')
0074 logger.disabled = True
0075
0076 self.logger = logger
0077 self.infosys = infosys_instance or infosys
0078
0079 try:
0080 if isinstance(acopytools, basestring):
0081 acopytools = {'default': [acopytools]} if acopytools else {}
0082 except Exception:
0083 if isinstance(acopytools, str):
0084 acopytools = {'default': [acopytools]} if acopytools else {}
0085
0086 if isinstance(acopytools, (list, tuple)):
0087 acopytools = {'default': acopytools} if acopytools else {}
0088
0089 self.acopytools = acopytools or {}
0090
0091 if self.infosys.queuedata:
0092 self.set_acopytools()
0093
0094 if not self.acopytools.get('default'):
0095 self.acopytools['default'] = self.get_default_copytools(default_copytools)
0096
0097
0098 self.trace_report = trace_report if trace_report else TraceReport(pq=os.environ.get('PILOT_SITENAME', ''))
0099
0100 if not self.acopytools:
0101 msg = 'failed to initilize StagingClient: no acopytools options found, acopytools=%s' % self.acopytools
0102 logger.error(msg)
0103 self.trace_report.update(clientState='BAD_COPYTOOL', stateReason=msg)
0104 self.trace_report.send()
0105 raise PilotException("failed to resolve acopytools settings")
0106 logger.info('configured copytools per activity: acopytools=%s', self.acopytools)
0107
0108 def set_acopytools(self):
0109 """
0110 Set the internal acopytools.
0111
0112 :return:
0113 """
0114 if not self.acopytools:
0115 self.acopytools = (self.infosys.queuedata.acopytools or {}).copy()
0116 if not self.acopytools:
0117 self.acopytools = dict(default=list((self.infosys.queuedata.copytools or {}).keys()))
0118
0119
0120 @staticmethod
0121 def get_default_copytools(default_copytools):
0122 """
0123 Get the default copytools.
0124
0125 :param default_copytools:
0126 :return: default copytools (string).
0127 """
0128 try:
0129 if isinstance(default_copytools, basestring):
0130 default_copytools = [default_copytools] if default_copytools else []
0131 except Exception:
0132 if isinstance(default_copytools, str):
0133 default_copytools = [default_copytools] if default_copytools else []
0134 return default_copytools
0135
0136 @classmethod
0137 def get_preferred_replica(self, replicas, allowed_schemas):
0138 """
0139 Get preferred replica from the `replicas` list suitable for `allowed_schemas`
0140 :return: first matched replica or None if not found
0141 """
0142
0143 for replica in replicas:
0144 pfn = replica.get('pfn')
0145 for schema in allowed_schemas:
0146 if pfn and (not schema or pfn.startswith('%s://' % schema)):
0147 return replica
0148
0149 def prepare_sources(self, files, activities=None):
0150 """
0151 Customize/prepare source data for each entry in `files` optionally checking data for requested `activities`
0152 (custom StageClient could extend the logic if need)
0153 :param files: list of `FileSpec` objects to be processed
0154 :param activities: string or ordered list of activities to resolve `astorages` (optional)
0155 :return: None
0156 """
0157
0158 return
0159
0160 def prepare_inputddms(self, files, activities=None):
0161 """
0162 Populates filespec.inputddms for each entry from `files` list
0163 :param files: list of `FileSpec` objects
0164 :param activities: sting or ordered list of activities to resolve astorages (optional)
0165 :return: None
0166 """
0167
0168 activities = activities or 'read_lan'
0169 try:
0170 if isinstance(activities, basestring):
0171 activities = [activities]
0172 except Exception:
0173 if isinstance(activities, str):
0174 activities = [activities]
0175
0176 astorages = self.infosys.queuedata.astorages if self.infosys and self.infosys.queuedata else {}
0177
0178 storages = []
0179 for a in activities:
0180 storages = astorages.get(a, [])
0181 if storages:
0182 break
0183
0184
0185
0186
0187
0188
0189 for fdat in files:
0190 if not fdat.inputddms:
0191 fdat.inputddms = storages
0192 if not fdat.inputddms and fdat.ddmendpoint:
0193 fdat.inputddms = [fdat.ddmendpoint]
0194
0195 @classmethod
0196 def sort_replicas(self, replicas, inputddms):
0197 """
0198 Sort input replicas: consider first affected replicas from inputddms
0199 :param replicas: Prioritized list of replicas [(pfn, dat)]
0200 :param inputddms: preferred list of ddmebdpoint
0201 :return: sorted `replicas`
0202 """
0203
0204 if not inputddms:
0205 return replicas
0206
0207
0208 ddmreplicas = {}
0209 for pfn, xdat in replicas:
0210 ddmreplicas.setdefault(xdat.get('rse'), []).append((pfn, xdat))
0211
0212
0213 xreplicas = []
0214 for ddm in inputddms:
0215 xreplicas.extend(ddmreplicas.get(ddm) or [])
0216
0217 for pfn, xdat in replicas:
0218 if (pfn, xdat) in xreplicas:
0219 continue
0220 xreplicas.append((pfn, xdat))
0221
0222 return replicas
0223
0224 def resolve_replicas(self, files, use_vp=False):
0225 """
0226 Populates filespec.replicas for each entry from `files` list
0227
0228 fdat.replicas = [{'ddmendpoint':'ddmendpoint', 'pfn':'replica', 'domain':'domain value'}]
0229
0230 :param files: list of `FileSpec` objects.
0231 :param use_vp: True for VP jobs (boolean).
0232 :return: `files`
0233 """
0234
0235 logger = self.logger
0236 xfiles = []
0237
0238 show_memory_usage()
0239
0240 for fdat in files:
0241
0242 xfiles.append(fdat)
0243
0244 show_memory_usage()
0245
0246 if not xfiles:
0247 return files
0248
0249
0250 from rucio.client import Client
0251 c = Client()
0252
0253 show_memory_usage()
0254
0255 location = self.detect_client_location()
0256 if not location:
0257 raise PilotException("Failed to get client location for Rucio", code=ErrorCodes.RUCIOLOCATIONFAILED)
0258
0259 query = {
0260 'schemes': ['srm', 'root', 'davs', 'gsiftp', 'https', 'storm'],
0261 'dids': [dict(scope=e.scope, name=e.lfn) for e in xfiles],
0262 }
0263 query.update(sort='geoip', client_location=location)
0264
0265 if use_vp:
0266 query['schemes'] = ['root']
0267 query['rse_expression'] = 'istape=False\\type=SPECIAL'
0268
0269
0270 query.update(signature_lifetime=24 * 3600)
0271
0272 logger.info('calling rucio.list_replicas() with query=%s', query)
0273
0274 try:
0275 replicas = c.list_replicas(**query)
0276 except Exception as exc:
0277 raise PilotException("Failed to get replicas from Rucio: %s" % exc, code=ErrorCodes.RUCIOLISTREPLICASFAILED)
0278
0279 show_memory_usage()
0280
0281 replicas = list(replicas)
0282 logger.debug("replicas received from Rucio: %s", replicas)
0283
0284 files_lfn = dict(((e.scope, e.lfn), e) for e in xfiles)
0285 for replica in replicas:
0286 k = replica['scope'], replica['name']
0287 fdat = files_lfn.get(k)
0288 if not fdat:
0289 continue
0290
0291
0292 fdat = self.add_replicas(fdat, replica)
0293
0294
0295 self.trace_report.update(validateStart=time.time())
0296 status = True
0297 if fdat.filesize != replica['bytes']:
0298 logger.warning("Filesize of input file=%s mismatched with value from Rucio replica: filesize=%s, replica.filesize=%s, fdat=%s",
0299 fdat.lfn, fdat.filesize, replica['bytes'], fdat)
0300 status = False
0301
0302 if not fdat.filesize:
0303 fdat.filesize = replica['bytes']
0304 logger.warning("Filesize value for input file=%s is not defined, assigning info from Rucio replica: filesize=%s", fdat.lfn, replica['bytes'])
0305
0306 for ctype in ['adler32', 'md5']:
0307 if fdat.checksum.get(ctype) != replica[ctype] and replica[ctype]:
0308 logger.warning("Checksum value of input file=%s mismatched with info got from Rucio replica: checksum=%s, replica.checksum=%s, fdat=%s",
0309 fdat.lfn, fdat.checksum, replica[ctype], fdat)
0310 status = False
0311
0312 if not fdat.checksum.get(ctype) and replica[ctype]:
0313 fdat.checksum[ctype] = replica[ctype]
0314
0315 if not status:
0316 logger.info("filesize and checksum verification done")
0317 self.trace_report.update(clientState="DONE")
0318
0319 show_memory_usage()
0320
0321 logger.info('Number of resolved replicas:\n' +
0322 '\n'.join(["lfn=%s: replicas=%s, is_directaccess=%s"
0323 % (f.lfn, len(f.replicas or []), f.is_directaccess(ensure_replica=False)) for f in files]))
0324
0325 return files
0326
0327 def add_replicas(self, fdat, replica):
0328 """
0329 Add the replicas to the fdat structure.
0330
0331 :param fdat:
0332 :param replica:
0333 :return: updated fdat.
0334 """
0335
0336 fdat.replicas = []
0337
0338
0339 try:
0340 sorted_replicas = sorted(replica.get('pfns', {}).iteritems(), key=lambda x: x[1]['priority'])
0341 except Exception:
0342 sorted_replicas = sorted(iter(list(replica.get('pfns', {}).items())),
0343 key=lambda x: x[1]['priority'])
0344
0345
0346 xreplicas = self.sort_replicas(sorted_replicas, fdat.inputddms)
0347
0348 for pfn, xdat in xreplicas:
0349
0350 if xdat.get('type') != 'DISK':
0351 continue
0352
0353 rinfo = {'pfn': pfn, 'ddmendpoint': xdat.get('rse'), 'domain': xdat.get('domain')}
0354
0355
0356
0357
0358 rinfo['domain'] = 'lan' if rinfo['ddmendpoint'] in fdat.inputddms else 'wan'
0359
0360 if not fdat.allow_lan and rinfo['domain'] == 'lan':
0361 continue
0362 if not fdat.allow_wan and rinfo['domain'] == 'wan':
0363 continue
0364
0365 fdat.replicas.append(rinfo)
0366
0367 if not fdat.replicas:
0368 self.logger.warning('no replicas were selected (verify replica type, allow_lan/wan and domain values)')
0369
0370 return fdat
0371
0372 @classmethod
0373 def detect_client_location(self):
0374 """
0375 Open a UDP socket to a machine on the internet, to get the local IPv4 and IPv6
0376 addresses of the requesting client.
0377 Try to determine the sitename automatically from common environment variables,
0378 in this order: SITE_NAME, ATLAS_SITE_NAME, OSG_SITE_NAME. If none of these exist
0379 use the fixed string 'ROAMING'.
0380 """
0381
0382 ip = '0.0.0.0'
0383 try:
0384 import socket
0385 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
0386 s.connect(("8.8.8.8", 80))
0387 ip = s.getsockname()[0]
0388 except Exception:
0389 pass
0390
0391 ip6 = '::'
0392 try:
0393 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
0394 s.connect(("2001:4860:4860:0:0:0:0:8888", 80))
0395 ip6 = s.getsockname()[0]
0396 except Exception:
0397 pass
0398
0399 site = os.environ.get('PILOT_RUCIO_SITENAME', 'unknown')
0400
0401
0402
0403
0404
0405 return {'ip': ip,
0406 'ip6': ip6,
0407 'fqdn': socket.getfqdn(),
0408 'site': site}
0409
0410 def transfer_files(self, copytool, files, **kwargs):
0411 """
0412 Apply transfer of given `files` using passed `copytool` module
0413 Should be implemented by custom Staging Client
0414 :param copytool: copytool module
0415 :param files: list of `FileSpec` objects
0416 :param kwargs: extra kwargs to be passed to copytool transfer handler
0417 :raise: PilotException in case of controlled error
0418 """
0419
0420 raise NotImplementedError()
0421
0422 def transfer(self, files, activity='default', **kwargs):
0423 """
0424 Automatically stage passed files using copy tools related to given `activity`
0425 :param files: list of `FileSpec` objects
0426 :param activity: list of activity names used to determine appropriate copytool (prioritized list)
0427 :param kwargs: extra kwargs to be passed to copytool transfer handler
0428 :raise: PilotException in case of controlled error
0429 :return: list of processed `FileSpec` objects
0430 """
0431
0432 self.trace_report.update(relativeStart=time.time(), transferStart=time.time())
0433
0434 try:
0435 if isinstance(activity, basestring):
0436 activity = [activity]
0437 except Exception:
0438 if isinstance(activity, str):
0439 activity = [activity]
0440 if 'default' not in activity:
0441 activity.append('default')
0442
0443 copytools = None
0444 for aname in activity:
0445 copytools = self.acopytools.get(aname)
0446 if copytools:
0447 break
0448
0449 if not copytools:
0450 raise PilotException('failed to resolve copytool by preferred activities=%s, acopytools=%s' %
0451 (activity, self.acopytools))
0452
0453
0454 self.prepare_inputddms(files)
0455
0456
0457 for fspec in files:
0458 if fspec.ddm_activity:
0459 continue
0460 if self.mode == 'stage-in':
0461 if os.environ.get('PILOT_ES_EXECUTOR_TYPE', 'generic') == 'raythena':
0462 fspec.status = 'no_transfer'
0463
0464 try:
0465 fspec.ddm_activity = filter(None, ['read_lan' if fspec.ddmendpoint in fspec.inputddms else None, 'read_wan'])
0466 except Exception:
0467 fspec.ddm_activity = [_f for _f in
0468 ['read_lan' if fspec.ddmendpoint in fspec.inputddms else None, 'read_wan'] if
0469 _f]
0470 else:
0471 try:
0472 fspec.ddm_activity = filter(None, ['write_lan' if fspec.ddmendpoint in fspec.inputddms else None, 'write_wan'])
0473 except Exception:
0474 fspec.ddm_activity = [_f for _f in
0475 ['write_lan' if fspec.ddmendpoint in fspec.inputddms else None, 'write_wan']
0476 if _f]
0477 caught_errors = []
0478
0479 for name in copytools:
0480
0481
0482 remain_files = [e for e in files if e.status not in ['remote_io', 'transferred', 'no_transfer']]
0483
0484 if not remain_files:
0485 break
0486
0487 try:
0488 if name not in self.copytool_modules:
0489 raise PilotException('passed unknown copytool with name=%s .. skipped' % name,
0490 code=ErrorCodes.UNKNOWNCOPYTOOL)
0491
0492 module = self.copytool_modules[name]['module_name']
0493 self.logger.info('trying to use copytool=%s for activity=%s', name, activity)
0494 copytool = __import__('pilot.copytool.%s' % module, globals(), locals(), [module], 0)
0495
0496
0497 except PilotException as exc:
0498 caught_errors.append(exc)
0499 self.logger.debug('error: %s', exc)
0500 continue
0501 except Exception as exc:
0502 self.logger.warning('failed to import copytool module=%s, error=%s', module, exc)
0503 continue
0504
0505 try:
0506 result = self.transfer_files(copytool, remain_files, activity, **kwargs)
0507 self.logger.debug('transfer_files() using copytool=%s completed with result=%s', copytool, str(result))
0508 show_memory_usage()
0509 break
0510 except PilotException as exc:
0511 self.logger.warning('failed to transfer_files() using copytool=%s .. skipped; error=%s', copytool, exc)
0512 caught_errors.append(exc)
0513 except TimeoutException as exc:
0514 self.logger.warning('function timed out: %s', exc)
0515 caught_errors.append(exc)
0516 except Exception as exc:
0517 self.logger.warning('failed to transfer files using copytool=%s .. skipped; error=%s', copytool, exc)
0518 caught_errors.append(exc)
0519 import traceback
0520 self.logger.error(traceback.format_exc())
0521
0522 if caught_errors and isinstance(caught_errors[-1], PilotException) and \
0523 caught_errors[-1].get_error_code() == ErrorCodes.MISSINGOUTPUTFILE:
0524 raise caught_errors[-1]
0525
0526 remain_files = [fspec for fspec in files if fspec.status not in ['remote_io', 'transferred', 'no_transfer']]
0527
0528 if remain_files:
0529
0530 errmsg = str(caught_errors[0]) if caught_errors else ''
0531 if caught_errors and "Cannot authenticate" in str(caught_errors):
0532 code = ErrorCodes.STAGEINAUTHENTICATIONFAILURE
0533 elif caught_errors and "bad queue configuration" in str(caught_errors):
0534 code = ErrorCodes.BADQUEUECONFIGURATION
0535 elif caught_errors and isinstance(caught_errors[0], PilotException):
0536 code = caught_errors[0].get_error_code()
0537 errmsg = caught_errors[0].get_last_error()
0538 elif caught_errors and isinstance(caught_errors[0], TimeoutException):
0539 code = ErrorCodes.STAGEINTIMEOUT if self.mode == 'stage-in' else ErrorCodes.STAGEOUTTIMEOUT
0540 self.logger.warning('caught time-out exception: %s', caught_errors[0])
0541 else:
0542 code = ErrorCodes.STAGEINFAILED if self.mode == 'stage-in' else ErrorCodes.STAGEOUTFAILED
0543 details = str(caught_errors) + ":" + 'failed to transfer files using copytools=%s' % copytools
0544 self.logger.fatal(details)
0545 raise PilotException(details, code=code)
0546
0547 return files
0548
0549 def require_protocols(self, files, copytool, activity, local_dir=''):
0550 """
0551 Populates fspec.protocols and fspec.turl for each entry in `files` according to preferred fspec.ddm_activity
0552 :param files: list of `FileSpec` objects
0553 :param activity: str or ordered list of transfer activity names to resolve acopytools related data
0554 :return: None
0555 """
0556
0557 allowed_schemas = getattr(copytool, 'allowed_schemas', None)
0558
0559 if self.infosys and self.infosys.queuedata:
0560 copytool_name = copytool.__name__.rsplit('.', 1)[-1]
0561 allowed_schemas = self.infosys.queuedata.resolve_allowed_schemas(activity, copytool_name) or allowed_schemas
0562
0563 if local_dir:
0564 for fdat in files:
0565 if not local_dir.endswith('/'):
0566 local_dir += '/'
0567 fdat.protocols = [{'endpoint': local_dir, 'flavour': '', 'id': 0, 'path': ''}]
0568 else:
0569 files = self.resolve_protocols(files)
0570
0571 ddmconf = self.infosys.resolve_storage_data()
0572
0573 for fspec in files:
0574
0575 protocols = self.resolve_protocol(fspec, allowed_schemas)
0576 if not protocols and 'mv' not in self.infosys.queuedata.copytools:
0577 error = 'Failed to resolve protocol for file=%s, allowed_schemas=%s, fspec=%s' % (fspec.lfn, allowed_schemas, fspec)
0578 self.logger.error("resolve_protocol: %s", error)
0579 raise PilotException(error, code=ErrorCodes.NOSTORAGEPROTOCOL)
0580
0581
0582 protocol = protocols[0]
0583
0584 self.logger.info("Resolved protocol to be used for transfer: \'%s\': lfn=\'%s\'", protocol, fspec.lfn)
0585
0586 resolve_surl = getattr(copytool, 'resolve_surl', None)
0587 if not callable(resolve_surl):
0588 resolve_surl = self.resolve_surl
0589
0590 r = resolve_surl(fspec, protocol, ddmconf, local_dir=local_dir)
0591 if r.get('surl'):
0592 fspec.turl = r['surl']
0593
0594 if r.get('ddmendpoint'):
0595 fspec.ddmendpoint = r['ddmendpoint']
0596
0597 def resolve_protocols(self, files):
0598 """
0599 Populates filespec.protocols for each entry from `files` according to preferred `fspec.ddm_activity` value
0600 :param files: list of `FileSpec` objects
0601 fdat.protocols = [dict(endpoint, path, flavour), ..]
0602 :return: `files`
0603 """
0604
0605 ddmconf = self.infosys.resolve_storage_data()
0606
0607 for fdat in files:
0608 ddm = ddmconf.get(fdat.ddmendpoint)
0609 if not ddm:
0610 error = 'Failed to resolve output ddmendpoint by name=%s (from PanDA), please check configuration.' % fdat.ddmendpoint
0611 self.logger.error("resolve_protocols: %s, fspec=%s", error, fdat)
0612 raise PilotException(error, code=ErrorCodes.NOSTORAGE)
0613
0614 protocols = []
0615 for aname in fdat.ddm_activity:
0616 protocols = ddm.arprotocols.get(aname)
0617 if protocols:
0618 break
0619
0620 fdat.protocols = protocols
0621
0622 return files
0623
0624 @classmethod
0625 def resolve_protocol(self, fspec, allowed_schemas=None):
0626 """
0627 Resolve protocols according to allowed schema
0628 :param fspec: `FileSpec` instance
0629 :param allowed_schemas: list of allowed schemas or any if None
0630 :return: list of dict(endpoint, path, flavour)
0631 """
0632
0633 if not fspec.protocols:
0634 return []
0635
0636 protocols = []
0637
0638 allowed_schemas = allowed_schemas or [None]
0639 for schema in allowed_schemas:
0640 for pdat in fspec.protocols:
0641 if schema is None or pdat.get('endpoint', '').startswith("%s://" % schema):
0642 protocols.append(pdat)
0643
0644 return protocols
0645
0646
0647 class StageInClient(StagingClient):
0648
0649 mode = "stage-in"
0650
0651 def resolve_replica(self, fspec, primary_schemas=None, allowed_schemas=None, domain=None):
0652 """
0653 Resolve input replica (matched by `domain` if need) first according to `primary_schemas`,
0654 if not found then look up within `allowed_schemas`
0655 Primary schemas ignore replica priority (used to resolve direct access replica, which could be not with top priority set)
0656 :param fspec: input `FileSpec` objects
0657 :param allowed_schemas: list of allowed schemas or any if None
0658 :return: dict(surl, ddmendpoint, pfn, domain) or None if replica not found
0659 """
0660
0661 if not fspec.replicas:
0662 self.logger.warning('resolve_replica() received no fspec.replicas')
0663 return
0664
0665 allowed_schemas = allowed_schemas or [None]
0666 primary_replica, replica = None, None
0667
0668
0669 replicas = {}
0670
0671 for rinfo in fspec.replicas:
0672
0673 replicas.setdefault(rinfo['ddmendpoint'], []).append(rinfo)
0674
0675 if rinfo['domain'] != domain:
0676 continue
0677 if primary_schemas and not primary_replica:
0678 primary_replica = self.get_preferred_replica([rinfo], primary_schemas)
0679 if not replica:
0680 replica = self.get_preferred_replica([rinfo], allowed_schemas)
0681
0682 if replica and primary_replica:
0683 break
0684
0685 replica = primary_replica or replica
0686
0687 if not replica:
0688 schemas = 'any' if not allowed_schemas[0] else ','.join(allowed_schemas)
0689 pschemas = 'any' if primary_schemas and not primary_schemas[0] else ','.join(primary_schemas or [])
0690
0691 error = 'Failed to find replica for file=%s, domain=%s, allowed_schemas=%s, pschemas=%s, fspec=%s' % (fspec.lfn, domain, schemas, pschemas, fspec)
0692 self.logger.info("resolve_replica: %s", error)
0693 return
0694
0695
0696 rse_replicas = replicas.get(replica['ddmendpoint'], [])
0697 surl = self.get_preferred_replica(rse_replicas, ['srm']) or rse_replicas[0]
0698 self.logger.info("[stage-in] surl (srm replica) from Rucio: pfn=%s, ddmendpoint=%s", surl['pfn'], surl['ddmendpoint'])
0699
0700 return {'surl': surl['pfn'], 'ddmendpoint': replica['ddmendpoint'], 'pfn': replica['pfn'], 'domain': replica['domain']}
0701
0702 def get_direct_access_variables(self, job):
0703 """
0704 Return the direct access settings for the PQ.
0705
0706 :param job: job object.
0707 :return: allow_direct_access (bool), direct_access_type (string).
0708 """
0709
0710 allow_direct_access, direct_access_type = False, ''
0711 if self.infosys.queuedata:
0712 allow_direct_access = self.infosys.queuedata.direct_access_lan or self.infosys.queuedata.direct_access_wan
0713 if self.infosys.queuedata.direct_access_lan:
0714 direct_access_type = 'LAN'
0715 if self.infosys.queuedata.direct_access_wan:
0716 direct_access_type = 'WAN'
0717 else:
0718 self.logger.info('infosys.queuedata is not initialized: direct access mode will be DISABLED by default')
0719
0720 if job and not job.is_analysis() and job.transfertype != 'direct':
0721 allow_direct_access = False
0722 self.logger.info('switched off direct access mode for production job since transfertype=%s', job.transfertype)
0723
0724 return allow_direct_access, direct_access_type
0725
0726 def transfer_files(self, copytool, files, activity=None, **kwargs):
0727 """
0728 Automatically stage in files using the selected copy tool module.
0729
0730 :param copytool: copytool module
0731 :param files: list of `FileSpec` objects
0732 :param kwargs: extra kwargs to be passed to copytool transfer handler
0733
0734 :return: list of processed `FileSpec` objects
0735 :raise: PilotException in case of controlled error
0736 """
0737
0738 if getattr(copytool, 'require_replicas', False) and files:
0739 if files[0].replicas is None:
0740 files = self.resolve_replicas(files, use_vp=kwargs['use_vp'])
0741
0742 allowed_schemas = getattr(copytool, 'allowed_schemas', None)
0743
0744 if self.infosys and self.infosys.queuedata:
0745 copytool_name = copytool.__name__.rsplit('.', 1)[-1]
0746 allowed_schemas = self.infosys.queuedata.resolve_allowed_schemas(activity, copytool_name) or allowed_schemas
0747
0748
0749 if kwargs['use_vp']:
0750 allowed_schemas = ['root']
0751 self.logger.debug('overwrote allowed_schemas for VP job: %s', str(allowed_schemas))
0752
0753 for fspec in files:
0754 resolve_replica = getattr(copytool, 'resolve_replica', None)
0755 resolve_replica = self.resolve_replica if not callable(resolve_replica) else resolve_replica
0756
0757 replica = None
0758
0759
0760
0761 if fspec.allow_lan:
0762
0763 primary_schemas = (self.direct_localinput_allowed_schemas if fspec.direct_access_lan and
0764 fspec.is_directaccess(ensure_replica=False) else None)
0765 replica = resolve_replica(fspec, primary_schemas, allowed_schemas, domain='lan')
0766 else:
0767 self.logger.info("[stage-in] LAN access is DISABLED for lfn=%s (fspec.allow_lan=%s)", fspec.lfn, fspec.allow_lan)
0768
0769 if not replica and fspec.allow_lan:
0770 self.logger.info("[stage-in] No LAN replica found for lfn=%s, primary_schemas=%s, allowed_schemas=%s",
0771 fspec.lfn, primary_schemas, allowed_schemas)
0772
0773
0774 if not replica and fspec.allow_wan:
0775
0776 primary_schemas = (self.direct_remoteinput_allowed_schemas if fspec.direct_access_wan and
0777 fspec.is_directaccess(ensure_replica=False) else None)
0778 xschemas = self.remoteinput_allowed_schemas
0779 allowed_schemas = [schema for schema in allowed_schemas if schema in xschemas] if allowed_schemas else xschemas
0780 replica = resolve_replica(fspec, primary_schemas, allowed_schemas, domain='wan')
0781
0782 if not replica and fspec.allow_wan:
0783 self.logger.info("[stage-in] No WAN replica found for lfn=%s, primary_schemas=%s, allowed_schemas=%s",
0784 fspec.lfn, primary_schemas, allowed_schemas)
0785 if not replica:
0786 raise ReplicasNotFound('No replica found for lfn=%s (allow_lan=%s, allow_wan=%s)' % (fspec.lfn, fspec.allow_lan, fspec.allow_wan))
0787
0788 if replica.get('pfn'):
0789 fspec.turl = replica['pfn']
0790 if replica.get('surl'):
0791 fspec.surl = replica['surl']
0792 if replica.get('ddmendpoint'):
0793 fspec.ddmendpoint = replica['ddmendpoint']
0794 if replica.get('domain'):
0795 fspec.domain = replica['domain']
0796
0797 self.logger.info("[stage-in] found replica to be used for lfn=%s: ddmendpoint=%s, pfn=%s", fspec.lfn, fspec.ddmendpoint, fspec.turl)
0798
0799
0800 if getattr(copytool, 'require_input_protocols', False) and files:
0801 self.require_protocols(files, copytool, activity, local_dir=kwargs['input_dir'])
0802
0803
0804 self.set_status_for_direct_access(files, kwargs.get('workdir', ''))
0805
0806
0807 remain_files = [e for e in files if e.status not in ['direct', 'remote_io', 'transferred', 'no_transfer']]
0808
0809 if not remain_files:
0810 return files
0811
0812 if not copytool.is_valid_for_copy_in(remain_files):
0813 msg = 'input is not valid for transfers using copytool=%s' % copytool
0814 self.logger.warning(msg)
0815 self.logger.debug('input: %s', remain_files)
0816 self.trace_report.update(clientState='NO_REPLICA', stateReason=msg)
0817 self.trace_report.send()
0818 raise PilotException('invalid input data for transfer operation')
0819
0820 if self.infosys:
0821 if self.infosys.queuedata:
0822 kwargs['copytools'] = self.infosys.queuedata.copytools
0823 kwargs['ddmconf'] = self.infosys.resolve_storage_data()
0824 kwargs['activity'] = activity
0825
0826
0827 if getattr(copytool, 'check_availablespace', True):
0828 if self.infosys.queuedata.maxinputsize != -1:
0829 self.check_availablespace(remain_files)
0830 else:
0831 self.logger.info('skipping input file size check since maxinputsize=-1')
0832
0833 show_memory_usage()
0834
0835
0836 kwargs['trace_report'] = self.trace_report
0837 self.logger.info('ready to transfer (stage-in) files: %s', remain_files)
0838
0839
0840
0841
0842 return copytool.copy_in(remain_files, **kwargs)
0843
0844 def set_status_for_direct_access(self, files, workdir):
0845 """
0846 Update the FileSpec status with 'remote_io' for direct access mode.
0847 Should be called only once since the function sends traces
0848
0849 :param files: list of FileSpec objects.
0850 :param workdir: work directory (string).
0851 :return: None
0852 """
0853
0854 for fspec in files:
0855 direct_lan = (fspec.domain == 'lan' and fspec.direct_access_lan and
0856 fspec.is_directaccess(ensure_replica=True, allowed_replica_schemas=self.direct_localinput_allowed_schemas))
0857 direct_wan = (fspec.domain == 'wan' and fspec.direct_access_wan and
0858 fspec.is_directaccess(ensure_replica=True, allowed_replica_schemas=self.direct_remoteinput_allowed_schemas))
0859
0860
0861
0862
0863
0864
0865 if not direct_lan and not direct_wan:
0866 self.logger.debug('direct lan/wan transfer will not be used for lfn=%s', fspec.lfn)
0867 self.logger.debug('lfn=%s, direct_lan=%s, direct_wan=%s, direct_access_lan=%s, direct_access_wan=%s, '
0868 'direct_localinput_allowed_schemas=%s, remoteinput_allowed_schemas=%s, domain=%s',
0869 fspec.lfn, direct_lan, direct_wan, fspec.direct_access_lan, fspec.direct_access_wan,
0870 str(self.direct_localinput_allowed_schemas), str(self.direct_remoteinput_allowed_schemas), fspec.domain)
0871
0872 if direct_lan or direct_wan:
0873 fspec.status_code = 0
0874 fspec.status = 'remote_io'
0875
0876 alrb_xcache_proxy = os.environ.get('ALRB_XCACHE_PROXY', None)
0877 if alrb_xcache_proxy and direct_lan:
0878 fspec.turl = '${ALRB_XCACHE_PROXY}' + fspec.turl
0879
0880 self.logger.info('stage-in: direct access (remote i/o) will be used for lfn=%s (direct_lan=%s, direct_wan=%s), turl=%s',
0881 fspec.lfn, direct_lan, direct_wan, fspec.turl)
0882
0883
0884 localsite = os.environ.get('RUCIO_LOCAL_SITE_ID')
0885 localsite = localsite or fspec.ddmendpoint
0886 self.trace_report.update(localSite=localsite, remoteSite=fspec.ddmendpoint, filesize=fspec.filesize)
0887 self.trace_report.update(filename=fspec.lfn, guid=fspec.guid.replace('-', ''))
0888 self.trace_report.update(scope=fspec.scope, dataset=fspec.dataset)
0889 self.trace_report.update(url=fspec.turl, clientState='FOUND_ROOT', stateReason='direct_access')
0890
0891
0892
0893
0894 if config.Pilot.remotefileverification_log:
0895
0896
0897
0898 _workdir = workdir if os.path.exists(workdir) else '.'
0899 path = os.path.join(_workdir, config.Pilot.base_trace_report)
0900 if not os.path.exists(_workdir):
0901 path = os.path.join('/srv', config.Pilot.base_trace_report)
0902 if not os.path.exists(path):
0903 self.logger.debug('writing base trace report to: %s', path)
0904 write_json(path, self.trace_report)
0905 else:
0906 self.trace_report.send()
0907
0908 def check_availablespace(self, files):
0909 """
0910 Verify that enough local space is available to stage in and run the job
0911
0912 :param files: list of FileSpec objects.
0913 :raise: PilotException in case of not enough space or total input size too large
0914 """
0915
0916 for f in files:
0917 self.logger.debug('lfn=%s filesize=%d accessmode=%s', f.lfn, f.filesize, f.accessmode)
0918
0919 maxinputsize = convert_mb_to_b(get_maximum_input_sizes())
0920 totalsize = reduce(lambda x, y: x + y.filesize, files, 0)
0921
0922
0923 if maxinputsize and totalsize > maxinputsize:
0924 error = "too many/too large input files (%s). total file size=%s B > maxinputsize=%s B" % \
0925 (len(files), totalsize, maxinputsize)
0926 raise SizeTooLarge(error)
0927
0928 self.logger.info("total input file size=%s B within allowed limit=%s B (zero value means unlimited)", totalsize, maxinputsize)
0929
0930
0931 available_space = convert_mb_to_b(get_local_disk_space(os.getcwd()))
0932 self.logger.info("locally available space: %d B", available_space)
0933
0934
0935 if totalsize > available_space:
0936 error = "not enough local space for staging input files and run the job (need %d B, but only have %d B)" % \
0937 (totalsize, available_space)
0938 raise NoLocalSpace(error)
0939
0940
0941 class StageOutClient(StagingClient):
0942
0943 mode = "stage-out"
0944
0945 def prepare_destinations(self, files, activities):
0946 """
0947 Resolve destination RSE (filespec.ddmendpoint) for each entry from `files` according to requested `activities`
0948 Apply Pilot-side logic to choose proper destination
0949 :param files: list of FileSpec objects to be processed
0950 :param activities: ordered list of activities to be used to resolve astorages
0951 :return: updated fspec entries
0952 """
0953
0954 if not self.infosys.queuedata:
0955 return files
0956
0957 try:
0958 if isinstance(activities, (str, unicode)):
0959 activities = [activities]
0960 except Exception:
0961 if isinstance(activities, str):
0962 activities = [activities]
0963
0964 if not activities:
0965 raise PilotException("Failed to resolve destination: passed empty activity list. Internal error.",
0966 code=ErrorCodes.INTERNALPILOTPROBLEM, state='INTERNAL_ERROR')
0967
0968 astorages = self.infosys.queuedata.astorages or {}
0969
0970 storages = None
0971 activity = activities[0]
0972 for a in activities:
0973 storages = astorages.get(a, {})
0974 if storages:
0975 break
0976
0977 if not storages:
0978 if 'mv' in self.infosys.queuedata.copytools:
0979 return files
0980 else:
0981 raise PilotException("Failed to resolve destination: no associated storages defined for activity=%s (%s)"
0982 % (activity, ','.join(activities)), code=ErrorCodes.NOSTORAGE, state='NO_ASTORAGES_DEFINED')
0983
0984
0985 ddm = storages[0]
0986
0987 self.logger.info("[prepare_destinations][%s]: allowed (local) destinations: %s", activity, storages)
0988 self.logger.info("[prepare_destinations][%s]: resolved default destination ddm=%s", activity, ddm)
0989
0990 for e in files:
0991 if not e.ddmendpoint:
0992 self.logger.info("[prepare_destinations][%s]: fspec.ddmendpoint is not set for lfn=%s"
0993 " .. will use default ddm=%s as (local) destination", activity, e.lfn, ddm)
0994 e.ddmendpoint = ddm
0995 elif e.ddmendpoint not in storages:
0996 self.logger.info("[prepare_destinations][%s]: Requested fspec.ddmendpoint=%s is not in the list of allowed (local) destinations"
0997 " .. will consider default ddm=%s for transfer and tag %s as alt. location", activity, e.ddmendpoint, ddm, e.ddmendpoint)
0998 e.ddmendpoint = ddm
0999 e.ddmendpoint_alt = e.ddmendpoint
1000
1001 return files
1002
1003 @classmethod
1004 def get_path(self, scope, lfn, prefix='rucio'):
1005 """
1006 Construct a partial Rucio PFN using the scope and the LFN
1007 """
1008
1009
1010
1011 s = '%s:%s' % (scope, lfn)
1012 hash_hex = hashlib.md5(s.encode('utf-8')).hexdigest()
1013
1014
1015
1016 paths = scope.split('.') + [hash_hex[0:2], hash_hex[2:4], lfn]
1017 try:
1018 paths = filter(None, paths)
1019 except Exception:
1020 paths = [_f for _f in paths if _f]
1021
1022 return '/'.join(paths)
1023
1024 def resolve_surl(self, fspec, protocol, ddmconf, **kwargs):
1025 """
1026 Get final destination SURL for file to be transferred
1027 Can be customized at the level of specific copytool
1028 :param protocol: suggested protocol
1029 :param ddmconf: full ddmconf data
1030 :param activity: ordered list of preferred activity names to resolve SE protocols
1031 :return: dict with keys ('pfn', 'ddmendpoint')
1032 """
1033
1034 local_dir = kwargs.get('local_dir', '')
1035 if not local_dir:
1036
1037 ddm = ddmconf.get(fspec.ddmendpoint)
1038 if not ddm:
1039 raise PilotException('Failed to resolve ddmendpoint by name=%s' % fspec.ddmendpoint)
1040
1041
1042
1043 if not ddm.is_deterministic:
1044 raise PilotException('resolve_surl(): Failed to construct SURL for non deterministic ddm=%s: '
1045 'NOT IMPLEMENTED' % fspec.ddmendpoint, code=ErrorCodes.NONDETERMINISTICDDM)
1046
1047 surl = protocol.get('endpoint', '') + os.path.join(protocol.get('path', ''), self.get_path(fspec.scope, fspec.lfn))
1048 return {'surl': surl}
1049
1050 def transfer_files(self, copytool, files, activity, **kwargs):
1051 """
1052 Automatically stage out files using the selected copy tool module.
1053
1054 :param copytool: copytool module
1055 :param files: list of `FileSpec` objects
1056 :param activity: ordered list of preferred activity names to resolve SE protocols
1057 :param kwargs: extra kwargs to be passed to copytool transfer handler
1058
1059 :return: the output of the copytool transfer operation
1060 :raise: PilotException in case of controlled error
1061 """
1062
1063
1064
1065 for fspec in files:
1066
1067 if not fspec.ddmendpoint:
1068 if 'mv' not in self.infosys.queuedata.copytools:
1069 msg = 'no output RSE defined for file=%s' % fspec.lfn
1070 self.logger.error(msg)
1071 raise PilotException(msg, code=ErrorCodes.NOSTORAGE, state='NO_OUTPUTSTORAGE_DEFINED')
1072
1073 pfn = fspec.surl or getattr(fspec, 'pfn', None) or os.path.join(kwargs.get('workdir', ''), fspec.lfn)
1074 if not os.path.exists(pfn) or not os.access(pfn, os.R_OK):
1075 msg = "output pfn file/directory does not exist: %s" % pfn
1076 self.logger.error(msg)
1077 self.trace_report.update(clientState='MISSINGOUTPUTFILE', stateReason=msg)
1078 self.trace_report.send()
1079 raise PilotException(msg, code=ErrorCodes.MISSINGOUTPUTFILE, state="FILE_INFO_FAIL")
1080 if not fspec.filesize:
1081 fspec.filesize = os.path.getsize(pfn)
1082
1083 if not fspec.filesize:
1084 msg = 'output file has size zero: %s' % fspec.lfn
1085 self.logger.fatal(msg)
1086 raise PilotException(msg, code=ErrorCodes.ZEROFILESIZE, state="ZERO_FILE_SIZE")
1087
1088 fspec.surl = pfn
1089 fspec.activity = activity
1090 if os.path.isfile(pfn) and not fspec.checksum.get('adler32'):
1091 fspec.checksum['adler32'] = calculate_checksum(pfn)
1092
1093
1094 if getattr(copytool, 'require_protocols', True) and files:
1095 try:
1096 output_dir = kwargs['output_dir']
1097 except Exception:
1098 output_dir = ""
1099 self.require_protocols(files, copytool, activity, local_dir=output_dir)
1100
1101 if not copytool.is_valid_for_copy_out(files):
1102 self.logger.warning('Input is not valid for transfers using copytool=%s', copytool)
1103 self.logger.debug('Input: %s', files)
1104 raise PilotException('Invalid input for transfer operation')
1105
1106 self.logger.info('ready to transfer (stage-out) files: %s', files)
1107
1108 if self.infosys:
1109 kwargs['copytools'] = self.infosys.queuedata.copytools
1110
1111
1112 kwargs['ddmconf'] = self.infosys.resolve_storage_data()
1113
1114 if not files:
1115 msg = 'nothing to stage-out - an internal Pilot error has occurred'
1116 self.logger.fatal(msg)
1117 raise PilotException(msg, code=ErrorCodes.INTERNALPILOTPROBLEM)
1118
1119
1120 kwargs['trace_report'] = self.trace_report
1121
1122 return copytool.copy_out(files, **kwargs)
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163