File indexing completed on 2026-04-10 08:39:18
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 import functools
0013 import logging
0014 import os
0015 import signal
0016 import time
0017 from collections import namedtuple
0018 from datetime import datetime
0019
0020 try:
0021 from functools import reduce
0022 except Exception:
0023 pass
0024
0025 from pilot.common.exception import FileHandlingFailure
0026 from pilot.util.auxiliary import set_pilot_state
0027 from pilot.util.config import config
0028 from pilot.util.constants import SUCCESS, FAILURE, PILOT_PRE_GETJOB, PILOT_POST_GETJOB, PILOT_PRE_SETUP, \
0029 PILOT_POST_SETUP, PILOT_PRE_PAYLOAD, PILOT_POST_PAYLOAD, PILOT_PRE_STAGEOUT, PILOT_POST_STAGEOUT, PILOT_PRE_FINAL_UPDATE, PILOT_POST_FINAL_UPDATE
0030 from pilot.util.container import execute
0031 from pilot.util.filehandling import tar_files, write_json, read_json, copy
0032 from pilot.util.harvester import get_initial_work_report, publish_work_report
0033 from pilot.util.timing import add_to_pilot_timing
0034
0035 logger = logging.getLogger(__name__)
0036
0037
0038 def interrupt(args, signum, frame):
0039 """
0040 Interrupt function on the receiving end of kill signals.
0041 This function is forwarded any incoming signals (SIGINT, SIGTERM, etc) and will set abort_job which instructs
0042 the threads to abort the job.
0043
0044 :param args: pilot arguments.
0045 :param signum: signal.
0046 :param frame: stack/execution frame pointing to the frame that was interrupted by the signal.
0047 :return:
0048 """
0049
0050 try:
0051 logger.info('caught signal: %s', [v for v, k in signal.__dict__.iteritems() if k == signum][0])
0052 except Exception:
0053 logger.info('caught signal: %s', [v for v, k in list(signal.__dict__.items()) if k == signum][0])
0054
0055 args.graceful_stop.set()
0056
0057
0058 def run(args):
0059 """
0060 Main execution function for the generic HPC workflow.
0061
0062 :param args: pilot arguments.
0063 :returns: traces object.
0064 """
0065
0066
0067 if args.harvester_workdir:
0068 communication_point = args.harvester_workdir
0069 else:
0070 communication_point = os.getcwd()
0071 work_report = get_initial_work_report()
0072 worker_attributes_file = config.Harvester.workerAttributesFile
0073 worker_stageout_declaration = config.Harvester.StageOutnFile
0074 payload_report_file = config.Payload.jobreport
0075 payload_stdout_file = config.Payload.payloadstdout
0076 payload_stderr_file = config.Payload.payloadstderr
0077
0078 try:
0079 logger.info('setting up signal handling')
0080 signal.signal(signal.SIGINT, functools.partial(interrupt, args))
0081
0082 logger.info('setting up tracing')
0083 traces = namedtuple('traces', ['pilot'])
0084 traces.pilot = {'state': SUCCESS,
0085 'nr_jobs': 0}
0086
0087 if args.hpc_resource == '':
0088 logger.critical('hpc resource not specified, cannot continue')
0089 traces.pilot['state'] = FAILURE
0090 return traces
0091
0092
0093 resource = __import__('pilot.resource.%s' % args.hpc_resource, globals(), locals(), [args.hpc_resource], 0)
0094
0095
0096 user = __import__('pilot.user.%s.common' % args.pilot_user.lower(), globals(), locals(),
0097 [args.pilot_user.lower()], 0)
0098
0099
0100 add_to_pilot_timing('0', PILOT_PRE_GETJOB, time.time(), args)
0101 job, rank = resource.get_job(communication_point)
0102 add_to_pilot_timing(job.jobid, PILOT_POST_GETJOB, time.time(), args)
0103
0104
0105 add_to_pilot_timing(job.jobid, PILOT_PRE_SETUP, time.time(), args)
0106 work_dir = resource.set_job_workdir(job, communication_point)
0107 work_report['workdir'] = work_dir
0108 worker_attributes_file = os.path.join(work_dir, worker_attributes_file)
0109 logger.debug("Worker attributes will be publeshied in: {0}".format(worker_attributes_file))
0110
0111 set_pilot_state(job=job, state="starting")
0112 work_report["jobStatus"] = job.state
0113 publish_work_report(work_report, worker_attributes_file)
0114
0115
0116 logger.info('setup for resource %s: %s' % (args.hpc_resource, str(resource.get_setup())))
0117 setup_str = "; ".join(resource.get_setup())
0118
0119
0120 job_scratch_dir = resource.set_scratch_workdir(job, work_dir, args)
0121
0122 my_command = " ".join([job.script, job.script_parameters])
0123 my_command = resource.command_fix(my_command, job_scratch_dir)
0124 my_command = setup_str + my_command
0125 add_to_pilot_timing(job.jobid, PILOT_POST_SETUP, time.time(), args)
0126
0127
0128 logger.debug("Going to launch: {0}".format(my_command))
0129 logger.debug("Current work directory: {0}".format(job_scratch_dir))
0130 payloadstdout = open(payload_stdout_file, "w")
0131 payloadstderr = open(payload_stderr_file, "w")
0132
0133 add_to_pilot_timing(job.jobid, PILOT_PRE_PAYLOAD, time.time(), args)
0134 set_pilot_state(job=job, state="running")
0135 work_report["jobStatus"] = job.state
0136 work_report["startTime"] = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
0137 start_time = time.asctime(time.localtime(time.time()))
0138 job.startTime = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
0139 publish_work_report(work_report, worker_attributes_file)
0140
0141 stime = time.time()
0142 t0 = os.times()
0143 exit_code, stdout, stderr = execute(my_command, stdout=payloadstdout, stderr=payloadstderr, shell=True)
0144 logger.debug("Payload exit code: {0}".format(exit_code))
0145 t1 = os.times()
0146 exetime = time.time() - stime
0147 end_time = time.asctime(time.localtime(time.time()))
0148 t = list(map(lambda x, y: x - y, t1, t0))
0149 t_tot = reduce(lambda x, y: x + y, t[2:3])
0150 job.endTime = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
0151 payloadstdout.close()
0152 payloadstderr.close()
0153 add_to_pilot_timing(job.jobid, PILOT_POST_PAYLOAD, time.time(), args)
0154
0155 state = 'finished' if exit_code == 0 else 'failed'
0156 set_pilot_state(job=job, state=state)
0157 job.exitcode = exit_code
0158
0159 work_report["startTime"] = job.startTime
0160 work_report["endTime"] = job.endTime
0161 work_report["jobStatus"] = job.state
0162 work_report["cpuConsumptionTime"] = t_tot
0163 work_report["transExitCode"] = job.exitcode
0164
0165 log_jobreport = "\nPayload exit code: {0} JobID: {1} \n".format(exit_code, job.jobid)
0166 log_jobreport += "CPU comsumption time: {0} JobID: {1} \n".format(t_tot, job.jobid)
0167 log_jobreport += "Start time: {0} JobID: {1} \n".format(start_time, job.jobid)
0168 log_jobreport += "End time: {0} JobID: {1} \n".format(end_time, job.jobid)
0169 log_jobreport += "Execution time: {0} sec. JobID: {1} \n".format(exetime, job.jobid)
0170 logger.info(log_jobreport)
0171 log_jobreport = "\nJob report start time: {0}\nJob report end time: {1}".format(job.startTime, job.endTime)
0172 logger.debug(log_jobreport)
0173
0174
0175 if os.path.exists(payload_report_file):
0176 payload_report = user.parse_jobreport_data(read_json(payload_report_file))
0177 work_report.update(payload_report)
0178 resource.process_jobreport(payload_report_file, job_scratch_dir, work_dir)
0179
0180 resource.postprocess_workdir(job_scratch_dir)
0181
0182
0183 protectedfiles = list(job.output_files.keys())
0184
0185
0186 if job.log_file in protectedfiles:
0187 protectedfiles.remove(job.log_file)
0188 else:
0189 logger.info("Log files was not declared")
0190
0191 logger.info("Cleanup of working directory")
0192
0193 protectedfiles.extend([worker_attributes_file, worker_stageout_declaration])
0194 user.remove_redundant_files(job_scratch_dir, protectedfiles)
0195 res = tar_files(job_scratch_dir, protectedfiles, job.log_file)
0196 if res > 0:
0197 raise FileHandlingFailure("Log file tar failed")
0198
0199 add_to_pilot_timing(job.jobid, PILOT_PRE_STAGEOUT, time.time(), args)
0200
0201 if not job_scratch_dir == work_dir:
0202 copy_output(job, job_scratch_dir, work_dir)
0203 add_to_pilot_timing(job.jobid, PILOT_POST_STAGEOUT, time.time(), args)
0204
0205 logger.info("Declare stage-out")
0206 add_to_pilot_timing(job.jobid, PILOT_PRE_FINAL_UPDATE, time.time(), args)
0207 declare_output(job, work_report, worker_stageout_declaration)
0208
0209 logger.info("All done")
0210 publish_work_report(work_report, worker_attributes_file)
0211 traces.pilot['state'] = SUCCESS
0212 logger.debug("Final report: {0}".format(work_report))
0213 add_to_pilot_timing(job.jobid, PILOT_POST_FINAL_UPDATE, time.time(), args)
0214
0215 except Exception as error:
0216 work_report["jobStatus"] = "failed"
0217 work_report["exitMsg"] = str(error)
0218 publish_work_report(work_report, worker_attributes_file)
0219 logging.exception('exception caught: %s', error)
0220 traces.pilot['state'] = FAILURE
0221
0222 return traces
0223
0224
0225 def copy_output(job, job_scratch_dir, work_dir):
0226 cp_start = time.time()
0227 try:
0228 for outfile in list(job.output_files.keys()):
0229 if os.path.exists(outfile):
0230 copy(os.path.join(job_scratch_dir, outfile), os.path.join(work_dir, outfile))
0231 os.chdir(work_dir)
0232 except IOError:
0233 raise FileHandlingFailure("Copy from scratch dir to access point failed")
0234 finally:
0235 cp_time = time.time() - cp_start
0236 logger.info("Copy of outputs took: {0} sec.".format(cp_time))
0237 return 0
0238
0239
0240 def declare_output(job, work_report, worker_stageout_declaration):
0241 out_file_report = {}
0242 out_file_report[job.jobid] = []
0243 for outfile in list(job.output_files.keys()):
0244 logger.debug("File {} will be checked and declared for stage out".format(outfile))
0245 if os.path.exists(outfile):
0246 file_desc = {}
0247 if outfile == job.log_file:
0248 file_desc['filetype'] = 'log'
0249 else:
0250 file_desc['filetype'] = 'output'
0251 file_desc['path'] = os.path.abspath(outfile)
0252 file_desc['fsize'] = os.path.getsize(outfile)
0253 if 'guid' in list(job.output_files[outfile].keys()):
0254 file_desc['guid'] = job.output_files[outfile]['guid']
0255 elif work_report['outputfiles'] and work_report['outputfiles'][outfile]:
0256 file_desc['guid'] = work_report['outputfiles'][outfile]['guid']
0257 out_file_report[job.jobid].append(file_desc)
0258 else:
0259 logger.info("Expected output file {0} missed. Job {1} will be failed".format(outfile, job.jobid))
0260 set_pilot_state(job=job, state='failed')
0261
0262 if out_file_report[job.jobid]:
0263 write_json(worker_stageout_declaration, out_file_report)
0264 logger.debug('Stagout declared in: {0}'.format(worker_stageout_declaration))
0265 logger.debug('Report for stageout: {}'.format(out_file_report))