Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:16

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2018-2019
0009 # - Danila Oleynik danila.oleynik@cern.ch, 2018
0010 
0011 import logging
0012 import os
0013 import shutil
0014 import sys
0015 import time
0016 
0017 from .jobdescription import JobDescription  # Python 2/3
0018 from pilot.common.exception import FileHandlingFailure
0019 from pilot.util.config import config
0020 from pilot.util.constants import PILOT_PRE_STAGEIN, PILOT_POST_STAGEIN
0021 from pilot.util.filehandling import read_json, write_json, remove
0022 #from pilot.util.mpi import get_ranks_info
0023 from pilot.util.timing import add_to_pilot_timing
0024 
0025 logger = logging.getLogger(__name__)
0026 
0027 
0028 def get_job(harvesterpath):
0029     """
0030     Return job description in dictionary and MPI rank (if applicable)
0031 
0032     :param harvesterpath: path to config.Harvester.jobs_list_file (string).
0033     :return: job object, rank (int).
0034     """
0035 
0036     rank = 0
0037     job = None
0038     logger.info("Going to read job definition from file")
0039 
0040     pandaids_list_filename = os.path.join(harvesterpath, config.Harvester.jobs_list_file)
0041     if not os.path.isfile(pandaids_list_filename):
0042         logger.info("File with PanDA IDs are missing. Nothing to execute.")
0043         return job, rank
0044 
0045     harvesterpath = os.path.abspath(harvesterpath)
0046     #rank, max_ranks = get_ranks_info()
0047 
0048     pandaids = read_json(pandaids_list_filename)
0049     logger.info('Got {0} job ids'.format(len(pandaids)))
0050     pandaid = pandaids[rank]
0051     job_workdir = os.path.join(harvesterpath, str(pandaid))
0052 
0053     logger.info('Rank: {2} with job {0} will have work directory {1}'.format(pandaid, job_workdir, rank))
0054 
0055     job_def_filename = os.path.join(job_workdir, config.Harvester.pandajob_file)
0056     jobs_dict = read_json(job_def_filename)
0057     job_dict = jobs_dict[str(pandaid)]
0058     job = JobDescription()
0059     job.load(job_dict)
0060 
0061     return job, rank
0062 
0063 
0064 def get_setup(job=None):
0065     """
0066     Return the resource specific setup.
0067 
0068     :param job: optional job object.
0069     :return: setup commands (list).
0070     """
0071 
0072     setup_commands = ['source /ccs/proj/csc108/athena_grid_env/setup.sh',
0073                       'source $MODULESHOME/init/bash',
0074                       'tmp_dirname=/tmp/scratch',
0075                       'tmp_dirname+="/tmp"',
0076                       'export TEMP=$tmp_dirname',
0077                       'export TMPDIR=$TEMP',
0078                       'export TMP=$TEMP',
0079                       'export LD_LIBRARY_PATH=/ccs/proj/csc108/AtlasReleases/ldpatch:$LD_LIBRARY_PATH',
0080                       'export ATHENA_PROC_NUMBER=16',
0081                       'export G4ATLAS_SKIPFILEPEEK=1',
0082                       'export PANDA_RESOURCE=\"ORNL_Titan_MCORE\"',
0083                       'export ROOT_TTREECACHE_SIZE=1',
0084                       'export RUCIO_APPID=\"simul\"',
0085                       'export RUCIO_ACCOUNT=\"pilot\"',
0086                       'export CORAL_DBLOOKUP_PATH=/ccs/proj/csc108/AtlasReleases/21.0.15/nfs_db_files',
0087                       'export CORAL_AUTH_PATH=$SW_INSTALL_AREA/DBRelease/current/XMLConfig',
0088                       'export DATAPATH=$SW_INSTALL_AREA/DBRelease/current:$DATAPATH',
0089                       'unset FRONTIER_SERVER',
0090                       ' ']
0091 
0092     return setup_commands
0093 
0094 
0095 def set_job_workdir(job, path):
0096     """
0097     Point pilot to job working directory (job id).
0098 
0099     :param job: job object.
0100     :param path: local path to Harvester access point (string).
0101     :return: job working directory (string).
0102     """
0103     work_dir = os.path.join(path, str(job.jobid))
0104     os.chdir(work_dir)
0105 
0106     return work_dir
0107 
0108 
0109 def set_scratch_workdir(job, work_dir, args):
0110     """
0111     Copy input files and some db files to RAM disk.
0112 
0113     :param job: job object.
0114     :param work_dir: job working directory (permanent FS) (string).
0115     :param args: args dictionary to collect timing metrics.
0116     :return: job working directory in scratch (string).
0117     """
0118 
0119     scratch_path = config.HPC.scratch
0120     job_scratch_dir = os.path.join(scratch_path, str(job.jobid))
0121     for inp_file in job.input_files:
0122         job.input_files[inp_file]["scratch_path"] = job_scratch_dir
0123     logger.debug("Job scratch path: {0}".format(job_scratch_dir))
0124     # special data, that should be preplaced in RAM disk
0125     dst_db_path = 'sqlite200/'
0126     dst_db_filename = 'ALLP200.db'
0127     dst_db_path_2 = 'geomDB/'
0128     dst_db_filename_2 = 'geomDB_sqlite'
0129     tmp_path = 'tmp/'
0130     src_file = '/ccs/proj/csc108/AtlasReleases/21.0.15/DBRelease/current/sqlite200/ALLP200.db'
0131     src_file_2 = '/ccs/proj/csc108/AtlasReleases/21.0.15/DBRelease/current/geomDB/geomDB_sqlite'
0132 
0133     if os.path.exists(scratch_path):
0134         try:
0135             add_to_pilot_timing(job.jobid, PILOT_PRE_STAGEIN, time.time(), args)
0136             logger.debug("Prepare \'tmp\' dir in scratch ")
0137             if not os.path.exists(scratch_path + tmp_path):
0138                 os.makedirs(scratch_path + tmp_path)
0139             logger.debug("Prepare dst and copy sqlite db files")
0140             t0 = time.time()
0141             if not os.path.exists(scratch_path + dst_db_path):
0142                 os.makedirs(scratch_path + dst_db_path)
0143             shutil.copyfile(src_file, scratch_path + dst_db_path + dst_db_filename)
0144             logger.debug("")
0145             sql_cp_time = time.time() - t0
0146             logger.debug("Copy of sqlite files took: {0}".format(sql_cp_time))
0147             logger.debug("Prepare dst and copy geomDB files")
0148             t0 = time.time()
0149             if not os.path.exists(scratch_path + dst_db_path_2):
0150                 os.makedirs(scratch_path + dst_db_path_2)
0151             shutil.copyfile(src_file_2, scratch_path + dst_db_path_2 + dst_db_filename_2)
0152             geomdb_cp_time = time.time() - t0
0153             logger.debug("Copy of geomDB files took: {0} s".format(geomdb_cp_time))
0154             logger.debug("Prepare job scratch dir")
0155             t0 = time.time()
0156             if not os.path.exists(job_scratch_dir):
0157                 os.makedirs(job_scratch_dir)
0158             logger.debug("Copy input file")
0159             for inp_file in job.input_files:
0160                 logger.debug("Copy: {0} to {1}".format(os.path.join(work_dir, inp_file),
0161                                                        job.input_files[inp_file]["scratch_path"]))
0162                 shutil.copyfile(os.path.join(work_dir, inp_file),
0163                                 os.path.join(job.input_files[inp_file]["scratch_path"], inp_file))
0164             input_cp_time = time.time() - t0
0165             logger.debug("Copy of input files took: {0} s".format(input_cp_time))
0166         except IOError as e:
0167             logger.error("I/O error({0}): {1}".format(e.errno, e.strerror))
0168             logger.error("Copy to scratch failed, execution terminated': \n %s " % (sys.exc_info()[1]))
0169             raise FileHandlingFailure("Copy to RAM disk failed")
0170         finally:
0171             add_to_pilot_timing(job.jobid, PILOT_POST_STAGEIN, time.time(), args)
0172     else:
0173         logger.info('Scratch directory (%s) dos not exist' % scratch_path)
0174         return work_dir
0175 
0176     os.chdir(job_scratch_dir)
0177     logger.debug("Current directory: {0}".format(os.getcwd()))
0178     true_dir = '/ccs/proj/csc108/AtlasReleases/21.0.15/nfs_db_files'
0179     pseudo_dir = "./poolcond"
0180     os.symlink(true_dir, pseudo_dir)
0181 
0182     return job_scratch_dir
0183 
0184 
0185 def process_jobreport(payload_report_file, job_scratch_path, job_communication_point):
0186     """
0187     Copy job report file to make it accessible by Harvester. Shrink job report file.
0188 
0189     :param payload_report_file: name of job report (string).
0190     :param job_scratch_path: path to scratch directory (string).
0191     :param job_communication_point: path to updated job report accessible by Harvester (string).
0192     :raises FileHandlingFailure: in case of IOError.
0193     """
0194 
0195     src_file = os.path.join(job_scratch_path, payload_report_file)
0196     dst_file = os.path.join(job_communication_point, payload_report_file)
0197 
0198     try:
0199         logger.info(
0200             "Copy of payload report [{0}] to access point: {1}".format(payload_report_file, job_communication_point))
0201         # shrink jobReport
0202         job_report = read_json(src_file)
0203         if 'executor' in job_report:
0204             for executor in job_report['executor']:
0205                 if 'logfileReport' in executor:
0206                     executor['logfileReport'] = {}
0207 
0208         write_json(dst_file, job_report)
0209 
0210     except IOError:
0211         logger.error("Job report copy failed, execution terminated':  \n %s " % (sys.exc_info()[1]))
0212         raise FileHandlingFailure("Job report copy from RAM failed")
0213 
0214 
0215 def postprocess_workdir(workdir):
0216     """
0217     Post-processing of working directory. Unlink paths.
0218 
0219     :param workdir: path to directory to be processed (string).
0220     :raises FileHandlingFailure: in case of IOError.
0221     """
0222 
0223     pseudo_dir = "poolcond"
0224     try:
0225         if os.path.exists(pseudo_dir):
0226             remove(os.path.join(workdir, pseudo_dir))
0227     except IOError:
0228         raise FileHandlingFailure("Post processing of working directory failed")
0229 
0230 
0231 def command_fix(command, job_scratch_dir):
0232     """
0233     Modification of payload parameters, to be executed on Titan on RAM disk. Some cleanup.
0234 
0235     :param command: payload command (string).
0236     :param job_scratch_dir: local path to input files (string).
0237     :return: updated/fixed payload command (string).
0238     """
0239 
0240     subs_a = command.split()
0241     for i in range(len(subs_a)):
0242         if i > 0:
0243             if '(' in subs_a[i] and not subs_a[i][0] == '"':
0244                 subs_a[i] = '"' + subs_a[i] + '"'
0245             if subs_a[i].startswith("--inputEVNTFile"):
0246                 filename = subs_a[i].split("=")[1]
0247                 subs_a[i] = subs_a[i].replace(filename, os.path.join(job_scratch_dir, filename))
0248 
0249     fixed_command = ' '.join(subs_a)
0250     fixed_command = fixed_command.strip()
0251     fixed_command = fixed_command.replace('--DBRelease="all:current"', '')  # avoid Frontier reading
0252 
0253     return fixed_command