File indexing completed on 2026-04-10 08:39:18
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 from __future__ import print_function
0013 from __future__ import absolute_import
0014
0015 import argparse
0016 import logging
0017 import sys
0018 import threading
0019 import time
0020 from os import getcwd, chdir, environ
0021 from os.path import exists, join
0022 from shutil import rmtree
0023
0024 from pilot.common.errorcodes import ErrorCodes
0025 from pilot.common.exception import PilotException
0026 from pilot.info import infosys
0027 from pilot.util.auxiliary import pilot_version_banner, shell_exit_code
0028 from pilot.util.constants import SUCCESS, FAILURE, ERRNO_NOJOBS, PILOT_START_TIME, PILOT_END_TIME, get_pilot_version, \
0029 SERVER_UPDATE_NOT_DONE, PILOT_MULTIJOB_START_TIME
0030 from pilot.util.filehandling import get_pilot_work_dir, mkdirs, establish_logging
0031 from pilot.util.harvester import is_harvester_mode
0032 from pilot.util.https import https_setup
0033 from pilot.util.timing import add_to_pilot_timing
0034
0035 errors = ErrorCodes()
0036
0037
0038 def main():
0039 """
0040 Main function of PanDA Pilot 2.
0041 Prepare for and execute the requested workflow.
0042
0043 :return: exit code (int).
0044 """
0045
0046
0047 logger = logging.getLogger(__name__)
0048
0049
0050 pilot_version_banner()
0051
0052
0053 args.graceful_stop = threading.Event()
0054 args.abort_job = threading.Event()
0055 args.job_aborted = threading.Event()
0056
0057
0058 args.retrieve_next_job = True
0059 args.signal = None
0060 args.signal_counter = 0
0061 args.kill_time = 0
0062
0063
0064 if args.use_https:
0065 https_setup(args, get_pilot_version())
0066
0067
0068 try:
0069 infosys.init(args.queue)
0070
0071 if infosys.queuedata.state != 'ACTIVE':
0072 logger.critical('specified queue is NOT ACTIVE: %s -- aborting', infosys.queuedata.name)
0073 return errors.PANDAQUEUENOTACTIVE
0074 except PilotException as error:
0075 logger.fatal(error)
0076 return error.get_error_code()
0077
0078
0079 environ['PILOT_RUCIO_SITENAME'] = infosys.queuedata.site
0080
0081
0082 environ['PILOT_SITENAME'] = infosys.queuedata.resource
0083
0084
0085 logger.info('pilot arguments: %s', str(args))
0086 workflow = __import__('pilot.workflow.%s' % args.workflow, globals(), locals(), [args.workflow], 0)
0087
0088
0089 try:
0090 exit_code = workflow.run(args)
0091 except Exception as e:
0092 logger.fatal('main pilot function caught exception: %s', e)
0093 exit_code = None
0094
0095 return exit_code
0096
0097
0098 class Args:
0099 """
0100 Dummy namespace class used to contain pilot arguments.
0101 """
0102 pass
0103
0104
0105 def str2bool(v):
0106 """ Helper function to convert string to bool """
0107
0108 if isinstance(v, bool):
0109 return v
0110 if v.lower() in ('yes', 'true', 't', 'y', '1'):
0111 return True
0112 elif v.lower() in ('no', 'false', 'f', 'n', '0'):
0113 return False
0114 else:
0115 raise argparse.ArgumentTypeError('Boolean value expected.')
0116
0117
0118 def get_args():
0119 """
0120 Return the args from the arg parser.
0121
0122 :return: args (arg parser object).
0123 """
0124
0125 arg_parser = argparse.ArgumentParser()
0126
0127
0128 arg_parser.add_argument('--no-pilot-log',
0129 dest='nopilotlog',
0130 action='store_true',
0131 default=False,
0132 help='Do not write the pilot log to file')
0133
0134
0135 arg_parser.add_argument('-a',
0136 dest='workdir',
0137 default="",
0138 help='Pilot work directory')
0139
0140
0141 arg_parser.add_argument('-d',
0142 dest='debug',
0143 action='store_true',
0144 default=False,
0145 help='Enable debug mode for logging messages')
0146
0147
0148 arg_parser.add_argument('-w',
0149 dest='workflow',
0150 default='generic',
0151 choices=['generic', 'generic_hpc',
0152 'production', 'production_hpc',
0153 'analysis', 'analysis_hpc',
0154 'eventservice_hpc', 'stagein', 'payload_stageout'],
0155 help='Pilot workflow (default: generic)')
0156
0157
0158 arg_parser.add_argument('-l',
0159 dest='lifetime',
0160 default=324000,
0161 required=False,
0162 type=int,
0163 help='Pilot lifetime seconds (default: 324000 s)')
0164
0165
0166 arg_parser.add_argument('-q',
0167 dest='queue',
0168 required=True,
0169 help='MANDATORY: queue name (e.g., AGLT2_TEST-condor)')
0170 arg_parser.add_argument('-r',
0171 dest='resource',
0172 required=False,
0173 help='OBSOLETE: resource name (e.g., AGLT2_TEST)')
0174 arg_parser.add_argument('-s',
0175 dest='site',
0176 required=False,
0177 help='OBSOLETE: site name (e.g., AGLT2_TEST)')
0178
0179
0180 arg_parser.add_argument('-j',
0181 dest='job_label',
0182 default='ptest',
0183 help='Job prod/source label (default: ptest)')
0184
0185
0186 arg_parser.add_argument('-i',
0187 dest='version_tag',
0188 default='PR',
0189 help='Version tag (default: PR, optional: RC)')
0190
0191 arg_parser.add_argument('-z',
0192 dest='update_server',
0193 action='store_false',
0194 default=True,
0195 help='Disable server updates')
0196
0197 arg_parser.add_argument('-t',
0198 dest='verify_proxy',
0199 action='store_false',
0200 default=True,
0201 help='Disable proxy verification')
0202
0203 arg_parser.add_argument('-u',
0204 dest='verify_payload_proxy',
0205 action='store_false',
0206 default=True,
0207 help='Disable payload proxy verification')
0208
0209
0210 arg_parser.add_argument('-v',
0211 dest='getjob_requests',
0212 default=2,
0213 required=False,
0214 type=int,
0215 help='Number of getjob requests')
0216
0217 arg_parser.add_argument('-x',
0218 dest='getjob_failures',
0219 default=5,
0220 required=False,
0221 type=int,
0222 help='Maximum number of getjob request failures in Harvester mode')
0223
0224
0225 arg_parser.add_argument('--cacert',
0226 dest='cacert',
0227 default=None,
0228 help='CA certificate to use with HTTPS calls to server, commonly X509 proxy',
0229 metavar='path/to/your/certificate')
0230 arg_parser.add_argument('--capath',
0231 dest='capath',
0232 default=None,
0233 help='CA certificates path',
0234 metavar='path/to/certificates/')
0235
0236
0237 arg_parser.add_argument('--url',
0238 dest='url',
0239 default='',
0240 help='PanDA server URL')
0241 arg_parser.add_argument('-p',
0242 dest='port',
0243 default=25443,
0244 help='PanDA server port')
0245 arg_parser.add_argument('--queuedata-url',
0246 dest='queuedata_url',
0247 default='',
0248 help='Queuedata server URL')
0249
0250
0251 arg_parser.add_argument('--country-group',
0252 dest='country_group',
0253 default='',
0254 help='Country group option for getjob request')
0255
0256
0257 arg_parser.add_argument('--working-group',
0258 dest='working_group',
0259 default='',
0260 help='Working group option for getjob request')
0261
0262
0263 arg_parser.add_argument('--allow-other-country',
0264 dest='allow_other_country',
0265 type=str2bool,
0266 default=False,
0267 help='Is the resource allowed to be used outside the privileged group?')
0268
0269
0270 arg_parser.add_argument('--allow-same-user',
0271 dest='allow_same_user',
0272 type=str2bool,
0273 default=True,
0274 help='Multi-jobs will only come from same taskID (and thus same user)')
0275
0276
0277 arg_parser.add_argument('--pilot-user',
0278 dest='pilot_user',
0279 default='generic',
0280 required=True,
0281 help='Pilot user (e.g. name of experiment corresponding to pilot plug-in)')
0282
0283
0284 arg_parser.add_argument('--harvester-workdir',
0285 dest='harvester_workdir',
0286 default='',
0287 help='Harvester work directory')
0288 arg_parser.add_argument('--harvester-datadir',
0289 dest='harvester_datadir',
0290 default='',
0291 help='Harvester data directory')
0292 arg_parser.add_argument('--harvester-eventstatusdump',
0293 dest='harvester_eventstatusdump',
0294 default='',
0295 help='Harvester event status dump json file containing processing status')
0296 arg_parser.add_argument('--harvester-workerattributes',
0297 dest='harvester_workerattributes',
0298 default='',
0299 help='Harvester worker attributes json file containing job status')
0300 arg_parser.add_argument('--harvester-submit-mode',
0301 dest='harvester_submitmode',
0302 default='PULL',
0303 help='Harvester submit mode (PUSH or PULL [default])')
0304 arg_parser.add_argument('--resource-type',
0305 dest='resource_type',
0306 default='',
0307 type=str,
0308 choices=['SCORE', 'MCORE', 'SCORE_HIMEM', 'MCORE_HIMEM'],
0309 help='Resource type; MCORE, SCORE, SCORE_HIMEM or MCORE_HIMEM')
0310 arg_parser.add_argument('--use-https',
0311 dest='use_https',
0312 type=str2bool,
0313 default=True,
0314 help='Use HTTPS protocol for communications with server')
0315 arg_parser.add_argument('--cleanup',
0316 dest='cleanup',
0317 type=str2bool,
0318 default=True,
0319 help='Cleanup work directory after pilot has finished')
0320 arg_parser.add_argument('--use-realtime-logging',
0321 dest='use_realtime_logging',
0322 type=str2bool,
0323 default=False,
0324 help='Use near real-time logging')
0325 arg_parser.add_argument('--realtime-logging-server',
0326 dest='realtime_logging_server',
0327 default='',
0328 help='Near real-time logging server')
0329
0330
0331 arg_parser.add_argument('--input-dir',
0332 dest='input_dir',
0333 default='',
0334 help='Input directory')
0335 arg_parser.add_argument('--output-dir',
0336 dest='output_dir',
0337 default='',
0338 help='Output directory')
0339 arg_parser.add_argument('--job-type',
0340 dest='jobtype',
0341 default='',
0342 help='Job type (managed, user)')
0343 arg_parser.add_argument('--use-rucio-traces',
0344 dest='use_rucio_traces',
0345 type=str2bool,
0346 default=True,
0347 help='Use rucio traces')
0348
0349
0350 arg_parser.add_argument('--hpc-resource',
0351 dest='hpc_resource',
0352 default='',
0353 help='Name of the HPC (e.g. Titan)')
0354 arg_parser.add_argument('--hpc-mode',
0355 dest='hpc_mode',
0356 default='manytoone',
0357 help='HPC mode (manytoone, jumbojobs)')
0358 arg_parser.add_argument('--es-executor-type',
0359 dest='executor_type',
0360 default='generic',
0361 help='Event service executor type (generic, raythena)')
0362
0363 return arg_parser.parse_args()
0364
0365
0366 def create_main_work_dir(args):
0367 """
0368 Create and return the pilot's main work directory.
0369 The function also sets args.mainworkdir and cd's into this directory.
0370
0371 :param args: pilot arguments object.
0372 :return: exit code (int), main work directory (string).
0373 """
0374
0375 exit_code = 0
0376
0377 if args.workdir != "":
0378 mainworkdir = get_pilot_work_dir(args.workdir)
0379 try:
0380
0381 mkdirs(mainworkdir)
0382 except PilotException as error:
0383
0384 print('failed to create workdir at %s -- aborting: %s' % (mainworkdir, error), file=sys.stderr)
0385 exit_code = shell_exit_code(error._errorCode)
0386 else:
0387 mainworkdir = getcwd()
0388
0389 args.mainworkdir = mainworkdir
0390 chdir(mainworkdir)
0391
0392 return exit_code, mainworkdir
0393
0394
0395 def set_environment_variables(args, mainworkdir):
0396 """
0397 Set environment variables. To be replaced with singleton implementation.
0398 This function sets PILOT_WORK_DIR, PILOT_HOME, PILOT_SITENAME, PILOT_USER and PILOT_VERSION and others.
0399
0400 :param args: args object.
0401 :param mainworkdir: work directory (string).
0402 :return:
0403 """
0404
0405
0406 environ['PILOT_WORK_DIR'] = args.workdir
0407
0408
0409 environ['PILOT_HOME'] = mainworkdir
0410
0411
0412 environ['PILOT_SOURCE_DIR'] = args.sourcedir
0413
0414
0415 environ['PILOT_USER'] = args.pilot_user
0416
0417
0418 environ['PILOT_JOB_STATE'] = 'startup'
0419
0420
0421 environ['PILOT_VERSION'] = get_pilot_version()
0422
0423
0424 environ['PILOT_WRAP_UP'] = 'NORMAL'
0425
0426
0427 environ['PILOT_PROXY_VERIFICATION'] = '%s' % args.verify_proxy
0428 environ['PILOT_PAYLOAD_PROXY_VERIFICATION'] = '%s' % args.verify_payload_proxy
0429
0430
0431 environ['SERVER_UPDATE'] = SERVER_UPDATE_NOT_DONE
0432
0433
0434 environ['PILOT_RESOURCE_NAME'] = args.hpc_resource
0435
0436
0437 environ['PILOT_USE_RUCIO_TRACES'] = str(args.use_rucio_traces)
0438
0439
0440 environ['PILOT_ES_EXECUTOR_TYPE'] = args.executor_type
0441
0442 if args.output_dir:
0443 environ['PILOT_OUTPUT_DIR'] = args.output_dir
0444
0445
0446 _port = ":%s" % args.port
0447 url = args.url if _port in args.url else args.url + _port
0448 environ['PANDA_SERVER_URL'] = url
0449 environ['QUEUEDATA_SERVER_URL'] = '%s' % args.queuedata_url
0450
0451
0452 def wrap_up(initdir, mainworkdir, args):
0453 """
0454 Perform cleanup and terminate logging.
0455
0456 :param initdir: launch directory (string).
0457 :param mainworkdir: main work directory (string).
0458 :param args: pilot arguments object.
0459 :return: exit code (int).
0460 """
0461
0462 exit_code = 0
0463
0464
0465 if initdir != mainworkdir and args.cleanup:
0466 chdir(initdir)
0467 try:
0468 rmtree(mainworkdir)
0469 except Exception as e:
0470 logging.warning("failed to remove %s: %s", mainworkdir, e)
0471 else:
0472 logging.info("removed %s", mainworkdir)
0473
0474
0475 if args.harvester:
0476 from pilot.util.harvester import kill_worker
0477 kill_worker()
0478
0479 try:
0480 exit_code = trace.pilot['error_code']
0481 except Exception:
0482 exit_code = trace
0483 else:
0484 logging.info('traces error code: %d', exit_code)
0485 if trace.pilot['nr_jobs'] <= 1:
0486 if exit_code != 0:
0487 logging.info('an exit code was already set: %d (will be converted to a standard shell code)', exit_code)
0488 elif trace.pilot['nr_jobs'] > 0:
0489 if trace.pilot['nr_jobs'] == 1:
0490 logging.getLogger(__name__).info('pilot has finished (%d job was processed)', trace.pilot['nr_jobs'])
0491 else:
0492 logging.getLogger(__name__).info('pilot has finished (%d jobs were processed)', trace.pilot['nr_jobs'])
0493 exit_code = SUCCESS
0494 elif trace.pilot['state'] == FAILURE:
0495 logging.critical('pilot workflow failure -- aborting')
0496 elif trace.pilot['state'] == ERRNO_NOJOBS:
0497 logging.critical('pilot did not process any events -- aborting')
0498 exit_code = ERRNO_NOJOBS
0499 logging.info('pilot has finished')
0500 logging.shutdown()
0501
0502 return shell_exit_code(exit_code)
0503
0504
0505 def get_pilot_source_dir():
0506 """
0507 Return the pilot source directory.
0508
0509 :return: full path to pilot source directory.
0510 """
0511
0512 cwd = getcwd()
0513 if exists(join(join(cwd, 'pilot2'), 'pilot.py')):
0514 return join(cwd, 'pilot2')
0515 elif exists(join(cwd, 'pilot.py')):
0516 return cwd
0517 else:
0518
0519 return cwd
0520
0521
0522 if __name__ == '__main__':
0523 """
0524 Main function of pilot module.
0525 """
0526
0527
0528 args = get_args()
0529
0530
0531 args.harvester = is_harvester_mode(args)
0532
0533
0534 args.timing = {}
0535
0536
0537 args.job_status = {}
0538
0539
0540 add_to_pilot_timing('0', PILOT_START_TIME, time.time(), args)
0541 add_to_pilot_timing('1', PILOT_MULTIJOB_START_TIME, time.time(), args)
0542
0543
0544 args.sourcedir = getcwd()
0545
0546 exit_code, mainworkdir = create_main_work_dir(args)
0547 if exit_code != 0:
0548 sys.exit(exit_code)
0549
0550
0551 set_environment_variables(args, mainworkdir)
0552
0553
0554 establish_logging(debug=args.debug, nopilotlog=args.nopilotlog)
0555
0556
0557 trace = main()
0558
0559
0560 add_to_pilot_timing('0', PILOT_END_TIME, time.time(), args, store=False)
0561
0562
0563 exit_code = wrap_up(args.sourcedir, mainworkdir, args)
0564
0565
0566 sys.exit(exit_code)