目录
- 前言
- 前置依赖
- 1. MQTT配置类
- 代码解析
- 1.1 MQTT客户端工厂
- 1.2 MQTT消息订阅适配器
- 1.3 MQTT消息发布适配器
- 1.4 消息通道
- 2. MQTT消息监听器
- 代码解析
- 2.1 消息处理逻辑
- 3. MQTT消息网关
- 代码解析
- 4. MQTT服务接口
- 代码解析
- 5. 使用说明
- 5.1 配置MQTT参数
- 5.2 实现IMqttService接口
- 5.3 发送MQTT消息
- 6. 总结
前言
本文将详细介绍如何使用Java和Spring Integration框架实现MQTT协议的对接。代码包括MQTT客户端的配置、消息的订阅与发布、以及消息的处理逻辑。
前置依赖
<!-- MQTT 依赖 -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
1. MQTT配置类
代码解析
MqttConfig类是MQTT的核心配置类,负责MQTT客户端的初始化、连接选项的设置以及消息通道的创建。
package com.ruoyi.framework.config;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import java.io.IOException;
@Configuration
public class MqttConfig {
@Value("${mqtt.broker-url}")
private String brokerUrl;
@Value("${mqtt.client-id}")
private String clientId;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{brokerUrl}); // Broker 地址
options.setAutomaticReconnect(true); // 自动重连
factory.setConnectionOptions(options);
System.out.println("Connecting to broker: " + brokerUrl + " OK.");
return factory;
}
@Bean
public MqttPahoMessageDrivenChannelAdapter mqttInbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
clientId + "-inbound", // 客户端ID(唯一)
mqttClientFactory(), // 使用工厂创建客户端
"testSub/#" // 订阅的主题
);
adapter.setOutputChannelName("mqttInputChannel"); // 关键:绑定到输入通道 消息输出通道
adapter.setQos(1); // 设置 QoS 级别
return adapter;
}
// 出站适配器(发送)
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler handler = new MqttPahoMessageHandler(
clientId + "-outbound",
mqttClientFactory()
);
handler.setAsync(true);
handler.setDefaultQos(1);
return handler;
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel(); // 使用直连通道
}
// 出站通道(发送消息)
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
}
1.1 MQTT客户端工厂
mqttClientFactory方法创建了一个MqttPahoClientFactory实例,用于配置MQTT客户端的连接选项。
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{brokerUrl}); // 设置MQTT服务端地址
options.setAutomaticReconnect(true); // 开启自动重连
factory.setConnectionOptions(options);
System.out.println("Connecting to broker: " + brokerUrl + " OK.");
return factory;
}
brokerUrl:MQTT服务端的地址,通常为tcp://<IP>:<端口>。automaticReconnect:开启自动重连功能,确保网络波动时客户端能够自动恢复连接。
1.2 MQTT消息订阅适配器
mqttInbound方法创建了一个MqttPahoMessageDrivenChannelAdapter实例,用于订阅MQTT主题并接收消息。
@Bean
public MqttPahoMessageDrivenChannelAdapter mqttInbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
clientId + "-inbound", mqttClientFactory(), "testSub/#");
adapter.setOutputChannelName("mqttInputChannel"); // 绑定到输入通道
adapter.setQos(1); // 设置QoS级别
return adapter;
}
clientId + "-inbound":客户端ID,需保证唯一性。"testSub/#":订阅的主题,#表示匹配所有子主题。
1.3 MQTT消息发布适配器
mqttOutbound方法创建了一个MqttPahoMessageHandler实例,用于发布消息到MQTT主题。
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler handler = new MqttPahoMessageHandler(
clientId + "-outbound", mqttClientFactory());
handler.setAsync(true); // 异步发送
handler.setDefaultQos(1); // 设置QoS级别
return handler;
}
clientId + "-outbound":客户端ID,需保证唯一性。mqttOutboundChannel:消息发送通道。
1.4 消息通道
mqttInputChannel和mqttOutboundChannel方法分别创建了输入和输出通道,用于消息的传递。
@Bean
public MessageChannel mqttInputChannel()js {
return new DirectChannel(); // 直连通道
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
2. MQTT消息监听器
代码解析
MqttMessageListener类负责处理从MQTT主题接收到的消息
Ⅰ
package com.ruoyi.framework.mqtt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;
@Component
public class MqttMessageListener {
@Autowired
private IMqttService mqttService;
// 处理入站消息
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
String payload = message.getPayload().toString();
Logger log = LoggerFactory.getLogger(MqttMessageListener.class);
log.info("[MQTT] 收到消息: topic={}, payload={}", topic, payload);
try {
if (topic.startsWith("heartbeat/")) { //心跳上报
mqttService.handleHeartbeat(payload);
} else if (topic.startsWith("report/")) { //数据上报
mqttService.handleReport(payload);
}
} catch (Exception e) {
log.error("[MQTT] 消息处理失败: {}", e.getMessage());
}
}
};
}
}
或Ⅱ
package com.ruoyi.framework.mqtt;
import com.alibaba.fastjson2.JSON;
import com.ruoyi.common.constant.MessageConstants;
import com.ruoyi.power.domain.protocol.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;
@Component
public class MqttMessageListener {
@Autowired
private IMqttService mqttService;
// 处理入站消息
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
String payload = message.getPayload().toString();
Loghttp://www.devze.comger log = LoggerFactory.getLogger(MqttMessageListener.class);
log.info("[MQTT] 收到消息: topic={}, payload={}", topic, payload);
try {
if (topic.startsWith("testSub/")) {
BaseMessage baseMsg = JSON.parseobject(payload, BaseMessage.class);
switch (baseMsg.getType()) {
case MessageConstants.HEART_BEAT:
HeartbeatMessage heartbeat = JSON.parseObject(payload, HeartbeatMessage.class);
mqttService.handleHeartbeat(heartbeat);
break;
case MessageConstants.REPORT, MessageConstants.GET_CURRENT_DATA:
ReportMessage report = JSON.parseObject(payload, ReportMessage.class);
mqttService.handleReport(report);
break;
case MessageConstants.ALARM:
AlarmMessage alarm = JSON.parseObject(payload, AlarmMessage.class);
js mqttService.handleAlarm(alarm);
break;
case MessageConstants.CALL_ACK:
mqttService.handleCallReadAck(baseMsg);
break;
case MessageConstants.CONTROL_ACK:
ControlMessage controlAck = JSON.parseObject(payload, ControlMessage.class);
mqttService.handleControlAck(controlAck);
break;
default:
System.err.println("Unknown message type: " + baseMsg.getType());
}
} else if (topic.startsWith("report/allpoints")) {
BaseMessage baseMsg = JSON.parseObject(payload, BaseMessage.class);
switch (baseMsg.getType()) {
// 如果没收到callAck 则代表采集器没收到callRead
case MessageConstants.CALL_ACK:
mqttService.handleCallReadAck(baseMsg);
break;
case MessageConstants.REPORT, MessageConstants.GET_CURRENT_DATA:
ReportMessage report = JSON.parseObject(payload, ReportMessage.class);
mqttService.handleReport(report);
break;
case MessageConstants.CONTROL_ACK:
ControlMessage controlAck = JSON.parseObject(payload, ControlMessage.class);
mqttService.handleControlAck(controlAck);
break;
case MessageConstants.MULTIVALUESET_ACK:
MultiValueSetMessage multvaluesetAck = JSON.parseObject(payload, MultiValueSetMessage.class);
mqttService.handleMultiValueSet(multvaluesetAck);
break;
}
}
} catch (Exception e) {
log.error("[MQTT] 消息处理失败: {}", e.getMessage());
}
}
};
}
}
2.1 消息处理逻辑
handler方法是一个MessageHandler,用于处理从mqttInputChannel接收到的消息。
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); // 获取主题
String payload = message.getPayload().toString(); // 获取消息内容
Logger log = LoggerFactory.getLogger(MqttMessageListener.class);
log.info("[MQTT] 收到消息: topic={}, payload={}", topic, payload);
try {
if (topic.startsWith("testSub/")) {
// 处理订阅主题为testSub的消息
BaseMessage baseMsg = JSON.parseObject(payload, BaseMessage.class);
switch (baseMsg.getType()) {
case MessageConstants.HEART_BEAT:
mqttService.handleHeartbeat(JSON.parseObject(payload, HeartbeatMessage.class));
break;
case MessageConstants.REPORT:
mqttService.handleReport(JSON.parseObject(payload, ReportMessage.class));
break;
// 其他消息类型的处理逻辑
}
} else if (topic.startsWith("report/allpoints")) {
// 处理订阅主题为report/allpoints的消息
}
} catch (Exception e) {
log.error("[MQTT] 消息处理失败: {}", e.getMessage());
}
}
};
}
mqtt_receivedTopic:从消息头中获取主题。payload:消息内容,通常是JSON格式的字符串。- 使用
switch语句根据消息类型调用不同的处理方法。
3. MQTT消息网关
代码解析
MqttMessageGateway接口提供了一个简单的发送消息的方法。
package com.ruoyi.framework.mqtt;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttMessageGateway {
void sendMessage(@Header(MqttHeaders.TOPIC) String topic, String payload);
}
@MessagingGateway:声明一个消息网关,defaultRequestChannel指定默认的发送通道。@Header(MqttHeaders.TOPIC):指定消息的主题。
使用示例:
@Autowired
private MqttMessageGateway mqttMessageGateway;
public void publishMessage() {
mqttMessageGateway.sendMessage("testSub/topic", "{\"key\":\"value\"}");
}
4. MQTT服务接口
代码解析
IMqttService接口定义了处理不同类型消息的方法。
package com.ruoyi.framework.mqtt;
import com.ruoyi.power.domain.protocol.*;
public interface IMqttService {
/**
* 处理心跳数据
* @param heartbeat MQTT 消息内容
*/
void handleHeartbeat(HeartbeatMessage heartbeat);
/**
* 处理上报数据
* @param report MQTT 消息内容
*/
void handleReport(ReportMessage report);
/**
* 服务器发送遥控命令到采集器
* 服务器发送遥调命令到采集器
* @param controlMessage 遥控命令
*/
void sendControl(ControlMessage controlMessage);
/**
* 处理上报仪表报警
* @param alarm 报警内容
* @return String 配置内容
*/
void handleAlarm(AlarmMessage alarm);
/**
* 下发控制命令到指定网关
* @param saleid 配电站ID
* @param gateid 网关ID
*/
php void sendCallRead(String saleid, String gateid, String startTime, String endTime);
/**
* 采集器响应召读命令(响应召读命令回复包,不代表召读时间段的数据一定存在,采集器收到召读命令后首先回复
* 此数据包,下一不再查找相应历史数据, 存在即发送,不存在不发送 )
* @param baseMsg 采集器响应召读命令(
*/
void handleCallReadAck(BaseMessage baseMsg);
/**
* 采集器发送执行结果到服务器
* @param controlAck
*/
void handleControlAck(ControlMessage controlAck);
/**
* 由服务器发布获取数据命令到采集器
* @param baseMessage
*/
void getCurrentData(BaseMessage baseMessage);
/**
*
* @param multiValueSetMessage
*/
void sendMultiValueSet(MultiValueSetMessage multiValueSetMessage);
/**
* 处理相应采集器接收到服务器的命令
* @param multiValueSetMessage
*/
void handleMultiValueSet(MultiValueSetMessage multiValueSetMessage);
}
- 每个方法对应一种消息类型的处理逻辑。
- 实现该接口的类需要提供具体的业务逻辑。
5. 使用说明
5.1 配置MQTT参数
在application.yml中配置MQTT的相关参数:
mqtt: broker-url: tcp://127.0.0.1:1883 client-id: mqtt-client-123
5.2 实现IMqttService接口
创建一个类实现IMqttService接口,并提供具体的业务逻辑。例如:
package com.ruoyi.framework.mqtt;
import com.alibaba.fastjson2.JSON;
import com.baomidou.myBATisplus.core.toolkit.ObjectUtils;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.ruoyi.common.constant.MessageConstants;
import com.ruoyi.common.constant.OperationConstants;
import com.ruoyi.common.constant.ResultConstants;
import com.ruoyi.common.utils.bean.BeanUtils;
import com.ruoyi.power.config.CustomIdGenerator;
import com.ruoyi.power.domain.*;
import com.ruoyi.power.domain.protocol.*;
import com.ruoyi.power.mapper.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.*;
@Slf4j
@Service
public class MqttServiceImpl implements IMqttService {
@Autowired
private MqttMessageGateway mqttGateway;
@Autowired
private HeartBeatMapper heartbeatMapper;
@Autowired
private GatewayInfoMapper gatewayInfoMapper;
@Autowired
private ReportMapper reportMapper;
@Autowired
private ReportMeterMapper reportMeterMapper;
@Autowired
private AlarmMapper alarmMapper;
@Autowired
private AlarmDetailMapper alarmDetailMapper;
@Autowired
private ControlMapper controlMapper;
// 处理心跳数据
@Override
public void handleHeartbeat(HeartbeatMessage heartbeat) {
try {
// 心跳存储到数据库
HeartBeat heartBeat = new HeartBeat();
heartBeat.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
heartBeat.setGateId(heartbeat.getGateid());
heartBeat.setType(heartbeat.getType());
heartBeat.setSaleId(heartbeat.getSaleid());
heartBeat.setTime(heartbeat.getTime());
heartBeat.setOperation(heartbeat.getOperation());
heartbeatMapper.insertHeartBeat(heartBeat);
log.info("[心跳数据] 存储成功: substationId={}, gatewayId={}",
heartbeat.getSaleid(), heartbeat.getGateid());
// 查询或创建网关记录
GatewayInfo gatewayInfo = gatewayInfoMapper.selectOne(Wrappers.<GatewayInfo>lambdaQuery().eq(GatewayInfo::getGateid, heartbeat.getGateid()));
if(ObjectUtils.isNull(gatewayInfo)) {
createNewGateway(heartbeat.getSaleid(), heartbeat.getGateid());
} else {
gatewayInfo.setLastHeartbeatTime(LocalDateTime.now());
gatewayInfo.setUpdateTime(LocalDateTime.now());
int updated = gatewayInfoMapper.updateGatewayInfo(gatewayInfo);
if(updated == 0) {
log.warn("心跳更新冲突 saleid:{}, gateid:{}", heartbeat.getSaleid(), heartbeat.getGateid());
}
}
// 如果网关请求心跳,响应心跳
sendHeartbeat(heartbeat.getSaleid(), heartbeat.getGateid(), heartbeat);
} catch (Exception e) {
log.error("[心跳数据] 处理失败: {}", e.getMessage());
}
}
// 创建新网关记录
private void createNewGateway(String saleid, String gateid) {
GatewayInfo newGateway = new GatewayInfo();
newGateway.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
newGateway.setSaleid(saleid);
newGateway.setGateid(gateid);
newGateway.setLastHeartbeatTime(LocalDateTime.now());
newGateway.setStatus("0");
newGateway.setCheckInterval(60L); // 默认间隔
newGateway.setCreateTime(LocalDateTime.now());
gatewayInfoMapper.insertGatewayInfo(newGateway);
}
// 下发心跳
private void sendHeartbeat(String saleid, String gateid, HeartbeatMessage heartbeat) {
String topic = String.format("report/allpoints", saleid, gateid);
heartbeat.setOperation(OperationConstants.TIME);
mqttGateway.sendMessage(topic, JSON.toJSONString(heartbeat));
log.info("[配置下发] topic={}, config={}", topic, JSON.toJSONString(heartbeat));
}
// 处理上报数据
@Override
public void handleReport(ReportMessage report) {
try {
// 存储到仪表信息表 转换为仪表信息表(meterMapper)
String reportId = createReportData(report);
// 批量存储仪表数据
List<ReportMeter> meterEntities = report.getMeter().stream()
.map(m -> {
ReportMeter entity = new ReportMeter();
entity.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
entity.setReportId(reportId);
entity.setMeterId(m.getId());
entity.setStatus(m.getStatus());
entity.setName(m.getName());
entity.setValuesJson(JSON.toJSONString(m.getValues()));
return entity;
}).toList();
for (ReportMeter meter : meterEntities) {
reportMeterMapper.insertReportMeter(meter);
}
log.info("[上报数据] 存储成功: substationId={}, gatewayId={}",
report.getSaleid(), report.getGateid());
} catch (Exception e) {
log.error("[上报数据] 处理失败: {}", e.getMessage());
}
}
// 创建新数据记录
private String createReportData(ReportMessage report) {
Report rep = new Report();
rep.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
rep.setSaleid(report.getSaleid());
rep.setGateid(report.getGateid());
rep.setTime(report.getTime());
rep.setType(report.getType());
rep.setSequence(report.getSequence());
rep.setSource(report.getSource());
rep.setCreateTime(LocalDateTime.now());
reportMapper.insert(rep);
return rep.getId();
}
// 下发控制命令
@Override
public void sendControl(ControlMessage controlMessage) {
ControlMessage message = new ControlMessage();
message.setSaleid(controlMessage.getSaleid());
message.setGateid(controlMessage.getGateid());
message.setType(controlMessage.getType());
message.setCuuid(LocalDateTime.now().toString());
message.setTime(LocalDateTime.now());
message.setMeterid(controlMessage.getMeterid());
message.setName(controlMessage.getName());
message.setFunctionid(controlMessage.getFunctionid());
message.setValue(controlMessage.getValue());
// 存储到控制记录表
createControl(controlMessjavascriptage);
String topic = String.format("report/allpoints", message);
mqttGateway.sendMessage(topic, JSON.toJSONString(message));
log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(message));
}
private void createControl(ControlMessage controlMessage) {
Control control = new Control();
control.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
control.setSaleid(controlMessage.getSaleid());
control.setGateid(controlMessage.getGateid());
control.setType(controlMessage.getType());
control.setCuuid(controlMessage.getCuuid());
control.setTime(controlMessage.getTime());
control.setMeterid(controlMessage.getMeterid());
control.setName(controlMessage.getName());
control.setFunctionid(controlMessage.getFunctionid());
control.setValue(controlMessage.getValue());
control.setResult(controlMessage.getResult());
control.setErrordesc(controlMessage.getErrordesc());
controlMapper.insertControl(control);
}
@Override
public void handleAlarm(AlarmMessage alarmMessage) {
try {
// 存储报警信息表 转换为报警信息表(alarmMapper)
String alarmId = createAlarmData(alarmMessage);
// 批量存储仪表数据
List<AlarmDetail> alarmEntities = alarmMessage.getFunction().stream()
.map(m -> {
AlarmDetail entity = new AlarmDetail();
entity.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
entity.setAlarmId(alarmId);
entity.setPtId(m.getId());
entity.setAlarmType(m.getAlarmType());
entity.setLabel(m.getLabel());
entity.setCurrentValue(m.getCurrentValue());
entity.setSettingValue(m.getSettingValue());
entity.setLevel(m.getLevel());
return entity;
}).toList();
for (AlarmDetail alarm : alarmEntities) {
alarmDetailMapper.insertAlarmDetail(alarm);
}
log.info("[上报数据] 存储成功: substationId={}, gatewayId={}",
alarmMessage.getSaleid(), alarmMessage.getGateid());
} catch (Exception e) {
log.error("[上报数据] 处理失败: {}", e.getMessage());
}
}
@Override
public void sendCallRead(String saleid, String gateid, String startTime, String endTime) {
HashMap<String, String> protocol = new HashMap<>();
protocol.put("saleid", saleid);
protocol.put("gateid", gateid);
protocol.put("type", MessageConstants.CALL_READ);
protocol.put("time", String.valueOf(LocalDateTime.now()));
protocol.put("startTime", startTime);
protocol.put("endTime", endTime);
String topic = String.format("report/allpoints", saleid, gateid);
mqttGateway.sendMessage(topic, JSON.toJSONString(protocol));
log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(protocol));
}
@Override
public void handleCallReadAck(BaseMessage baseMsg) {
Report report = new Report();
report.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
report.setSaleid(baseMsg.getSaleid());
report.setGateid(baseMsg.getGateid());
report.setTime(baseMsg.getTime());
report.setType(baseMsg.getType());
reportMapper.insert(report);
}
@Override
public void handleControlAck(ControlMessage controlAck) {
if(ResultConstants.FAILURE.equals(controlAck.getResult())) {
createControl(controlAck);
// 配置或设备问题,记录错误并报警
log.error("控制失败(不可重试): {}", controlAck.getErrordesc());
} else if(ResultConstants.SUCCESS.equals(controlAck.getResult())) {
createControl(controlAck);
log.info("控制成功: {}", controlAck.getCuuid());
}
}
@Override
public void getCurrentData(BaseMessage baseMessage) {
String topic = String.format("report/allpoints", baseMessage.getSaleid(), baseMessage.getGateid());
mqttGateway.sendMessage(topic, JSON.toJSONString(baseMessage));
log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(baseMessage));
}
@Override
public void sendMultiValueSet(MultiValueSetMessage multiValueSetMessage) {
}
@Override
public void handleMultiValueSet(MultiValueSetMessage multiValueSetMessage) {
String topic = String.format("report/allpoints", multiValueSetMessage.getSaleid(), multiValueSetMessage.getGateid());
ControlMessage controlMessage = new ControlMessage();
try {
mqttGateway.sendMessage(topic, JSON.toJSONString(multiValueSetMessage));
log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(multiValueSetMessage));
if(ResultConstants.SUCCESS.equals(multiValueSetMessage.getResult())) {
BeanUtils.copyProperties(multiValueSetMessage, controlMessage);
createControl(controlMessage);
log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(multiValueSetMessage));
} else if(ResultConstants.FAILURE.equals(multiValueSetMessage.getResult())){
BeanUtils.copyProperties(multiValueSetMessage, controlMessage);
createControl(controlMessage);
log.error("[控制失败] topic={}, command={}", topic, JSON.toJSONString(multiValueSetMessage));
}
} catch (Exception e) {
log.error("[控制命令] topic={}, command={}", topic, JSON.toJSONString(multiValueSetMessage));
}
}
private String createAlarmData(AlarmMessage alarmMessage) {
Alarm alarm = new Alarm();
alarm.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
alarm.setSaleid(alarmMessage.getSaleid());
alarm.setGateid(alarmMessage.getGateid());
alarm.setTime(alarmMessage.getTime());
alarm.setType(alarmMessage.getType());
alarm.setSequence(alarmMessage.getSequence());
alarm.setName(alarmMessage.getName());
alarm.setMeterid(alarmMessage.getMeterid());
alarm.setCreateTime(LocalDateTime.now());
alarmMapper.insert(alarm);
return alarm.getId();
}
}
5.3 发送MQTT消息
通过MqttMessageGateway发送消息:
@Autowired
private MqttMessageGateway mqttMessageGateway;
public void sendTestMessage() {
mqttMessageGateway.sendMessage("testSub/topic", "{\"key\":\"value\"}");
}
6. 总结
本文介绍了如何使用Spring Integration框架实现MQTT协议的对接,包括客户端的配置、消息的订阅与发布、以及消息的处理逻辑。通过上述代码,您可以快速实现Java与MQTT的集成,并根据业务需求扩展消息的处理逻辑。
到此这篇关于Java对接MQTT协议的文章就介绍到这了,更多相关Java对接MQTT协议内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!
加载中,请稍侯......
精彩评论