Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:16

0001 #do not use: #!/usr/bin/env python3
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2020
0009 import argparse
0010 import os
0011 import re
0012 
0013 from pilot.api.data import StageOutClient
0014 from pilot.common.errorcodes import ErrorCodes
0015 from pilot.common.exception import PilotException
0016 from pilot.info import InfoService, FileSpec, infosys
0017 from pilot.util.config import config
0018 from pilot.util.filehandling import establish_logging, write_json
0019 from pilot.util.tracereport import TraceReport
0020 
0021 import logging
0022 
0023 errors = ErrorCodes()
0024 
0025 # error codes
0026 GENERAL_ERROR = 1
0027 NO_QUEUENAME = 2
0028 NO_SCOPES = 3
0029 NO_LFNS = 4
0030 NO_EVENTTYPE = 5
0031 NO_LOCALSITE = 6
0032 NO_REMOTESITE = 7
0033 NO_PRODUSERID = 8
0034 NO_JOBID = 9
0035 NO_TASKID = 10
0036 NO_JOBDEFINITIONID = 11
0037 NO_DDMENDPOINTS = 12
0038 NO_DATASETS = 13
0039 NO_GUIDS = 14
0040 TRANSFER_ERROR = 15
0041 
0042 
0043 def get_args():
0044     """
0045     Return the args from the arg parser.
0046 
0047     :return: args (arg parser object).
0048     """
0049 
0050     arg_parser = argparse.ArgumentParser()
0051 
0052     arg_parser.add_argument('-d',
0053                             dest='debug',
0054                             action='store_true',
0055                             default=False,
0056                             help='Enable debug mode for logging messages')
0057     arg_parser.add_argument('-q',
0058                             dest='queuename',
0059                             required=True,
0060                             help='Queue name (e.g., AGLT2_TEST-condor')
0061     arg_parser.add_argument('-w',
0062                             dest='workdir',
0063                             required=False,
0064                             default=os.getcwd(),
0065                             help='Working directory')
0066     arg_parser.add_argument('--scopes',
0067                             dest='scopes',
0068                             required=True,
0069                             help='List of Rucio scopes (e.g., mc16_13TeV,mc16_13TeV')
0070     arg_parser.add_argument('--lfns',
0071                             dest='lfns',
0072                             required=True,
0073                             help='LFN list (e.g., filename1,filename2')
0074     arg_parser.add_argument('--eventtype',
0075                             dest='eventtype',
0076                             required=True,
0077                             help='Event type')
0078     arg_parser.add_argument('--ddmendpoints',
0079                             dest='ddmendpoints',
0080                             required=True,
0081                             help='DDM endpoint')
0082     arg_parser.add_argument('--datasets',
0083                             dest='datasets',
0084                             required=True,
0085                             help='Dataset')
0086     arg_parser.add_argument('--guids',
0087                             dest='guids',
0088                             required=True,
0089                             help='GUIDs')
0090     arg_parser.add_argument('--localsite',
0091                             dest='localsite',
0092                             required=True,
0093                             help='Local site')
0094     arg_parser.add_argument('--remotesite',
0095                             dest='remotesite',
0096                             required=True,
0097                             help='Remote site')
0098     arg_parser.add_argument('--produserid',
0099                             dest='produserid',
0100                             required=True,
0101                             help='produserid')
0102     arg_parser.add_argument('--jobid',
0103                             dest='jobid',
0104                             required=True,
0105                             help='PanDA job id')
0106     arg_parser.add_argument('--taskid',
0107                             dest='taskid',
0108                             required=True,
0109                             help='PanDA task id')
0110     arg_parser.add_argument('--jobdefinitionid',
0111                             dest='jobdefinitionid',
0112                             required=True,
0113                             help='Job definition id')
0114     arg_parser.add_argument('--eventservicemerge',
0115                             dest='eventservicemerge',
0116                             type=str2bool,
0117                             default=False,
0118                             help='Event service merge boolean')
0119     arg_parser.add_argument('--usepcache',
0120                             dest='usepcache',
0121                             type=str2bool,
0122                             default=False,
0123                             help='pcache boolean from queuedata')
0124     arg_parser.add_argument('--no-pilot-log',
0125                             dest='nopilotlog',
0126                             action='store_true',
0127                             default=False,
0128                             help='Do not write the pilot log to file')
0129     arg_parser.add_argument('--outputdir',
0130                             dest='outputdir',
0131                             required=False,
0132                             default='',
0133                             help='Output files directory')
0134     arg_parser.add_argument('--catchall',
0135                             dest='catchall',
0136                             required=False,
0137                             default='',
0138                             help='PQ catchall field')
0139 
0140     return arg_parser.parse_args()
0141 
0142 
0143 def str2bool(v):
0144     """ Helper function to convert string to bool """
0145 
0146     if isinstance(v, bool):
0147         return v
0148     if v.lower() in ('yes', 'true', 't', 'y', '1'):
0149         return True
0150     elif v.lower() in ('no', 'false', 'f', 'n', '0'):
0151         return False
0152     else:
0153         raise argparse.ArgumentTypeError('Boolean value expected.')
0154 
0155 
0156 def verify_args():
0157     """
0158     Make sure required arguments are set, and if they are not then set them.
0159     (deprecated)
0160     :return:
0161     """
0162     if not args.workdir:
0163         args.workdir = os.getcwd()
0164 
0165     if not args.queuename:
0166         message('queue name not set, cannot initialize InfoService')
0167         return NO_QUEUENAME
0168 
0169     if not args.scopes:
0170         message('scopes not set')
0171         return NO_SCOPES
0172 
0173     if not args.lfns:
0174         message('LFNs not set')
0175         return NO_LFNS
0176 
0177     if not args.eventtype:
0178         message('No event type provided')
0179         return NO_EVENTTYPE
0180 
0181     if not args.localsite:
0182         message('No local site provided')
0183         return NO_LOCALSITE
0184 
0185     if not args.remotesite:
0186         message('No remote site provided')
0187         return NO_REMOTESITE
0188 
0189     if not args.produserid:
0190         message('No produserid provided')
0191         return NO_PRODUSERID
0192 
0193     if not args.jobid:
0194         message('No jobid provided')
0195         return NO_JOBID
0196 
0197     if not args.ddmendpoints:
0198         message('No ddmendpoint provided')
0199         return NO_DDMENDPOINTS
0200 
0201     if not args.datasets:
0202         message('No dataset provided')
0203         return NO_DATASETS
0204 
0205     if not args.guids:
0206         message('No GUIDs provided')
0207         return NO_GUIDS
0208 
0209     if not args.taskid:
0210         message('No taskid provided')
0211         return NO_TASKID
0212 
0213     if not args.jobdefinitionid:
0214         message('No jobdefinitionid provided')
0215         return NO_JOBDEFINITIONID
0216 
0217     return 0
0218 
0219 
0220 def message(msg):
0221     print(msg) if not logger else logger.info(msg)
0222 
0223 
0224 def get_file_lists(lfns, scopes, ddmendpoints, datasets, guids):
0225     return lfns.split(','), scopes.split(','), ddmendpoints.split(','), datasets.split(','), guids.split(',')
0226 
0227 
0228 class Job:
0229     """
0230     A minimal implementation of the Pilot Job class with data members necessary for the trace report only.
0231     """
0232 
0233     produserid = ""
0234     jobid = ""
0235     taskid = ""
0236     jobdefinitionid = ""
0237 
0238     def __init__(self, produserid="", jobid="", taskid="", jobdefinitionid=""):
0239         self.produserid = produserid.replace('%20', ' ')
0240         self.jobid = jobid
0241         self.taskid = taskid
0242         self.jobdefinitionid = jobdefinitionid
0243 
0244 
0245 def add_to_dictionary(dictionary, key, value1, value2, value3, value4, value5, value6):
0246     """
0247     Add key: [value1, value2, value3, value4, value5, value6] to dictionary.
0248     In practice; lfn: [status, status_code, surl, turl, checksum, fsize].
0249 
0250     :param dictionary: dictionary to be updated.
0251     :param key: lfn key to be added (string).
0252     :param value1: status to be added to list belonging to key (string).
0253     :param value2: status_code to be added to list belonging to key (string).
0254     :param value3: surl to be added to list belonging to key (string).
0255     :param value4: turl to be added to list belonging to key (string).
0256     :param value5: checksum to be added to list belonging to key (string).
0257     :param value6: fsize to be added to list belonging to key (string).
0258     :return: updated dictionary.
0259     """
0260 
0261     dictionary[key] = [value1, value2, value3, value4, value5, value6]
0262     return dictionary
0263 
0264 
0265 def extract_error_info(err):
0266 
0267     error_code = 0
0268     error_message = ""
0269 
0270     _code = re.search(r'error code: (\d+)', err)
0271     if _code:
0272         error_code = _code.group(1)
0273 
0274     _msg = re.search('details: (.+)', err)
0275     if _msg:
0276         error_message = _msg.group(1)
0277         error_message = error_message.replace('[PilotException(', '').strip()
0278 
0279     return error_code, error_message
0280 
0281 
0282 if __name__ == '__main__':
0283     """
0284     Main function of the stage-in script.
0285     """
0286 
0287     # get the args from the arg parser
0288     args = get_args()
0289     args.debug = True
0290     args.nopilotlog = False
0291 
0292     establish_logging(debug=args.debug, nopilotlog=args.nopilotlog, filename=config.Pilot.stageoutlog)
0293     logger = logging.getLogger(__name__)
0294 
0295     #ret = verify_args()
0296     #if ret:
0297     #    exit(ret)
0298 
0299     # get the file info
0300     lfns, scopes, ddmendpoints, datasets, guids = get_file_lists(args.lfns, args.scopes, args.ddmendpoints, args.datasets, args.guids)
0301     if len(lfns) != len(scopes) or len(lfns) != len(ddmendpoints) or len(lfns) != len(datasets) or len(lfns) != len(guids):
0302         message('file lists not same length: len(lfns)=%d, len(scopes)=%d, len(ddmendpoints)=%d, len(datasets)=%d, len(guids)=%d' %
0303                 (len(lfns), len(scopes), len(ddmendpoints), len(datasets), len(guids)))
0304 
0305     # generate the trace report
0306     trace_report = TraceReport(pq=os.environ.get('PILOT_SITENAME', ''), localSite=args.localsite,
0307                                remoteSite=args.remotesite, dataset="", eventType=args.eventtype)
0308     job = Job(produserid=args.produserid, jobid=args.jobid, taskid=args.taskid, jobdefinitionid=args.jobdefinitionid)
0309     trace_report.init(job)
0310 
0311     try:
0312         infoservice = InfoService()
0313         infoservice.init(args.queuename, infosys.confinfo, infosys.extinfo)
0314         infosys.init(args.queuename)  # is this correct? otherwise infosys.queuedata doesn't get set
0315     except Exception as e:
0316         message(e)
0317 
0318     # perform stage-out (single transfers)
0319     err = ""
0320     errcode = 0
0321     xfiles = None
0322     activity = 'pw'
0323 
0324     client = StageOutClient(infoservice, logger=logger, trace_report=trace_report)
0325     kwargs = dict(workdir=args.workdir, cwd=args.workdir, usecontainer=False, job=job, output_dir=args.outputdir,
0326                   catchall=args.catchall)  # , mode='stage-out')
0327 
0328     xfiles = []
0329     for lfn, scope, dataset, ddmendpoint, guid in list(zip(lfns, scopes, datasets, ddmendpoints, guids)):
0330         files = [{'scope': scope, 'lfn': lfn, 'workdir': args.workdir, 'dataset': dataset, 'ddmendpoint': ddmendpoint,
0331                   'ddmendpoint_alt': None}]
0332         # do not abbreviate the following two lines as otherwise the content of xfiles will be a list of generator objects
0333         _xfiles = [FileSpec(type='output', **f) for f in files]
0334         xfiles += _xfiles
0335 
0336         # prod analy unification: use destination preferences from PanDA server for unified queues
0337         if infoservice.queuedata.type != 'unified':
0338             client.prepare_destinations(xfiles,
0339                                         activity)  ## FIX ME LATER: split activities: for astorages and for copytools (to unify with ES workflow)
0340 
0341     try:
0342         r = client.transfer(xfiles, activity=activity, **kwargs)
0343     except PilotException as error:
0344         import traceback
0345         error_msg = traceback.format_exc()
0346         logger.error(error_msg)
0347         err = errors.format_diagnostics(error.get_error_code(), error_msg)
0348     except Exception as error:
0349         err = str(error)
0350         errcode = -1
0351         message(err)
0352 
0353 #    for lfn, scope, dataset, ddmendpoint, guid in list(zip(lfns, scopes, datasets, ddmendpoints, guids)):
0354 #        try:
0355 #            files = [{'scope': scope, 'lfn': lfn, 'workdir': args.workdir, 'dataset': dataset, 'ddmendpoint': ddmendpoint, 'ddmendpoint_alt': None}]
0356 #            xfiles = [FileSpec(type='output', **f) for f in files]
0357 #
0358 #            # prod analy unification: use destination preferences from PanDA server for unified queues
0359 #            if infoservice.queuedata.type != 'unified':
0360 #                client.prepare_destinations(xfiles,
0361 #                                            activity)  ## FIX ME LATER: split activities: for astorages and for copytools (to unify with ES workflow)
0362 #
0363 #            r = client.transfer(xfiles, activity=activity, **kwargs)
0364 #        except PilotException as error:
0365 #            import traceback
0366 #            error_msg = traceback.format_exc()
0367 #            logger.error(error_msg)
0368 #            err = errors.format_diagnostics(error.get_error_code(), error_msg)
0369 #        except Exception as error:
0370 #            err = str(error)
0371 #            errcode = -1
0372 #            message(err)
0373 
0374     # put file statuses in a dictionary to be written to file
0375     file_dictionary = {}  # { 'error': [error_diag, -1], 'lfn1': [status, status_code], 'lfn2':.., .. }
0376     if xfiles:
0377         message('stageout script summary of transferred files:')
0378         for fspec in xfiles:
0379             add_to_dictionary(file_dictionary, fspec.lfn, fspec.status, fspec.status_code,
0380                               fspec.surl, fspec.turl, fspec.checksum.get('adler32'), fspec.filesize)
0381             status = fspec.status if fspec.status else "(not transferred)"
0382             message(" -- lfn=%s, status_code=%s, status=%s, surl=%s, turl=%s, checksum=%s, filesize=%s" %
0383                     (fspec.lfn, fspec.status_code, status, fspec.surl, fspec.turl, fspec.checksum.get('adler32'), fspec.filesize))
0384 
0385     # add error info, if any
0386     if err:
0387         errcode, err = extract_error_info(err)
0388     add_to_dictionary(file_dictionary, 'error', err, errcode, None, None, None, None)
0389     path = os.path.join(args.workdir, config.Container.stageout_status_dictionary)
0390     if os.path.exists(path):
0391         path += '.log'
0392     _status = write_json(path, file_dictionary)
0393     if err:
0394         message("containerised file transfers failed: %s" % err)
0395         exit(TRANSFER_ERROR)
0396 
0397     message("wrote %s" % path)
0398     message("containerised file transfers finished")
0399     exit(0)