File indexing completed on 2025-04-02 09:08:02
0001
0002
0003
0004 #pragma once
0005
0006
0007
0008
0009
0010
0011
0012
0013 #include "spdlog/async.h"
0014 #include "spdlog/details/log_msg.h"
0015 #include "spdlog/details/null_mutex.h"
0016 #include "spdlog/details/synchronous_factory.h"
0017 #include "spdlog/sinks/base_sink.h"
0018 #include <mutex>
0019 #include <spdlog/common.h>
0020
0021
0022 #include <librdkafka/rdkafkacpp.h>
0023
0024 namespace spdlog {
0025 namespace sinks {
0026
0027 struct kafka_sink_config {
0028 std::string server_addr;
0029 std::string produce_topic;
0030 int32_t flush_timeout_ms = 1000;
0031
0032 kafka_sink_config(std::string addr, std::string topic, int flush_timeout_ms = 1000)
0033 : server_addr{std::move(addr)},
0034 produce_topic{std::move(topic)},
0035 flush_timeout_ms(flush_timeout_ms) {}
0036 };
0037
0038 template <typename Mutex>
0039 class kafka_sink : public base_sink<Mutex> {
0040 public:
0041 kafka_sink(kafka_sink_config config)
0042 : config_{std::move(config)} {
0043 try {
0044 std::string errstr;
0045 conf_.reset(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
0046 RdKafka::Conf::ConfResult confRes =
0047 conf_->set("bootstrap.servers", config_.server_addr, errstr);
0048 if (confRes != RdKafka::Conf::CONF_OK) {
0049 throw_spdlog_ex(
0050 fmt_lib::format("conf set bootstrap.servers failed err:{}", errstr));
0051 }
0052
0053 tconf_.reset(RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC));
0054 if (tconf_ == nullptr) {
0055 throw_spdlog_ex(fmt_lib::format("create topic config failed"));
0056 }
0057
0058 producer_.reset(RdKafka::Producer::create(conf_.get(), errstr));
0059 if (producer_ == nullptr) {
0060 throw_spdlog_ex(fmt_lib::format("create producer failed err:{}", errstr));
0061 }
0062 topic_.reset(RdKafka::Topic::create(producer_.get(), config_.produce_topic,
0063 tconf_.get(), errstr));
0064 if (topic_ == nullptr) {
0065 throw_spdlog_ex(fmt_lib::format("create topic failed err:{}", errstr));
0066 }
0067 } catch (const std::exception &e) {
0068 throw_spdlog_ex(fmt_lib::format("error create kafka instance: {}", e.what()));
0069 }
0070 }
0071
0072 ~kafka_sink() { producer_->flush(config_.flush_timeout_ms); }
0073
0074 protected:
0075 void sink_it_(const details::log_msg &msg) override {
0076 producer_->produce(topic_.get(), 0, RdKafka::Producer::RK_MSG_COPY,
0077 (void *)msg.payload.data(), msg.payload.size(), NULL, NULL);
0078 }
0079
0080 void flush_() override { producer_->flush(config_.flush_timeout_ms); }
0081
0082 private:
0083 kafka_sink_config config_;
0084 std::unique_ptr<RdKafka::Producer> producer_ = nullptr;
0085 std::unique_ptr<RdKafka::Conf> conf_ = nullptr;
0086 std::unique_ptr<RdKafka::Conf> tconf_ = nullptr;
0087 std::unique_ptr<RdKafka::Topic> topic_ = nullptr;
0088 };
0089
0090 using kafka_sink_mt = kafka_sink<std::mutex>;
0091 using kafka_sink_st = kafka_sink<spdlog::details::null_mutex>;
0092
0093 }
0094
0095 template <typename Factory = spdlog::synchronous_factory>
0096 inline std::shared_ptr<logger> kafka_logger_mt(const std::string &logger_name,
0097 spdlog::sinks::kafka_sink_config config) {
0098 return Factory::template create<sinks::kafka_sink_mt>(logger_name, config);
0099 }
0100
0101 template <typename Factory = spdlog::synchronous_factory>
0102 inline std::shared_ptr<logger> kafka_logger_st(const std::string &logger_name,
0103 spdlog::sinks::kafka_sink_config config) {
0104 return Factory::template create<sinks::kafka_sink_st>(logger_name, config);
0105 }
0106
0107 template <typename Factory = spdlog::async_factory>
0108 inline std::shared_ptr<spdlog::logger> kafka_logger_async_mt(
0109 std::string logger_name, spdlog::sinks::kafka_sink_config config) {
0110 return Factory::template create<sinks::kafka_sink_mt>(logger_name, config);
0111 }
0112
0113 template <typename Factory = spdlog::async_factory>
0114 inline std::shared_ptr<spdlog::logger> kafka_logger_async_st(
0115 std::string logger_name, spdlog::sinks::kafka_sink_config config) {
0116 return Factory::template create<sinks::kafka_sink_st>(logger_name, config);
0117 }
0118
0119 }