Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:18

0001 # Licensed under the Apache License, Version 2.0 (the "License");
0002 # you may not use this file except in compliance with the License.
0003 # You may obtain a copy of the License at
0004 # http://www.apache.org/licenses/LICENSE-2.0
0005 #
0006 # Authors:
0007 # - Alexey Anisenkov, alexey.anisenkov@cern.ch, 2017
0008 # - Pavlo Svirin, pavlo.svirin@cern.ch, 2018
0009 # - Paul Nilsson, paul.nilsson@cern.ch, 2018
0010 
0011 import hashlib
0012 import socket
0013 import time
0014 from sys import exc_info
0015 from json import dumps  #, loads
0016 from os import environ, getuid
0017 
0018 from pilot.util.config import config
0019 from pilot.util.constants import get_pilot_version, get_rucio_client_version
0020 from pilot.util.container import execute
0021 #from pilot.util.https import request
0022 
0023 import logging
0024 logger = logging.getLogger(__name__)
0025 
0026 
0027 class TraceReport(dict):
0028 
0029     def __init__(self, *args, **kwargs):
0030 
0031         event_version = "%s+%s" % (get_pilot_version(), get_rucio_client_version())
0032         defs = {  # for reference, see Tracing report document in wiki area of Pilot GitHub repository
0033             'eventType': '',
0034             'eventVersion': event_version,  # Pilot+Rucio client version
0035             'protocol': None,               # set by specific copy tool
0036             'clientState': 'INIT_REPORT',
0037             'localSite': environ.get('RUCIO_LOCAL_SITE_ID', ''),
0038             'remoteSite': '',
0039             'timeStart': None,
0040             'catStart': None,
0041             'relativeStart': None,
0042             'transferStart': None,
0043             'validateStart': None,
0044             'timeEnd': None,
0045             'dataset': '',
0046             'version': None,
0047             'duid': None,
0048             'filename': None,
0049             'guid': None,
0050             'filesize': None,
0051             'usr': None,
0052             'appid': None,
0053             'hostname': '',
0054             'ip': '',
0055             'suspicious': '0',
0056             'usrdn': '',
0057             'url': None,
0058             'stateReason': None,
0059             'uuid': None,
0060             'taskid': '',
0061             'pq': environ.get('PILOT_SITENAME', '')
0062         }
0063 
0064         super(TraceReport, self).__init__(defs)
0065         self.update(dict(*args, **kwargs))  # apply extra input
0066 
0067     # sitename, dsname, eventType
0068     def init(self, job):
0069         """
0070         Initialization.
0071 
0072         :param job: job object.
0073         :return:
0074         """
0075 
0076         data = {
0077             'clientState': 'INIT_REPORT',
0078             'usr': hashlib.md5(job.produserid.encode('utf-8')).hexdigest(),  # anonymise user and pilot id's, Python 2/3
0079             'appid': job.jobid,
0080             'usrdn': job.produserid,
0081             'taskid': job.taskid
0082         }
0083         self.update(data)
0084         self['timeStart'] = time.time()
0085 
0086         try:
0087             self['hostname'] = socket.gethostbyaddr(socket.gethostname())[0]
0088         except Exception:
0089             logger.debug("unable to detect hostname for trace report")
0090 
0091         try:
0092             self['ip'] = socket.gethostbyname(socket.gethostname())
0093         except Exception:
0094             logger.debug("unable to detect host IP for trace report")
0095 
0096         if job.jobdefinitionid:
0097             s = 'ppilot_%s' % job.jobdefinitionid
0098             self['uuid'] = hashlib.md5(s.encode('utf-8')).hexdigest()  # hash_pilotid, Python 2/3
0099         else:
0100             #self['uuid'] = commands.getoutput('uuidgen -t 2> /dev/null').replace('-', '')  # all LFNs of one request have the same uuid
0101             cmd = 'uuidgen -t 2> /dev/null'
0102             exit_code, stdout, stderr = execute(cmd)
0103             self['uuid'] = stdout.replace('-', '')
0104 
0105     def get_value(self, key):
0106         """
0107 
0108         """
0109 
0110         return self.get(key, None)
0111 
0112     def verify_trace(self):
0113         """
0114         Verify the trace consistency.
0115         Are all required fields set? Remove escape chars from stateReason if present.
0116 
0117         :return: Boolean.
0118         """
0119 
0120         # remove any escape characters that might be present in the stateReason field
0121         state_reason = self.get('stateReason', '')
0122         if not state_reason:
0123             state_reason = ''
0124         self.update(stateReason=state_reason.replace('\\', ''))
0125 
0126         # overwrite any localSite if RUCIO_LOCAL_SITE_ID is set
0127         localsite = environ.get('RUCIO_LOCAL_SITE_ID', '')
0128         if localsite:
0129             self['localSite'] = localsite
0130 
0131         if not self['eventType'] or not self['localSite'] or not self['remoteSite']:
0132             return False
0133         else:
0134             return True
0135 
0136     def send(self):
0137         """
0138         Send trace to rucio server using curl.
0139 
0140         :return: Boolean.
0141         """
0142 
0143         # only send trace if it is actually required (can be turned off with pilot option)
0144         if environ.get('PILOT_USE_RUCIO_TRACES', 'True') == 'False':
0145             logger.debug('rucio trace does not need to be sent')
0146             return True
0147 
0148         url = config.Rucio.url
0149         logger.info("tracing server: %s" % url)
0150         logger.info("sending tracing report: %s" % str(self))
0151 
0152         if not self.verify_trace():
0153             logger.warning('cannot send trace since not all fields are set')
0154             return False
0155 
0156         try:
0157             # take care of the encoding
0158             #data = {'API': '0_3_0', 'operation': 'addReport', 'report': self}
0159             data = dumps(self).replace('"', '\\"')
0160             #loaded = loads(data)
0161             #logger.debug('self object converted to json dictionary: %s' % loaded)
0162 
0163             ssl_certificate = self.get_ssl_certificate()
0164 
0165             # create the command
0166             cmd = 'curl --connect-timeout 20 --max-time 120 --cacert %s -v -k -d \"%s\" %s' % \
0167                   (ssl_certificate, data, url)
0168             exit_code, stdout, stderr = execute(cmd, mute=True)
0169             if exit_code:
0170                 logger.warning('failed to send traces to rucio: %s' % stdout)
0171             #request(url, loaded)
0172             #if status is not None:
0173             #    logger.warning('failed to send traces to rucio: %s' % status)
0174             #    raise Exception(status)
0175         except Exception:
0176             # if something fails, log it but ignore
0177             logger.error('tracing failed: %s' % str(exc_info()))
0178         else:
0179             logger.info("tracing report sent")
0180 
0181         return True
0182 
0183     def get_ssl_certificate(self):
0184         """
0185         Return the path to the SSL certificate
0186 
0187         :return: path (string).
0188         """
0189 
0190         return environ.get('X509_USER_PROXY', '/tmp/x509up_u%s' % getuid())