File indexing completed on 2026-04-10 08:39:16
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 import logging
0012 import os
0013 import shutil
0014 import sys
0015 import time
0016
0017 from .jobdescription import JobDescription
0018 from pilot.common.exception import FileHandlingFailure
0019 from pilot.util.config import config
0020 from pilot.util.constants import PILOT_PRE_STAGEIN, PILOT_POST_STAGEIN
0021 from pilot.util.filehandling import read_json, write_json, remove
0022
0023 from pilot.util.timing import add_to_pilot_timing
0024
0025 logger = logging.getLogger(__name__)
0026
0027
0028 def get_job(harvesterpath):
0029 """
0030 Return job description in dictionary and MPI rank (if applicable)
0031
0032 :param harvesterpath: path to config.Harvester.jobs_list_file (string).
0033 :return: job object, rank (int).
0034 """
0035
0036 rank = 0
0037 job = None
0038 logger.info("Going to read job definition from file")
0039
0040 pandaids_list_filename = os.path.join(harvesterpath, config.Harvester.jobs_list_file)
0041 if not os.path.isfile(pandaids_list_filename):
0042 logger.info("File with PanDA IDs are missing. Nothing to execute.")
0043 return job, rank
0044
0045 harvesterpath = os.path.abspath(harvesterpath)
0046
0047
0048 pandaids = read_json(pandaids_list_filename)
0049 logger.info('Got {0} job ids'.format(len(pandaids)))
0050 pandaid = pandaids[rank]
0051 job_workdir = os.path.join(harvesterpath, str(pandaid))
0052
0053 logger.info('Rank: {2} with job {0} will have work directory {1}'.format(pandaid, job_workdir, rank))
0054
0055 job_def_filename = os.path.join(job_workdir, config.Harvester.pandajob_file)
0056 jobs_dict = read_json(job_def_filename)
0057 job_dict = jobs_dict[str(pandaid)]
0058 job = JobDescription()
0059 job.load(job_dict)
0060
0061 return job, rank
0062
0063
0064 def get_setup(job=None):
0065 """
0066 Return the resource specific setup.
0067
0068 :param job: optional job object.
0069 :return: setup commands (list).
0070 """
0071
0072 setup_commands = ['source /ccs/proj/csc108/athena_grid_env/setup.sh',
0073 'source $MODULESHOME/init/bash',
0074 'tmp_dirname=/tmp/scratch',
0075 'tmp_dirname+="/tmp"',
0076 'export TEMP=$tmp_dirname',
0077 'export TMPDIR=$TEMP',
0078 'export TMP=$TEMP',
0079 'export LD_LIBRARY_PATH=/ccs/proj/csc108/AtlasReleases/ldpatch:$LD_LIBRARY_PATH',
0080 'export ATHENA_PROC_NUMBER=16',
0081 'export G4ATLAS_SKIPFILEPEEK=1',
0082 'export PANDA_RESOURCE=\"ORNL_Titan_MCORE\"',
0083 'export ROOT_TTREECACHE_SIZE=1',
0084 'export RUCIO_APPID=\"simul\"',
0085 'export RUCIO_ACCOUNT=\"pilot\"',
0086 'export CORAL_DBLOOKUP_PATH=/ccs/proj/csc108/AtlasReleases/21.0.15/nfs_db_files',
0087 'export CORAL_AUTH_PATH=$SW_INSTALL_AREA/DBRelease/current/XMLConfig',
0088 'export DATAPATH=$SW_INSTALL_AREA/DBRelease/current:$DATAPATH',
0089 'unset FRONTIER_SERVER',
0090 ' ']
0091
0092 return setup_commands
0093
0094
0095 def set_job_workdir(job, path):
0096 """
0097 Point pilot to job working directory (job id).
0098
0099 :param job: job object.
0100 :param path: local path to Harvester access point (string).
0101 :return: job working directory (string).
0102 """
0103 work_dir = os.path.join(path, str(job.jobid))
0104 os.chdir(work_dir)
0105
0106 return work_dir
0107
0108
0109 def set_scratch_workdir(job, work_dir, args):
0110 """
0111 Copy input files and some db files to RAM disk.
0112
0113 :param job: job object.
0114 :param work_dir: job working directory (permanent FS) (string).
0115 :param args: args dictionary to collect timing metrics.
0116 :return: job working directory in scratch (string).
0117 """
0118
0119 scratch_path = config.HPC.scratch
0120 job_scratch_dir = os.path.join(scratch_path, str(job.jobid))
0121 for inp_file in job.input_files:
0122 job.input_files[inp_file]["scratch_path"] = job_scratch_dir
0123 logger.debug("Job scratch path: {0}".format(job_scratch_dir))
0124
0125 dst_db_path = 'sqlite200/'
0126 dst_db_filename = 'ALLP200.db'
0127 dst_db_path_2 = 'geomDB/'
0128 dst_db_filename_2 = 'geomDB_sqlite'
0129 tmp_path = 'tmp/'
0130 src_file = '/ccs/proj/csc108/AtlasReleases/21.0.15/DBRelease/current/sqlite200/ALLP200.db'
0131 src_file_2 = '/ccs/proj/csc108/AtlasReleases/21.0.15/DBRelease/current/geomDB/geomDB_sqlite'
0132
0133 if os.path.exists(scratch_path):
0134 try:
0135 add_to_pilot_timing(job.jobid, PILOT_PRE_STAGEIN, time.time(), args)
0136 logger.debug("Prepare \'tmp\' dir in scratch ")
0137 if not os.path.exists(scratch_path + tmp_path):
0138 os.makedirs(scratch_path + tmp_path)
0139 logger.debug("Prepare dst and copy sqlite db files")
0140 t0 = time.time()
0141 if not os.path.exists(scratch_path + dst_db_path):
0142 os.makedirs(scratch_path + dst_db_path)
0143 shutil.copyfile(src_file, scratch_path + dst_db_path + dst_db_filename)
0144 logger.debug("")
0145 sql_cp_time = time.time() - t0
0146 logger.debug("Copy of sqlite files took: {0}".format(sql_cp_time))
0147 logger.debug("Prepare dst and copy geomDB files")
0148 t0 = time.time()
0149 if not os.path.exists(scratch_path + dst_db_path_2):
0150 os.makedirs(scratch_path + dst_db_path_2)
0151 shutil.copyfile(src_file_2, scratch_path + dst_db_path_2 + dst_db_filename_2)
0152 geomdb_cp_time = time.time() - t0
0153 logger.debug("Copy of geomDB files took: {0} s".format(geomdb_cp_time))
0154 logger.debug("Prepare job scratch dir")
0155 t0 = time.time()
0156 if not os.path.exists(job_scratch_dir):
0157 os.makedirs(job_scratch_dir)
0158 logger.debug("Copy input file")
0159 for inp_file in job.input_files:
0160 logger.debug("Copy: {0} to {1}".format(os.path.join(work_dir, inp_file),
0161 job.input_files[inp_file]["scratch_path"]))
0162 shutil.copyfile(os.path.join(work_dir, inp_file),
0163 os.path.join(job.input_files[inp_file]["scratch_path"], inp_file))
0164 input_cp_time = time.time() - t0
0165 logger.debug("Copy of input files took: {0} s".format(input_cp_time))
0166 except IOError as e:
0167 logger.error("I/O error({0}): {1}".format(e.errno, e.strerror))
0168 logger.error("Copy to scratch failed, execution terminated': \n %s " % (sys.exc_info()[1]))
0169 raise FileHandlingFailure("Copy to RAM disk failed")
0170 finally:
0171 add_to_pilot_timing(job.jobid, PILOT_POST_STAGEIN, time.time(), args)
0172 else:
0173 logger.info('Scratch directory (%s) dos not exist' % scratch_path)
0174 return work_dir
0175
0176 os.chdir(job_scratch_dir)
0177 logger.debug("Current directory: {0}".format(os.getcwd()))
0178 true_dir = '/ccs/proj/csc108/AtlasReleases/21.0.15/nfs_db_files'
0179 pseudo_dir = "./poolcond"
0180 os.symlink(true_dir, pseudo_dir)
0181
0182 return job_scratch_dir
0183
0184
0185 def process_jobreport(payload_report_file, job_scratch_path, job_communication_point):
0186 """
0187 Copy job report file to make it accessible by Harvester. Shrink job report file.
0188
0189 :param payload_report_file: name of job report (string).
0190 :param job_scratch_path: path to scratch directory (string).
0191 :param job_communication_point: path to updated job report accessible by Harvester (string).
0192 :raises FileHandlingFailure: in case of IOError.
0193 """
0194
0195 src_file = os.path.join(job_scratch_path, payload_report_file)
0196 dst_file = os.path.join(job_communication_point, payload_report_file)
0197
0198 try:
0199 logger.info(
0200 "Copy of payload report [{0}] to access point: {1}".format(payload_report_file, job_communication_point))
0201
0202 job_report = read_json(src_file)
0203 if 'executor' in job_report:
0204 for executor in job_report['executor']:
0205 if 'logfileReport' in executor:
0206 executor['logfileReport'] = {}
0207
0208 write_json(dst_file, job_report)
0209
0210 except IOError:
0211 logger.error("Job report copy failed, execution terminated': \n %s " % (sys.exc_info()[1]))
0212 raise FileHandlingFailure("Job report copy from RAM failed")
0213
0214
0215 def postprocess_workdir(workdir):
0216 """
0217 Post-processing of working directory. Unlink paths.
0218
0219 :param workdir: path to directory to be processed (string).
0220 :raises FileHandlingFailure: in case of IOError.
0221 """
0222
0223 pseudo_dir = "poolcond"
0224 try:
0225 if os.path.exists(pseudo_dir):
0226 remove(os.path.join(workdir, pseudo_dir))
0227 except IOError:
0228 raise FileHandlingFailure("Post processing of working directory failed")
0229
0230
0231 def command_fix(command, job_scratch_dir):
0232 """
0233 Modification of payload parameters, to be executed on Titan on RAM disk. Some cleanup.
0234
0235 :param command: payload command (string).
0236 :param job_scratch_dir: local path to input files (string).
0237 :return: updated/fixed payload command (string).
0238 """
0239
0240 subs_a = command.split()
0241 for i in range(len(subs_a)):
0242 if i > 0:
0243 if '(' in subs_a[i] and not subs_a[i][0] == '"':
0244 subs_a[i] = '"' + subs_a[i] + '"'
0245 if subs_a[i].startswith("--inputEVNTFile"):
0246 filename = subs_a[i].split("=")[1]
0247 subs_a[i] = subs_a[i].replace(filename, os.path.join(job_scratch_dir, filename))
0248
0249 fixed_command = ' '.join(subs_a)
0250 fixed_command = fixed_command.strip()
0251 fixed_command = fixed_command.replace('--DBRelease="all:current"', '')
0252
0253 return fixed_command