File indexing completed on 2026-04-10 08:39:18
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 import hashlib
0012 import socket
0013 import time
0014 from sys import exc_info
0015 from json import dumps
0016 from os import environ, getuid
0017
0018 from pilot.util.config import config
0019 from pilot.util.constants import get_pilot_version, get_rucio_client_version
0020 from pilot.util.container import execute
0021
0022
0023 import logging
0024 logger = logging.getLogger(__name__)
0025
0026
0027 class TraceReport(dict):
0028
0029 def __init__(self, *args, **kwargs):
0030
0031 event_version = "%s+%s" % (get_pilot_version(), get_rucio_client_version())
0032 defs = {
0033 'eventType': '',
0034 'eventVersion': event_version,
0035 'protocol': None,
0036 'clientState': 'INIT_REPORT',
0037 'localSite': environ.get('RUCIO_LOCAL_SITE_ID', ''),
0038 'remoteSite': '',
0039 'timeStart': None,
0040 'catStart': None,
0041 'relativeStart': None,
0042 'transferStart': None,
0043 'validateStart': None,
0044 'timeEnd': None,
0045 'dataset': '',
0046 'version': None,
0047 'duid': None,
0048 'filename': None,
0049 'guid': None,
0050 'filesize': None,
0051 'usr': None,
0052 'appid': None,
0053 'hostname': '',
0054 'ip': '',
0055 'suspicious': '0',
0056 'usrdn': '',
0057 'url': None,
0058 'stateReason': None,
0059 'uuid': None,
0060 'taskid': '',
0061 'pq': environ.get('PILOT_SITENAME', '')
0062 }
0063
0064 super(TraceReport, self).__init__(defs)
0065 self.update(dict(*args, **kwargs))
0066
0067
0068 def init(self, job):
0069 """
0070 Initialization.
0071
0072 :param job: job object.
0073 :return:
0074 """
0075
0076 data = {
0077 'clientState': 'INIT_REPORT',
0078 'usr': hashlib.md5(job.produserid.encode('utf-8')).hexdigest(),
0079 'appid': job.jobid,
0080 'usrdn': job.produserid,
0081 'taskid': job.taskid
0082 }
0083 self.update(data)
0084 self['timeStart'] = time.time()
0085
0086 try:
0087 self['hostname'] = socket.gethostbyaddr(socket.gethostname())[0]
0088 except Exception:
0089 logger.debug("unable to detect hostname for trace report")
0090
0091 try:
0092 self['ip'] = socket.gethostbyname(socket.gethostname())
0093 except Exception:
0094 logger.debug("unable to detect host IP for trace report")
0095
0096 if job.jobdefinitionid:
0097 s = 'ppilot_%s' % job.jobdefinitionid
0098 self['uuid'] = hashlib.md5(s.encode('utf-8')).hexdigest()
0099 else:
0100
0101 cmd = 'uuidgen -t 2> /dev/null'
0102 exit_code, stdout, stderr = execute(cmd)
0103 self['uuid'] = stdout.replace('-', '')
0104
0105 def get_value(self, key):
0106 """
0107
0108 """
0109
0110 return self.get(key, None)
0111
0112 def verify_trace(self):
0113 """
0114 Verify the trace consistency.
0115 Are all required fields set? Remove escape chars from stateReason if present.
0116
0117 :return: Boolean.
0118 """
0119
0120
0121 state_reason = self.get('stateReason', '')
0122 if not state_reason:
0123 state_reason = ''
0124 self.update(stateReason=state_reason.replace('\\', ''))
0125
0126
0127 localsite = environ.get('RUCIO_LOCAL_SITE_ID', '')
0128 if localsite:
0129 self['localSite'] = localsite
0130
0131 if not self['eventType'] or not self['localSite'] or not self['remoteSite']:
0132 return False
0133 else:
0134 return True
0135
0136 def send(self):
0137 """
0138 Send trace to rucio server using curl.
0139
0140 :return: Boolean.
0141 """
0142
0143
0144 if environ.get('PILOT_USE_RUCIO_TRACES', 'True') == 'False':
0145 logger.debug('rucio trace does not need to be sent')
0146 return True
0147
0148 url = config.Rucio.url
0149 logger.info("tracing server: %s" % url)
0150 logger.info("sending tracing report: %s" % str(self))
0151
0152 if not self.verify_trace():
0153 logger.warning('cannot send trace since not all fields are set')
0154 return False
0155
0156 try:
0157
0158
0159 data = dumps(self).replace('"', '\\"')
0160
0161
0162
0163 ssl_certificate = self.get_ssl_certificate()
0164
0165
0166 cmd = 'curl --connect-timeout 20 --max-time 120 --cacert %s -v -k -d \"%s\" %s' % \
0167 (ssl_certificate, data, url)
0168 exit_code, stdout, stderr = execute(cmd, mute=True)
0169 if exit_code:
0170 logger.warning('failed to send traces to rucio: %s' % stdout)
0171
0172
0173
0174
0175 except Exception:
0176
0177 logger.error('tracing failed: %s' % str(exc_info()))
0178 else:
0179 logger.info("tracing report sent")
0180
0181 return True
0182
0183 def get_ssl_certificate(self):
0184 """
0185 Return the path to the SSL certificate
0186
0187 :return: path (string).
0188 """
0189
0190 return environ.get('X509_USER_PROXY', '/tmp/x509up_u%s' % getuid())