File indexing completed on 2026-04-09 07:58:22
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 import argparse
0013 import logging
0014
0015 import os
0016 import re
0017 import json
0018
0019 import configparser
0020
0021
0022 is_unicode_defined = True
0023 try:
0024 _ = unicode('test')
0025 except NameError:
0026 is_unicode_defined = False
0027
0028
0029 def is_string(value):
0030 if is_unicode_defined and type(value) in [str, unicode] or type(value) in [str]:
0031 return True
0032 else:
0033 return False
0034
0035
0036 def as_parse_env(dct):
0037 for key in dct:
0038 value = dct[key]
0039 if is_string(value) and '$' in value:
0040 env_matches = re.findall('\$\{*([^\}]+)\}*', value)
0041 for env_name in env_matches:
0042 if env_name not in os.environ:
0043 print("WARN: %s is defined in configmap but is not defined in environments" % env_name)
0044 else:
0045 env_name1 = r'${%s}' % env_name
0046 env_name2 = r'$%s' % env_name
0047 value = value.replace(env_name1, os.environ.get(env_name)).replace(env_name2, os.environ.get(env_name))
0048 dct[key] = value
0049 return dct
0050
0051
0052 def convert_section_json(data_conf):
0053 for section in data_conf:
0054 for item in data_conf[section]:
0055 if type(data_conf[section][item]) in [list, tuple, dict]:
0056 data_conf[section][item] = json.dumps(data_conf[section][item])
0057 return data_conf
0058
0059
0060 def merge_configs(source_file_path, dest_file_path):
0061 """
0062 Merge configuration file.
0063 """
0064
0065 if source_file_path and dest_file_path:
0066 if os.path.exists(source_file_path) and os.path.exists(dest_file_path):
0067 with open(source_file_path, 'r') as f:
0068 data = json.load(f, object_hook=as_parse_env)
0069 if dest_file_path in data:
0070 data_conf = data[dest_file_path]
0071 data_conf = convert_section_json(data_conf)
0072 parser = configparser.ConfigParser()
0073 parser.read(dest_file_path)
0074 parser.read_dict(data_conf)
0075 with open(dest_file_path, 'w') as dest_file:
0076 parser.write(dest_file)
0077
0078
0079 def create_oidc_token():
0080 if 'PANDA_AUTH_ID_TOKEN' in os.environ:
0081 config_root = os.environ.get('PANDA_CONFIG_ROOT', '/tmp')
0082 token_file = os.path.join(config_root, '.token')
0083 token = {"id_token": os.environ.get('PANDA_AUTH_ID_TOKEN', ""),
0084 "token_type": "Bearer"}
0085
0086 with open(token_file, 'w') as f:
0087 f.write(json.dumps(token))
0088
0089
0090 logging.getLogger().setLevel(logging.INFO)
0091 parser = argparse.ArgumentParser(description="Merge configuration file from configmap")
0092 parser.add_argument('-s', '--source', default=None, help='Source config file path (in .json format)')
0093 parser.add_argument('-d', '--destination', default=None, help='Destination file path')
0094 parser.add_argument('-c', '--create_oidc_token', default=False, action='store_true', help='Create the oidc token based on environment')
0095 args = parser.parse_args()
0096
0097 merge_configs(args.source, args.destination)
0098 if args.create_oidc_token:
0099 create_oidc_token()