Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:16

0001 # Licensed under the Apache License, Version 2.0 (the "License");
0002 # you may not use this file except in compliance with the License.
0003 # You may obtain a copy of the License at
0004 # http://www.apache.org/licenses/LICENSE-2.0
0005 #
0006 # Authors:
0007 # - Paul Nilsson, paul.nilsson@cern.ch, 2020-2021
0008 
0009 import argparse
0010 import os
0011 import logging
0012 import threading
0013 import queue
0014 import ROOT
0015 from collections import namedtuple
0016 
0017 from pilot.util.config import config
0018 from pilot.util.filehandling import (
0019     establish_logging,
0020     write_json,
0021 )
0022 
0023 logger = logging.getLogger(__name__)
0024 
0025 
0026 def get_args():
0027     """
0028     Return the args from the arg parser.
0029 
0030     :return: args (arg parser object).
0031     """
0032 
0033     arg_parser = argparse.ArgumentParser()
0034 
0035     arg_parser.add_argument('-d',
0036                             dest='debug',
0037                             action='store_true',
0038                             default=False,
0039                             help='Enable debug mode for logging messages')
0040     arg_parser.add_argument('-t',
0041                             dest='nthreads',
0042                             default=1,
0043                             required=False,
0044                             type=int,
0045                             help='Number of concurrent file open threads')
0046     arg_parser.add_argument('-w',
0047                             dest='workdir',
0048                             required=False,
0049                             default=os.getcwd(),
0050                             help='Working directory')
0051     arg_parser.add_argument('--turls',
0052                             dest='turls',
0053                             required=True,
0054                             help='TURL list (e.g., filepath1,filepath2')
0055     arg_parser.add_argument('--no-pilot-log',
0056                             dest='nopilotlog',
0057                             action='store_true',
0058                             default=False,
0059                             help='Do not write the pilot log to file')
0060 
0061     return arg_parser.parse_args()
0062 
0063 
0064 def message(msg):
0065     """
0066     Print message to stdout or to log.
0067     Note: not using lazy formatting.
0068 
0069     :param msg: message (string).
0070     :return:
0071     """
0072 
0073     print(msg) if not logger else logger.info(msg)
0074 
0075 
0076 def get_file_lists(turls):
0077     """
0078     Return a dictionary with the turls.
0079     Format: {'turls': <turl list>}
0080 
0081     :param turls: comma separated turls (string)
0082     :return: turls dictionary.
0083     """
0084 
0085     _turls = []
0086 
0087     try:
0088         _turls = turls.split(',')
0089     except Exception as error:
0090         message("exception caught: %s" % error)
0091 
0092     return {'turls': _turls}
0093 
0094 
0095 def try_open_file(turl, queues):
0096     """
0097     Attempt to open a remote file.
0098     Successfully opened turls will be put in the queues.opened queue. Unsuccessful turls will be placed in
0099     the queues.unopened queue.
0100 
0101     :param turl: turl (string).
0102     :param queues: queues collection.
0103     :return:
0104     """
0105 
0106     turl_opened = False
0107     try:
0108         message('opening %s' % turl)
0109         _timeout = 120 * 1000  # 120 s
0110         _ = ROOT.TFile.SetOpenTimeout(_timeout)
0111         message("time-out set to %d ms)" % _timeout)
0112         in_file = ROOT.TFile.Open(turl)
0113     except Exception as exc:
0114         message('caught exception: %s' % exc)
0115     else:
0116         if in_file and in_file.IsOpen():
0117             in_file.Close()
0118             turl_opened = True
0119             message('closed %s' % turl)
0120     queues.opened.put(turl) if turl_opened else queues.unopened.put(turl)
0121     queues.result.put(turl)
0122 
0123 
0124 def spawn_file_open_thread(queues, file_list):
0125     """
0126     Spawn a thread for the try_open_file().
0127 
0128     :param queues: queue collection.
0129     :param file_list: files to open (list).
0130     :return: thread.
0131     """
0132 
0133     thread = None
0134     try:
0135         turl = file_list.pop(0)
0136     except IndexError:
0137         pass
0138     else:
0139         # create and start thread for the current turl
0140         thread = threading.Thread(target=try_open_file, args=(turl, queues))
0141         thread.daemon = True
0142         thread.start()
0143 
0144     return thread
0145 
0146 
0147 if __name__ == '__main__':
0148     """
0149     Main function of the remote file open script.
0150     """
0151 
0152     # get the args from the arg parser
0153     args = get_args()
0154     args.debug = True
0155     args.nopilotlog = False
0156 
0157     try:
0158         logname = config.Pilot.remotefileverification_log
0159     except Exception as error:
0160         print("caught exception: %s (skipping remote file open verification)" % error)
0161         exit(1)
0162     else:
0163         if not logname:
0164             print("remote file open verification not desired")
0165             exit(0)
0166 
0167     establish_logging(debug=args.debug, nopilotlog=args.nopilotlog, filename=logname)
0168     logger = logging.getLogger(__name__)
0169 
0170     # get the file info
0171     file_list_dictionary = get_file_lists(args.turls)
0172     turls = file_list_dictionary.get('turls')
0173     processed_turls_dictionary = {}
0174 
0175     queues = namedtuple('queues', ['result', 'opened', 'unopened'])
0176     queues.result = queue.Queue()
0177     queues.opened = queue.Queue()
0178     queues.unopened = queue.Queue()
0179     threads = []
0180 
0181     message('will attempt to open %d file(s) using %d thread(s)' % (len(turls), args.nthreads))
0182 
0183     if turls:
0184         # make N calls to begin with
0185         for index in range(args.nthreads):
0186             thread = spawn_file_open_thread(queues, turls)
0187             if thread:
0188                 threads.append(thread)
0189 
0190         timedout = False
0191         while turls:
0192 
0193             try:
0194                 _ = queues.result.get(block=True, timeout=60)
0195             except queue.Empty:
0196                 message("reached time-out")
0197                 timedout = True
0198                 break
0199             except Exception as error:
0200                 message("caught exception: %s" % error)
0201 
0202             thread = spawn_file_open_thread(queues, turls)
0203             if thread:
0204                 threads.append(thread)
0205 
0206         # wait until all threads have finished
0207         [_thread.join() for _thread in threads]
0208 
0209         opened_turls = list(queues.opened.queue)
0210         opened_turls.sort()
0211         unopened_turls = list(queues.unopened.queue)
0212         unopened_turls.sort()
0213 
0214         for turl in opened_turls:
0215             processed_turls_dictionary[turl] = True
0216         for turl in unopened_turls:
0217             processed_turls_dictionary[turl] = False
0218 
0219         # write dictionary to file with results
0220         _status = write_json(os.path.join(args.workdir, config.Pilot.remotefileverification_dictionary), processed_turls_dictionary)
0221     else:
0222         message('no TURLs to verify')
0223 
0224     exit(0)