File indexing completed on 2026-04-10 08:39:16
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 import argparse
0011 import os
0012 import re
0013
0014 from pilot.api.data import StageInClient
0015 from pilot.api.es_data import StageInESClient
0016 from pilot.info import InfoService, FileSpec, infosys
0017 from pilot.util.config import config
0018 from pilot.util.filehandling import establish_logging, write_json, read_json
0019 from pilot.util.tracereport import TraceReport
0020
0021 import logging
0022
0023
0024 GENERAL_ERROR = 1
0025 NO_QUEUENAME = 2
0026 NO_SCOPES = 3
0027 NO_LFNS = 4
0028 NO_EVENTTYPE = 5
0029 NO_LOCALSITE = 6
0030 NO_REMOTESITE = 7
0031 NO_PRODUSERID = 8
0032 NO_JOBID = 9
0033 NO_TASKID = 10
0034 NO_JOBDEFINITIONID = 11
0035 TRANSFER_ERROR = 12
0036
0037
0038 def get_args():
0039 """
0040 Return the args from the arg parser.
0041
0042 :return: args (arg parser object).
0043 """
0044
0045 arg_parser = argparse.ArgumentParser()
0046
0047 arg_parser.add_argument('-d',
0048 dest='debug',
0049 action='store_true',
0050 default=False,
0051 help='Enable debug mode for logging messages')
0052 arg_parser.add_argument('-q',
0053 dest='queuename',
0054 required=True,
0055 help='Queue name (e.g., AGLT2_TEST-condor')
0056 arg_parser.add_argument('-w',
0057 dest='workdir',
0058 required=False,
0059 default=os.getcwd(),
0060 help='Working directory')
0061 arg_parser.add_argument('--scopes',
0062 dest='scopes',
0063 required=False,
0064 help='List of Rucio scopes (e.g., mc16_13TeV,mc16_13TeV')
0065 arg_parser.add_argument('--lfns',
0066 dest='lfns',
0067 required=False,
0068 help='LFN list (e.g., filename1,filename2')
0069 arg_parser.add_argument('--eventtype',
0070 dest='eventtype',
0071 required=True,
0072 help='Event type')
0073 arg_parser.add_argument('--localsite',
0074 dest='localsite',
0075 required=True,
0076 help='Local site')
0077 arg_parser.add_argument('--remotesite',
0078 dest='remotesite',
0079 required=True,
0080 help='Remote site')
0081 arg_parser.add_argument('--produserid',
0082 dest='produserid',
0083 required=True,
0084 help='produserid')
0085 arg_parser.add_argument('--jobid',
0086 dest='jobid',
0087 required=True,
0088 help='PanDA job id')
0089 arg_parser.add_argument('--taskid',
0090 dest='taskid',
0091 required=True,
0092 help='PanDA task id')
0093 arg_parser.add_argument('--jobdefinitionid',
0094 dest='jobdefinitionid',
0095 required=True,
0096 help='Job definition id')
0097 arg_parser.add_argument('--eventservicemerge',
0098 dest='eventservicemerge',
0099 type=str2bool,
0100 default=False,
0101 help='Event service merge boolean')
0102 arg_parser.add_argument('--usepcache',
0103 dest='usepcache',
0104 type=str2bool,
0105 default=False,
0106 help='pcache boolean from queuedata')
0107 arg_parser.add_argument('--no-pilot-log',
0108 dest='nopilotlog',
0109 action='store_true',
0110 default=False,
0111 help='Do not write the pilot log to file')
0112 arg_parser.add_argument('--filesizes',
0113 dest='filesizes',
0114 required=False,
0115 help='Replica file sizes')
0116 arg_parser.add_argument('--checksums',
0117 dest='checksums',
0118 required=False,
0119 help='Replica checksums')
0120 arg_parser.add_argument('--allowlans',
0121 dest='allowlans',
0122 required=False,
0123 help='Replica allow_lan')
0124 arg_parser.add_argument('--allowwans',
0125 dest='allowwans',
0126 required=False,
0127 help='Replica allow_wan')
0128 arg_parser.add_argument('--directaccesslans',
0129 dest='directaccesslans',
0130 required=False,
0131 help='Replica direct_access_lan')
0132 arg_parser.add_argument('--directaccesswans',
0133 dest='directaccesswans',
0134 required=False,
0135 help='Replica direct_access_wan')
0136 arg_parser.add_argument('--istars',
0137 dest='istars',
0138 required=False,
0139 help='Replica is_tar')
0140 arg_parser.add_argument('--usevp',
0141 dest='usevp',
0142 type=str2bool,
0143 default=False,
0144 help='Job object boolean use_vp')
0145 arg_parser.add_argument('--accessmodes',
0146 dest='accessmodes',
0147 required=False,
0148 help='Replica accessmodes')
0149 arg_parser.add_argument('--storagetokens',
0150 dest='storagetokens',
0151 required=False,
0152 help='Replica storagetokens')
0153 arg_parser.add_argument('--guids',
0154 dest='guids',
0155 required=False,
0156 help='Replica guids')
0157 arg_parser.add_argument('--replicadictionary',
0158 dest='replicadictionary',
0159 required=True,
0160 help='Replica dictionary')
0161 arg_parser.add_argument('--inputdir',
0162 dest='inputdir',
0163 required=False,
0164 default='',
0165 help='Input files directory')
0166 arg_parser.add_argument('--catchall',
0167 dest='catchall',
0168 required=False,
0169 default='',
0170 help='PQ catchall field')
0171
0172 return arg_parser.parse_args()
0173
0174
0175 def str2bool(v):
0176 """ Helper function to convert string to bool """
0177
0178 if isinstance(v, bool):
0179 return v
0180 if v.lower() in ('yes', 'true', 't', 'y', '1'):
0181 return True
0182 elif v.lower() in ('no', 'false', 'f', 'n', '0'):
0183 return False
0184 else:
0185 raise argparse.ArgumentTypeError('Boolean value expected.')
0186
0187
0188 def verify_args():
0189 """
0190 Make sure required arguments are set, and if they are not then set them.
0191 (deprecated)
0192 :return:
0193 """
0194 if not args.workdir:
0195 args.workdir = os.getcwd()
0196
0197 if not args.queuename:
0198 message('queue name not set, cannot initialize InfoService')
0199 return NO_QUEUENAME
0200
0201 if not args.scopes:
0202 message('scopes not set')
0203 return NO_SCOPES
0204
0205 if not args.lfns:
0206 message('LFNs not set')
0207 return NO_LFNS
0208
0209 if not args.eventtype:
0210 message('No event type provided')
0211 return NO_EVENTTYPE
0212
0213 if not args.localsite:
0214 message('No local site provided')
0215 return NO_LOCALSITE
0216
0217 if not args.remotesite:
0218 message('No remote site provided')
0219 return NO_REMOTESITE
0220
0221 if not args.produserid:
0222 message('No produserid provided')
0223 return NO_PRODUSERID
0224
0225 if not args.jobid:
0226 message('No jobid provided')
0227 return NO_JOBID
0228
0229 if not args.taskid:
0230 message('No taskid provided')
0231 return NO_TASKID
0232
0233 if not args.jobdefinitionid:
0234 message('No jobdefinitionid provided')
0235 return NO_JOBDEFINITIONID
0236
0237 return 0
0238
0239
0240 def message(msg):
0241 print(msg) if not logger else logger.info(msg)
0242
0243
0244 def str_to_int_list(_list):
0245 _new_list = []
0246 for x in _list:
0247 try:
0248 _x = int(x)
0249 except Exception:
0250 _x = None
0251 _new_list.append(_x)
0252 return _new_list
0253
0254
0255 def str_to_bool_list(_list):
0256 changes = {"True": True, "False": False, "None": None, "NULL": None}
0257 return [changes.get(x, x) for x in _list]
0258
0259
0260 def get_file_lists(lfns, scopes, filesizes, checksums, allowlans, allowwans, directaccesslans, directaccesswans, istars,
0261 accessmodes, storagetokens, guids):
0262 _lfns = []
0263 _scopes = []
0264 _filesizes = []
0265 _checksums = []
0266 _allowlans = []
0267 _allowwans = []
0268 _directaccesslans = []
0269 _directaccesswans = []
0270 _istars = []
0271 _accessmodes = []
0272 _storagetokens = []
0273 _guids = []
0274 try:
0275 _lfns = lfns.split(',')
0276 _scopes = scopes.split(',')
0277 _filesizes = str_to_int_list(filesizes.split(','))
0278 _checksums = checksums.split(',')
0279 _allowlans = str_to_bool_list(allowlans.split(','))
0280 _allowwans = str_to_bool_list(allowwans.split(','))
0281 _directaccesslans = str_to_bool_list(directaccesslans.split(','))
0282 _directaccesswans = str_to_bool_list(directaccesswans.split(','))
0283 _istars = str_to_bool_list(istars.split(','))
0284 _accessmodes = accessmodes.split(',')
0285 _storagetokens = storagetokens.split(',')
0286 _guids = guids.split(',')
0287 except Exception as error:
0288 message("exception caught: %s" % error)
0289
0290 file_list_dictionary = {'lfns': _lfns, 'scopes': _scopes, 'filesizes': _filesizes, 'checksums': _checksums,
0291 'allowlans': _allowlans, 'allowwans': _allowwans, 'directaccesslans': _directaccesslans,
0292 'directaccesswans': _directaccesswans, 'istars': _istars, 'accessmodes': _accessmodes,
0293 'storagetokens': _storagetokens, 'guids': _guids}
0294 return file_list_dictionary
0295
0296
0297 class Job:
0298 """
0299 A minimal implementation of the Pilot Job class with data members necessary for the trace report only.
0300 """
0301
0302 produserid = ""
0303 jobid = ""
0304 taskid = ""
0305 jobdefinitionid = ""
0306
0307 def __init__(self, produserid="", jobid="", taskid="", jobdefinitionid=""):
0308 self.produserid = produserid.replace('%20', ' ')
0309 self.jobid = jobid
0310 self.taskid = taskid
0311 self.jobdefinitionid = jobdefinitionid
0312
0313
0314 def add_to_dictionary(dictionary, key, value1, value2, value3, value4):
0315 """
0316 Add key: [value1, value2, ..] to dictionary.
0317 In practice; lfn: [status, status_code, turl, DDM endpoint].
0318
0319 :param dictionary: dictionary to be updated.
0320 :param key: lfn key to be added (string).
0321 :param value1: status to be added to list belonging to key (string).
0322 :param value2: status_code to be added to list belonging to key (string).
0323 :param value3: turl (string).
0324 :param value4: DDM endpoint (string).
0325 :return: updated dictionary.
0326 """
0327
0328 dictionary[key] = [value1, value2, value3, value4]
0329 return dictionary
0330
0331
0332 def extract_error_info(err):
0333
0334 error_code = 0
0335 error_message = ""
0336
0337 _code = re.search(r'error code: (\d+)', err)
0338 if _code:
0339 error_code = _code.group(1)
0340
0341 _msg = re.search('details: (.+)', err)
0342 if _msg:
0343 error_message = _msg.group(1)
0344 error_message = error_message.replace('[PilotException(', '').strip()
0345
0346 return error_code, error_message
0347
0348
0349 if __name__ == '__main__':
0350 """
0351 Main function of the stage-in script.
0352 """
0353
0354
0355 args = get_args()
0356 args.debug = True
0357 args.nopilotlog = False
0358
0359 establish_logging(debug=args.debug, nopilotlog=args.nopilotlog, filename=config.Pilot.stageinlog)
0360 logger = logging.getLogger(__name__)
0361
0362
0363
0364
0365
0366
0367 try:
0368 replica_dictionary = read_json(os.path.join(args.workdir, args.replicadictionary))
0369 except Exception as e:
0370 message('exception caught reading json: %s' % e)
0371 exit(1)
0372
0373
0374
0375
0376
0377
0378
0379
0380
0381
0382
0383
0384
0385
0386
0387
0388
0389
0390 trace_report = TraceReport(pq=os.environ.get('PILOT_SITENAME', ''), localSite=args.localsite, remoteSite=args.remotesite, dataset="",
0391 eventType=args.eventtype)
0392 job = Job(produserid=args.produserid, jobid=args.jobid, taskid=args.taskid, jobdefinitionid=args.jobdefinitionid)
0393 trace_report.init(job)
0394
0395 try:
0396 infoservice = InfoService()
0397 infoservice.init(args.queuename, infosys.confinfo, infosys.extinfo)
0398 infosys.init(args.queuename)
0399 except Exception as e:
0400 message(e)
0401
0402
0403 err = ""
0404 errcode = 0
0405 if args.eventservicemerge:
0406 client = StageInESClient(infoservice, logger=logger, trace_report=trace_report)
0407 activity = 'es_events_read'
0408 else:
0409 client = StageInClient(infoservice, logger=logger, trace_report=trace_report)
0410 activity = 'pr'
0411 kwargs = dict(workdir=args.workdir, cwd=args.workdir, usecontainer=False, use_pcache=args.usepcache, use_bulk=False,
0412 use_vp=args.usevp, input_dir=args.inputdir, catchall=args.catchall)
0413 xfiles = []
0414 for lfn in replica_dictionary:
0415 files = [{'scope': replica_dictionary[lfn]['scope'],
0416 'lfn': lfn,
0417 'guid': replica_dictionary[lfn]['guid'],
0418 'workdir': args.workdir,
0419 'filesize': replica_dictionary[lfn]['filesize'],
0420 'checksum': replica_dictionary[lfn]['checksum'],
0421 'allow_lan': replica_dictionary[lfn]['allowlan'],
0422 'allow_wan': replica_dictionary[lfn]['allowwan'],
0423 'direct_access_lan': replica_dictionary[lfn]['directaccesslan'],
0424 'direct_access_wan': replica_dictionary[lfn]['directaccesswan'],
0425 'is_tar': replica_dictionary[lfn]['istar'],
0426 'accessmode': replica_dictionary[lfn]['accessmode'],
0427 'storage_token': replica_dictionary[lfn]['storagetoken']}]
0428
0429
0430 _xfiles = [FileSpec(type='input', **f) for f in files]
0431 xfiles += _xfiles
0432
0433 try:
0434 client.prepare_sources(xfiles)
0435 r = client.transfer(xfiles, activity=activity, **kwargs)
0436 except Exception as e:
0437 err = str(e)
0438 errcode = -1
0439 message(err)
0440
0441
0442 file_dictionary = {}
0443 if xfiles:
0444 message('stagein script summary of transferred files:')
0445 for fspec in xfiles:
0446 add_to_dictionary(file_dictionary, fspec.lfn, fspec.status, fspec.status_code, fspec.turl, fspec.ddmendpoint)
0447 status = fspec.status if fspec.status else "(not transferred)"
0448 message(" -- lfn=%s, ddmendpoint=%s, status_code=%s, status=%s" % (fspec.lfn, fspec.ddmendpoint, fspec.status_code, status))
0449
0450
0451 if err:
0452 errcode, err = extract_error_info(err)
0453 add_to_dictionary(file_dictionary, 'error', err, errcode, None, None)
0454 _status = write_json(os.path.join(args.workdir, config.Container.stagein_status_dictionary), file_dictionary)
0455 if err:
0456 message("containerised file transfers failed: %s" % err)
0457 exit(TRANSFER_ERROR)
0458
0459 message("containerised file transfers finished")
0460 exit(0)