File indexing completed on 2026-04-10 08:39:17
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 import os
0011 import os.path
0012 import socket
0013
0014 from pilot.common.exception import FileHandlingFailure
0015 from pilot.util.config import config
0016 from pilot.util.filehandling import write_json, touch, remove, read_json, get_checksum_value
0017 from pilot.util.timing import time_stamp
0018
0019 import logging
0020 logger = logging.getLogger(__name__)
0021
0022
0023 def dump(obj):
0024 """
0025 function for debugging - dumps object to sysout
0026 """
0027 for attr in dir(obj):
0028 print("obj.%s = %r" % (attr, getattr(obj, attr)))
0029
0030
0031 def is_harvester_mode(args):
0032 """
0033 Determine if the pilot is running in Harvester mode.
0034 :param args: Pilot arguments object.
0035 :return: Boolean.
0036 """
0037
0038 if (args.harvester_workdir != '' or args.harvester_datadir != '') and not args.update_server:
0039 harvester = True
0040 elif (args.harvester_eventstatusdump != '' or args.harvester_workerattributes != '') and not args.update_server:
0041 harvester = True
0042 elif ('HARVESTER_ID' in os.environ or 'HARVESTER_WORKER_ID' in os.environ) and args.harvester_submitmode.lower() == 'push':
0043 harvester = True
0044 else:
0045 harvester = False
0046
0047 return harvester
0048
0049
0050 def get_job_request_file_name():
0051 """
0052 Return the name of the job request file as defined in the pilot config file.
0053
0054 :return: job request file name.
0055 """
0056
0057
0058 return os.path.join(os.environ['PILOT_HOME'], config.Harvester.job_request_file)
0059
0060
0061 def remove_job_request_file():
0062 """
0063 Remove an old job request file when it is no longer needed.
0064
0065 :return:
0066 """
0067
0068 path = get_job_request_file_name()
0069 if os.path.exists(path):
0070 if remove(path) == 0:
0071 logger.info('removed %s', path)
0072 else:
0073 logger.debug('there is no job request file')
0074
0075
0076 def request_new_jobs(njobs=1):
0077 """
0078 Inform Harvester that the pilot is ready to process new jobs by creating a job request file with the desired
0079 number of jobs.
0080
0081 :param njobs: Number of jobs. Default is 1 since on grids and clouds the pilot does not know how many jobs it can
0082 process before it runs out of time.
0083 :return:
0084 """
0085
0086 path = get_job_request_file_name()
0087 dictionary = {'nJobs': njobs}
0088
0089
0090 try:
0091 write_json(path, dictionary)
0092 except FileHandlingFailure:
0093 raise FileHandlingFailure
0094
0095
0096 def kill_worker():
0097 """
0098 Create (touch) a kill_worker file in the pilot launch directory.
0099 This file will let Harverster know that the pilot has finished.
0100
0101 :return:
0102 """
0103
0104 touch(os.path.join(os.environ['PILOT_HOME'], config.Harvester.kill_worker_file))
0105
0106
0107 def get_initial_work_report():
0108 """
0109 Prepare the work report dictionary.
0110 Note: the work_report should also contain all fields defined in parse_jobreport_data().
0111
0112 :return: work report dictionary.
0113 """
0114
0115 work_report = {'jobStatus': 'starting',
0116 'messageLevel': logging.getLevelName(logger.getEffectiveLevel()),
0117 'cpuConversionFactor': 1.0,
0118 'cpuConsumptionTime': '',
0119 'node': socket.gethostname(),
0120 'workdir': '',
0121 'timestamp': time_stamp(),
0122 'endTime': '',
0123 'transExitCode': 0,
0124 'pilotErrorCode': 0,
0125 }
0126
0127 return work_report
0128
0129
0130 def get_event_status_file(args):
0131 """
0132 Return the name of the event_status.dump file as defined in the pilot config file
0133 and from the pilot arguments.
0134
0135 :param args: Pilot arguments object.
0136 :return: event staus file name.
0137 """
0138
0139 logger.debug('config.Harvester.__dict__ : {0}'.format(config.Harvester.__dict__))
0140
0141 if args.harvester_workdir != '':
0142 work_dir = args.harvester_workdir
0143 else:
0144 work_dir = os.environ['PILOT_HOME']
0145 event_status_file = config.Harvester.stageoutnfile
0146 event_status_file = os.path.join(work_dir, event_status_file)
0147 logger.debug('event_status_file = {}'.format(event_status_file))
0148
0149 return event_status_file
0150
0151
0152 def get_worker_attributes_file(args):
0153 """
0154 Return the name of the worker attributes file as defined in the pilot config file
0155 and from the pilot arguments.
0156
0157 :param args: Pilot arguments object.
0158 :return: worker attributes file name.
0159 """
0160
0161 logger.debug('config.Harvester.__dict__ : {0}'.format(config.Harvester.__dict__))
0162
0163 if args.harvester_workdir != '':
0164 work_dir = args.harvester_workdir
0165 else:
0166 work_dir = os.environ['PILOT_HOME']
0167 worker_attributes_file = config.Harvester.workerattributesfile
0168 worker_attributes_file = os.path.join(work_dir, worker_attributes_file)
0169 logger.debug('worker_attributes_file = {}'.format(worker_attributes_file))
0170
0171 return worker_attributes_file
0172
0173
0174 def findfile(path, name):
0175 """
0176 find the first instance of file in the directory tree
0177
0178 :param path: directory tree to search
0179 :param name: name of the file to search
0180
0181 :return: the path to the first instance of the file
0182 """
0183
0184 for root, dirs, files in os.walk(path):
0185 if name in files:
0186 return os.path.join(root, name)
0187 return ''
0188
0189
0190 def publish_stageout_files(job, event_status_file):
0191 """
0192 Publishing of work report to file.
0193 The work report dictionary should contain the fields defined in get_initial_work_report().
0194
0195 :param args: Pilot arguments object.
0196 :param job: job object.
0197 :param event status file name:
0198
0199 :return: Boolean. status of writing the file information to a json
0200 """
0201
0202
0203 work_dir = os.path.dirname(event_status_file)
0204
0205 out_file_report = {}
0206 out_file_report[job.jobid] = []
0207
0208
0209 for fspec in job.logdata:
0210 logger.debug("File {} will be checked and declared for stage out".format(fspec.lfn))
0211
0212 filename = os.path.basename(fspec.surl)
0213 path = findfile(work_dir, filename)
0214 logger.debug("Found File {} at path - {}".format(fspec.lfn, path))
0215
0216 file_desc = {}
0217 file_desc['type'] = fspec.filetype
0218 file_desc['path'] = path
0219 file_desc['guid'] = fspec.guid
0220 file_desc['fsize'] = fspec.filesize
0221 file_desc['chksum'] = get_checksum_value(fspec.checksum)
0222 logger.debug("File description - {} ".format(file_desc))
0223 out_file_report[job.jobid].append(file_desc)
0224
0225
0226 for fspec in job.outdata:
0227 logger.debug("File {} will be checked and declared for stage out".format(fspec.lfn))
0228 if fspec.status != 'transferred':
0229 logger.debug('will not add the output file to the json since it was not produced or transferred')
0230 else:
0231
0232 filename = os.path.basename(fspec.surl)
0233 path = findfile(work_dir, filename)
0234 if not path:
0235 logger.warning('file %s was not found - will not be added to json')
0236 else:
0237 logger.debug("Found File {} at path - {}".format(fspec.lfn, path))
0238
0239 file_desc = {}
0240 file_desc['type'] = fspec.filetype
0241 file_desc['path'] = path
0242 file_desc['guid'] = fspec.guid
0243 file_desc['fsize'] = fspec.filesize
0244 file_desc['chksum'] = get_checksum_value(fspec.checksum)
0245 logger.debug("File description - {} ".format(file_desc))
0246 out_file_report[job.jobid].append(file_desc)
0247
0248 if out_file_report[job.jobid]:
0249 if write_json(event_status_file, out_file_report):
0250 logger.debug('Stagout declared in: {0}'.format(event_status_file))
0251 logger.debug('Report for stageout: {}'.format(out_file_report))
0252 return True
0253 else:
0254 logger.debug('Failed to declare stagout in: {0}'.format(event_status_file))
0255 return False
0256 else:
0257 logger.debug('No Report for stageout')
0258 return False
0259
0260
0261 def publish_work_report(work_report=None, worker_attributes_file="worker_attributes.json"):
0262 """
0263 Publishing of work report to file.
0264 The work report dictionary should contain the fields defined in get_initial_work_report().
0265
0266 :param work_report: work report dictionary.
0267 :param worker_attributes_file:
0268 :raises FileHandlingFailure: in case of IOError.
0269 :return: True or False
0270 """
0271
0272 if work_report:
0273 try:
0274 work_report['timestamp'] = time_stamp()
0275 if "outputfiles" in work_report:
0276 del(work_report["outputfiles"])
0277 if "inputfiles" in work_report:
0278 del (work_report["inputfiles"])
0279 if "xml" in work_report:
0280 del (work_report["xml"])
0281 if write_json(worker_attributes_file, work_report):
0282 logger.info("work report published: {0}".format(work_report))
0283 return True
0284 else:
0285 logger.error("work report publish failed: {0}".format(work_report))
0286 return False
0287 except IOError:
0288 logger.error("job report copy failed")
0289 return False
0290 except Exception as e:
0291 logger.error("write json file failed: {0}".format(e))
0292 return False
0293 else:
0294
0295 return False
0296
0297
0298 def publish_job_report(job, args, job_report_file="jobReport.json"):
0299 """
0300 Copy job report file to make it accessible by Harvester. Shrink job report file.
0301
0302 :param job: job object.
0303 :param args: Pilot arguments object.
0304 :param job_report_file: name of job report (string).
0305 :raises FileHandlingFailure: in case of IOError.
0306 :return True or False
0307 """
0308
0309 src_file = os.path.join(job.workdir, job_report_file)
0310 dst_file = os.path.join(args.harvester_workdir, job_report_file)
0311
0312 try:
0313 logger.info(
0314 "copy of payload report [{0}] to access point: {1}".format(job_report_file, args.harvester_workdir))
0315
0316 job_report = read_json(src_file)
0317 if 'executor' in job_report:
0318 for executor in job_report['executor']:
0319 if 'logfileReport' in executor:
0320 executor['logfileReport'] = {}
0321
0322 if write_json(dst_file, job_report):
0323 return True
0324 else:
0325 return False
0326
0327 except IOError:
0328 logger.error("job report copy failed")
0329 return False
0330
0331
0332 def parse_job_definition_file(filename):
0333 """
0334 This function parses the Harvester job definition file and re-packages the job definition dictionaries.
0335 The format of the Harvester job definition dictionary is:
0336 dict = { job_id: { key: value, .. }, .. }
0337 The function returns a list of these dictionaries each re-packaged as
0338 dict = { key: value } (where the job_id is now one of the key-value pairs: 'jobid': job_id)
0339
0340 :param filename: file name (string).
0341 :return: list of job definition dictionaries.
0342 """
0343
0344 job_definitions_list = []
0345
0346
0347 job_definitions_dict = read_json(filename)
0348 if job_definitions_dict:
0349 for job_id in job_definitions_dict:
0350 res = {'jobid': job_id}
0351 res.update(job_definitions_dict[job_id])
0352 job_definitions_list.append(res)
0353
0354 return job_definitions_list