File indexing completed on 2026-04-10 08:39:16
0001
0002
0003
0004
0005
0006
0007
0008
0009 import argparse
0010 import os
0011 import logging
0012 import threading
0013 import queue
0014 import ROOT
0015 from collections import namedtuple
0016
0017 from pilot.util.config import config
0018 from pilot.util.filehandling import (
0019 establish_logging,
0020 write_json,
0021 )
0022
0023 logger = logging.getLogger(__name__)
0024
0025
0026 def get_args():
0027 """
0028 Return the args from the arg parser.
0029
0030 :return: args (arg parser object).
0031 """
0032
0033 arg_parser = argparse.ArgumentParser()
0034
0035 arg_parser.add_argument('-d',
0036 dest='debug',
0037 action='store_true',
0038 default=False,
0039 help='Enable debug mode for logging messages')
0040 arg_parser.add_argument('-t',
0041 dest='nthreads',
0042 default=1,
0043 required=False,
0044 type=int,
0045 help='Number of concurrent file open threads')
0046 arg_parser.add_argument('-w',
0047 dest='workdir',
0048 required=False,
0049 default=os.getcwd(),
0050 help='Working directory')
0051 arg_parser.add_argument('--turls',
0052 dest='turls',
0053 required=True,
0054 help='TURL list (e.g., filepath1,filepath2')
0055 arg_parser.add_argument('--no-pilot-log',
0056 dest='nopilotlog',
0057 action='store_true',
0058 default=False,
0059 help='Do not write the pilot log to file')
0060
0061 return arg_parser.parse_args()
0062
0063
0064 def message(msg):
0065 """
0066 Print message to stdout or to log.
0067 Note: not using lazy formatting.
0068
0069 :param msg: message (string).
0070 :return:
0071 """
0072
0073 print(msg) if not logger else logger.info(msg)
0074
0075
0076 def get_file_lists(turls):
0077 """
0078 Return a dictionary with the turls.
0079 Format: {'turls': <turl list>}
0080
0081 :param turls: comma separated turls (string)
0082 :return: turls dictionary.
0083 """
0084
0085 _turls = []
0086
0087 try:
0088 _turls = turls.split(',')
0089 except Exception as error:
0090 message("exception caught: %s" % error)
0091
0092 return {'turls': _turls}
0093
0094
0095 def try_open_file(turl, queues):
0096 """
0097 Attempt to open a remote file.
0098 Successfully opened turls will be put in the queues.opened queue. Unsuccessful turls will be placed in
0099 the queues.unopened queue.
0100
0101 :param turl: turl (string).
0102 :param queues: queues collection.
0103 :return:
0104 """
0105
0106 turl_opened = False
0107 try:
0108 message('opening %s' % turl)
0109 _timeout = 120 * 1000
0110 _ = ROOT.TFile.SetOpenTimeout(_timeout)
0111 message("time-out set to %d ms)" % _timeout)
0112 in_file = ROOT.TFile.Open(turl)
0113 except Exception as exc:
0114 message('caught exception: %s' % exc)
0115 else:
0116 if in_file and in_file.IsOpen():
0117 in_file.Close()
0118 turl_opened = True
0119 message('closed %s' % turl)
0120 queues.opened.put(turl) if turl_opened else queues.unopened.put(turl)
0121 queues.result.put(turl)
0122
0123
0124 def spawn_file_open_thread(queues, file_list):
0125 """
0126 Spawn a thread for the try_open_file().
0127
0128 :param queues: queue collection.
0129 :param file_list: files to open (list).
0130 :return: thread.
0131 """
0132
0133 thread = None
0134 try:
0135 turl = file_list.pop(0)
0136 except IndexError:
0137 pass
0138 else:
0139
0140 thread = threading.Thread(target=try_open_file, args=(turl, queues))
0141 thread.daemon = True
0142 thread.start()
0143
0144 return thread
0145
0146
0147 if __name__ == '__main__':
0148 """
0149 Main function of the remote file open script.
0150 """
0151
0152
0153 args = get_args()
0154 args.debug = True
0155 args.nopilotlog = False
0156
0157 try:
0158 logname = config.Pilot.remotefileverification_log
0159 except Exception as error:
0160 print("caught exception: %s (skipping remote file open verification)" % error)
0161 exit(1)
0162 else:
0163 if not logname:
0164 print("remote file open verification not desired")
0165 exit(0)
0166
0167 establish_logging(debug=args.debug, nopilotlog=args.nopilotlog, filename=logname)
0168 logger = logging.getLogger(__name__)
0169
0170
0171 file_list_dictionary = get_file_lists(args.turls)
0172 turls = file_list_dictionary.get('turls')
0173 processed_turls_dictionary = {}
0174
0175 queues = namedtuple('queues', ['result', 'opened', 'unopened'])
0176 queues.result = queue.Queue()
0177 queues.opened = queue.Queue()
0178 queues.unopened = queue.Queue()
0179 threads = []
0180
0181 message('will attempt to open %d file(s) using %d thread(s)' % (len(turls), args.nthreads))
0182
0183 if turls:
0184
0185 for index in range(args.nthreads):
0186 thread = spawn_file_open_thread(queues, turls)
0187 if thread:
0188 threads.append(thread)
0189
0190 timedout = False
0191 while turls:
0192
0193 try:
0194 _ = queues.result.get(block=True, timeout=60)
0195 except queue.Empty:
0196 message("reached time-out")
0197 timedout = True
0198 break
0199 except Exception as error:
0200 message("caught exception: %s" % error)
0201
0202 thread = spawn_file_open_thread(queues, turls)
0203 if thread:
0204 threads.append(thread)
0205
0206
0207 [_thread.join() for _thread in threads]
0208
0209 opened_turls = list(queues.opened.queue)
0210 opened_turls.sort()
0211 unopened_turls = list(queues.unopened.queue)
0212 unopened_turls.sort()
0213
0214 for turl in opened_turls:
0215 processed_turls_dictionary[turl] = True
0216 for turl in unopened_turls:
0217 processed_turls_dictionary[turl] = False
0218
0219
0220 _status = write_json(os.path.join(args.workdir, config.Pilot.remotefileverification_dictionary), processed_turls_dictionary)
0221 else:
0222 message('no TURLs to verify')
0223
0224 exit(0)