Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:15

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Mario Lassnig, mario.lassnig@cern.ch, 2016-2017
0009 # - Daniel Drizhuk, d.drizhuk@gmail.com, 2017
0010 # - Paul Nilsson, paul.nilsson@cern.ch, 2017-2021
0011 # - Wen Guan, wen.guan@cern.ch, 2018
0012 # - Alexey Anisenkov, anisyonk@cern.ch, 2018
0013 
0014 import copy as objectcopy
0015 import os
0016 import subprocess
0017 import time
0018 
0019 try:
0020     import Queue as queue  # noqa: N813
0021 except Exception:
0022     import queue  # Python 3
0023 
0024 from pilot.api.data import StageInClient, StageOutClient
0025 from pilot.api.es_data import StageInESClient
0026 from pilot.control.job import send_state
0027 from pilot.common.errorcodes import ErrorCodes
0028 from pilot.common.exception import ExcThread, PilotException, LogFileCreationFailure
0029 #from pilot.util.config import config
0030 from pilot.util.auxiliary import set_pilot_state, check_for_final_server_update  #, abort_jobs_in_queues
0031 from pilot.util.common import should_abort
0032 from pilot.util.constants import PILOT_PRE_STAGEIN, PILOT_POST_STAGEIN, PILOT_PRE_STAGEOUT, PILOT_POST_STAGEOUT, LOG_TRANSFER_IN_PROGRESS,\
0033     LOG_TRANSFER_DONE, LOG_TRANSFER_NOT_DONE, LOG_TRANSFER_FAILED, SERVER_UPDATE_RUNNING, MAX_KILL_WAIT_TIME, UTILITY_BEFORE_STAGEIN
0034 from pilot.util.container import execute
0035 from pilot.util.filehandling import remove, write_file
0036 from pilot.util.processes import threads_aborted
0037 from pilot.util.queuehandling import declare_failed_by_kill, put_in_queue
0038 from pilot.util.timing import add_to_pilot_timing
0039 from pilot.util.tracereport import TraceReport
0040 import pilot.util.middleware
0041 
0042 import logging
0043 logger = logging.getLogger(__name__)
0044 
0045 errors = ErrorCodes()
0046 
0047 
0048 def control(queues, traces, args):
0049 
0050     targets = {'copytool_in': copytool_in, 'copytool_out': copytool_out, 'queue_monitoring': queue_monitoring}
0051     threads = [ExcThread(bucket=queue.Queue(), target=target, kwargs={'queues': queues, 'traces': traces, 'args': args},
0052                          name=name) for name, target in list(targets.items())]  # Python 2/3
0053 
0054     [thread.start() for thread in threads]
0055 
0056     # if an exception is thrown, the graceful_stop will be set by the ExcThread class run() function
0057     while not args.graceful_stop.is_set():
0058         for thread in threads:
0059             bucket = thread.get_bucket()
0060             try:
0061                 exc = bucket.get(block=False)
0062             except queue.Empty:
0063                 pass
0064             else:
0065                 exc_type, exc_obj, exc_trace = exc
0066                 logger.warning("thread \'%s\' received an exception from bucket: %s", thread.name, exc_obj)
0067 
0068                 # deal with the exception
0069                 # ..
0070 
0071             thread.join(0.1)
0072             time.sleep(0.1)
0073 
0074         time.sleep(0.5)
0075 
0076     logger.debug('data control ending since graceful_stop has been set')
0077     if args.abort_job.is_set():
0078         if traces.pilot['command'] == 'aborting':
0079             logger.warning('jobs are aborting')
0080         elif traces.pilot['command'] == 'abort':
0081             logger.warning('data control detected a set abort_job (due to a kill signal)')
0082             traces.pilot['command'] = 'aborting'
0083 
0084             # find all running jobs and stop them, find all jobs in queues relevant to this module
0085             #abort_jobs_in_queues(queues, args.signal)
0086 
0087     # proceed to set the job_aborted flag?
0088     if threads_aborted():
0089         logger.debug('will proceed to set job_aborted')
0090         args.job_aborted.set()
0091     else:
0092         logger.debug('will not set job_aborted yet')
0093 
0094     logger.debug('[data] control thread has finished')
0095 
0096 
0097 def skip_special_files(job):
0098     """
0099     Consult user defined code if any files should be skipped during stage-in.
0100     ATLAS code will skip DBRelease files e.g. as they should already be available in CVMFS.
0101 
0102     :param job: job object.
0103     :return:
0104     """
0105 
0106     pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0107     user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)  # Python 2/3
0108     try:
0109         user.update_stagein(job)
0110     except Exception as error:
0111         logger.warning('caught exception: %s', error)
0112 
0113 
0114 def update_indata(job):
0115     """
0116     In case file were marked as no_transfer files, remove them from stage-in.
0117 
0118     :param job: job object.
0119     :return:
0120     """
0121 
0122     toberemoved = []
0123     for fspec in job.indata:
0124         if fspec.status == 'no_transfer':
0125             toberemoved.append(fspec)
0126     for fspec in toberemoved:
0127         logger.info('removing fspec object (lfn=%s) from list of input files', fspec.lfn)
0128         job.indata.remove(fspec)
0129 
0130 
0131 def get_trace_report_variables(job, label='stage-in'):
0132     """
0133     Get some of the variables needed for creating the trace report.
0134 
0135     :param job: job object
0136     :param label: 'stage-[in|out]' (string).
0137     :return: event_type (string), localsite (string), remotesite (string).
0138     """
0139 
0140     event_type = "get_sm" if label == 'stage-in' else "put_sm"
0141     if job.is_analysis():
0142         event_type += "_a"
0143     data = job.indata if label == 'stage-in' else job.outdata
0144     localsite = remotesite = get_rse(data)
0145 
0146     return event_type, localsite, remotesite
0147 
0148 
0149 def create_trace_report(job, label='stage-in'):
0150     """
0151     Create the trace report object.
0152 
0153     :param job: job object.
0154     :param label: 'stage-[in|out]' (string).
0155     :return: trace report object.
0156     """
0157 
0158     event_type, localsite, remotesite = get_trace_report_variables(job, label=label)
0159     trace_report = TraceReport(pq=os.environ.get('PILOT_SITENAME', ''), localSite=localsite, remoteSite=remotesite,
0160                                dataset="", eventType=event_type)
0161     trace_report.init(job)
0162 
0163     return trace_report
0164 
0165 
0166 def _stage_in(args, job):
0167     """
0168         :return: True in case of success
0169     """
0170 
0171     # tested ok:
0172     #logger.info('testing sending SIGUSR1')
0173     #import signal
0174     #os.kill(os.getpid(), signal.SIGUSR1)
0175 
0176     # write time stamps to pilot timing file
0177     add_to_pilot_timing(job.jobid, PILOT_PRE_STAGEIN, time.time(), args)
0178 
0179     # any DBRelease files should not be staged in
0180     skip_special_files(job)
0181 
0182     # now that the trace report has been created, remove any files that are not to be transferred (DBRelease files) from the indata list
0183     update_indata(job)
0184 
0185     label = 'stage-in'
0186 
0187     # should stage-in be done by a script (for containerisation) or by invoking the API (ie classic mode)?
0188     use_container = pilot.util.middleware.use_middleware_script(job.infosys.queuedata.container_type.get("middleware"))
0189     if use_container:
0190         logger.info('stage-in will be done by a script')
0191         try:
0192             eventtype, localsite, remotesite = get_trace_report_variables(job, label=label)
0193             pilot.util.middleware.containerise_middleware(job, job.indata, args.queue, eventtype, localsite, remotesite,
0194                                                           job.infosys.queuedata.container_options, args.input_dir,
0195                                                           label=label, container_type=job.infosys.queuedata.container_type.get("middleware"))
0196         except PilotException as error:
0197             logger.warning('stage-in containerisation threw a pilot exception: %s', error)
0198         except Exception as error:
0199             import traceback
0200             logger.warning('stage-in containerisation threw an exception: %s', error)
0201             logger.error(traceback.format_exc())
0202     else:
0203         try:
0204             logger.info('stage-in will not be done in a container')
0205 
0206             # create the trace report
0207             trace_report = create_trace_report(job, label=label)
0208 
0209             if job.is_eventservicemerge:
0210                 client = StageInESClient(job.infosys, logger=logger, trace_report=trace_report)
0211                 activity = 'es_events_read'
0212             else:
0213                 client = StageInClient(job.infosys, logger=logger, trace_report=trace_report)
0214                 activity = 'pr'
0215             use_pcache = job.infosys.queuedata.use_pcache
0216             kwargs = dict(workdir=job.workdir, cwd=job.workdir, usecontainer=False, use_pcache=use_pcache, use_bulk=False,
0217                           input_dir=args.input_dir, use_vp=job.use_vp, catchall=job.infosys.queuedata.catchall)
0218             client.prepare_sources(job.indata)
0219             client.transfer(job.indata, activity=activity, **kwargs)
0220         except PilotException as error:
0221             import traceback
0222             error_msg = traceback.format_exc()
0223             logger.error(error_msg)
0224             msg = errors.format_diagnostics(error.get_error_code(), error_msg)
0225             job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(error.get_error_code(), msg=msg)
0226         except Exception as error:
0227             logger.error('failed to stage-in: error=%s', error)
0228 
0229     logger.info('summary of transferred files:')
0230     for infile in job.indata:
0231         status = infile.status if infile.status else "(not transferred)"
0232         logger.info(" -- lfn=%s, status_code=%s, status=%s", infile.lfn, infile.status_code, status)
0233 
0234     # write time stamps to pilot timing file
0235     add_to_pilot_timing(job.jobid, PILOT_POST_STAGEIN, time.time(), args)
0236 
0237     remain_files = [infile for infile in job.indata if infile.status not in ['remote_io', 'transferred', 'no_transfer']]
0238     logger.info("stage-in finished") if not remain_files else logger.info("stage-in failed")
0239 
0240     return not remain_files
0241 
0242 
0243 def get_rse(data, lfn=""):
0244     """
0245     Return the ddmEndPoint corresponding to the given lfn.
0246     If lfn is not provided, the first ddmEndPoint will be returned.
0247 
0248     :param data: FileSpec list object.
0249     :param lfn: local file name (string).
0250     :return: rse (string)
0251     """
0252 
0253     rse = ""
0254 
0255     if lfn == "":
0256         try:
0257             return data[0].ddmendpoint
0258         except Exception as error:
0259             logger.warning("exception caught: %s", error)
0260             logger.warning("end point is currently unknown")
0261             return "unknown"
0262 
0263     for fspec in data:
0264         if fspec.lfn == lfn:
0265             rse = fspec.ddmendpoint
0266 
0267     if rse == "":
0268         logger.warning("end point is currently unknown")
0269         rse = "unknown"
0270 
0271     return rse
0272 
0273 
0274 def stage_in_auto(files):
0275     """
0276     Separate dummy implementation for automatic stage-in outside of pilot workflows.
0277     Should be merged with regular stage-in functionality later, but we need to have
0278     some operational experience with it first.
0279     Many things to improve:
0280      - separate file error handling in the merged case
0281      - auto-merging of files with same destination into single copytool call
0282     """
0283 
0284     # don't spoil the output, we depend on stderr parsing
0285     os.environ['RUCIO_LOGGING_FORMAT'] = '%(asctime)s %(levelname)s [%(message)s]'
0286 
0287     executable = ['/usr/bin/env',
0288                   'rucio', '-v', 'download',
0289                   '--no-subdir']
0290 
0291     # quickly remove non-existing destinations
0292     for _file in files:
0293         if not os.path.exists(_file['destination']):
0294             _file['status'] = 'failed'
0295             _file['errmsg'] = 'Destination directory does not exist: %s' % _file['destination']
0296             _file['errno'] = 1
0297         else:
0298             _file['status'] = 'running'
0299             _file['errmsg'] = 'File not yet successfully downloaded.'
0300             _file['errno'] = 2
0301 
0302     for _file in files:
0303         if _file['errno'] == 1:
0304             continue
0305 
0306         tmp_executable = objectcopy.deepcopy(executable)
0307 
0308         tmp_executable += ['--dir', _file['destination']]
0309         tmp_executable.append('%s:%s' % (_file['scope'],
0310                                          _file['name']))
0311         process = subprocess.Popen(tmp_executable,
0312                                    bufsize=-1,
0313                                    stdout=subprocess.PIPE,
0314                                    stderr=subprocess.PIPE)
0315         _file['errno'] = 2
0316         while True:
0317             time.sleep(0.5)
0318             exit_code = process.poll()
0319             if exit_code is not None:
0320                 _, stderr = process.communicate()
0321                 if exit_code == 0:
0322                     _file['status'] = 'done'
0323                     _file['errno'] = 0
0324                     _file['errmsg'] = 'File successfully downloaded.'
0325                 else:
0326                     _file['status'] = 'failed'
0327                     _file['errno'] = 3
0328                     try:
0329                         # the Details: string is set in rucio: lib/rucio/common/exception.py in __str__()
0330                         _file['errmsg'] = [detail for detail in stderr.split('\n') if detail.startswith('Details:')][0][9:-1]
0331                     except Exception as error:
0332                         _file['errmsg'] = 'Could not find rucio error message details - please check stderr directly: %s' % error
0333                 break
0334             else:
0335                 continue
0336 
0337     return files
0338 
0339 
0340 def stage_out_auto(files):
0341     """
0342     Separate dummy implementation for automatic stage-out outside of pilot workflows.
0343     Should be merged with regular stage-out functionality later, but we need to have
0344     some operational experience with it first.
0345     """
0346 
0347     # don't spoil the output, we depend on stderr parsing
0348     os.environ['RUCIO_LOGGING_FORMAT'] = '%(asctime)s %(levelname)s [%(message)s]'
0349 
0350     executable = ['/usr/bin/env',
0351                   'rucio', '-v', 'upload']
0352 
0353     # quickly remove non-existing destinations
0354     for _file in files:
0355         if not os.path.exists(_file['file']):
0356             _file['status'] = 'failed'
0357             _file['errmsg'] = 'Source file does not exist: %s' % _file['file']
0358             _file['errno'] = 1
0359         else:
0360             _file['status'] = 'running'
0361             _file['errmsg'] = 'File not yet successfully uploaded.'
0362             _file['errno'] = 2
0363 
0364     for _file in files:
0365         if _file['errno'] == 1:
0366             continue
0367 
0368         tmp_executable = objectcopy.deepcopy(executable)
0369 
0370         tmp_executable += ['--rse', _file['rse']]
0371 
0372         if 'no_register' in list(_file.keys()) and _file['no_register']:  # Python 2/3
0373             tmp_executable += ['--no-register']
0374 
0375         if 'summary' in list(_file.keys()) and _file['summary']:  # Python 2/3
0376             tmp_executable += ['--summary']
0377 
0378         if 'lifetime' in list(_file.keys()):  # Python 2/3
0379             tmp_executable += ['--lifetime', str(_file['lifetime'])]
0380 
0381         if 'guid' in list(_file.keys()):  # Python 2/3
0382             tmp_executable += ['--guid', _file['guid']]
0383 
0384         if 'attach' in list(_file.keys()):  # Python 2/3
0385             tmp_executable += ['--scope', _file['scope'], '%s:%s' % (_file['attach']['scope'], _file['attach']['name']), _file['file']]
0386         else:
0387             tmp_executable += ['--scope', _file['scope'], _file['file']]
0388 
0389         process = subprocess.Popen(tmp_executable, bufsize=-1, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0390         _file['errno'] = 2
0391         while True:
0392             time.sleep(0.5)
0393             exit_code = process.poll()
0394             if exit_code is not None:
0395                 _, stderr = process.communicate()
0396                 if exit_code == 0:
0397                     _file['status'] = 'done'
0398                     _file['errno'] = 0
0399                     _file['errmsg'] = 'File successfully uploaded.'
0400                 else:
0401                     _file['status'] = 'failed'
0402                     _file['errno'] = 3
0403                     try:
0404                         # the Details: string is set in rucio: lib/rucio/common/exception.py in __str__()
0405                         _file['errmsg'] = [detail for detail in stderr.split('\n') if detail.startswith('Details:')][0][9:-1]
0406                     except Exception as error:
0407                         _file['errmsg'] = 'Could not find rucio error message details - please check stderr directly: %s' % error
0408                 break
0409             else:
0410                 continue
0411 
0412     return files
0413 
0414 
0415 def write_output(filename, output):
0416     """
0417     Write command output to file.
0418 
0419     :param filename: file name (string).
0420     :param output: command stdout/stderr (string).
0421     :return:
0422     """
0423 
0424     try:
0425         write_file(filename, output, unique=True)
0426     except PilotException as error:
0427         logger.warning('failed to write utility output to file: %s, %s', error, output)
0428     else:
0429         logger.debug('wrote %s', filename)
0430 
0431 
0432 def write_utility_output(workdir, step, stdout, stderr):
0433     """
0434     Write the utility command output to stdout, stderr files to the job.workdir for the current step.
0435     -> <step>_stdout.txt, <step>_stderr.txt
0436     Example of step: xcache.
0437 
0438     :param workdir: job workdir (string).
0439     :param step: utility step (string).
0440     :param stdout: command stdout (string).
0441     :param stderr: command stderr (string).
0442     :return:
0443     """
0444 
0445     # dump to files
0446     write_output(os.path.join(workdir, step + '_stdout.txt'), stdout)
0447     write_output(os.path.join(workdir, step + '_stderr.txt'), stderr)
0448 
0449 
0450 def copytool_in(queues, traces, args):
0451     """
0452     Call the stage-in function and put the job object in the proper queue.
0453 
0454     :param queues: internal queues for job handling.
0455     :param traces: tuple containing internal pilot states.
0456     :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
0457     :return:
0458     """
0459 
0460     while not args.graceful_stop.is_set():
0461         time.sleep(0.5)
0462         try:
0463             # abort if kill signal arrived too long time ago, ie loop is stuck
0464             current_time = int(time.time())
0465             if args.kill_time and current_time - args.kill_time > MAX_KILL_WAIT_TIME:
0466                 logger.warning('loop has run for too long time after first kill signal - will abort')
0467                 break
0468 
0469             # extract a job to stage-in its input
0470             job = queues.data_in.get(block=True, timeout=1)
0471 
0472             # does the user want to execute any special commands before stage-in?
0473             pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0474             user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)  # Python 2/3
0475             cmd = user.get_utility_commands(job=job, order=UTILITY_BEFORE_STAGEIN)
0476             if cmd:
0477                 # xcache debug
0478                 #_, _stdout, _stderr = execute('pgrep -x xrootd | awk \'{print \"ps -p \"$1\" -o args --no-headers --cols 300\"}\' | sh')
0479                 #logger.debug('[before xcache start] stdout=%s', _stdout)
0480                 #logger.debug('[before xcache start] stderr=%s', _stderr)
0481 
0482                 _, stdout, stderr = execute(cmd.get('command'))
0483                 logger.debug('stdout=%s', stdout)
0484                 logger.debug('stderr=%s', stderr)
0485 
0486                 # xcache debug
0487                 #_, _stdout, _stderr = execute('pgrep -x xrootd | awk \'{print \"ps -p \"$1\" -o args --no-headers --cols 300\"}\' | sh')
0488                 #logger.debug('[after xcache start] stdout=%s', _stdout)
0489                 #logger.debug('[after xcache start] stderr=%s', _stderr)
0490 
0491                 # perform any action necessary after command execution (e.g. stdout processing)
0492                 kwargs = {'label': cmd.get('label', 'utility'), 'output': stdout}
0493                 user.post_prestagein_utility_command(**kwargs)
0494 
0495                 # write output to log files
0496                 write_utility_output(job.workdir, cmd.get('label', 'utility'), stdout, stderr)
0497 
0498             # place it in the current stage-in queue (used by the jobs' queue monitoring)
0499             put_in_queue(job, queues.current_data_in)
0500 
0501             # ready to set the job in running state
0502             send_state(job, args, 'running')
0503 
0504             # note: when sending a state change to the server, the server might respond with 'tobekilled'
0505             if job.state == 'failed':
0506                 logger.warning('job state is \'failed\' - order log transfer and abort copytool_in()')
0507                 job.stageout = 'log'  # only stage-out log file
0508                 put_in_queue(job, queues.data_out)
0509                 break
0510 
0511             os.environ['SERVER_UPDATE'] = SERVER_UPDATE_RUNNING
0512 
0513             if args.abort_job.is_set():
0514                 traces.pilot['command'] = 'abort'
0515                 logger.warning('copytool_in detected a set abort_job pre stage-in (due to a kill signal)')
0516                 declare_failed_by_kill(job, queues.failed_data_in, args.signal)
0517                 break
0518 
0519             if _stage_in(args, job):
0520                 if args.abort_job.is_set():
0521                     traces.pilot['command'] = 'abort'
0522                     logger.warning('copytool_in detected a set abort_job post stage-in (due to a kill signal)')
0523                     declare_failed_by_kill(job, queues.failed_data_in, args.signal)
0524                     break
0525 
0526                 put_in_queue(job, queues.finished_data_in)
0527                 # remove the job from the current stage-in queue
0528                 _job = queues.current_data_in.get(block=True, timeout=1)
0529                 if _job:
0530                     logger.debug('job %s has been removed from the current_data_in queue', _job.jobid)
0531 
0532                 # now create input file metadata if required by the payload
0533                 if os.environ.get('PILOT_ES_EXECUTOR_TYPE', 'generic') == 'generic':
0534                     pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0535                     user = __import__('pilot.user.%s.metadata' % pilot_user, globals(), locals(), [pilot_user], 0)  # Python 2/3
0536                     file_dictionary = get_input_file_dictionary(job.indata)
0537                     xml = user.create_input_file_metadata(file_dictionary, job.workdir)
0538                     logger.info('created input file metadata:\n%s', xml)
0539             else:
0540                 # remove the job from the current stage-in queue
0541                 _job = queues.current_data_in.get(block=True, timeout=1)
0542                 if _job:
0543                     logger.debug('job %s has been removed from the current_data_in queue', _job.jobid)
0544                 logger.warning('stage-in failed, adding job object to failed_data_in queue')
0545                 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.STAGEINFAILED)
0546                 set_pilot_state(job=job, state="failed")
0547                 traces.pilot['error_code'] = job.piloterrorcodes[0]
0548                 put_in_queue(job, queues.failed_data_in)
0549                 # do not set graceful stop if pilot has not finished sending the final job update
0550                 # i.e. wait until SERVER_UPDATE is DONE_FINAL
0551                 check_for_final_server_update(args.update_server)
0552                 args.graceful_stop.set()
0553 
0554         except queue.Empty:
0555             continue
0556 
0557     # proceed to set the job_aborted flag?
0558     if threads_aborted():
0559         logger.debug('will proceed to set job_aborted')
0560         args.job_aborted.set()
0561     else:
0562         logger.debug('will not set job_aborted yet')
0563 
0564     logger.debug('[data] copytool_in thread has finished')
0565 
0566 
0567 def copytool_out(queues, traces, args):
0568     """
0569     Main stage-out thread.
0570     Perform stage-out as soon as a job object can be extracted from the data_out queue.
0571 
0572     :param queues: internal queues for job handling.
0573     :param traces: tuple containing internal pilot states.
0574     :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
0575     :return:
0576     """
0577 
0578     cont = True
0579     logger.debug('entering copytool_out loop')
0580     if args.graceful_stop.is_set():
0581         logger.debug('graceful_stop already set')
0582 
0583     processed_jobs = []
0584     while cont:
0585 
0586         time.sleep(0.5)
0587 
0588         # abort if kill signal arrived too long time ago, ie loop is stuck
0589         current_time = int(time.time())
0590         if args.kill_time and current_time - args.kill_time > MAX_KILL_WAIT_TIME:
0591             logger.warning('loop has run for too long time after first kill signal - will abort')
0592             break
0593 
0594         # check for abort, print useful messages and include a 1 s sleep
0595         abort = should_abort(args, label='data:copytool_out')
0596         try:
0597             job = queues.data_out.get(block=True, timeout=1)
0598             if job:
0599                 # hack to prevent stage-out to be called more than once for same job object (can apparently happen
0600                 # in multi-output jobs)
0601                 # should not be necessary unless job object is added to queues.data_out more than once - check this
0602                 # for multiple output files
0603                 if processed_jobs:
0604                     if is_already_processed(queues, processed_jobs):
0605                         continue
0606 
0607                 logger.info('will perform stage-out for job id=%s', job.jobid)
0608 
0609                 if args.abort_job.is_set():
0610                     traces.pilot['command'] = 'abort'
0611                     logger.warning('copytool_out detected a set abort_job pre stage-out (due to a kill signal)')
0612                     declare_failed_by_kill(job, queues.failed_data_out, args.signal)
0613                     break
0614 
0615                 if _stage_out_new(job, args):
0616                     if args.abort_job.is_set():
0617                         traces.pilot['command'] = 'abort'
0618                         logger.warning('copytool_out detected a set abort_job post stage-out (due to a kill signal)')
0619                         #declare_failed_by_kill(job, queues.failed_data_out, args.signal)
0620                         break
0621 
0622                     #queues.finished_data_out.put(job)
0623                     processed_jobs.append(job.jobid)
0624                     put_in_queue(job, queues.finished_data_out)
0625                     logger.debug('job object added to finished_data_out queue')
0626                 else:
0627                     #queues.failed_data_out.put(job)
0628                     put_in_queue(job, queues.failed_data_out)
0629                     logger.debug('job object added to failed_data_out queue')
0630             else:
0631                 logger.debug('no returned job - why no exception?')
0632         except queue.Empty:
0633             if abort:
0634                 cont = False
0635                 break
0636             continue
0637 
0638         if abort:
0639             cont = False
0640             break
0641 
0642     # proceed to set the job_aborted flag?
0643     if threads_aborted():
0644         logger.debug('will proceed to set job_aborted')
0645         args.job_aborted.set()
0646     else:
0647         logger.debug('will not set job_aborted yet')
0648 
0649     logger.debug('[data] copytool_out thread has finished')
0650 
0651 
0652 def is_already_processed(queues, processed_jobs):
0653     """
0654     Skip stage-out in case the job has already been processed.
0655     This should not be necessary so this is a fail-safe but it seems there is a case when a job with multiple output
0656     files enters the stage-out more than once.
0657 
0658     :param queues: queues object.
0659     :param processed_jobs: list of already processed jobs.
0660     :return: True if stage-out queues contain a job object that has already been processed.
0661     """
0662 
0663     snapshots = list(queues.finished_data_out.queue) + list(queues.failed_data_out.queue)
0664     jobids = [obj.jobid for obj in snapshots]
0665     found = False
0666 
0667     for jobid in processed_jobs:
0668         if jobid in jobids:
0669             logger.warning('output from job %s has already been staged out', jobid)
0670             found = True
0671             break
0672     if found:
0673         time.sleep(5)
0674 
0675     return found
0676 
0677 
0678 def get_input_file_dictionary(indata):
0679     """
0680     Return an input file dictionary.
0681     Format: {'guid': 'pfn', ..}
0682     Normally use_turl would be set to True if direct access is used.
0683     Note: any environment variables in the turls will be expanded
0684 
0685     :param indata: list of FileSpec objects.
0686     :return: file dictionary.
0687     """
0688 
0689     ret = {}
0690 
0691     for fspec in indata:
0692         ret[fspec.guid] = fspec.turl if fspec.status == 'remote_io' else fspec.lfn
0693         ret[fspec.guid] = os.path.expandvars(ret[fspec.guid])
0694 
0695         # correction for ND and mv
0696         # in any case use the lfn instead of pfn since there are trf's that have problems with pfn's
0697         if not ret[fspec.guid]:   # this case never works (turl/lfn is always non empty), deprecated code?
0698             ret[fspec.guid] = fspec.lfn
0699 
0700     return ret
0701 
0702 
0703 def filter_files_for_log(directory):
0704     """
0705     Create a file list recursi
0706     :param directory:
0707     :return:
0708     """
0709     filtered_files = []
0710     maxfilesize = 10
0711     for root, _, filenames in os.walk(directory):
0712         for filename in filenames:
0713             location = os.path.join(root, filename)
0714             if os.path.exists(location):  # do not include broken links
0715                 if os.path.getsize(location) < maxfilesize:
0716                     filtered_files.append(location)
0717 
0718     return filtered_files
0719 
0720 
0721 def create_log(workdir, logfile_name, tarball_name, cleanup, input_files=[], output_files=[], is_looping=False, debugmode=False):
0722     """
0723     Create the tarball for the job.
0724 
0725     :param workdir: work directory for the job (string).
0726     :param logfile_name: log file name (string).
0727     :param tarball_name: tarball name (string).
0728     :param cleanup: perform cleanup (Boolean).
0729     :param input_files: list of input files to remove (list).
0730     :param output_files: list of output files to remove (list).
0731     :param is_looping: True for looping jobs, False by default (Boolean).
0732     :param debugmode: True if debug mode has been switched on (Boolean).
0733     :raises LogFileCreationFailure: in case of log file creation problem.
0734     :return:
0735     """
0736 
0737     logger.debug('preparing to create log file (debug mode=%s)', str(debugmode))
0738 
0739     # PILOT_HOME is the launch directory of the pilot (or the one specified in pilot options as pilot workdir)
0740     pilot_home = os.environ.get('PILOT_HOME', os.getcwd())
0741     current_dir = os.getcwd()
0742     if pilot_home != current_dir:
0743         os.chdir(pilot_home)
0744 
0745     # perform special cleanup (user specific) prior to log file creation
0746     if cleanup:
0747         pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0748         user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)  # Python 2/3
0749         user.remove_redundant_files(workdir, islooping=is_looping, debugmode=debugmode)
0750 
0751     # remove any present input/output files before tarring up workdir
0752     for fname in input_files + output_files:
0753         path = os.path.join(workdir, fname)
0754         if os.path.exists(path):
0755             logger.info('removing file: %s', path)
0756             remove(path)
0757 
0758     if logfile_name is None or len(logfile_name.strip('/ ')) == 0:
0759         logger.info('Skipping tarball creation, since the logfile_name is empty')
0760         return
0761 
0762     # rename the workdir for the tarball creation
0763     newworkdir = os.path.join(os.path.dirname(workdir), tarball_name)
0764     orgworkdir = workdir
0765     os.rename(workdir, newworkdir)
0766     workdir = newworkdir
0767 
0768     fullpath = os.path.join(workdir, logfile_name)  # /some/path/to/dirname/log.tgz
0769     logger.info('will create archive %s', fullpath)
0770     try:
0771         cmd = "pwd;tar cvfz %s %s --dereference --one-file-system; echo $?" % (fullpath, tarball_name)
0772         _, stdout, _ = execute(cmd)
0773     except Exception as error:
0774         raise LogFileCreationFailure(error)
0775     else:
0776         if pilot_home != current_dir:
0777             os.chdir(pilot_home)
0778         logger.debug('stdout = %s', stdout)
0779     try:
0780         os.rename(workdir, orgworkdir)
0781     except Exception as error:
0782         logger.debug('exception caught: %s', error)
0783 
0784 
0785 def _do_stageout(job, xdata, activity, queue, title, output_dir=''):
0786     """
0787     Use the `StageOutClient` in the Data API to perform stage-out.
0788 
0789     :param job: job object.
0790     :param xdata: list of FileSpec objects.
0791     :param activity: copytool activity or preferred list of activities to resolve copytools
0792     :param title: type of stage-out (output, log) (string).
0793     :param queue: PanDA queue (string).
0794     :return: True in case of success transfers
0795     """
0796 
0797     logger.info('prepare to stage-out %d %s file(s)', len(xdata), title)
0798     label = 'stage-out'
0799 
0800     # should stage-in be done by a script (for containerisation) or by invoking the API (ie classic mode)?
0801     use_container = pilot.util.middleware.use_middleware_script(job.infosys.queuedata.container_type.get("middleware"))
0802     if use_container:
0803         logger.info('stage-out will be done by a script')
0804         try:
0805             eventtype, localsite, remotesite = get_trace_report_variables(job, label=label)
0806             pilot.util.middleware.containerise_middleware(job, xdata, queue, eventtype, localsite, remotesite,
0807                                                           job.infosys.queuedata.container_options, output_dir,
0808                                                           label=label, container_type=job.infosys.queuedata.container_type.get("middleware"))
0809         except PilotException as error:
0810             logger.warning('stage-out containerisation threw a pilot exception: %s', error)
0811         except Exception as error:
0812             logger.warning('stage-out containerisation threw an exception: %s', error)
0813     else:
0814         try:
0815             logger.info('stage-out will not be done in a container')
0816 
0817             # create the trace report
0818             trace_report = create_trace_report(job, label=label)
0819 
0820             client = StageOutClient(job.infosys, logger=logger, trace_report=trace_report)
0821             kwargs = dict(workdir=job.workdir, cwd=job.workdir, usecontainer=False, job=job, output_dir=output_dir,
0822                           catchall=job.infosys.queuedata.catchall)  #, mode='stage-out')
0823             # prod analy unification: use destination preferences from PanDA server for unified queues
0824             if job.infosys.queuedata.type != 'unified':
0825                 client.prepare_destinations(xdata, activity)  ## FIX ME LATER: split activities: for astorages and for copytools (to unify with ES workflow)
0826             client.transfer(xdata, activity, **kwargs)
0827         except PilotException as error:
0828             import traceback
0829             error_msg = traceback.format_exc()
0830             logger.error(error_msg)
0831             msg = errors.format_diagnostics(error.get_error_code(), error_msg)
0832             job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(error.get_error_code(), msg=msg)
0833         except Exception:
0834             import traceback
0835             logger.error(traceback.format_exc())
0836             # do not raise the exception since that will prevent also the log from being staged out
0837             # error = PilotException("stageOut failed with error=%s" % e, code=ErrorCodes.STAGEOUTFAILED)
0838         else:
0839             logger.debug('stage-out client completed')
0840 
0841     logger.info('summary of transferred files:')
0842     for iofile in xdata:
0843         if not iofile.status:
0844             status = "(not transferred)"
0845         else:
0846             status = iofile.status
0847         logger.info(" -- lfn=%s, status_code=%s, status=%s", iofile.lfn, iofile.status_code, status)
0848 
0849     remain_files = [iofile for iofile in xdata if iofile.status not in ['transferred']]
0850 
0851     return not remain_files
0852 
0853 
0854 def _stage_out_new(job, args):
0855     """
0856     Stage-out of all output files.
0857     If job.stageout=log then only log files will be transferred.
0858 
0859     :param job: job object.
0860     :param args: pilot args object.
0861     :return: True in case of success, False otherwise.
0862     """
0863 
0864     #logger.info('testing sending SIGUSR1')
0865     #import signal
0866     #os.kill(os.getpid(), signal.SIGUSR1)
0867 
0868     # write time stamps to pilot timing file
0869     add_to_pilot_timing(job.jobid, PILOT_PRE_STAGEOUT, time.time(), args)
0870 
0871     is_success = True
0872 
0873     if not job.outdata or job.is_eventservice:
0874         logger.info('this job does not have any output files, only stage-out log file')
0875         job.stageout = 'log'
0876 
0877     if job.stageout != 'log':  ## do stage-out output files
0878         if not _do_stageout(job, job.outdata, ['pw', 'w'], args.queue, title='output', output_dir=args.output_dir):
0879             is_success = False
0880             logger.warning('transfer of output file(s) failed')
0881 
0882     if job.stageout in ['log', 'all'] and job.logdata:  ## do stage-out log files
0883         # prepare log file, consider only 1st available log file
0884         status = job.get_status('LOG_TRANSFER')
0885         if status != LOG_TRANSFER_NOT_DONE:
0886             logger.warning('log transfer already attempted')
0887             return False
0888 
0889         job.status['LOG_TRANSFER'] = LOG_TRANSFER_IN_PROGRESS
0890         logfile = job.logdata[0]
0891 
0892         try:
0893             tarball_name = 'tarball_PandaJob_%s_%s' % (job.jobid, job.infosys.pandaqueue)
0894             input_files = [fspec.lfn for fspec in job.indata]
0895             output_files = [fspec.lfn for fspec in job.outdata]
0896             create_log(job.workdir, logfile.lfn, tarball_name, args.cleanup,
0897                        input_files=input_files, output_files=output_files,
0898                        is_looping=errors.LOOPINGJOB in job.piloterrorcodes, debugmode=job.debug)
0899         except LogFileCreationFailure as error:
0900             logger.warning('failed to create tar file: %s', error)
0901             set_pilot_state(job=job, state="failed")
0902             job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.LOGFILECREATIONFAILURE)
0903             return False
0904 
0905         if not _do_stageout(job, [logfile], ['pl', 'pw', 'w'], args.queue, title='log', output_dir=args.output_dir):
0906             is_success = False
0907             logger.warning('log transfer failed')
0908             job.status['LOG_TRANSFER'] = LOG_TRANSFER_FAILED
0909         else:
0910             job.status['LOG_TRANSFER'] = LOG_TRANSFER_DONE
0911     elif not job.logdata:
0912         logger.info('no log was defined - will not create log file')
0913         job.status['LOG_TRANSFER'] = LOG_TRANSFER_DONE
0914 
0915     # write time stamps to pilot timing file
0916     add_to_pilot_timing(job.jobid, PILOT_POST_STAGEOUT, time.time(), args)
0917 
0918     # generate fileinfo details to be send to Panda
0919     fileinfo = {}
0920     for iofile in job.outdata + job.logdata:
0921         if iofile.status in ['transferred']:
0922             fileinfo[iofile.lfn] = {'guid': iofile.guid,
0923                                     'fsize': iofile.filesize,
0924                                     'adler32': iofile.checksum.get('adler32'),
0925                                     'surl': iofile.turl}
0926 
0927     job.fileinfo = fileinfo
0928 
0929     # WARNING THE FOLLOWING RESETS ANY PREVIOUS STAGEOUT ERRORS
0930     if not is_success:
0931         # set error code + message (a more precise error code might have been set already)
0932         job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.STAGEOUTFAILED)
0933         set_pilot_state(job=job, state="failed")
0934         logger.warning('stage-out failed')
0935         return False
0936 
0937     logger.info('stage-out finished correctly')
0938 
0939     if not job.state or (job.state and job.state == 'stageout'):  # is the job state already set? if so, don't change the state (unless it's the stageout state)
0940         logger.debug('changing job state from %s to finished', job.state)
0941         set_pilot_state(job=job, state="finished")
0942 
0943     # send final server update since all transfers have finished correctly
0944     # send_state(job, args, 'finished', xml=dumps(fileinfodict))
0945 
0946     return is_success
0947 
0948 
0949 def queue_monitoring(queues, traces, args):
0950     """
0951     Monitoring of Data queues.
0952 
0953     :param queues: internal queues for job handling.
0954     :param traces: tuple containing internal pilot states.
0955     :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
0956     :return:
0957     """
0958 
0959     while True:  # will abort when graceful_stop has been set
0960         time.sleep(0.5)
0961         if traces.pilot['command'] == 'abort':
0962             logger.warning('data queue monitor saw the abort instruction')
0963             args.graceful_stop.set()
0964 
0965         # abort in case graceful_stop has been set, and less than 30 s has passed since MAXTIME was reached (if set)
0966         # (abort at the end of the loop)
0967         abort = should_abort(args, label='data:queue_monitoring')
0968 
0969         # monitor the failed_data_in queue
0970         try:
0971             job = queues.failed_data_in.get(block=True, timeout=1)
0972         except queue.Empty:
0973             pass
0974         else:
0975             # stage-out log file then add the job to the failed_jobs queue
0976             job.stageout = "log"
0977 
0978             # TODO: put in data_out queue instead?
0979 
0980             if not _stage_out_new(job, args):
0981                 logger.info("job %s failed during stage-in and stage-out of log, adding job object to failed_data_outs queue", job.jobid)
0982                 put_in_queue(job, queues.failed_data_out)
0983             else:
0984                 logger.info("job %s failed during stage-in, adding job object to failed_jobs queue", job.jobid)
0985                 put_in_queue(job, queues.failed_jobs)
0986 
0987         # monitor the finished_data_out queue
0988         try:
0989             job = queues.finished_data_out.get(block=True, timeout=1)
0990         except queue.Empty:
0991             pass
0992         else:
0993             # use the payload/transform exitCode from the job report if it exists
0994             if job.transexitcode == 0 and job.exitcode == 0 and job.piloterrorcodes == []:
0995                 logger.info('finished stage-out for finished payload, adding job to finished_jobs queue')
0996                 #queues.finished_jobs.put(job)
0997                 put_in_queue(job, queues.finished_jobs)
0998             else:
0999                 logger.info('finished stage-out (of log) for failed payload')
1000                 #queues.failed_jobs.put(job)
1001                 put_in_queue(job, queues.failed_jobs)
1002 
1003         # monitor the failed_data_out queue
1004         try:
1005             job = queues.failed_data_out.get(block=True, timeout=1)
1006         except queue.Empty:
1007             pass
1008         else:
1009             # attempt to upload the log in case the previous stage-out failure was not an SE error
1010             job.stageout = "log"
1011             set_pilot_state(job=job, state="failed")
1012             if not _stage_out_new(job, args):
1013                 logger.info("job %s failed during stage-out", job.jobid)
1014 
1015             put_in_queue(job, queues.failed_jobs)
1016 
1017         if abort:
1018             break
1019 
1020     # proceed to set the job_aborted flag?
1021     if threads_aborted():
1022         logger.debug('will proceed to set job_aborted')
1023         args.job_aborted.set()
1024     else:
1025         logger.debug('will not set job_aborted yet')
1026 
1027     logger.debug('[data] queue_monitor thread has finished')