File indexing completed on 2026-04-10 08:39:16
0001
0002
0003
0004
0005
0006
0007
0008
0009 import argparse
0010 import os
0011 import re
0012
0013 from pilot.api.data import StageOutClient
0014 from pilot.common.errorcodes import ErrorCodes
0015 from pilot.common.exception import PilotException
0016 from pilot.info import InfoService, FileSpec, infosys
0017 from pilot.util.config import config
0018 from pilot.util.filehandling import establish_logging, write_json
0019 from pilot.util.tracereport import TraceReport
0020
0021 import logging
0022
0023 errors = ErrorCodes()
0024
0025
0026 GENERAL_ERROR = 1
0027 NO_QUEUENAME = 2
0028 NO_SCOPES = 3
0029 NO_LFNS = 4
0030 NO_EVENTTYPE = 5
0031 NO_LOCALSITE = 6
0032 NO_REMOTESITE = 7
0033 NO_PRODUSERID = 8
0034 NO_JOBID = 9
0035 NO_TASKID = 10
0036 NO_JOBDEFINITIONID = 11
0037 NO_DDMENDPOINTS = 12
0038 NO_DATASETS = 13
0039 NO_GUIDS = 14
0040 TRANSFER_ERROR = 15
0041
0042
0043 def get_args():
0044 """
0045 Return the args from the arg parser.
0046
0047 :return: args (arg parser object).
0048 """
0049
0050 arg_parser = argparse.ArgumentParser()
0051
0052 arg_parser.add_argument('-d',
0053 dest='debug',
0054 action='store_true',
0055 default=False,
0056 help='Enable debug mode for logging messages')
0057 arg_parser.add_argument('-q',
0058 dest='queuename',
0059 required=True,
0060 help='Queue name (e.g., AGLT2_TEST-condor')
0061 arg_parser.add_argument('-w',
0062 dest='workdir',
0063 required=False,
0064 default=os.getcwd(),
0065 help='Working directory')
0066 arg_parser.add_argument('--scopes',
0067 dest='scopes',
0068 required=True,
0069 help='List of Rucio scopes (e.g., mc16_13TeV,mc16_13TeV')
0070 arg_parser.add_argument('--lfns',
0071 dest='lfns',
0072 required=True,
0073 help='LFN list (e.g., filename1,filename2')
0074 arg_parser.add_argument('--eventtype',
0075 dest='eventtype',
0076 required=True,
0077 help='Event type')
0078 arg_parser.add_argument('--ddmendpoints',
0079 dest='ddmendpoints',
0080 required=True,
0081 help='DDM endpoint')
0082 arg_parser.add_argument('--datasets',
0083 dest='datasets',
0084 required=True,
0085 help='Dataset')
0086 arg_parser.add_argument('--guids',
0087 dest='guids',
0088 required=True,
0089 help='GUIDs')
0090 arg_parser.add_argument('--localsite',
0091 dest='localsite',
0092 required=True,
0093 help='Local site')
0094 arg_parser.add_argument('--remotesite',
0095 dest='remotesite',
0096 required=True,
0097 help='Remote site')
0098 arg_parser.add_argument('--produserid',
0099 dest='produserid',
0100 required=True,
0101 help='produserid')
0102 arg_parser.add_argument('--jobid',
0103 dest='jobid',
0104 required=True,
0105 help='PanDA job id')
0106 arg_parser.add_argument('--taskid',
0107 dest='taskid',
0108 required=True,
0109 help='PanDA task id')
0110 arg_parser.add_argument('--jobdefinitionid',
0111 dest='jobdefinitionid',
0112 required=True,
0113 help='Job definition id')
0114 arg_parser.add_argument('--eventservicemerge',
0115 dest='eventservicemerge',
0116 type=str2bool,
0117 default=False,
0118 help='Event service merge boolean')
0119 arg_parser.add_argument('--usepcache',
0120 dest='usepcache',
0121 type=str2bool,
0122 default=False,
0123 help='pcache boolean from queuedata')
0124 arg_parser.add_argument('--no-pilot-log',
0125 dest='nopilotlog',
0126 action='store_true',
0127 default=False,
0128 help='Do not write the pilot log to file')
0129 arg_parser.add_argument('--outputdir',
0130 dest='outputdir',
0131 required=False,
0132 default='',
0133 help='Output files directory')
0134 arg_parser.add_argument('--catchall',
0135 dest='catchall',
0136 required=False,
0137 default='',
0138 help='PQ catchall field')
0139
0140 return arg_parser.parse_args()
0141
0142
0143 def str2bool(v):
0144 """ Helper function to convert string to bool """
0145
0146 if isinstance(v, bool):
0147 return v
0148 if v.lower() in ('yes', 'true', 't', 'y', '1'):
0149 return True
0150 elif v.lower() in ('no', 'false', 'f', 'n', '0'):
0151 return False
0152 else:
0153 raise argparse.ArgumentTypeError('Boolean value expected.')
0154
0155
0156 def verify_args():
0157 """
0158 Make sure required arguments are set, and if they are not then set them.
0159 (deprecated)
0160 :return:
0161 """
0162 if not args.workdir:
0163 args.workdir = os.getcwd()
0164
0165 if not args.queuename:
0166 message('queue name not set, cannot initialize InfoService')
0167 return NO_QUEUENAME
0168
0169 if not args.scopes:
0170 message('scopes not set')
0171 return NO_SCOPES
0172
0173 if not args.lfns:
0174 message('LFNs not set')
0175 return NO_LFNS
0176
0177 if not args.eventtype:
0178 message('No event type provided')
0179 return NO_EVENTTYPE
0180
0181 if not args.localsite:
0182 message('No local site provided')
0183 return NO_LOCALSITE
0184
0185 if not args.remotesite:
0186 message('No remote site provided')
0187 return NO_REMOTESITE
0188
0189 if not args.produserid:
0190 message('No produserid provided')
0191 return NO_PRODUSERID
0192
0193 if not args.jobid:
0194 message('No jobid provided')
0195 return NO_JOBID
0196
0197 if not args.ddmendpoints:
0198 message('No ddmendpoint provided')
0199 return NO_DDMENDPOINTS
0200
0201 if not args.datasets:
0202 message('No dataset provided')
0203 return NO_DATASETS
0204
0205 if not args.guids:
0206 message('No GUIDs provided')
0207 return NO_GUIDS
0208
0209 if not args.taskid:
0210 message('No taskid provided')
0211 return NO_TASKID
0212
0213 if not args.jobdefinitionid:
0214 message('No jobdefinitionid provided')
0215 return NO_JOBDEFINITIONID
0216
0217 return 0
0218
0219
0220 def message(msg):
0221 print(msg) if not logger else logger.info(msg)
0222
0223
0224 def get_file_lists(lfns, scopes, ddmendpoints, datasets, guids):
0225 return lfns.split(','), scopes.split(','), ddmendpoints.split(','), datasets.split(','), guids.split(',')
0226
0227
0228 class Job:
0229 """
0230 A minimal implementation of the Pilot Job class with data members necessary for the trace report only.
0231 """
0232
0233 produserid = ""
0234 jobid = ""
0235 taskid = ""
0236 jobdefinitionid = ""
0237
0238 def __init__(self, produserid="", jobid="", taskid="", jobdefinitionid=""):
0239 self.produserid = produserid.replace('%20', ' ')
0240 self.jobid = jobid
0241 self.taskid = taskid
0242 self.jobdefinitionid = jobdefinitionid
0243
0244
0245 def add_to_dictionary(dictionary, key, value1, value2, value3, value4, value5, value6):
0246 """
0247 Add key: [value1, value2, value3, value4, value5, value6] to dictionary.
0248 In practice; lfn: [status, status_code, surl, turl, checksum, fsize].
0249
0250 :param dictionary: dictionary to be updated.
0251 :param key: lfn key to be added (string).
0252 :param value1: status to be added to list belonging to key (string).
0253 :param value2: status_code to be added to list belonging to key (string).
0254 :param value3: surl to be added to list belonging to key (string).
0255 :param value4: turl to be added to list belonging to key (string).
0256 :param value5: checksum to be added to list belonging to key (string).
0257 :param value6: fsize to be added to list belonging to key (string).
0258 :return: updated dictionary.
0259 """
0260
0261 dictionary[key] = [value1, value2, value3, value4, value5, value6]
0262 return dictionary
0263
0264
0265 def extract_error_info(err):
0266
0267 error_code = 0
0268 error_message = ""
0269
0270 _code = re.search(r'error code: (\d+)', err)
0271 if _code:
0272 error_code = _code.group(1)
0273
0274 _msg = re.search('details: (.+)', err)
0275 if _msg:
0276 error_message = _msg.group(1)
0277 error_message = error_message.replace('[PilotException(', '').strip()
0278
0279 return error_code, error_message
0280
0281
0282 if __name__ == '__main__':
0283 """
0284 Main function of the stage-in script.
0285 """
0286
0287
0288 args = get_args()
0289 args.debug = True
0290 args.nopilotlog = False
0291
0292 establish_logging(debug=args.debug, nopilotlog=args.nopilotlog, filename=config.Pilot.stageoutlog)
0293 logger = logging.getLogger(__name__)
0294
0295
0296
0297
0298
0299
0300 lfns, scopes, ddmendpoints, datasets, guids = get_file_lists(args.lfns, args.scopes, args.ddmendpoints, args.datasets, args.guids)
0301 if len(lfns) != len(scopes) or len(lfns) != len(ddmendpoints) or len(lfns) != len(datasets) or len(lfns) != len(guids):
0302 message('file lists not same length: len(lfns)=%d, len(scopes)=%d, len(ddmendpoints)=%d, len(datasets)=%d, len(guids)=%d' %
0303 (len(lfns), len(scopes), len(ddmendpoints), len(datasets), len(guids)))
0304
0305
0306 trace_report = TraceReport(pq=os.environ.get('PILOT_SITENAME', ''), localSite=args.localsite,
0307 remoteSite=args.remotesite, dataset="", eventType=args.eventtype)
0308 job = Job(produserid=args.produserid, jobid=args.jobid, taskid=args.taskid, jobdefinitionid=args.jobdefinitionid)
0309 trace_report.init(job)
0310
0311 try:
0312 infoservice = InfoService()
0313 infoservice.init(args.queuename, infosys.confinfo, infosys.extinfo)
0314 infosys.init(args.queuename)
0315 except Exception as e:
0316 message(e)
0317
0318
0319 err = ""
0320 errcode = 0
0321 xfiles = None
0322 activity = 'pw'
0323
0324 client = StageOutClient(infoservice, logger=logger, trace_report=trace_report)
0325 kwargs = dict(workdir=args.workdir, cwd=args.workdir, usecontainer=False, job=job, output_dir=args.outputdir,
0326 catchall=args.catchall)
0327
0328 xfiles = []
0329 for lfn, scope, dataset, ddmendpoint, guid in list(zip(lfns, scopes, datasets, ddmendpoints, guids)):
0330 files = [{'scope': scope, 'lfn': lfn, 'workdir': args.workdir, 'dataset': dataset, 'ddmendpoint': ddmendpoint,
0331 'ddmendpoint_alt': None}]
0332
0333 _xfiles = [FileSpec(type='output', **f) for f in files]
0334 xfiles += _xfiles
0335
0336
0337 if infoservice.queuedata.type != 'unified':
0338 client.prepare_destinations(xfiles,
0339 activity)
0340
0341 try:
0342 r = client.transfer(xfiles, activity=activity, **kwargs)
0343 except PilotException as error:
0344 import traceback
0345 error_msg = traceback.format_exc()
0346 logger.error(error_msg)
0347 err = errors.format_diagnostics(error.get_error_code(), error_msg)
0348 except Exception as error:
0349 err = str(error)
0350 errcode = -1
0351 message(err)
0352
0353
0354
0355
0356
0357
0358
0359
0360
0361
0362
0363
0364
0365
0366
0367
0368
0369
0370
0371
0372
0373
0374
0375 file_dictionary = {}
0376 if xfiles:
0377 message('stageout script summary of transferred files:')
0378 for fspec in xfiles:
0379 add_to_dictionary(file_dictionary, fspec.lfn, fspec.status, fspec.status_code,
0380 fspec.surl, fspec.turl, fspec.checksum.get('adler32'), fspec.filesize)
0381 status = fspec.status if fspec.status else "(not transferred)"
0382 message(" -- lfn=%s, status_code=%s, status=%s, surl=%s, turl=%s, checksum=%s, filesize=%s" %
0383 (fspec.lfn, fspec.status_code, status, fspec.surl, fspec.turl, fspec.checksum.get('adler32'), fspec.filesize))
0384
0385
0386 if err:
0387 errcode, err = extract_error_info(err)
0388 add_to_dictionary(file_dictionary, 'error', err, errcode, None, None, None, None)
0389 path = os.path.join(args.workdir, config.Container.stageout_status_dictionary)
0390 if os.path.exists(path):
0391 path += '.log'
0392 _status = write_json(path, file_dictionary)
0393 if err:
0394 message("containerised file transfers failed: %s" % err)
0395 exit(TRANSFER_ERROR)
0396
0397 message("wrote %s" % path)
0398 message("containerised file transfers finished")
0399 exit(0)