Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:18

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Mario Lassnig, mario.lassnig@cern.ch, 2016-2017
0009 # - Daniel Drizhuk, d.drizhuk@gmail.com, 2017
0010 # - Paul Nilsson, paul.nilsson@cern.ch, 2017-2019
0011 
0012 from __future__ import print_function  # Python 2 (2to3 complains about this)
0013 from __future__ import absolute_import
0014 
0015 import argparse
0016 import logging
0017 import sys
0018 import threading
0019 import time
0020 from os import getcwd, chdir, environ
0021 from os.path import exists, join
0022 from shutil import rmtree
0023 
0024 from pilot.common.errorcodes import ErrorCodes
0025 from pilot.common.exception import PilotException
0026 from pilot.info import infosys
0027 from pilot.util.auxiliary import pilot_version_banner, shell_exit_code
0028 from pilot.util.constants import SUCCESS, FAILURE, ERRNO_NOJOBS, PILOT_START_TIME, PILOT_END_TIME, get_pilot_version, \
0029     SERVER_UPDATE_NOT_DONE, PILOT_MULTIJOB_START_TIME
0030 from pilot.util.filehandling import get_pilot_work_dir, mkdirs, establish_logging
0031 from pilot.util.harvester import is_harvester_mode
0032 from pilot.util.https import https_setup
0033 from pilot.util.timing import add_to_pilot_timing
0034 
0035 errors = ErrorCodes()
0036 
0037 
0038 def main():
0039     """
0040     Main function of PanDA Pilot 2.
0041     Prepare for and execute the requested workflow.
0042 
0043     :return: exit code (int).
0044     """
0045 
0046     # get the logger
0047     logger = logging.getLogger(__name__)
0048 
0049     # print the pilot version
0050     pilot_version_banner()
0051 
0052     # define threading events
0053     args.graceful_stop = threading.Event()
0054     args.abort_job = threading.Event()
0055     args.job_aborted = threading.Event()
0056 
0057     # define useful variables
0058     args.retrieve_next_job = True  # go ahead and download a new job
0059     args.signal = None  # to store any incoming signals
0060     args.signal_counter = 0  # keep track of number of received kill signal (suicide counter)
0061     args.kill_time = 0  # keep track of when first kill signal arrived
0062 
0063     # perform https setup
0064     if args.use_https:
0065         https_setup(args, get_pilot_version())
0066 
0067     # initialize InfoService
0068     try:
0069         infosys.init(args.queue)
0070         # check if queue is ACTIVE
0071         if infosys.queuedata.state != 'ACTIVE':
0072             logger.critical('specified queue is NOT ACTIVE: %s -- aborting', infosys.queuedata.name)
0073             return errors.PANDAQUEUENOTACTIVE
0074     except PilotException as error:
0075         logger.fatal(error)
0076         return error.get_error_code()
0077 
0078     # set the site name for rucio  ## is it really used?
0079     environ['PILOT_RUCIO_SITENAME'] = infosys.queuedata.site
0080 
0081     # store the site name as set with a pilot option
0082     environ['PILOT_SITENAME'] = infosys.queuedata.resource  #args.site  # TODO: replace with singleton
0083 
0084     # set requested workflow
0085     logger.info('pilot arguments: %s', str(args))
0086     workflow = __import__('pilot.workflow.%s' % args.workflow, globals(), locals(), [args.workflow], 0)  # Python 3, -1 -> 0
0087 
0088     # execute workflow
0089     try:
0090         exit_code = workflow.run(args)
0091     except Exception as e:
0092         logger.fatal('main pilot function caught exception: %s', e)
0093         exit_code = None
0094 
0095     return exit_code
0096 
0097 
0098 class Args:
0099     """
0100     Dummy namespace class used to contain pilot arguments.
0101     """
0102     pass
0103 
0104 
0105 def str2bool(v):
0106     """ Helper function to convert string to bool """
0107 
0108     if isinstance(v, bool):
0109         return v
0110     if v.lower() in ('yes', 'true', 't', 'y', '1'):
0111         return True
0112     elif v.lower() in ('no', 'false', 'f', 'n', '0'):
0113         return False
0114     else:
0115         raise argparse.ArgumentTypeError('Boolean value expected.')
0116 
0117 
0118 def get_args():
0119     """
0120     Return the args from the arg parser.
0121 
0122     :return: args (arg parser object).
0123     """
0124 
0125     arg_parser = argparse.ArgumentParser()
0126 
0127     # pilot log creation
0128     arg_parser.add_argument('--no-pilot-log',
0129                             dest='nopilotlog',
0130                             action='store_true',
0131                             default=False,
0132                             help='Do not write the pilot log to file')
0133 
0134     # pilot work directory
0135     arg_parser.add_argument('-a',
0136                             dest='workdir',
0137                             default="",
0138                             help='Pilot work directory')
0139 
0140     # debug option to enable more log messages
0141     arg_parser.add_argument('-d',
0142                             dest='debug',
0143                             action='store_true',
0144                             default=False,
0145                             help='Enable debug mode for logging messages')
0146 
0147     # the choices must match in name the python module in pilot/workflow/
0148     arg_parser.add_argument('-w',
0149                             dest='workflow',
0150                             default='generic',
0151                             choices=['generic', 'generic_hpc',
0152                                      'production', 'production_hpc',
0153                                      'analysis', 'analysis_hpc',
0154                                      'eventservice_hpc', 'stagein', 'payload_stageout'],
0155                             help='Pilot workflow (default: generic)')
0156 
0157     # graciously stop pilot process after hard limit
0158     arg_parser.add_argument('-l',
0159                             dest='lifetime',
0160                             default=324000,
0161                             required=False,
0162                             type=int,
0163                             help='Pilot lifetime seconds (default: 324000 s)')
0164 
0165     # set the appropriate site, resource and queue
0166     arg_parser.add_argument('-q',
0167                             dest='queue',
0168                             required=True,
0169                             help='MANDATORY: queue name (e.g., AGLT2_TEST-condor)')
0170     arg_parser.add_argument('-r',
0171                             dest='resource',
0172                             required=False,  # From v 2.2.0 the resource name is internally set
0173                             help='OBSOLETE: resource name (e.g., AGLT2_TEST)')
0174     arg_parser.add_argument('-s',
0175                             dest='site',
0176                             required=False,  # From v 2.2.1 the site name is internally set
0177                             help='OBSOLETE: site name (e.g., AGLT2_TEST)')
0178 
0179     # graciously stop pilot process after hard limit
0180     arg_parser.add_argument('-j',
0181                             dest='job_label',
0182                             default='ptest',
0183                             help='Job prod/source label (default: ptest)')
0184 
0185     # pilot version tag; PR or RC
0186     arg_parser.add_argument('-i',
0187                             dest='version_tag',
0188                             default='PR',
0189                             help='Version tag (default: PR, optional: RC)')
0190 
0191     arg_parser.add_argument('-z',
0192                             dest='update_server',
0193                             action='store_false',
0194                             default=True,
0195                             help='Disable server updates')
0196 
0197     arg_parser.add_argument('-t',
0198                             dest='verify_proxy',
0199                             action='store_false',
0200                             default=True,
0201                             help='Disable proxy verification')
0202 
0203     arg_parser.add_argument('-u',
0204                             dest='verify_payload_proxy',
0205                             action='store_false',
0206                             default=True,
0207                             help='Disable payload proxy verification')
0208 
0209     # graciously stop pilot process after hard limit
0210     arg_parser.add_argument('-v',
0211                             dest='getjob_requests',
0212                             default=2,
0213                             required=False,
0214                             type=int,
0215                             help='Number of getjob requests')
0216 
0217     arg_parser.add_argument('-x',
0218                             dest='getjob_failures',
0219                             default=5,
0220                             required=False,
0221                             type=int,
0222                             help='Maximum number of getjob request failures in Harvester mode')
0223 
0224     # SSL certificates
0225     arg_parser.add_argument('--cacert',
0226                             dest='cacert',
0227                             default=None,
0228                             help='CA certificate to use with HTTPS calls to server, commonly X509 proxy',
0229                             metavar='path/to/your/certificate')
0230     arg_parser.add_argument('--capath',
0231                             dest='capath',
0232                             default=None,
0233                             help='CA certificates path',
0234                             metavar='path/to/certificates/')
0235 
0236     # Server URLs and ports
0237     arg_parser.add_argument('--url',
0238                             dest='url',
0239                             default='',  # the proper default is stored in default.cfg
0240                             help='PanDA server URL')
0241     arg_parser.add_argument('-p',
0242                             dest='port',
0243                             default=25443,
0244                             help='PanDA server port')
0245     arg_parser.add_argument('--queuedata-url',
0246                             dest='queuedata_url',
0247                             default='',
0248                             help='Queuedata server URL')
0249 
0250     # Country group
0251     arg_parser.add_argument('--country-group',
0252                             dest='country_group',
0253                             default='',
0254                             help='Country group option for getjob request')
0255 
0256     # Working group
0257     arg_parser.add_argument('--working-group',
0258                             dest='working_group',
0259                             default='',
0260                             help='Working group option for getjob request')
0261 
0262     # Allow other country
0263     arg_parser.add_argument('--allow-other-country',
0264                             dest='allow_other_country',
0265                             type=str2bool,
0266                             default=False,
0267                             help='Is the resource allowed to be used outside the privileged group?')
0268 
0269     # Allow same user
0270     arg_parser.add_argument('--allow-same-user',
0271                             dest='allow_same_user',
0272                             type=str2bool,
0273                             default=True,
0274                             help='Multi-jobs will only come from same taskID (and thus same user)')
0275 
0276     # Experiment
0277     arg_parser.add_argument('--pilot-user',
0278                             dest='pilot_user',
0279                             default='generic',
0280                             required=True,
0281                             help='Pilot user (e.g. name of experiment corresponding to pilot plug-in)')
0282 
0283     # Harvester specific options (if any of the following options are used, args.harvester will be set to True)
0284     arg_parser.add_argument('--harvester-workdir',
0285                             dest='harvester_workdir',
0286                             default='',
0287                             help='Harvester work directory')
0288     arg_parser.add_argument('--harvester-datadir',
0289                             dest='harvester_datadir',
0290                             default='',
0291                             help='Harvester data directory')
0292     arg_parser.add_argument('--harvester-eventstatusdump',
0293                             dest='harvester_eventstatusdump',
0294                             default='',
0295                             help='Harvester event status dump json file containing processing status')
0296     arg_parser.add_argument('--harvester-workerattributes',
0297                             dest='harvester_workerattributes',
0298                             default='',
0299                             help='Harvester worker attributes json file containing job status')
0300     arg_parser.add_argument('--harvester-submit-mode',
0301                             dest='harvester_submitmode',
0302                             default='PULL',
0303                             help='Harvester submit mode (PUSH or PULL [default])')
0304     arg_parser.add_argument('--resource-type',
0305                             dest='resource_type',
0306                             default='',
0307                             type=str,
0308                             choices=['SCORE', 'MCORE', 'SCORE_HIMEM', 'MCORE_HIMEM'],
0309                             help='Resource type; MCORE, SCORE, SCORE_HIMEM or MCORE_HIMEM')
0310     arg_parser.add_argument('--use-https',
0311                             dest='use_https',
0312                             type=str2bool,
0313                             default=True,
0314                             help='Use HTTPS protocol for communications with server')
0315     arg_parser.add_argument('--cleanup',
0316                             dest='cleanup',
0317                             type=str2bool,
0318                             default=True,
0319                             help='Cleanup work directory after pilot has finished')
0320     arg_parser.add_argument('--use-realtime-logging',
0321                             dest='use_realtime_logging',
0322                             type=str2bool,
0323                             default=False,
0324                             help='Use near real-time logging')
0325     arg_parser.add_argument('--realtime-logging-server',
0326                             dest='realtime_logging_server',
0327                             default='',
0328                             help='Near real-time logging server')
0329 
0330     # Harvester and Nordugrid specific options
0331     arg_parser.add_argument('--input-dir',
0332                             dest='input_dir',
0333                             default='',
0334                             help='Input directory')
0335     arg_parser.add_argument('--output-dir',
0336                             dest='output_dir',
0337                             default='',
0338                             help='Output directory')
0339     arg_parser.add_argument('--job-type',
0340                             dest='jobtype',
0341                             default='',
0342                             help='Job type (managed, user)')
0343     arg_parser.add_argument('--use-rucio-traces',
0344                             dest='use_rucio_traces',
0345                             type=str2bool,
0346                             default=True,
0347                             help='Use rucio traces')
0348 
0349     # HPC options
0350     arg_parser.add_argument('--hpc-resource',
0351                             dest='hpc_resource',
0352                             default='',
0353                             help='Name of the HPC (e.g. Titan)')
0354     arg_parser.add_argument('--hpc-mode',
0355                             dest='hpc_mode',
0356                             default='manytoone',
0357                             help='HPC mode (manytoone, jumbojobs)')
0358     arg_parser.add_argument('--es-executor-type',
0359                             dest='executor_type',
0360                             default='generic',
0361                             help='Event service executor type (generic, raythena)')
0362 
0363     return arg_parser.parse_args()
0364 
0365 
0366 def create_main_work_dir(args):
0367     """
0368     Create and return the pilot's main work directory.
0369     The function also sets args.mainworkdir and cd's into this directory.
0370 
0371     :param args: pilot arguments object.
0372     :return: exit code (int), main work directory (string).
0373     """
0374 
0375     exit_code = 0
0376 
0377     if args.workdir != "":
0378         mainworkdir = get_pilot_work_dir(args.workdir)
0379         try:
0380             # create the main PanDA Pilot work directory
0381             mkdirs(mainworkdir)
0382         except PilotException as error:
0383             # print to stderr since logging has not been established yet
0384             print('failed to create workdir at %s -- aborting: %s' % (mainworkdir, error), file=sys.stderr)
0385             exit_code = shell_exit_code(error._errorCode)
0386     else:
0387         mainworkdir = getcwd()
0388 
0389     args.mainworkdir = mainworkdir
0390     chdir(mainworkdir)
0391 
0392     return exit_code, mainworkdir
0393 
0394 
0395 def set_environment_variables(args, mainworkdir):
0396     """
0397     Set environment variables. To be replaced with singleton implementation.
0398     This function sets PILOT_WORK_DIR, PILOT_HOME, PILOT_SITENAME, PILOT_USER and PILOT_VERSION and others.
0399 
0400     :param args: args object.
0401     :param mainworkdir: work directory (string).
0402     :return:
0403     """
0404 
0405     # working directory as set with a pilot option (e.g. ..)
0406     environ['PILOT_WORK_DIR'] = args.workdir  # TODO: replace with singleton
0407 
0408     # main work directory (e.g. /scratch/PanDA_Pilot2_3908_1537173670)
0409     environ['PILOT_HOME'] = mainworkdir  # TODO: replace with singleton
0410 
0411     # pilot source directory (e.g. /cluster/home/usatlas1/gram_scratch_hHq4Ns/condorg_oqmHdWxz)
0412     environ['PILOT_SOURCE_DIR'] = args.sourcedir  # TODO: replace with singleton
0413 
0414     # set the pilot user (e.g. ATLAS)
0415     environ['PILOT_USER'] = args.pilot_user  # TODO: replace with singleton
0416 
0417     # internal pilot state
0418     environ['PILOT_JOB_STATE'] = 'startup'  # TODO: replace with singleton
0419 
0420     # set the pilot version
0421     environ['PILOT_VERSION'] = get_pilot_version()
0422 
0423     # set the default wrap-up/finish instruction
0424     environ['PILOT_WRAP_UP'] = 'NORMAL'
0425 
0426     # proxy verifications
0427     environ['PILOT_PROXY_VERIFICATION'] = '%s' % args.verify_proxy
0428     environ['PILOT_PAYLOAD_PROXY_VERIFICATION'] = '%s' % args.verify_payload_proxy
0429 
0430     # keep track of the server updates, if any
0431     environ['SERVER_UPDATE'] = SERVER_UPDATE_NOT_DONE
0432 
0433     # set the (HPC) resource name (if set in options)
0434     environ['PILOT_RESOURCE_NAME'] = args.hpc_resource
0435 
0436     # allow for the possibility of turning off rucio traces
0437     environ['PILOT_USE_RUCIO_TRACES'] = str(args.use_rucio_traces)
0438 
0439     # event service executor type
0440     environ['PILOT_ES_EXECUTOR_TYPE'] = args.executor_type
0441 
0442     if args.output_dir:
0443         environ['PILOT_OUTPUT_DIR'] = args.output_dir
0444 
0445     # keep track of the server urls
0446     _port = ":%s" % args.port
0447     url = args.url if _port in args.url else args.url + _port
0448     environ['PANDA_SERVER_URL'] = url
0449     environ['QUEUEDATA_SERVER_URL'] = '%s' % args.queuedata_url
0450 
0451 
0452 def wrap_up(initdir, mainworkdir, args):
0453     """
0454     Perform cleanup and terminate logging.
0455 
0456     :param initdir: launch directory (string).
0457     :param mainworkdir: main work directory (string).
0458     :param args: pilot arguments object.
0459     :return: exit code (int).
0460     """
0461 
0462     exit_code = 0
0463 
0464     # cleanup pilot workdir if created
0465     if initdir != mainworkdir and args.cleanup:
0466         chdir(initdir)
0467         try:
0468             rmtree(mainworkdir)
0469         except Exception as e:
0470             logging.warning("failed to remove %s: %s", mainworkdir, e)
0471         else:
0472             logging.info("removed %s", mainworkdir)
0473 
0474     # in Harvester mode, create a kill_worker file that will instruct Harvester that the pilot has finished
0475     if args.harvester:
0476         from pilot.util.harvester import kill_worker
0477         kill_worker()
0478 
0479     try:
0480         exit_code = trace.pilot['error_code']
0481     except Exception:
0482         exit_code = trace
0483     else:
0484         logging.info('traces error code: %d', exit_code)
0485         if trace.pilot['nr_jobs'] <= 1:
0486             if exit_code != 0:
0487                 logging.info('an exit code was already set: %d (will be converted to a standard shell code)', exit_code)
0488         elif trace.pilot['nr_jobs'] > 0:
0489             if trace.pilot['nr_jobs'] == 1:
0490                 logging.getLogger(__name__).info('pilot has finished (%d job was processed)', trace.pilot['nr_jobs'])
0491             else:
0492                 logging.getLogger(__name__).info('pilot has finished (%d jobs were processed)', trace.pilot['nr_jobs'])
0493             exit_code = SUCCESS
0494         elif trace.pilot['state'] == FAILURE:
0495             logging.critical('pilot workflow failure -- aborting')
0496         elif trace.pilot['state'] == ERRNO_NOJOBS:
0497             logging.critical('pilot did not process any events -- aborting')
0498             exit_code = ERRNO_NOJOBS
0499     logging.info('pilot has finished')
0500     logging.shutdown()
0501 
0502     return shell_exit_code(exit_code)
0503 
0504 
0505 def get_pilot_source_dir():
0506     """
0507     Return the pilot source directory.
0508 
0509     :return: full path to pilot source directory.
0510     """
0511 
0512     cwd = getcwd()
0513     if exists(join(join(cwd, 'pilot2'), 'pilot.py')):  # in case wrapper has untarred src as pilot2 in init dir
0514         return join(cwd, 'pilot2')
0515     elif exists(join(cwd, 'pilot.py')):  # in case pilot gets launched from within the src dir
0516         return cwd
0517     else:
0518         # could throw error here, but logging is not setup yet - fail later
0519         return cwd
0520 
0521 
0522 if __name__ == '__main__':
0523     """
0524     Main function of pilot module.
0525     """
0526 
0527     # get the args from the arg parser
0528     args = get_args()
0529 
0530     # Define and set the main harvester control boolean
0531     args.harvester = is_harvester_mode(args)
0532 
0533     # initialize the pilot timing dictionary
0534     args.timing = {}  # TODO: move to singleton?
0535 
0536     # initialize job status dictionary (e.g. used to keep track of log transfers)
0537     args.job_status = {}  # TODO: move to singleton or to job object directly?
0538 
0539     # store T0 time stamp
0540     add_to_pilot_timing('0', PILOT_START_TIME, time.time(), args)
0541     add_to_pilot_timing('1', PILOT_MULTIJOB_START_TIME, time.time(), args)
0542 
0543     # if requested by the wrapper via a pilot option, create the main pilot workdir and cd into it
0544     args.sourcedir = getcwd()  #get_pilot_source_dir()
0545 
0546     exit_code, mainworkdir = create_main_work_dir(args)
0547     if exit_code != 0:
0548         sys.exit(exit_code)
0549 
0550     # set environment variables (to be replaced with singleton implementation)
0551     set_environment_variables(args, mainworkdir)
0552 
0553     # setup and establish standard logging
0554     establish_logging(debug=args.debug, nopilotlog=args.nopilotlog)
0555 
0556     # execute main function
0557     trace = main()
0558 
0559     # store final time stamp (cannot be placed later since the mainworkdir is about to be purged)
0560     add_to_pilot_timing('0', PILOT_END_TIME, time.time(), args, store=False)
0561 
0562     # perform cleanup and terminate logging
0563     exit_code = wrap_up(args.sourcedir, mainworkdir, args)
0564 
0565     # the end.
0566     sys.exit(exit_code)