File indexing completed on 2026-04-10 08:39:15
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014 import copy as objectcopy
0015 import os
0016 import subprocess
0017 import time
0018
0019 try:
0020 import Queue as queue
0021 except Exception:
0022 import queue
0023
0024 from pilot.api.data import StageInClient, StageOutClient
0025 from pilot.api.es_data import StageInESClient
0026 from pilot.control.job import send_state
0027 from pilot.common.errorcodes import ErrorCodes
0028 from pilot.common.exception import ExcThread, PilotException, LogFileCreationFailure
0029
0030 from pilot.util.auxiliary import set_pilot_state, check_for_final_server_update
0031 from pilot.util.common import should_abort
0032 from pilot.util.constants import PILOT_PRE_STAGEIN, PILOT_POST_STAGEIN, PILOT_PRE_STAGEOUT, PILOT_POST_STAGEOUT, LOG_TRANSFER_IN_PROGRESS,\
0033 LOG_TRANSFER_DONE, LOG_TRANSFER_NOT_DONE, LOG_TRANSFER_FAILED, SERVER_UPDATE_RUNNING, MAX_KILL_WAIT_TIME, UTILITY_BEFORE_STAGEIN
0034 from pilot.util.container import execute
0035 from pilot.util.filehandling import remove, write_file
0036 from pilot.util.processes import threads_aborted
0037 from pilot.util.queuehandling import declare_failed_by_kill, put_in_queue
0038 from pilot.util.timing import add_to_pilot_timing
0039 from pilot.util.tracereport import TraceReport
0040 import pilot.util.middleware
0041
0042 import logging
0043 logger = logging.getLogger(__name__)
0044
0045 errors = ErrorCodes()
0046
0047
0048 def control(queues, traces, args):
0049
0050 targets = {'copytool_in': copytool_in, 'copytool_out': copytool_out, 'queue_monitoring': queue_monitoring}
0051 threads = [ExcThread(bucket=queue.Queue(), target=target, kwargs={'queues': queues, 'traces': traces, 'args': args},
0052 name=name) for name, target in list(targets.items())]
0053
0054 [thread.start() for thread in threads]
0055
0056
0057 while not args.graceful_stop.is_set():
0058 for thread in threads:
0059 bucket = thread.get_bucket()
0060 try:
0061 exc = bucket.get(block=False)
0062 except queue.Empty:
0063 pass
0064 else:
0065 exc_type, exc_obj, exc_trace = exc
0066 logger.warning("thread \'%s\' received an exception from bucket: %s", thread.name, exc_obj)
0067
0068
0069
0070
0071 thread.join(0.1)
0072 time.sleep(0.1)
0073
0074 time.sleep(0.5)
0075
0076 logger.debug('data control ending since graceful_stop has been set')
0077 if args.abort_job.is_set():
0078 if traces.pilot['command'] == 'aborting':
0079 logger.warning('jobs are aborting')
0080 elif traces.pilot['command'] == 'abort':
0081 logger.warning('data control detected a set abort_job (due to a kill signal)')
0082 traces.pilot['command'] = 'aborting'
0083
0084
0085
0086
0087
0088 if threads_aborted():
0089 logger.debug('will proceed to set job_aborted')
0090 args.job_aborted.set()
0091 else:
0092 logger.debug('will not set job_aborted yet')
0093
0094 logger.debug('[data] control thread has finished')
0095
0096
0097 def skip_special_files(job):
0098 """
0099 Consult user defined code if any files should be skipped during stage-in.
0100 ATLAS code will skip DBRelease files e.g. as they should already be available in CVMFS.
0101
0102 :param job: job object.
0103 :return:
0104 """
0105
0106 pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0107 user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)
0108 try:
0109 user.update_stagein(job)
0110 except Exception as error:
0111 logger.warning('caught exception: %s', error)
0112
0113
0114 def update_indata(job):
0115 """
0116 In case file were marked as no_transfer files, remove them from stage-in.
0117
0118 :param job: job object.
0119 :return:
0120 """
0121
0122 toberemoved = []
0123 for fspec in job.indata:
0124 if fspec.status == 'no_transfer':
0125 toberemoved.append(fspec)
0126 for fspec in toberemoved:
0127 logger.info('removing fspec object (lfn=%s) from list of input files', fspec.lfn)
0128 job.indata.remove(fspec)
0129
0130
0131 def get_trace_report_variables(job, label='stage-in'):
0132 """
0133 Get some of the variables needed for creating the trace report.
0134
0135 :param job: job object
0136 :param label: 'stage-[in|out]' (string).
0137 :return: event_type (string), localsite (string), remotesite (string).
0138 """
0139
0140 event_type = "get_sm" if label == 'stage-in' else "put_sm"
0141 if job.is_analysis():
0142 event_type += "_a"
0143 data = job.indata if label == 'stage-in' else job.outdata
0144 localsite = remotesite = get_rse(data)
0145
0146 return event_type, localsite, remotesite
0147
0148
0149 def create_trace_report(job, label='stage-in'):
0150 """
0151 Create the trace report object.
0152
0153 :param job: job object.
0154 :param label: 'stage-[in|out]' (string).
0155 :return: trace report object.
0156 """
0157
0158 event_type, localsite, remotesite = get_trace_report_variables(job, label=label)
0159 trace_report = TraceReport(pq=os.environ.get('PILOT_SITENAME', ''), localSite=localsite, remoteSite=remotesite,
0160 dataset="", eventType=event_type)
0161 trace_report.init(job)
0162
0163 return trace_report
0164
0165
0166 def _stage_in(args, job):
0167 """
0168 :return: True in case of success
0169 """
0170
0171
0172
0173
0174
0175
0176
0177 add_to_pilot_timing(job.jobid, PILOT_PRE_STAGEIN, time.time(), args)
0178
0179
0180 skip_special_files(job)
0181
0182
0183 update_indata(job)
0184
0185 label = 'stage-in'
0186
0187
0188 use_container = pilot.util.middleware.use_middleware_script(job.infosys.queuedata.container_type.get("middleware"))
0189 if use_container:
0190 logger.info('stage-in will be done by a script')
0191 try:
0192 eventtype, localsite, remotesite = get_trace_report_variables(job, label=label)
0193 pilot.util.middleware.containerise_middleware(job, job.indata, args.queue, eventtype, localsite, remotesite,
0194 job.infosys.queuedata.container_options, args.input_dir,
0195 label=label, container_type=job.infosys.queuedata.container_type.get("middleware"))
0196 except PilotException as error:
0197 logger.warning('stage-in containerisation threw a pilot exception: %s', error)
0198 except Exception as error:
0199 import traceback
0200 logger.warning('stage-in containerisation threw an exception: %s', error)
0201 logger.error(traceback.format_exc())
0202 else:
0203 try:
0204 logger.info('stage-in will not be done in a container')
0205
0206
0207 trace_report = create_trace_report(job, label=label)
0208
0209 if job.is_eventservicemerge:
0210 client = StageInESClient(job.infosys, logger=logger, trace_report=trace_report)
0211 activity = 'es_events_read'
0212 else:
0213 client = StageInClient(job.infosys, logger=logger, trace_report=trace_report)
0214 activity = 'pr'
0215 use_pcache = job.infosys.queuedata.use_pcache
0216 kwargs = dict(workdir=job.workdir, cwd=job.workdir, usecontainer=False, use_pcache=use_pcache, use_bulk=False,
0217 input_dir=args.input_dir, use_vp=job.use_vp, catchall=job.infosys.queuedata.catchall)
0218 client.prepare_sources(job.indata)
0219 client.transfer(job.indata, activity=activity, **kwargs)
0220 except PilotException as error:
0221 import traceback
0222 error_msg = traceback.format_exc()
0223 logger.error(error_msg)
0224 msg = errors.format_diagnostics(error.get_error_code(), error_msg)
0225 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(error.get_error_code(), msg=msg)
0226 except Exception as error:
0227 logger.error('failed to stage-in: error=%s', error)
0228
0229 logger.info('summary of transferred files:')
0230 for infile in job.indata:
0231 status = infile.status if infile.status else "(not transferred)"
0232 logger.info(" -- lfn=%s, status_code=%s, status=%s", infile.lfn, infile.status_code, status)
0233
0234
0235 add_to_pilot_timing(job.jobid, PILOT_POST_STAGEIN, time.time(), args)
0236
0237 remain_files = [infile for infile in job.indata if infile.status not in ['remote_io', 'transferred', 'no_transfer']]
0238 logger.info("stage-in finished") if not remain_files else logger.info("stage-in failed")
0239
0240 return not remain_files
0241
0242
0243 def get_rse(data, lfn=""):
0244 """
0245 Return the ddmEndPoint corresponding to the given lfn.
0246 If lfn is not provided, the first ddmEndPoint will be returned.
0247
0248 :param data: FileSpec list object.
0249 :param lfn: local file name (string).
0250 :return: rse (string)
0251 """
0252
0253 rse = ""
0254
0255 if lfn == "":
0256 try:
0257 return data[0].ddmendpoint
0258 except Exception as error:
0259 logger.warning("exception caught: %s", error)
0260 logger.warning("end point is currently unknown")
0261 return "unknown"
0262
0263 for fspec in data:
0264 if fspec.lfn == lfn:
0265 rse = fspec.ddmendpoint
0266
0267 if rse == "":
0268 logger.warning("end point is currently unknown")
0269 rse = "unknown"
0270
0271 return rse
0272
0273
0274 def stage_in_auto(files):
0275 """
0276 Separate dummy implementation for automatic stage-in outside of pilot workflows.
0277 Should be merged with regular stage-in functionality later, but we need to have
0278 some operational experience with it first.
0279 Many things to improve:
0280 - separate file error handling in the merged case
0281 - auto-merging of files with same destination into single copytool call
0282 """
0283
0284
0285 os.environ['RUCIO_LOGGING_FORMAT'] = '%(asctime)s %(levelname)s [%(message)s]'
0286
0287 executable = ['/usr/bin/env',
0288 'rucio', '-v', 'download',
0289 '--no-subdir']
0290
0291
0292 for _file in files:
0293 if not os.path.exists(_file['destination']):
0294 _file['status'] = 'failed'
0295 _file['errmsg'] = 'Destination directory does not exist: %s' % _file['destination']
0296 _file['errno'] = 1
0297 else:
0298 _file['status'] = 'running'
0299 _file['errmsg'] = 'File not yet successfully downloaded.'
0300 _file['errno'] = 2
0301
0302 for _file in files:
0303 if _file['errno'] == 1:
0304 continue
0305
0306 tmp_executable = objectcopy.deepcopy(executable)
0307
0308 tmp_executable += ['--dir', _file['destination']]
0309 tmp_executable.append('%s:%s' % (_file['scope'],
0310 _file['name']))
0311 process = subprocess.Popen(tmp_executable,
0312 bufsize=-1,
0313 stdout=subprocess.PIPE,
0314 stderr=subprocess.PIPE)
0315 _file['errno'] = 2
0316 while True:
0317 time.sleep(0.5)
0318 exit_code = process.poll()
0319 if exit_code is not None:
0320 _, stderr = process.communicate()
0321 if exit_code == 0:
0322 _file['status'] = 'done'
0323 _file['errno'] = 0
0324 _file['errmsg'] = 'File successfully downloaded.'
0325 else:
0326 _file['status'] = 'failed'
0327 _file['errno'] = 3
0328 try:
0329
0330 _file['errmsg'] = [detail for detail in stderr.split('\n') if detail.startswith('Details:')][0][9:-1]
0331 except Exception as error:
0332 _file['errmsg'] = 'Could not find rucio error message details - please check stderr directly: %s' % error
0333 break
0334 else:
0335 continue
0336
0337 return files
0338
0339
0340 def stage_out_auto(files):
0341 """
0342 Separate dummy implementation for automatic stage-out outside of pilot workflows.
0343 Should be merged with regular stage-out functionality later, but we need to have
0344 some operational experience with it first.
0345 """
0346
0347
0348 os.environ['RUCIO_LOGGING_FORMAT'] = '%(asctime)s %(levelname)s [%(message)s]'
0349
0350 executable = ['/usr/bin/env',
0351 'rucio', '-v', 'upload']
0352
0353
0354 for _file in files:
0355 if not os.path.exists(_file['file']):
0356 _file['status'] = 'failed'
0357 _file['errmsg'] = 'Source file does not exist: %s' % _file['file']
0358 _file['errno'] = 1
0359 else:
0360 _file['status'] = 'running'
0361 _file['errmsg'] = 'File not yet successfully uploaded.'
0362 _file['errno'] = 2
0363
0364 for _file in files:
0365 if _file['errno'] == 1:
0366 continue
0367
0368 tmp_executable = objectcopy.deepcopy(executable)
0369
0370 tmp_executable += ['--rse', _file['rse']]
0371
0372 if 'no_register' in list(_file.keys()) and _file['no_register']:
0373 tmp_executable += ['--no-register']
0374
0375 if 'summary' in list(_file.keys()) and _file['summary']:
0376 tmp_executable += ['--summary']
0377
0378 if 'lifetime' in list(_file.keys()):
0379 tmp_executable += ['--lifetime', str(_file['lifetime'])]
0380
0381 if 'guid' in list(_file.keys()):
0382 tmp_executable += ['--guid', _file['guid']]
0383
0384 if 'attach' in list(_file.keys()):
0385 tmp_executable += ['--scope', _file['scope'], '%s:%s' % (_file['attach']['scope'], _file['attach']['name']), _file['file']]
0386 else:
0387 tmp_executable += ['--scope', _file['scope'], _file['file']]
0388
0389 process = subprocess.Popen(tmp_executable, bufsize=-1, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0390 _file['errno'] = 2
0391 while True:
0392 time.sleep(0.5)
0393 exit_code = process.poll()
0394 if exit_code is not None:
0395 _, stderr = process.communicate()
0396 if exit_code == 0:
0397 _file['status'] = 'done'
0398 _file['errno'] = 0
0399 _file['errmsg'] = 'File successfully uploaded.'
0400 else:
0401 _file['status'] = 'failed'
0402 _file['errno'] = 3
0403 try:
0404
0405 _file['errmsg'] = [detail for detail in stderr.split('\n') if detail.startswith('Details:')][0][9:-1]
0406 except Exception as error:
0407 _file['errmsg'] = 'Could not find rucio error message details - please check stderr directly: %s' % error
0408 break
0409 else:
0410 continue
0411
0412 return files
0413
0414
0415 def write_output(filename, output):
0416 """
0417 Write command output to file.
0418
0419 :param filename: file name (string).
0420 :param output: command stdout/stderr (string).
0421 :return:
0422 """
0423
0424 try:
0425 write_file(filename, output, unique=True)
0426 except PilotException as error:
0427 logger.warning('failed to write utility output to file: %s, %s', error, output)
0428 else:
0429 logger.debug('wrote %s', filename)
0430
0431
0432 def write_utility_output(workdir, step, stdout, stderr):
0433 """
0434 Write the utility command output to stdout, stderr files to the job.workdir for the current step.
0435 -> <step>_stdout.txt, <step>_stderr.txt
0436 Example of step: xcache.
0437
0438 :param workdir: job workdir (string).
0439 :param step: utility step (string).
0440 :param stdout: command stdout (string).
0441 :param stderr: command stderr (string).
0442 :return:
0443 """
0444
0445
0446 write_output(os.path.join(workdir, step + '_stdout.txt'), stdout)
0447 write_output(os.path.join(workdir, step + '_stderr.txt'), stderr)
0448
0449
0450 def copytool_in(queues, traces, args):
0451 """
0452 Call the stage-in function and put the job object in the proper queue.
0453
0454 :param queues: internal queues for job handling.
0455 :param traces: tuple containing internal pilot states.
0456 :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
0457 :return:
0458 """
0459
0460 while not args.graceful_stop.is_set():
0461 time.sleep(0.5)
0462 try:
0463
0464 current_time = int(time.time())
0465 if args.kill_time and current_time - args.kill_time > MAX_KILL_WAIT_TIME:
0466 logger.warning('loop has run for too long time after first kill signal - will abort')
0467 break
0468
0469
0470 job = queues.data_in.get(block=True, timeout=1)
0471
0472
0473 pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0474 user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)
0475 cmd = user.get_utility_commands(job=job, order=UTILITY_BEFORE_STAGEIN)
0476 if cmd:
0477
0478
0479
0480
0481
0482 _, stdout, stderr = execute(cmd.get('command'))
0483 logger.debug('stdout=%s', stdout)
0484 logger.debug('stderr=%s', stderr)
0485
0486
0487
0488
0489
0490
0491
0492 kwargs = {'label': cmd.get('label', 'utility'), 'output': stdout}
0493 user.post_prestagein_utility_command(**kwargs)
0494
0495
0496 write_utility_output(job.workdir, cmd.get('label', 'utility'), stdout, stderr)
0497
0498
0499 put_in_queue(job, queues.current_data_in)
0500
0501
0502 send_state(job, args, 'running')
0503
0504
0505 if job.state == 'failed':
0506 logger.warning('job state is \'failed\' - order log transfer and abort copytool_in()')
0507 job.stageout = 'log'
0508 put_in_queue(job, queues.data_out)
0509 break
0510
0511 os.environ['SERVER_UPDATE'] = SERVER_UPDATE_RUNNING
0512
0513 if args.abort_job.is_set():
0514 traces.pilot['command'] = 'abort'
0515 logger.warning('copytool_in detected a set abort_job pre stage-in (due to a kill signal)')
0516 declare_failed_by_kill(job, queues.failed_data_in, args.signal)
0517 break
0518
0519 if _stage_in(args, job):
0520 if args.abort_job.is_set():
0521 traces.pilot['command'] = 'abort'
0522 logger.warning('copytool_in detected a set abort_job post stage-in (due to a kill signal)')
0523 declare_failed_by_kill(job, queues.failed_data_in, args.signal)
0524 break
0525
0526 put_in_queue(job, queues.finished_data_in)
0527
0528 _job = queues.current_data_in.get(block=True, timeout=1)
0529 if _job:
0530 logger.debug('job %s has been removed from the current_data_in queue', _job.jobid)
0531
0532
0533 if os.environ.get('PILOT_ES_EXECUTOR_TYPE', 'generic') == 'generic':
0534 pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0535 user = __import__('pilot.user.%s.metadata' % pilot_user, globals(), locals(), [pilot_user], 0)
0536 file_dictionary = get_input_file_dictionary(job.indata)
0537 xml = user.create_input_file_metadata(file_dictionary, job.workdir)
0538 logger.info('created input file metadata:\n%s', xml)
0539 else:
0540
0541 _job = queues.current_data_in.get(block=True, timeout=1)
0542 if _job:
0543 logger.debug('job %s has been removed from the current_data_in queue', _job.jobid)
0544 logger.warning('stage-in failed, adding job object to failed_data_in queue')
0545 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.STAGEINFAILED)
0546 set_pilot_state(job=job, state="failed")
0547 traces.pilot['error_code'] = job.piloterrorcodes[0]
0548 put_in_queue(job, queues.failed_data_in)
0549
0550
0551 check_for_final_server_update(args.update_server)
0552 args.graceful_stop.set()
0553
0554 except queue.Empty:
0555 continue
0556
0557
0558 if threads_aborted():
0559 logger.debug('will proceed to set job_aborted')
0560 args.job_aborted.set()
0561 else:
0562 logger.debug('will not set job_aborted yet')
0563
0564 logger.debug('[data] copytool_in thread has finished')
0565
0566
0567 def copytool_out(queues, traces, args):
0568 """
0569 Main stage-out thread.
0570 Perform stage-out as soon as a job object can be extracted from the data_out queue.
0571
0572 :param queues: internal queues for job handling.
0573 :param traces: tuple containing internal pilot states.
0574 :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
0575 :return:
0576 """
0577
0578 cont = True
0579 logger.debug('entering copytool_out loop')
0580 if args.graceful_stop.is_set():
0581 logger.debug('graceful_stop already set')
0582
0583 processed_jobs = []
0584 while cont:
0585
0586 time.sleep(0.5)
0587
0588
0589 current_time = int(time.time())
0590 if args.kill_time and current_time - args.kill_time > MAX_KILL_WAIT_TIME:
0591 logger.warning('loop has run for too long time after first kill signal - will abort')
0592 break
0593
0594
0595 abort = should_abort(args, label='data:copytool_out')
0596 try:
0597 job = queues.data_out.get(block=True, timeout=1)
0598 if job:
0599
0600
0601
0602
0603 if processed_jobs:
0604 if is_already_processed(queues, processed_jobs):
0605 continue
0606
0607 logger.info('will perform stage-out for job id=%s', job.jobid)
0608
0609 if args.abort_job.is_set():
0610 traces.pilot['command'] = 'abort'
0611 logger.warning('copytool_out detected a set abort_job pre stage-out (due to a kill signal)')
0612 declare_failed_by_kill(job, queues.failed_data_out, args.signal)
0613 break
0614
0615 if _stage_out_new(job, args):
0616 if args.abort_job.is_set():
0617 traces.pilot['command'] = 'abort'
0618 logger.warning('copytool_out detected a set abort_job post stage-out (due to a kill signal)')
0619
0620 break
0621
0622
0623 processed_jobs.append(job.jobid)
0624 put_in_queue(job, queues.finished_data_out)
0625 logger.debug('job object added to finished_data_out queue')
0626 else:
0627
0628 put_in_queue(job, queues.failed_data_out)
0629 logger.debug('job object added to failed_data_out queue')
0630 else:
0631 logger.debug('no returned job - why no exception?')
0632 except queue.Empty:
0633 if abort:
0634 cont = False
0635 break
0636 continue
0637
0638 if abort:
0639 cont = False
0640 break
0641
0642
0643 if threads_aborted():
0644 logger.debug('will proceed to set job_aborted')
0645 args.job_aborted.set()
0646 else:
0647 logger.debug('will not set job_aborted yet')
0648
0649 logger.debug('[data] copytool_out thread has finished')
0650
0651
0652 def is_already_processed(queues, processed_jobs):
0653 """
0654 Skip stage-out in case the job has already been processed.
0655 This should not be necessary so this is a fail-safe but it seems there is a case when a job with multiple output
0656 files enters the stage-out more than once.
0657
0658 :param queues: queues object.
0659 :param processed_jobs: list of already processed jobs.
0660 :return: True if stage-out queues contain a job object that has already been processed.
0661 """
0662
0663 snapshots = list(queues.finished_data_out.queue) + list(queues.failed_data_out.queue)
0664 jobids = [obj.jobid for obj in snapshots]
0665 found = False
0666
0667 for jobid in processed_jobs:
0668 if jobid in jobids:
0669 logger.warning('output from job %s has already been staged out', jobid)
0670 found = True
0671 break
0672 if found:
0673 time.sleep(5)
0674
0675 return found
0676
0677
0678 def get_input_file_dictionary(indata):
0679 """
0680 Return an input file dictionary.
0681 Format: {'guid': 'pfn', ..}
0682 Normally use_turl would be set to True if direct access is used.
0683 Note: any environment variables in the turls will be expanded
0684
0685 :param indata: list of FileSpec objects.
0686 :return: file dictionary.
0687 """
0688
0689 ret = {}
0690
0691 for fspec in indata:
0692 ret[fspec.guid] = fspec.turl if fspec.status == 'remote_io' else fspec.lfn
0693 ret[fspec.guid] = os.path.expandvars(ret[fspec.guid])
0694
0695
0696
0697 if not ret[fspec.guid]:
0698 ret[fspec.guid] = fspec.lfn
0699
0700 return ret
0701
0702
0703 def filter_files_for_log(directory):
0704 """
0705 Create a file list recursi
0706 :param directory:
0707 :return:
0708 """
0709 filtered_files = []
0710 maxfilesize = 10
0711 for root, _, filenames in os.walk(directory):
0712 for filename in filenames:
0713 location = os.path.join(root, filename)
0714 if os.path.exists(location):
0715 if os.path.getsize(location) < maxfilesize:
0716 filtered_files.append(location)
0717
0718 return filtered_files
0719
0720
0721 def create_log(workdir, logfile_name, tarball_name, cleanup, input_files=[], output_files=[], is_looping=False, debugmode=False):
0722 """
0723 Create the tarball for the job.
0724
0725 :param workdir: work directory for the job (string).
0726 :param logfile_name: log file name (string).
0727 :param tarball_name: tarball name (string).
0728 :param cleanup: perform cleanup (Boolean).
0729 :param input_files: list of input files to remove (list).
0730 :param output_files: list of output files to remove (list).
0731 :param is_looping: True for looping jobs, False by default (Boolean).
0732 :param debugmode: True if debug mode has been switched on (Boolean).
0733 :raises LogFileCreationFailure: in case of log file creation problem.
0734 :return:
0735 """
0736
0737 logger.debug('preparing to create log file (debug mode=%s)', str(debugmode))
0738
0739
0740 pilot_home = os.environ.get('PILOT_HOME', os.getcwd())
0741 current_dir = os.getcwd()
0742 if pilot_home != current_dir:
0743 os.chdir(pilot_home)
0744
0745
0746 if cleanup:
0747 pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0748 user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)
0749 user.remove_redundant_files(workdir, islooping=is_looping, debugmode=debugmode)
0750
0751
0752 for fname in input_files + output_files:
0753 path = os.path.join(workdir, fname)
0754 if os.path.exists(path):
0755 logger.info('removing file: %s', path)
0756 remove(path)
0757
0758 if logfile_name is None or len(logfile_name.strip('/ ')) == 0:
0759 logger.info('Skipping tarball creation, since the logfile_name is empty')
0760 return
0761
0762
0763 newworkdir = os.path.join(os.path.dirname(workdir), tarball_name)
0764 orgworkdir = workdir
0765 os.rename(workdir, newworkdir)
0766 workdir = newworkdir
0767
0768 fullpath = os.path.join(workdir, logfile_name)
0769 logger.info('will create archive %s', fullpath)
0770 try:
0771 cmd = "pwd;tar cvfz %s %s --dereference --one-file-system; echo $?" % (fullpath, tarball_name)
0772 _, stdout, _ = execute(cmd)
0773 except Exception as error:
0774 raise LogFileCreationFailure(error)
0775 else:
0776 if pilot_home != current_dir:
0777 os.chdir(pilot_home)
0778 logger.debug('stdout = %s', stdout)
0779 try:
0780 os.rename(workdir, orgworkdir)
0781 except Exception as error:
0782 logger.debug('exception caught: %s', error)
0783
0784
0785 def _do_stageout(job, xdata, activity, queue, title, output_dir=''):
0786 """
0787 Use the `StageOutClient` in the Data API to perform stage-out.
0788
0789 :param job: job object.
0790 :param xdata: list of FileSpec objects.
0791 :param activity: copytool activity or preferred list of activities to resolve copytools
0792 :param title: type of stage-out (output, log) (string).
0793 :param queue: PanDA queue (string).
0794 :return: True in case of success transfers
0795 """
0796
0797 logger.info('prepare to stage-out %d %s file(s)', len(xdata), title)
0798 label = 'stage-out'
0799
0800
0801 use_container = pilot.util.middleware.use_middleware_script(job.infosys.queuedata.container_type.get("middleware"))
0802 if use_container:
0803 logger.info('stage-out will be done by a script')
0804 try:
0805 eventtype, localsite, remotesite = get_trace_report_variables(job, label=label)
0806 pilot.util.middleware.containerise_middleware(job, xdata, queue, eventtype, localsite, remotesite,
0807 job.infosys.queuedata.container_options, output_dir,
0808 label=label, container_type=job.infosys.queuedata.container_type.get("middleware"))
0809 except PilotException as error:
0810 logger.warning('stage-out containerisation threw a pilot exception: %s', error)
0811 except Exception as error:
0812 logger.warning('stage-out containerisation threw an exception: %s', error)
0813 else:
0814 try:
0815 logger.info('stage-out will not be done in a container')
0816
0817
0818 trace_report = create_trace_report(job, label=label)
0819
0820 client = StageOutClient(job.infosys, logger=logger, trace_report=trace_report)
0821 kwargs = dict(workdir=job.workdir, cwd=job.workdir, usecontainer=False, job=job, output_dir=output_dir,
0822 catchall=job.infosys.queuedata.catchall)
0823
0824 if job.infosys.queuedata.type != 'unified':
0825 client.prepare_destinations(xdata, activity)
0826 client.transfer(xdata, activity, **kwargs)
0827 except PilotException as error:
0828 import traceback
0829 error_msg = traceback.format_exc()
0830 logger.error(error_msg)
0831 msg = errors.format_diagnostics(error.get_error_code(), error_msg)
0832 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(error.get_error_code(), msg=msg)
0833 except Exception:
0834 import traceback
0835 logger.error(traceback.format_exc())
0836
0837
0838 else:
0839 logger.debug('stage-out client completed')
0840
0841 logger.info('summary of transferred files:')
0842 for iofile in xdata:
0843 if not iofile.status:
0844 status = "(not transferred)"
0845 else:
0846 status = iofile.status
0847 logger.info(" -- lfn=%s, status_code=%s, status=%s", iofile.lfn, iofile.status_code, status)
0848
0849 remain_files = [iofile for iofile in xdata if iofile.status not in ['transferred']]
0850
0851 return not remain_files
0852
0853
0854 def _stage_out_new(job, args):
0855 """
0856 Stage-out of all output files.
0857 If job.stageout=log then only log files will be transferred.
0858
0859 :param job: job object.
0860 :param args: pilot args object.
0861 :return: True in case of success, False otherwise.
0862 """
0863
0864
0865
0866
0867
0868
0869 add_to_pilot_timing(job.jobid, PILOT_PRE_STAGEOUT, time.time(), args)
0870
0871 is_success = True
0872
0873 if not job.outdata or job.is_eventservice:
0874 logger.info('this job does not have any output files, only stage-out log file')
0875 job.stageout = 'log'
0876
0877 if job.stageout != 'log':
0878 if not _do_stageout(job, job.outdata, ['pw', 'w'], args.queue, title='output', output_dir=args.output_dir):
0879 is_success = False
0880 logger.warning('transfer of output file(s) failed')
0881
0882 if job.stageout in ['log', 'all'] and job.logdata:
0883
0884 status = job.get_status('LOG_TRANSFER')
0885 if status != LOG_TRANSFER_NOT_DONE:
0886 logger.warning('log transfer already attempted')
0887 return False
0888
0889 job.status['LOG_TRANSFER'] = LOG_TRANSFER_IN_PROGRESS
0890 logfile = job.logdata[0]
0891
0892 try:
0893 tarball_name = 'tarball_PandaJob_%s_%s' % (job.jobid, job.infosys.pandaqueue)
0894 input_files = [fspec.lfn for fspec in job.indata]
0895 output_files = [fspec.lfn for fspec in job.outdata]
0896 create_log(job.workdir, logfile.lfn, tarball_name, args.cleanup,
0897 input_files=input_files, output_files=output_files,
0898 is_looping=errors.LOOPINGJOB in job.piloterrorcodes, debugmode=job.debug)
0899 except LogFileCreationFailure as error:
0900 logger.warning('failed to create tar file: %s', error)
0901 set_pilot_state(job=job, state="failed")
0902 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.LOGFILECREATIONFAILURE)
0903 return False
0904
0905 if not _do_stageout(job, [logfile], ['pl', 'pw', 'w'], args.queue, title='log', output_dir=args.output_dir):
0906 is_success = False
0907 logger.warning('log transfer failed')
0908 job.status['LOG_TRANSFER'] = LOG_TRANSFER_FAILED
0909 else:
0910 job.status['LOG_TRANSFER'] = LOG_TRANSFER_DONE
0911 elif not job.logdata:
0912 logger.info('no log was defined - will not create log file')
0913 job.status['LOG_TRANSFER'] = LOG_TRANSFER_DONE
0914
0915
0916 add_to_pilot_timing(job.jobid, PILOT_POST_STAGEOUT, time.time(), args)
0917
0918
0919 fileinfo = {}
0920 for iofile in job.outdata + job.logdata:
0921 if iofile.status in ['transferred']:
0922 fileinfo[iofile.lfn] = {'guid': iofile.guid,
0923 'fsize': iofile.filesize,
0924 'adler32': iofile.checksum.get('adler32'),
0925 'surl': iofile.turl}
0926
0927 job.fileinfo = fileinfo
0928
0929
0930 if not is_success:
0931
0932 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.STAGEOUTFAILED)
0933 set_pilot_state(job=job, state="failed")
0934 logger.warning('stage-out failed')
0935 return False
0936
0937 logger.info('stage-out finished correctly')
0938
0939 if not job.state or (job.state and job.state == 'stageout'):
0940 logger.debug('changing job state from %s to finished', job.state)
0941 set_pilot_state(job=job, state="finished")
0942
0943
0944
0945
0946 return is_success
0947
0948
0949 def queue_monitoring(queues, traces, args):
0950 """
0951 Monitoring of Data queues.
0952
0953 :param queues: internal queues for job handling.
0954 :param traces: tuple containing internal pilot states.
0955 :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
0956 :return:
0957 """
0958
0959 while True:
0960 time.sleep(0.5)
0961 if traces.pilot['command'] == 'abort':
0962 logger.warning('data queue monitor saw the abort instruction')
0963 args.graceful_stop.set()
0964
0965
0966
0967 abort = should_abort(args, label='data:queue_monitoring')
0968
0969
0970 try:
0971 job = queues.failed_data_in.get(block=True, timeout=1)
0972 except queue.Empty:
0973 pass
0974 else:
0975
0976 job.stageout = "log"
0977
0978
0979
0980 if not _stage_out_new(job, args):
0981 logger.info("job %s failed during stage-in and stage-out of log, adding job object to failed_data_outs queue", job.jobid)
0982 put_in_queue(job, queues.failed_data_out)
0983 else:
0984 logger.info("job %s failed during stage-in, adding job object to failed_jobs queue", job.jobid)
0985 put_in_queue(job, queues.failed_jobs)
0986
0987
0988 try:
0989 job = queues.finished_data_out.get(block=True, timeout=1)
0990 except queue.Empty:
0991 pass
0992 else:
0993
0994 if job.transexitcode == 0 and job.exitcode == 0 and job.piloterrorcodes == []:
0995 logger.info('finished stage-out for finished payload, adding job to finished_jobs queue')
0996
0997 put_in_queue(job, queues.finished_jobs)
0998 else:
0999 logger.info('finished stage-out (of log) for failed payload')
1000
1001 put_in_queue(job, queues.failed_jobs)
1002
1003
1004 try:
1005 job = queues.failed_data_out.get(block=True, timeout=1)
1006 except queue.Empty:
1007 pass
1008 else:
1009
1010 job.stageout = "log"
1011 set_pilot_state(job=job, state="failed")
1012 if not _stage_out_new(job, args):
1013 logger.info("job %s failed during stage-out", job.jobid)
1014
1015 put_in_queue(job, queues.failed_jobs)
1016
1017 if abort:
1018 break
1019
1020
1021 if threads_aborted():
1022 logger.debug('will proceed to set job_aborted')
1023 args.job_aborted.set()
1024 else:
1025 logger.debug('will not set job_aborted yet')
1026
1027 logger.debug('[data] queue_monitor thread has finished')