Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:18

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Mario Lassnig, mario.lassnig@cern.ch, 2016
0009 # - Paul Nilsson, paul.nilsson@cern.ch, 2018-2021
0010 # - Danila Oleynik danila.oleynik@cern.ch, 2018
0011 
0012 import functools
0013 import logging
0014 import os
0015 import signal
0016 import time
0017 from collections import namedtuple
0018 from datetime import datetime
0019 
0020 try:
0021     from functools import reduce  # Python 3
0022 except Exception:
0023     pass
0024 
0025 from pilot.common.exception import FileHandlingFailure
0026 from pilot.util.auxiliary import set_pilot_state
0027 from pilot.util.config import config
0028 from pilot.util.constants import SUCCESS, FAILURE, PILOT_PRE_GETJOB, PILOT_POST_GETJOB, PILOT_PRE_SETUP, \
0029     PILOT_POST_SETUP, PILOT_PRE_PAYLOAD, PILOT_POST_PAYLOAD, PILOT_PRE_STAGEOUT, PILOT_POST_STAGEOUT, PILOT_PRE_FINAL_UPDATE, PILOT_POST_FINAL_UPDATE
0030 from pilot.util.container import execute
0031 from pilot.util.filehandling import tar_files, write_json, read_json, copy
0032 from pilot.util.harvester import get_initial_work_report, publish_work_report
0033 from pilot.util.timing import add_to_pilot_timing
0034 
0035 logger = logging.getLogger(__name__)
0036 
0037 
0038 def interrupt(args, signum, frame):
0039     """
0040     Interrupt function on the receiving end of kill signals.
0041     This function is forwarded any incoming signals (SIGINT, SIGTERM, etc) and will set abort_job which instructs
0042     the threads to abort the job.
0043 
0044     :param args: pilot arguments.
0045     :param signum: signal.
0046     :param frame: stack/execution frame pointing to the frame that was interrupted by the signal.
0047     :return:
0048     """
0049 
0050     try:
0051         logger.info('caught signal: %s', [v for v, k in signal.__dict__.iteritems() if k == signum][0])  # Python 2
0052     except Exception:
0053         logger.info('caught signal: %s', [v for v, k in list(signal.__dict__.items()) if k == signum][0])  # Python 3
0054 
0055     args.graceful_stop.set()
0056 
0057 
0058 def run(args):
0059     """
0060     Main execution function for the generic HPC workflow.
0061 
0062     :param args: pilot arguments.
0063     :returns: traces object.
0064     """
0065 
0066     # set communication point. Worker report should be placed there, matched with working directory of Harvester
0067     if args.harvester_workdir:
0068         communication_point = args.harvester_workdir
0069     else:
0070         communication_point = os.getcwd()
0071     work_report = get_initial_work_report()
0072     worker_attributes_file = config.Harvester.workerAttributesFile
0073     worker_stageout_declaration = config.Harvester.StageOutnFile
0074     payload_report_file = config.Payload.jobreport
0075     payload_stdout_file = config.Payload.payloadstdout
0076     payload_stderr_file = config.Payload.payloadstderr
0077 
0078     try:
0079         logger.info('setting up signal handling')
0080         signal.signal(signal.SIGINT, functools.partial(interrupt, args))
0081 
0082         logger.info('setting up tracing')
0083         traces = namedtuple('traces', ['pilot'])
0084         traces.pilot = {'state': SUCCESS,
0085                         'nr_jobs': 0}
0086 
0087         if args.hpc_resource == '':
0088             logger.critical('hpc resource not specified, cannot continue')
0089             traces.pilot['state'] = FAILURE
0090             return traces
0091 
0092         # get the resource reference
0093         resource = __import__('pilot.resource.%s' % args.hpc_resource, globals(), locals(), [args.hpc_resource], 0)  # Python 2/3
0094 
0095         # get the user reference
0096         user = __import__('pilot.user.%s.common' % args.pilot_user.lower(), globals(), locals(),
0097                           [args.pilot_user.lower()], 0)  # Python 2/3
0098 
0099         # get job (and rank)
0100         add_to_pilot_timing('0', PILOT_PRE_GETJOB, time.time(), args)
0101         job, rank = resource.get_job(communication_point)
0102         add_to_pilot_timing(job.jobid, PILOT_POST_GETJOB, time.time(), args)
0103         # cd to job working directory
0104 
0105         add_to_pilot_timing(job.jobid, PILOT_PRE_SETUP, time.time(), args)
0106         work_dir = resource.set_job_workdir(job, communication_point)
0107         work_report['workdir'] = work_dir
0108         worker_attributes_file = os.path.join(work_dir, worker_attributes_file)
0109         logger.debug("Worker attributes will be publeshied in: {0}".format(worker_attributes_file))
0110 
0111         set_pilot_state(job=job, state="starting")
0112         work_report["jobStatus"] = job.state
0113         publish_work_report(work_report, worker_attributes_file)
0114 
0115         # Get HPC specific setup commands
0116         logger.info('setup for resource %s: %s' % (args.hpc_resource, str(resource.get_setup())))
0117         setup_str = "; ".join(resource.get_setup())
0118 
0119         # Prepare job scratch directory (RAM disk etc.)
0120         job_scratch_dir = resource.set_scratch_workdir(job, work_dir, args)
0121 
0122         my_command = " ".join([job.script, job.script_parameters])
0123         my_command = resource.command_fix(my_command, job_scratch_dir)
0124         my_command = setup_str + my_command
0125         add_to_pilot_timing(job.jobid, PILOT_POST_SETUP, time.time(), args)
0126 
0127         # Basic execution. Should be replaced with something like 'run_payload'
0128         logger.debug("Going to launch: {0}".format(my_command))
0129         logger.debug("Current work directory: {0}".format(job_scratch_dir))
0130         payloadstdout = open(payload_stdout_file, "w")
0131         payloadstderr = open(payload_stderr_file, "w")
0132 
0133         add_to_pilot_timing(job.jobid, PILOT_PRE_PAYLOAD, time.time(), args)
0134         set_pilot_state(job=job, state="running")
0135         work_report["jobStatus"] = job.state
0136         work_report["startTime"] = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
0137         start_time = time.asctime(time.localtime(time.time()))
0138         job.startTime = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
0139         publish_work_report(work_report, worker_attributes_file)
0140 
0141         stime = time.time()
0142         t0 = os.times()
0143         exit_code, stdout, stderr = execute(my_command, stdout=payloadstdout, stderr=payloadstderr, shell=True)
0144         logger.debug("Payload exit code: {0}".format(exit_code))
0145         t1 = os.times()
0146         exetime = time.time() - stime
0147         end_time = time.asctime(time.localtime(time.time()))
0148         t = list(map(lambda x, y: x - y, t1, t0))  # Python 2/3
0149         t_tot = reduce(lambda x, y: x + y, t[2:3])
0150         job.endTime = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
0151         payloadstdout.close()
0152         payloadstderr.close()
0153         add_to_pilot_timing(job.jobid, PILOT_POST_PAYLOAD, time.time(), args)
0154 
0155         state = 'finished' if exit_code == 0 else 'failed'
0156         set_pilot_state(job=job, state=state)
0157         job.exitcode = exit_code
0158 
0159         work_report["startTime"] = job.startTime
0160         work_report["endTime"] = job.endTime
0161         work_report["jobStatus"] = job.state
0162         work_report["cpuConsumptionTime"] = t_tot
0163         work_report["transExitCode"] = job.exitcode
0164 
0165         log_jobreport = "\nPayload exit code: {0} JobID: {1} \n".format(exit_code, job.jobid)
0166         log_jobreport += "CPU comsumption time: {0}  JobID: {1} \n".format(t_tot, job.jobid)
0167         log_jobreport += "Start time: {0}  JobID: {1} \n".format(start_time, job.jobid)
0168         log_jobreport += "End time: {0}  JobID: {1} \n".format(end_time, job.jobid)
0169         log_jobreport += "Execution time: {0} sec.  JobID: {1} \n".format(exetime, job.jobid)
0170         logger.info(log_jobreport)
0171         log_jobreport = "\nJob report start time: {0}\nJob report end time: {1}".format(job.startTime, job.endTime)
0172         logger.debug(log_jobreport)
0173 
0174         # Parse job report file and update of work report
0175         if os.path.exists(payload_report_file):
0176             payload_report = user.parse_jobreport_data(read_json(payload_report_file))
0177             work_report.update(payload_report)
0178             resource.process_jobreport(payload_report_file, job_scratch_dir, work_dir)
0179 
0180         resource.postprocess_workdir(job_scratch_dir)
0181 
0182         # output files should not be packed with logs
0183         protectedfiles = list(job.output_files.keys())  # Python 2/3
0184 
0185         # log file not produced (yet), so should be excluded
0186         if job.log_file in protectedfiles:
0187             protectedfiles.remove(job.log_file)
0188         else:
0189             logger.info("Log files was not declared")
0190 
0191         logger.info("Cleanup of working directory")
0192 
0193         protectedfiles.extend([worker_attributes_file, worker_stageout_declaration])
0194         user.remove_redundant_files(job_scratch_dir, protectedfiles)
0195         res = tar_files(job_scratch_dir, protectedfiles, job.log_file)
0196         if res > 0:
0197             raise FileHandlingFailure("Log file tar failed")
0198 
0199         add_to_pilot_timing(job.jobid, PILOT_PRE_STAGEOUT, time.time(), args)
0200         # Copy of output to shared FS for stageout
0201         if not job_scratch_dir == work_dir:
0202             copy_output(job, job_scratch_dir, work_dir)
0203         add_to_pilot_timing(job.jobid, PILOT_POST_STAGEOUT, time.time(), args)
0204 
0205         logger.info("Declare stage-out")
0206         add_to_pilot_timing(job.jobid, PILOT_PRE_FINAL_UPDATE, time.time(), args)
0207         declare_output(job, work_report, worker_stageout_declaration)
0208 
0209         logger.info("All done")
0210         publish_work_report(work_report, worker_attributes_file)
0211         traces.pilot['state'] = SUCCESS
0212         logger.debug("Final report: {0}".format(work_report))
0213         add_to_pilot_timing(job.jobid, PILOT_POST_FINAL_UPDATE, time.time(), args)
0214 
0215     except Exception as error:
0216         work_report["jobStatus"] = "failed"
0217         work_report["exitMsg"] = str(error)
0218         publish_work_report(work_report, worker_attributes_file)
0219         logging.exception('exception caught: %s', error)
0220         traces.pilot['state'] = FAILURE
0221 
0222     return traces
0223 
0224 
0225 def copy_output(job, job_scratch_dir, work_dir):
0226     cp_start = time.time()
0227     try:
0228         for outfile in list(job.output_files.keys()):  # Python 2/3
0229             if os.path.exists(outfile):
0230                 copy(os.path.join(job_scratch_dir, outfile), os.path.join(work_dir, outfile))
0231         os.chdir(work_dir)
0232     except IOError:
0233         raise FileHandlingFailure("Copy from scratch dir to access point failed")
0234     finally:
0235         cp_time = time.time() - cp_start
0236         logger.info("Copy of outputs took: {0} sec.".format(cp_time))
0237     return 0
0238 
0239 
0240 def declare_output(job, work_report, worker_stageout_declaration):
0241     out_file_report = {}
0242     out_file_report[job.jobid] = []
0243     for outfile in list(job.output_files.keys()):  # Python 2/3
0244         logger.debug("File {} will be checked and declared for stage out".format(outfile))
0245         if os.path.exists(outfile):
0246             file_desc = {}
0247             if outfile == job.log_file:
0248                 file_desc['filetype'] = 'log'
0249             else:
0250                 file_desc['filetype'] = 'output'
0251             file_desc['path'] = os.path.abspath(outfile)
0252             file_desc['fsize'] = os.path.getsize(outfile)
0253             if 'guid' in list(job.output_files[outfile].keys()):  # Python 2/3
0254                 file_desc['guid'] = job.output_files[outfile]['guid']
0255             elif work_report['outputfiles'] and work_report['outputfiles'][outfile]:
0256                 file_desc['guid'] = work_report['outputfiles'][outfile]['guid']
0257             out_file_report[job.jobid].append(file_desc)
0258         else:
0259             logger.info("Expected output file {0} missed. Job {1} will be failed".format(outfile, job.jobid))
0260             set_pilot_state(job=job, state='failed')
0261 
0262     if out_file_report[job.jobid]:
0263         write_json(worker_stageout_declaration, out_file_report)
0264         logger.debug('Stagout declared in: {0}'.format(worker_stageout_declaration))
0265         logger.debug('Report for stageout: {}'.format(out_file_report))