Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:17

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2018-2021
0009 
0010 import os
0011 import os.path
0012 import socket
0013 
0014 from pilot.common.exception import FileHandlingFailure
0015 from pilot.util.config import config
0016 from pilot.util.filehandling import write_json, touch, remove, read_json, get_checksum_value
0017 from pilot.util.timing import time_stamp
0018 
0019 import logging
0020 logger = logging.getLogger(__name__)
0021 
0022 
0023 def dump(obj):
0024     """
0025     function for debugging - dumps object to sysout
0026     """
0027     for attr in dir(obj):
0028         print("obj.%s = %r" % (attr, getattr(obj, attr)))
0029 
0030 
0031 def is_harvester_mode(args):
0032     """
0033     Determine if the pilot is running in Harvester mode.
0034     :param args: Pilot arguments object.
0035     :return: Boolean.
0036     """
0037 
0038     if (args.harvester_workdir != '' or args.harvester_datadir != '') and not args.update_server:
0039         harvester = True
0040     elif (args.harvester_eventstatusdump != '' or args.harvester_workerattributes != '') and not args.update_server:
0041         harvester = True
0042     elif ('HARVESTER_ID' in os.environ or 'HARVESTER_WORKER_ID' in os.environ) and args.harvester_submitmode.lower() == 'push':
0043         harvester = True
0044     else:
0045         harvester = False
0046 
0047     return harvester
0048 
0049 
0050 def get_job_request_file_name():
0051     """
0052     Return the name of the job request file as defined in the pilot config file.
0053 
0054     :return: job request file name.
0055     """
0056 
0057     #logger.debug('config.Harvester.__dict__ : {0}'.format(config.Harvester.__dict__))
0058     return os.path.join(os.environ['PILOT_HOME'], config.Harvester.job_request_file)
0059 
0060 
0061 def remove_job_request_file():
0062     """
0063     Remove an old job request file when it is no longer needed.
0064 
0065     :return:
0066     """
0067 
0068     path = get_job_request_file_name()
0069     if os.path.exists(path):
0070         if remove(path) == 0:
0071             logger.info('removed %s', path)
0072     else:
0073         logger.debug('there is no job request file')
0074 
0075 
0076 def request_new_jobs(njobs=1):
0077     """
0078     Inform Harvester that the pilot is ready to process new jobs by creating a job request file with the desired
0079     number of jobs.
0080 
0081     :param njobs: Number of jobs. Default is 1 since on grids and clouds the pilot does not know how many jobs it can
0082     process before it runs out of time.
0083     :return:
0084     """
0085 
0086     path = get_job_request_file_name()
0087     dictionary = {'nJobs': njobs}
0088 
0089     # write it to file
0090     try:
0091         write_json(path, dictionary)
0092     except FileHandlingFailure:
0093         raise FileHandlingFailure
0094 
0095 
0096 def kill_worker():
0097     """
0098     Create (touch) a kill_worker file in the pilot launch directory.
0099     This file will let Harverster know that the pilot has finished.
0100 
0101     :return:
0102     """
0103 
0104     touch(os.path.join(os.environ['PILOT_HOME'], config.Harvester.kill_worker_file))
0105 
0106 
0107 def get_initial_work_report():
0108     """
0109     Prepare the work report dictionary.
0110     Note: the work_report should also contain all fields defined in parse_jobreport_data().
0111 
0112     :return: work report dictionary.
0113     """
0114 
0115     work_report = {'jobStatus': 'starting',
0116                    'messageLevel': logging.getLevelName(logger.getEffectiveLevel()),
0117                    'cpuConversionFactor': 1.0,
0118                    'cpuConsumptionTime': '',
0119                    'node': socket.gethostname(),
0120                    'workdir': '',
0121                    'timestamp': time_stamp(),
0122                    'endTime': '',
0123                    'transExitCode': 0,
0124                    'pilotErrorCode': 0,  # only add this in case of failure?
0125                    }
0126 
0127     return work_report
0128 
0129 
0130 def get_event_status_file(args):
0131     """
0132     Return the name of the event_status.dump file as defined in the pilot config file
0133     and from the pilot arguments.
0134 
0135     :param args: Pilot arguments object.
0136     :return: event staus file name.
0137     """
0138 
0139     logger.debug('config.Harvester.__dict__ : {0}'.format(config.Harvester.__dict__))
0140 
0141     if args.harvester_workdir != '':
0142         work_dir = args.harvester_workdir
0143     else:
0144         work_dir = os.environ['PILOT_HOME']
0145     event_status_file = config.Harvester.stageoutnfile
0146     event_status_file = os.path.join(work_dir, event_status_file)
0147     logger.debug('event_status_file = {}'.format(event_status_file))
0148 
0149     return event_status_file
0150 
0151 
0152 def get_worker_attributes_file(args):
0153     """
0154     Return the name of the worker attributes file as defined in the pilot config file
0155     and from the pilot arguments.
0156 
0157     :param args: Pilot arguments object.
0158     :return: worker attributes file name.
0159     """
0160 
0161     logger.debug('config.Harvester.__dict__ : {0}'.format(config.Harvester.__dict__))
0162 
0163     if args.harvester_workdir != '':
0164         work_dir = args.harvester_workdir
0165     else:
0166         work_dir = os.environ['PILOT_HOME']
0167     worker_attributes_file = config.Harvester.workerattributesfile
0168     worker_attributes_file = os.path.join(work_dir, worker_attributes_file)
0169     logger.debug('worker_attributes_file = {}'.format(worker_attributes_file))
0170 
0171     return worker_attributes_file
0172 
0173 
0174 def findfile(path, name):
0175     """
0176     find the first instance of file in the directory tree
0177 
0178     :param path: directory tree to search
0179     :param name: name of the file to search
0180 
0181     :return: the path to the first instance of the file
0182     """
0183 
0184     for root, dirs, files in os.walk(path):
0185         if name in files:
0186             return os.path.join(root, name)
0187     return ''
0188 
0189 
0190 def publish_stageout_files(job, event_status_file):
0191     """
0192     Publishing of work report to file.
0193     The work report dictionary should contain the fields defined in get_initial_work_report().
0194 
0195     :param args: Pilot arguments object.
0196     :param job: job object.
0197     :param event status file name:
0198 
0199     :return: Boolean. status of writing the file information to a json
0200     """
0201 
0202     # get the harvester workdir from the event_status_file
0203     work_dir = os.path.dirname(event_status_file)
0204 
0205     out_file_report = {}
0206     out_file_report[job.jobid] = []
0207 
0208     # first look at the logfile information (logdata) from the FileSpec objects
0209     for fspec in job.logdata:
0210         logger.debug("File {} will be checked and declared for stage out".format(fspec.lfn))
0211         # find the first instance of the file
0212         filename = os.path.basename(fspec.surl)
0213         path = findfile(work_dir, filename)
0214         logger.debug("Found File {} at path - {}".format(fspec.lfn, path))
0215         #
0216         file_desc = {}
0217         file_desc['type'] = fspec.filetype
0218         file_desc['path'] = path
0219         file_desc['guid'] = fspec.guid
0220         file_desc['fsize'] = fspec.filesize
0221         file_desc['chksum'] = get_checksum_value(fspec.checksum)
0222         logger.debug("File description - {} ".format(file_desc))
0223         out_file_report[job.jobid].append(file_desc)
0224 
0225     # Now look at the output file(s) information (outdata) from the FileSpec objects
0226     for fspec in job.outdata:
0227         logger.debug("File {} will be checked and declared for stage out".format(fspec.lfn))
0228         if fspec.status != 'transferred':
0229             logger.debug('will not add the output file to the json since it was not produced or transferred')
0230         else:
0231             # find the first instance of the file
0232             filename = os.path.basename(fspec.surl)
0233             path = findfile(work_dir, filename)
0234             if not path:
0235                 logger.warning('file %s was not found - will not be added to json')
0236             else:
0237                 logger.debug("Found File {} at path - {}".format(fspec.lfn, path))
0238                 #
0239                 file_desc = {}
0240                 file_desc['type'] = fspec.filetype
0241                 file_desc['path'] = path
0242                 file_desc['guid'] = fspec.guid
0243                 file_desc['fsize'] = fspec.filesize
0244                 file_desc['chksum'] = get_checksum_value(fspec.checksum)
0245                 logger.debug("File description - {} ".format(file_desc))
0246                 out_file_report[job.jobid].append(file_desc)
0247 
0248     if out_file_report[job.jobid]:
0249         if write_json(event_status_file, out_file_report):
0250             logger.debug('Stagout declared in: {0}'.format(event_status_file))
0251             logger.debug('Report for stageout: {}'.format(out_file_report))
0252             return True
0253         else:
0254             logger.debug('Failed to declare stagout in: {0}'.format(event_status_file))
0255             return False
0256     else:
0257         logger.debug('No Report for stageout')
0258         return False
0259 
0260 
0261 def publish_work_report(work_report=None, worker_attributes_file="worker_attributes.json"):
0262     """
0263     Publishing of work report to file.
0264     The work report dictionary should contain the fields defined in get_initial_work_report().
0265 
0266     :param work_report: work report dictionary.
0267     :param worker_attributes_file:
0268     :raises FileHandlingFailure: in case of IOError.
0269     :return: True or False
0270     """
0271 
0272     if work_report:
0273         try:
0274             work_report['timestamp'] = time_stamp()
0275             if "outputfiles" in work_report:
0276                 del(work_report["outputfiles"])
0277             if "inputfiles" in work_report:
0278                 del (work_report["inputfiles"])
0279             if "xml" in work_report:
0280                 del (work_report["xml"])
0281             if write_json(worker_attributes_file, work_report):
0282                 logger.info("work report published: {0}".format(work_report))
0283                 return True
0284             else:
0285                 logger.error("work report publish failed: {0}".format(work_report))
0286                 return False
0287         except IOError:
0288             logger.error("job report copy failed")
0289             return False
0290         except Exception as e:
0291             logger.error("write json file failed: {0}".format(e))
0292             return False
0293     else:
0294         # No work_report return False
0295         return False
0296 
0297 
0298 def publish_job_report(job, args, job_report_file="jobReport.json"):
0299     """
0300     Copy job report file to make it accessible by Harvester. Shrink job report file.
0301 
0302     :param job: job object.
0303     :param args: Pilot arguments object.
0304     :param job_report_file: name of job report (string).
0305     :raises FileHandlingFailure: in case of IOError.
0306     :return True or False
0307     """
0308 
0309     src_file = os.path.join(job.workdir, job_report_file)
0310     dst_file = os.path.join(args.harvester_workdir, job_report_file)
0311 
0312     try:
0313         logger.info(
0314             "copy of payload report [{0}] to access point: {1}".format(job_report_file, args.harvester_workdir))
0315         # shrink jobReport
0316         job_report = read_json(src_file)
0317         if 'executor' in job_report:
0318             for executor in job_report['executor']:
0319                 if 'logfileReport' in executor:
0320                     executor['logfileReport'] = {}
0321 
0322         if write_json(dst_file, job_report):
0323             return True
0324         else:
0325             return False
0326 
0327     except IOError:
0328         logger.error("job report copy failed")
0329         return False
0330 
0331 
0332 def parse_job_definition_file(filename):
0333     """
0334     This function parses the Harvester job definition file and re-packages the job definition dictionaries.
0335     The format of the Harvester job definition dictionary is:
0336     dict = { job_id: { key: value, .. }, .. }
0337     The function returns a list of these dictionaries each re-packaged as
0338     dict = { key: value } (where the job_id is now one of the key-value pairs: 'jobid': job_id)
0339 
0340     :param filename: file name (string).
0341     :return: list of job definition dictionaries.
0342     """
0343 
0344     job_definitions_list = []
0345 
0346     # re-package dictionaries
0347     job_definitions_dict = read_json(filename)
0348     if job_definitions_dict:
0349         for job_id in job_definitions_dict:
0350             res = {'jobid': job_id}
0351             res.update(job_definitions_dict[job_id])
0352             job_definitions_list.append(res)
0353 
0354     return job_definitions_list