Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:16

0001 #do not use: #!/usr/bin/env python3
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2020
0009 
0010 import argparse
0011 import os
0012 import re
0013 
0014 from pilot.api.data import StageInClient
0015 from pilot.api.es_data import StageInESClient
0016 from pilot.info import InfoService, FileSpec, infosys
0017 from pilot.util.config import config
0018 from pilot.util.filehandling import establish_logging, write_json, read_json
0019 from pilot.util.tracereport import TraceReport
0020 
0021 import logging
0022 
0023 # error codes
0024 GENERAL_ERROR = 1
0025 NO_QUEUENAME = 2
0026 NO_SCOPES = 3
0027 NO_LFNS = 4
0028 NO_EVENTTYPE = 5
0029 NO_LOCALSITE = 6
0030 NO_REMOTESITE = 7
0031 NO_PRODUSERID = 8
0032 NO_JOBID = 9
0033 NO_TASKID = 10
0034 NO_JOBDEFINITIONID = 11
0035 TRANSFER_ERROR = 12
0036 
0037 
0038 def get_args():
0039     """
0040     Return the args from the arg parser.
0041 
0042     :return: args (arg parser object).
0043     """
0044 
0045     arg_parser = argparse.ArgumentParser()
0046 
0047     arg_parser.add_argument('-d',
0048                             dest='debug',
0049                             action='store_true',
0050                             default=False,
0051                             help='Enable debug mode for logging messages')
0052     arg_parser.add_argument('-q',
0053                             dest='queuename',
0054                             required=True,
0055                             help='Queue name (e.g., AGLT2_TEST-condor')
0056     arg_parser.add_argument('-w',
0057                             dest='workdir',
0058                             required=False,
0059                             default=os.getcwd(),
0060                             help='Working directory')
0061     arg_parser.add_argument('--scopes',
0062                             dest='scopes',
0063                             required=False,
0064                             help='List of Rucio scopes (e.g., mc16_13TeV,mc16_13TeV')
0065     arg_parser.add_argument('--lfns',
0066                             dest='lfns',
0067                             required=False,
0068                             help='LFN list (e.g., filename1,filename2')
0069     arg_parser.add_argument('--eventtype',
0070                             dest='eventtype',
0071                             required=True,
0072                             help='Event type')
0073     arg_parser.add_argument('--localsite',
0074                             dest='localsite',
0075                             required=True,
0076                             help='Local site')
0077     arg_parser.add_argument('--remotesite',
0078                             dest='remotesite',
0079                             required=True,
0080                             help='Remote site')
0081     arg_parser.add_argument('--produserid',
0082                             dest='produserid',
0083                             required=True,
0084                             help='produserid')
0085     arg_parser.add_argument('--jobid',
0086                             dest='jobid',
0087                             required=True,
0088                             help='PanDA job id')
0089     arg_parser.add_argument('--taskid',
0090                             dest='taskid',
0091                             required=True,
0092                             help='PanDA task id')
0093     arg_parser.add_argument('--jobdefinitionid',
0094                             dest='jobdefinitionid',
0095                             required=True,
0096                             help='Job definition id')
0097     arg_parser.add_argument('--eventservicemerge',
0098                             dest='eventservicemerge',
0099                             type=str2bool,
0100                             default=False,
0101                             help='Event service merge boolean')
0102     arg_parser.add_argument('--usepcache',
0103                             dest='usepcache',
0104                             type=str2bool,
0105                             default=False,
0106                             help='pcache boolean from queuedata')
0107     arg_parser.add_argument('--no-pilot-log',
0108                             dest='nopilotlog',
0109                             action='store_true',
0110                             default=False,
0111                             help='Do not write the pilot log to file')
0112     arg_parser.add_argument('--filesizes',
0113                             dest='filesizes',
0114                             required=False,
0115                             help='Replica file sizes')
0116     arg_parser.add_argument('--checksums',
0117                             dest='checksums',
0118                             required=False,
0119                             help='Replica checksums')
0120     arg_parser.add_argument('--allowlans',
0121                             dest='allowlans',
0122                             required=False,
0123                             help='Replica allow_lan')
0124     arg_parser.add_argument('--allowwans',
0125                             dest='allowwans',
0126                             required=False,
0127                             help='Replica allow_wan')
0128     arg_parser.add_argument('--directaccesslans',
0129                             dest='directaccesslans',
0130                             required=False,
0131                             help='Replica direct_access_lan')
0132     arg_parser.add_argument('--directaccesswans',
0133                             dest='directaccesswans',
0134                             required=False,
0135                             help='Replica direct_access_wan')
0136     arg_parser.add_argument('--istars',
0137                             dest='istars',
0138                             required=False,
0139                             help='Replica is_tar')
0140     arg_parser.add_argument('--usevp',
0141                             dest='usevp',
0142                             type=str2bool,
0143                             default=False,
0144                             help='Job object boolean use_vp')
0145     arg_parser.add_argument('--accessmodes',
0146                             dest='accessmodes',
0147                             required=False,
0148                             help='Replica accessmodes')
0149     arg_parser.add_argument('--storagetokens',
0150                             dest='storagetokens',
0151                             required=False,
0152                             help='Replica storagetokens')
0153     arg_parser.add_argument('--guids',
0154                             dest='guids',
0155                             required=False,
0156                             help='Replica guids')
0157     arg_parser.add_argument('--replicadictionary',
0158                             dest='replicadictionary',
0159                             required=True,
0160                             help='Replica dictionary')
0161     arg_parser.add_argument('--inputdir',
0162                             dest='inputdir',
0163                             required=False,
0164                             default='',
0165                             help='Input files directory')
0166     arg_parser.add_argument('--catchall',
0167                             dest='catchall',
0168                             required=False,
0169                             default='',
0170                             help='PQ catchall field')
0171 
0172     return arg_parser.parse_args()
0173 
0174 
0175 def str2bool(v):
0176     """ Helper function to convert string to bool """
0177 
0178     if isinstance(v, bool):
0179         return v
0180     if v.lower() in ('yes', 'true', 't', 'y', '1'):
0181         return True
0182     elif v.lower() in ('no', 'false', 'f', 'n', '0'):
0183         return False
0184     else:
0185         raise argparse.ArgumentTypeError('Boolean value expected.')
0186 
0187 
0188 def verify_args():
0189     """
0190     Make sure required arguments are set, and if they are not then set them.
0191     (deprecated)
0192     :return:
0193     """
0194     if not args.workdir:
0195         args.workdir = os.getcwd()
0196 
0197     if not args.queuename:
0198         message('queue name not set, cannot initialize InfoService')
0199         return NO_QUEUENAME
0200 
0201     if not args.scopes:
0202         message('scopes not set')
0203         return NO_SCOPES
0204 
0205     if not args.lfns:
0206         message('LFNs not set')
0207         return NO_LFNS
0208 
0209     if not args.eventtype:
0210         message('No event type provided')
0211         return NO_EVENTTYPE
0212 
0213     if not args.localsite:
0214         message('No local site provided')
0215         return NO_LOCALSITE
0216 
0217     if not args.remotesite:
0218         message('No remote site provided')
0219         return NO_REMOTESITE
0220 
0221     if not args.produserid:
0222         message('No produserid provided')
0223         return NO_PRODUSERID
0224 
0225     if not args.jobid:
0226         message('No jobid provided')
0227         return NO_JOBID
0228 
0229     if not args.taskid:
0230         message('No taskid provided')
0231         return NO_TASKID
0232 
0233     if not args.jobdefinitionid:
0234         message('No jobdefinitionid provided')
0235         return NO_JOBDEFINITIONID
0236 
0237     return 0
0238 
0239 
0240 def message(msg):
0241     print(msg) if not logger else logger.info(msg)
0242 
0243 
0244 def str_to_int_list(_list):
0245     _new_list = []
0246     for x in _list:
0247         try:
0248             _x = int(x)
0249         except Exception:
0250             _x = None
0251         _new_list.append(_x)
0252     return _new_list
0253 
0254 
0255 def str_to_bool_list(_list):
0256     changes = {"True": True, "False": False, "None": None, "NULL": None}
0257     return [changes.get(x, x) for x in _list]
0258 
0259 
0260 def get_file_lists(lfns, scopes, filesizes, checksums, allowlans, allowwans, directaccesslans, directaccesswans, istars,
0261                    accessmodes, storagetokens, guids):
0262     _lfns = []
0263     _scopes = []
0264     _filesizes = []
0265     _checksums = []
0266     _allowlans = []
0267     _allowwans = []
0268     _directaccesslans = []
0269     _directaccesswans = []
0270     _istars = []
0271     _accessmodes = []
0272     _storagetokens = []
0273     _guids = []
0274     try:
0275         _lfns = lfns.split(',')
0276         _scopes = scopes.split(',')
0277         _filesizes = str_to_int_list(filesizes.split(','))
0278         _checksums = checksums.split(',')
0279         _allowlans = str_to_bool_list(allowlans.split(','))
0280         _allowwans = str_to_bool_list(allowwans.split(','))
0281         _directaccesslans = str_to_bool_list(directaccesslans.split(','))
0282         _directaccesswans = str_to_bool_list(directaccesswans.split(','))
0283         _istars = str_to_bool_list(istars.split(','))
0284         _accessmodes = accessmodes.split(',')
0285         _storagetokens = storagetokens.split(',')
0286         _guids = guids.split(',')
0287     except Exception as error:
0288         message("exception caught: %s" % error)
0289 
0290     file_list_dictionary = {'lfns': _lfns, 'scopes': _scopes, 'filesizes': _filesizes, 'checksums': _checksums,
0291                             'allowlans': _allowlans, 'allowwans': _allowwans, 'directaccesslans': _directaccesslans,
0292                             'directaccesswans': _directaccesswans, 'istars': _istars, 'accessmodes': _accessmodes,
0293                             'storagetokens': _storagetokens, 'guids': _guids}
0294     return file_list_dictionary
0295 
0296 
0297 class Job:
0298     """
0299     A minimal implementation of the Pilot Job class with data members necessary for the trace report only.
0300     """
0301 
0302     produserid = ""
0303     jobid = ""
0304     taskid = ""
0305     jobdefinitionid = ""
0306 
0307     def __init__(self, produserid="", jobid="", taskid="", jobdefinitionid=""):
0308         self.produserid = produserid.replace('%20', ' ')
0309         self.jobid = jobid
0310         self.taskid = taskid
0311         self.jobdefinitionid = jobdefinitionid
0312 
0313 
0314 def add_to_dictionary(dictionary, key, value1, value2, value3, value4):
0315     """
0316     Add key: [value1, value2, ..] to dictionary.
0317     In practice; lfn: [status, status_code, turl, DDM endpoint].
0318 
0319     :param dictionary: dictionary to be updated.
0320     :param key: lfn key to be added (string).
0321     :param value1: status to be added to list belonging to key (string).
0322     :param value2: status_code to be added to list belonging to key (string).
0323     :param value3: turl (string).
0324     :param value4: DDM endpoint (string).
0325     :return: updated dictionary.
0326     """
0327 
0328     dictionary[key] = [value1, value2, value3, value4]
0329     return dictionary
0330 
0331 
0332 def extract_error_info(err):
0333 
0334     error_code = 0
0335     error_message = ""
0336 
0337     _code = re.search(r'error code: (\d+)', err)
0338     if _code:
0339         error_code = _code.group(1)
0340 
0341     _msg = re.search('details: (.+)', err)
0342     if _msg:
0343         error_message = _msg.group(1)
0344         error_message = error_message.replace('[PilotException(', '').strip()
0345 
0346     return error_code, error_message
0347 
0348 
0349 if __name__ == '__main__':
0350     """
0351     Main function of the stage-in script.
0352     """
0353 
0354     # get the args from the arg parser
0355     args = get_args()
0356     args.debug = True
0357     args.nopilotlog = False
0358 
0359     establish_logging(debug=args.debug, nopilotlog=args.nopilotlog, filename=config.Pilot.stageinlog)
0360     logger = logging.getLogger(__name__)
0361 
0362     #ret = verify_args()
0363     #if ret:
0364     #    exit(ret)
0365 
0366     # get the file info
0367     try:
0368         replica_dictionary = read_json(os.path.join(args.workdir, args.replicadictionary))
0369     except Exception as e:
0370         message('exception caught reading json: %s' % e)
0371         exit(1)
0372 
0373 #    file_list_dictionary = get_file_lists(args.lfns, args.scopes, args.filesizes, args.checksums, args.allowlans,
0374 #                                          args.allowwans, args.directaccesslans, args.directaccesswans, args.istars,
0375 #                                          args.accessmodes, args.storagetokens, args.guids)
0376 #    lfns = file_list_dictionary.get('lfns')
0377 #    scopes = file_list_dictionary.get('scopes')
0378 #    filesizes = file_list_dictionary.get('filesizes')
0379 #    checksums = file_list_dictionary.get('checksums')
0380 #    allowlans = file_list_dictionary.get('allowlans')
0381 #    allowwans = file_list_dictionary.get('allowwans')
0382 #    directaccesslans = file_list_dictionary.get('directaccesslans')
0383 #    directaccesswans = file_list_dictionary.get('directaccesswans')
0384 #    istars = file_list_dictionary.get('istars')
0385 #    accessmodes = file_list_dictionary.get('accessmodes')
0386 #    storagetokens = file_list_dictionary.get('storagetokens')
0387 #    guids = file_list_dictionary.get('guids')
0388 
0389     # generate the trace report
0390     trace_report = TraceReport(pq=os.environ.get('PILOT_SITENAME', ''), localSite=args.localsite, remoteSite=args.remotesite, dataset="",
0391                                eventType=args.eventtype)
0392     job = Job(produserid=args.produserid, jobid=args.jobid, taskid=args.taskid, jobdefinitionid=args.jobdefinitionid)
0393     trace_report.init(job)
0394 
0395     try:
0396         infoservice = InfoService()
0397         infoservice.init(args.queuename, infosys.confinfo, infosys.extinfo)
0398         infosys.init(args.queuename)  # is this correct? otherwise infosys.queuedata doesn't get set
0399     except Exception as e:
0400         message(e)
0401 
0402     # perform stage-in (single transfers)
0403     err = ""
0404     errcode = 0
0405     if args.eventservicemerge:
0406         client = StageInESClient(infoservice, logger=logger, trace_report=trace_report)
0407         activity = 'es_events_read'
0408     else:
0409         client = StageInClient(infoservice, logger=logger, trace_report=trace_report)
0410         activity = 'pr'
0411     kwargs = dict(workdir=args.workdir, cwd=args.workdir, usecontainer=False, use_pcache=args.usepcache, use_bulk=False,
0412                   use_vp=args.usevp, input_dir=args.inputdir, catchall=args.catchall)
0413     xfiles = []
0414     for lfn in replica_dictionary:
0415         files = [{'scope': replica_dictionary[lfn]['scope'],
0416                   'lfn': lfn,
0417                   'guid': replica_dictionary[lfn]['guid'],
0418                   'workdir': args.workdir,
0419                   'filesize': replica_dictionary[lfn]['filesize'],
0420                   'checksum': replica_dictionary[lfn]['checksum'],
0421                   'allow_lan': replica_dictionary[lfn]['allowlan'],
0422                   'allow_wan': replica_dictionary[lfn]['allowwan'],
0423                   'direct_access_lan': replica_dictionary[lfn]['directaccesslan'],
0424                   'direct_access_wan': replica_dictionary[lfn]['directaccesswan'],
0425                   'is_tar': replica_dictionary[lfn]['istar'],
0426                   'accessmode': replica_dictionary[lfn]['accessmode'],
0427                   'storage_token': replica_dictionary[lfn]['storagetoken']}]
0428 
0429         # do not abbreviate the following two lines as otherwise the content of xfiles will be a list of generator objects
0430         _xfiles = [FileSpec(type='input', **f) for f in files]
0431         xfiles += _xfiles
0432 
0433     try:
0434         client.prepare_sources(xfiles)
0435         r = client.transfer(xfiles, activity=activity, **kwargs)
0436     except Exception as e:
0437         err = str(e)
0438         errcode = -1
0439         message(err)
0440 
0441     # put file statuses in a dictionary to be written to file
0442     file_dictionary = {}  # { 'error': [error_diag, -1], 'lfn1': [status, status_code], 'lfn2':.., .. }
0443     if xfiles:
0444         message('stagein script summary of transferred files:')
0445         for fspec in xfiles:
0446             add_to_dictionary(file_dictionary, fspec.lfn, fspec.status, fspec.status_code, fspec.turl, fspec.ddmendpoint)
0447             status = fspec.status if fspec.status else "(not transferred)"
0448             message(" -- lfn=%s, ddmendpoint=%s, status_code=%s, status=%s" % (fspec.lfn, fspec.ddmendpoint, fspec.status_code, status))
0449 
0450     # add error info, if any
0451     if err:
0452         errcode, err = extract_error_info(err)
0453     add_to_dictionary(file_dictionary, 'error', err, errcode, None, None)
0454     _status = write_json(os.path.join(args.workdir, config.Container.stagein_status_dictionary), file_dictionary)
0455     if err:
0456         message("containerised file transfers failed: %s" % err)
0457         exit(TRANSFER_ERROR)
0458 
0459     message("containerised file transfers finished")
0460     exit(0)