bonus-edge-proxy/src/mqtt/mqtt_client.cpp

85 lines
2.8 KiB
C++
Raw Normal View History

2025-10-14 18:10:15 +08:00
// 文件名: mqtt_client.cpp
#include "mqtt_client.h"
#include "spdlog/spdlog.h"
MqttClient::MqttClient(const std::string& server_uri, const std::string& client_id)
: client_(server_uri, client_id) {
2025-10-14 18:10:15 +08:00
conn_opts_.set_clean_session(true);
conn_opts_.set_automatic_reconnect(true);
2025-10-14 18:10:15 +08:00
// <<< MODIFIED: 修改连接成功的回调逻辑 >>>
// 这个回调现在会先打印日志,然后检查并调用外部设置的回调函数
client_.set_connected_handler([this](const std::string& cause) {
spdlog::info("MQTT client connected: {}", cause);
2025-10-14 18:10:15 +08:00
// 如果外部设置了回调函数,就调用它
if (m_connected_handler) {
m_connected_handler(cause);
}
});
client_.set_connection_lost_handler([](const std::string& cause) {
spdlog::error("MQTT connection lost: {}", cause);
});
spdlog::info("MQTT client created for broker at {}", server_uri);
}
void MqttClient::connect() {
try {
spdlog::info("Connecting to MQTT broker...");
client_.connect(conn_opts_)->wait();
} catch (const mqtt::exception& exc) {
spdlog::error("Failed to connect to MQTT broker: {}", exc.what());
2025-10-14 18:10:15 +08:00
throw;
}
}
void MqttClient::disconnect() {
try {
spdlog::info("Disconnecting from MQTT broker...");
client_.disconnect()->wait();
} catch (const mqtt::exception& exc) {
spdlog::error("Error disconnecting from MQTT broker: {}", exc.what());
}
}
void MqttClient::publish(const std::string& topic, const std::string& payload, int qos, bool retained) {
if (!client_.is_connected()) {
spdlog::warn("MQTT client is not connected. Cannot publish message.");
return;
}
try {
auto msg = mqtt::make_message(topic, payload, qos, retained);
2025-10-14 18:10:15 +08:00
client_.publish(msg)->wait_for(std::chrono::seconds(2));
spdlog::debug("Published message to topic '{}': {}", topic, payload);
} catch (const mqtt::exception& exc) {
spdlog::error("Failed to publish to topic '{}': {}", topic, exc.what());
}
}
void MqttClient::subscribe(const std::string& topic, int qos) {
if (!client_.is_connected()) {
spdlog::warn("MQTT client is not connected. Cannot subscribe to topic.");
return;
}
try {
spdlog::info("Subscribing to topic '{}' with QoS {}", topic, qos);
client_.subscribe(topic, qos)->wait();
} catch (const mqtt::exception& exc) {
spdlog::error("Failed to subscribe to topic '{}': {}", topic, exc.what());
}
}
void MqttClient::set_message_callback(message_callback cb) {
client_.set_message_callback(std::move(cb));
2025-10-14 18:10:15 +08:00
}
// <<< MODIFIED: 新增方法的实现 >>>
void MqttClient::set_connected_handler(connection_handler cb) {
m_connected_handler = std::move(cb);
}
bool MqttClient::is_connected() const {
return client_.is_connected();
}