File indexing completed on 2026-04-11 08:41:05
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 import os
0011 import re
0012 import glob
0013 from time import sleep
0014
0015 from pilot.common.errorcodes import ErrorCodes
0016 from pilot.common.exception import NoSoftwareDir
0017 from pilot.info import infosys
0018 from pilot.util.container import execute
0019 from pilot.util.filehandling import read_file, write_file, copy
0020
0021 from .metadata import get_file_info_from_xml
0022
0023 import logging
0024 logger = logging.getLogger(__name__)
0025
0026 errors = ErrorCodes()
0027
0028
0029 def get_file_system_root_path():
0030 """
0031 Return the root path of the local file system.
0032 The function returns "/cvmfs" or "/(some path)/cvmfs" in case the expected file system root path is not
0033 where it usually is (e.g. on an HPC). A site can set the base path by exporting ATLAS_SW_BASE.
0034
0035 :return: path (string)
0036 """
0037
0038 return os.environ.get('ATLAS_SW_BASE', '/cvmfs')
0039
0040
0041 def should_pilot_prepare_setup(noexecstrcnv, jobpars, imagename=None):
0042 """
0043 Determine whether the pilot should add the setup to the payload command or not.
0044 The pilot will not add asetup if jobPars already contain the information (i.e. it was set by the payload creator).
0045 If noExecStrCnv is set, then jobPars is expected to contain asetup.sh + options
0046 If a stand-alone container / user defined container is used, pilot should not prepare asetup.
0047
0048 :param noexecstrcnv: boolean.
0049 :param jobpars: job parameters (string).
0050 :param imagename: container image (string).
0051 :return: boolean.
0052 """
0053
0054 if imagename:
0055 return False
0056
0057 if noexecstrcnv:
0058 if "asetup.sh" in jobpars:
0059 logger.info("asetup will be taken from jobPars")
0060 preparesetup = False
0061 else:
0062 logger.info("noExecStrCnv is set but asetup command was not found in jobPars (pilot will prepare asetup)")
0063 preparesetup = True
0064 else:
0065 logger.info("pilot will prepare the setup")
0066 preparesetup = True
0067
0068 return preparesetup
0069
0070
0071 def get_alrb_export(add_if=False):
0072 """
0073 Return the export command for the ALRB path if it exists.
0074 If the path does not exist, return empty string.
0075
0076 :param add_if: Boolean. True means that an if statement will be placed around the export.
0077 :return: export command
0078 """
0079
0080 path = "%s/atlas.cern.ch/repo" % get_file_system_root_path()
0081 cmd = "export ATLAS_LOCAL_ROOT_BASE=%s/ATLASLocalRootBase;" % path if os.path.exists(path) else ""
0082
0083
0084 if cmd and add_if:
0085 cmd = 'if [ -z \"$ATLAS_LOCAL_ROOT_BASE\" ]; then ' + cmd + ' fi;'
0086
0087 return cmd
0088
0089
0090 def get_asetup(asetup=True, alrb=False, add_if=False):
0091 """
0092 Define the setup for asetup, i.e. including full path to asetup and setting of ATLAS_LOCAL_ROOT_BASE
0093 Only include the actual asetup script if asetup=True. This is not needed if the jobPars contain the payload command
0094 but the pilot still needs to add the exports and the atlasLocalSetup.
0095
0096 :param asetup: Boolean. True value means that the pilot should include the asetup command.
0097 :param alrb: Boolean. True value means that the function should return special setup used with ALRB and containers.
0098 :param add_if: Boolean. True means that an if statement will be placed around the export.
0099 :raises: NoSoftwareDir if appdir does not exist.
0100 :return: source <path>/asetup.sh (string).
0101 """
0102
0103 cmd = ""
0104 alrb_cmd = get_alrb_export(add_if=add_if)
0105 if alrb_cmd != "":
0106 cmd = alrb_cmd
0107 if not alrb:
0108 cmd += "source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh --quiet;"
0109 if asetup:
0110 cmd += "source $AtlasSetup/scripts/asetup.sh"
0111 else:
0112 try:
0113 appdir = infosys.queuedata.appdir
0114 except Exception:
0115 appdir = ""
0116 if appdir == "":
0117 appdir = os.environ.get('VO_ATLAS_SW_DIR', '')
0118 if appdir != "":
0119
0120 if not os.path.exists(appdir):
0121 msg = 'appdir does not exist: %s' % appdir
0122 logger.warning(msg)
0123 raise NoSoftwareDir(msg)
0124 if asetup:
0125 cmd = "source %s/scripts/asetup.sh" % appdir
0126
0127
0128
0129
0130
0131 return cmd
0132
0133
0134 def get_asetup_options(release, homepackage):
0135 """
0136 Determine the proper asetup options.
0137 :param release: ATLAS release string.
0138 :param homepackage: ATLAS homePackage string.
0139 :return: asetup options (string).
0140 """
0141
0142 asetupopt = []
0143 release = re.sub('^Atlas-', '', release)
0144
0145
0146 if 'AnalysisTransforms' in homepackage:
0147
0148 _homepackage = re.sub('^AnalysisTransforms-*', '', homepackage)
0149 if _homepackage == '' or re.search(r'^\d+\.\d+\.\d+$', release) is None:
0150 if release != "":
0151 asetupopt.append(release)
0152 if _homepackage != '':
0153 asetupopt += _homepackage.split('_')
0154
0155 else:
0156
0157 asetupopt += homepackage.split('/')
0158 if release not in homepackage and release not in asetupopt:
0159 asetupopt.append(release)
0160
0161
0162 asetupopt.append('notest')
0163
0164
0165
0166 if "ATLAS_FAST_ASETUP" in os.environ:
0167 asetupopt.append('fast')
0168
0169 return ','.join(asetupopt)
0170
0171
0172 def is_standard_atlas_job(release):
0173 """
0174 Is it a standard ATLAS job?
0175 A job is a standard ATLAS job if the release string begins with 'Atlas-'.
0176
0177 :param release: Release value (string).
0178 :return: Boolean. Returns True if standard ATLAS job.
0179 """
0180
0181 return release.startswith('Atlas-')
0182
0183
0184 def set_inds(dataset):
0185 """
0186 Set the INDS environmental variable used by runAthena.
0187
0188 :param dataset: dataset for input files (realDatasetsIn) (string).
0189 :return:
0190 """
0191
0192 inds = ""
0193 _dataset = dataset.split(',')
0194 for ds in _dataset:
0195 if "DBRelease" not in ds and ".lib." not in ds:
0196 inds = ds
0197 break
0198 if inds != "":
0199 logger.info("setting INDS environmental variable to: %s", inds)
0200 os.environ['INDS'] = inds
0201 else:
0202 logger.warning("INDS unknown")
0203
0204
0205 def get_analysis_trf(transform, workdir):
0206 """
0207 Prepare to download the user analysis transform with curl.
0208 The function will verify the download location from a known list of hosts.
0209
0210 :param transform: full trf path (url) (string).
0211 :param workdir: work directory (string).
0212 :return: exit code (int), diagnostics (string), transform_name (string)
0213 """
0214
0215 ec = 0
0216 diagnostics = ""
0217
0218
0219 harvester_workdir = os.environ.get('HARVESTER_WORKDIR')
0220 if harvester_workdir is not None:
0221 search_pattern = "%s/jobO.*.tar.gz" % harvester_workdir
0222 logger.debug("search_pattern - %s", search_pattern)
0223 jobopt_files = glob.glob(search_pattern)
0224 for jobopt_file in jobopt_files:
0225 logger.debug("jobopt_file = %s workdir = %s", jobopt_file, workdir)
0226 try:
0227 copy(jobopt_file, workdir)
0228 except Exception as error:
0229 logger.error("could not copy file %s to %s : %s", jobopt_file, workdir, error)
0230
0231 if '/' in transform:
0232 transform_name = transform.split('/')[-1]
0233 else:
0234 logger.warning('did not detect any / in %s (using full transform name)', transform)
0235 transform_name = transform
0236
0237
0238 if os.path.exists(os.path.join(workdir, transform_name)):
0239 logger.info('script %s is already available - no need to download again', transform_name)
0240 return ec, diagnostics, transform_name
0241
0242 original_base_url = ""
0243
0244
0245 for base_url in get_valid_base_urls():
0246 if transform.startswith(base_url):
0247 original_base_url = base_url
0248 break
0249
0250 if original_base_url == "":
0251 diagnostics = "invalid base URL: %s" % transform
0252 return errors.TRFDOWNLOADFAILURE, diagnostics, ""
0253
0254
0255 status = False
0256 for base_url in get_valid_base_urls(order=original_base_url):
0257 trf = re.sub(original_base_url, base_url, transform)
0258 logger.debug("attempting to download script: %s", trf)
0259 status, diagnostics = download_transform(trf, transform_name, workdir)
0260 if status:
0261 break
0262
0263 if not status:
0264 return errors.TRFDOWNLOADFAILURE, diagnostics, ""
0265
0266 logger.info("successfully downloaded script")
0267 path = os.path.join(workdir, transform_name)
0268 logger.debug("changing permission of %s to 0o755", path)
0269 try:
0270 os.chmod(path, 0o755)
0271 except Exception as error:
0272 diagnostics = "failed to chmod %s: %s" % (transform_name, error)
0273 return errors.CHMODTRF, diagnostics, ""
0274
0275 return ec, diagnostics, transform_name
0276
0277
0278 def download_transform(url, transform_name, workdir):
0279 """
0280 Download the transform from the given url
0281 :param url: download URL with path to transform (string).
0282 :param transform_name: trf name (string).
0283 :param workdir: work directory (string).
0284 :return:
0285 """
0286
0287 status = False
0288 diagnostics = ""
0289 path = os.path.join(workdir, transform_name)
0290 cmd = 'curl -sS \"%s\" > %s' % (url, path)
0291 trial = 1
0292 max_trials = 3
0293
0294
0295 harvester_workdir = os.environ.get('HARVESTER_WORKDIR')
0296 if harvester_workdir is not None:
0297
0298 max_trials = 0
0299 source_path = os.path.join(harvester_workdir, transform_name)
0300 try:
0301 copy(source_path, path)
0302 status = True
0303 except Exception as error:
0304 status = False
0305 diagnostics = "Failed to copy file %s to %s : %s" % (source_path, path, error)
0306 logger.error(diagnostics)
0307
0308
0309 while trial <= max_trials:
0310 logger.info("executing command [trial %d/%d]: %s", trial, max_trials, cmd)
0311
0312 exit_code, stdout, stderr = execute(cmd, mute=True)
0313 if not stdout:
0314 stdout = "(None)"
0315 if exit_code != 0:
0316
0317 diagnostics = "curl command failed: %d, %s, %s" % (exit_code, stdout, stderr)
0318 logger.warning(diagnostics)
0319 if trial == max_trials:
0320 logger.fatal('could not download transform: %s', stdout)
0321 status = False
0322 break
0323 else:
0324 logger.info("will try again after 60 s")
0325 sleep(60)
0326 else:
0327 logger.info("curl command returned: %s", stdout)
0328 status = True
0329 break
0330 trial += 1
0331
0332 return status, diagnostics
0333
0334
0335 def get_valid_base_urls(order=None):
0336 """
0337 Return a list of valid base URLs from where the user analysis transform may be downloaded from.
0338 If order is defined, return given item first.
0339 E.g. order=http://atlpan.web.cern.ch/atlpan -> ['http://atlpan.web.cern.ch/atlpan', ...]
0340 NOTE: the URL list may be out of date.
0341
0342 :param order: order (string).
0343 :return: valid base URLs (list).
0344 """
0345
0346 valid_base_urls = []
0347 _valid_base_urls = ["http://www.usatlas.bnl.gov",
0348 "https://www.usatlas.bnl.gov",
0349 "http://pandaserver.cern.ch",
0350 "http://atlpan.web.cern.ch/atlpan",
0351 "https://atlpan.web.cern.ch/atlpan",
0352 "http://classis01.roma1.infn.it",
0353 "http://atlas-install.roma1.infn.it"]
0354
0355 if order:
0356 valid_base_urls.append(order)
0357 for url in _valid_base_urls:
0358 if url != order:
0359 valid_base_urls.append(url)
0360 else:
0361 valid_base_urls = _valid_base_urls
0362
0363 return valid_base_urls
0364
0365
0366 def get_payload_environment_variables(cmd, job_id, task_id, attempt_nr, processing_type, site_name, analysis_job):
0367 """
0368 Return an array with enviroment variables needed by the payload.
0369
0370 :param cmd: payload execution command (string).
0371 :param job_id: PanDA job id (string).
0372 :param task_id: PanDA task id (string).
0373 :param attempt_nr: PanDA job attempt number (int).
0374 :param processing_type: processing type (string).
0375 :param site_name: site name (string).
0376 :param analysis_job: True for user analysis jobs, False otherwise (boolean).
0377 :return: list of environment variables needed by the payload.
0378 """
0379
0380 variables = []
0381 variables.append('export PANDA_RESOURCE=\'%s\';' % site_name)
0382 variables.append('export FRONTIER_ID=\"[%s_%s]\";' % (task_id, job_id))
0383 variables.append('export CMSSW_VERSION=$FRONTIER_ID;')
0384 variables.append('export PandaID=%s;' % os.environ.get('PANDAID', 'unknown'))
0385 variables.append('export PanDA_TaskID=\'%s\';' % os.environ.get('PanDA_TaskID', 'unknown'))
0386 variables.append('export PanDA_AttemptNr=\'%d\';' % attempt_nr)
0387 variables.append('export INDS=\'%s\';' % os.environ.get('INDS', 'unknown'))
0388
0389
0390 if "Merge_tf" in cmd and 'ATHENA_PROC_NUMBER' in os.environ:
0391 variables.append('unset ATHENA_PROC_NUMBER;')
0392 variables.append('unset ATHENA_CORE_NUMBER;')
0393
0394 if analysis_job:
0395 variables.append('export ROOT_TTREECACHE_SIZE=1;')
0396 try:
0397 core_count = int(os.environ.get('ATHENA_PROC_NUMBER'))
0398 except Exception:
0399 _core_count = 'export ROOTCORE_NCPUS=1;'
0400 else:
0401 _core_count = 'export ROOTCORE_NCPUS=%d;' % core_count
0402 variables.append(_core_count)
0403
0404 if processing_type == "":
0405 logger.warning("RUCIO_APPID needs job.processingType but it is not set!")
0406 else:
0407 variables.append('export RUCIO_APPID=\'%s\';' % processing_type)
0408 variables.append('export RUCIO_ACCOUNT=\'%s\';' % os.environ.get('RUCIO_ACCOUNT', 'pilot'))
0409
0410 return variables
0411
0412
0413 def get_writetoinput_filenames(writetofile):
0414 """
0415 Extract the writeToFile file name(s).
0416 writeToFile='tmpin_mc16_13TeV.blah:AOD.15760866._000002.pool.root.1'
0417 -> return 'tmpin_mc16_13TeV.blah'
0418
0419 :param writetofile: string containing file name information.
0420 :return: list of file names
0421 """
0422
0423 filenames = []
0424 entries = writetofile.split('^')
0425 for entry in entries:
0426 if ':' in entry:
0427 name = entry.split(":")[0]
0428 name = name.replace('.pool.root.', '.txt.')
0429 filenames.append(name)
0430
0431 return filenames
0432
0433
0434 def replace_lfns_with_turls(cmd, workdir, filename, infiles, writetofile=""):
0435 """
0436 Replace all LFNs with full TURLs in the payload execution command.
0437
0438 This function is used with direct access in production jobs. Athena requires a full TURL instead of LFN.
0439
0440 :param cmd: payload execution command (string).
0441 :param workdir: location of metadata file (string).
0442 :param filename: metadata file name (string).
0443 :param infiles: list of input files.
0444 :param writetofile:
0445 :return: updated cmd (string).
0446 """
0447
0448 turl_dictionary = {}
0449 path = os.path.join(workdir, filename)
0450 if os.path.exists(path):
0451 file_info_dictionary = get_file_info_from_xml(workdir, filename=filename)
0452 for inputfile in infiles:
0453 if inputfile in cmd:
0454 turl = file_info_dictionary[inputfile][0]
0455 turl_dictionary[inputfile] = turl
0456
0457 if turl not in cmd:
0458 cmd = cmd.replace(inputfile, turl)
0459 logger.info("replaced '%s' with '%s' in the run command", inputfile, turl)
0460
0461
0462 if writetofile and turl_dictionary:
0463 filenames = get_writetoinput_filenames(writetofile)
0464 for fname in filenames:
0465 new_lines = []
0466 path = os.path.join(workdir, fname)
0467 if os.path.exists(path):
0468 f = read_file(path)
0469 for line in f.split('\n'):
0470 fname = os.path.basename(line)
0471 if fname in turl_dictionary:
0472 turl = turl_dictionary[fname]
0473 new_lines.append(turl)
0474 else:
0475 if line:
0476 new_lines.append(line)
0477
0478 lines = '\n'.join(new_lines)
0479 if lines:
0480 write_file(path, lines)
0481 else:
0482 logger.warning("file does not exist: %s", path)
0483 else:
0484 logger.warning("could not find file: %s (cannot locate TURLs for direct access)", filename)
0485
0486 return cmd