|  | @ -0,0 +1,195 @@
 | 
	
		
			
				|  |  | package com.yihu.jw.care.util;
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  | import com.yihu.jw.care.dao.device.BaseMqttCallbackRecordDao;
 | 
	
		
			
				|  |  | import com.yihu.jw.entity.care.device.BaseMqttCallbackRecordDO;
 | 
	
		
			
				|  |  | import org.apache.commons.lang3.StringUtils;
 | 
	
		
			
				|  |  | import org.eclipse.paho.client.mqttv3.*;
 | 
	
		
			
				|  |  | import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 | 
	
		
			
				|  |  | import org.slf4j.Logger;
 | 
	
		
			
				|  |  | import org.slf4j.LoggerFactory;
 | 
	
		
			
				|  |  | import org.springframework.beans.factory.annotation.Autowired;
 | 
	
		
			
				|  |  | import org.springframework.jdbc.core.JdbcTemplate;
 | 
	
		
			
				|  |  | import org.springframework.stereotype.Component;
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  | import java.util.concurrent.ScheduledExecutorService;
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  | /**
 | 
	
		
			
				|  |  |  * 凯旋街道视频
 | 
	
		
			
				|  |  |  * Created by yeshijie on 2023/1/12.
 | 
	
		
			
				|  |  |  */
 | 
	
		
			
				|  |  | @Component
 | 
	
		
			
				|  |  | public class MQTTKXClientUtil {
 | 
	
		
			
				|  |  |     private static Logger logger = LoggerFactory.getLogger(MQTTKXClientUtil.class);
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     public static final String HOST = "tcp://121.40.135.79";
 | 
	
		
			
				|  |  |     public static final String deviceId = "18202a08724e5573";
 | 
	
		
			
				|  |  |     public static final Integer qos = 1;
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     @Autowired
 | 
	
		
			
				|  |  |     private JdbcTemplate jdbcTemplate;
 | 
	
		
			
				|  |  |     @Autowired
 | 
	
		
			
				|  |  |     private BaseMqttCallbackRecordDao mqttCallbackRecordDao;
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  | /*    	1、心跳保活,设置为30s:
 | 
	
		
			
				|  |  |     mosquitto_pub -h 121.40.135.79 -t "device/18202a08724e5573/cmd/ipcam_keep_alive" -u admin -P admin -m "30"
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |             2、订阅:
 | 
	
		
			
				|  |  |     mosquitto_sub -h 121.40.135.79 -t "device/18202a08724e5573/response/+"
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |             3、开启视频推流:
 | 
	
		
			
				|  |  |     mosquitto_pub -h 121.40.135.79 -t "device/18202a08724e5573/cmd/ipcam_video_start" -u admin -P admin -m "1"
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |             6、解析步骤2的payload,获取直播SourceURL:
 | 
	
		
			
				|  |  |     如:http://121.40.135.79/live/18202a08724e5573_1673488105807/hls.m3u8
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |             5、关闭视频推流:
 | 
	
		
			
				|  |  |     mosquitto_pub -h 121.40.135.79 -t "device/18202a08724e5573/cmd/ipcam_video_start" -u admin -P admin -m "0"*/
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     public static final String ipcam_keep_alive = "device/{deviceId}/cmd/ipcam_keep_alive";
 | 
	
		
			
				|  |  |     public static final String TOPIC = "device/{deviceId}/response/+";
 | 
	
		
			
				|  |  |     public static final String ipcam_video_start = "device/{deviceId}/cmd/ipcam_video_start";
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     private static final String clientid = String.valueOf(System.currentTimeMillis());;
 | 
	
		
			
				|  |  |     private MqttClient client;
 | 
	
		
			
				|  |  |     private MqttConnectOptions options;
 | 
	
		
			
				|  |  |     private String userName = "admin";
 | 
	
		
			
				|  |  |     private String passWord = "admin";
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     private ScheduledExecutorService scheduler;
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     public void start() {
 | 
	
		
			
				|  |  |         logger.info("启动监听mqtt-start");
 | 
	
		
			
				|  |  |         try {
 | 
	
		
			
				|  |  |             // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
 | 
	
		
			
				|  |  |             client = new MqttClient(HOST, clientid, new MemoryPersistence());
 | 
	
		
			
				|  |  |             // MQTT的连接设置
 | 
	
		
			
				|  |  |             options = new MqttConnectOptions();
 | 
	
		
			
				|  |  |             // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
 | 
	
		
			
				|  |  |             options.setCleanSession(true);
 | 
	
		
			
				|  |  |             // 设置连接的用户名
 | 
	
		
			
				|  |  |             options.setUserName(userName);
 | 
	
		
			
				|  |  |             // 设置连接的密码
 | 
	
		
			
				|  |  |             options.setPassword(passWord.toCharArray());
 | 
	
		
			
				|  |  |             // 设置超时时间 单位为秒
 | 
	
		
			
				|  |  |             options.setConnectionTimeout(10);
 | 
	
		
			
				|  |  |             options.setAutomaticReconnect(true);//设置自动重连
 | 
	
		
			
				|  |  |             // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
 | 
	
		
			
				|  |  |             options.setKeepAliveInterval(20);
 | 
	
		
			
				|  |  |             // 设置回调
 | 
	
		
			
				|  |  |             client.setCallback(new MqttCallback(){
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                 @Override
 | 
	
		
			
				|  |  |                 public void connectionLost(Throwable throwable) {
 | 
	
		
			
				|  |  |                     // 连接丢失后,一般在这里面进行重连
 | 
	
		
			
				|  |  |                     throwable.printStackTrace();
 | 
	
		
			
				|  |  |                     logger.info("连接断开,可以做重连");
 | 
	
		
			
				|  |  |                 }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                 @Override
 | 
	
		
			
				|  |  |                 public void messageArrived(String topic, MqttMessage message) throws Exception {
 | 
	
		
			
				|  |  |                     // subscribe后得到的消息会执行到这里面
 | 
	
		
			
				|  |  |                     BaseMqttCallbackRecordDO mqttCallbackRecordDO = new BaseMqttCallbackRecordDO();
 | 
	
		
			
				|  |  |                     if(StringUtils.isNotBlank(topic)){
 | 
	
		
			
				|  |  |                         String[] deviceId = topic.split("//");
 | 
	
		
			
				|  |  |                         if(deviceId.length>1){
 | 
	
		
			
				|  |  |                             mqttCallbackRecordDO.setDeviceId(deviceId[1]);
 | 
	
		
			
				|  |  |                         }
 | 
	
		
			
				|  |  |                     }
 | 
	
		
			
				|  |  |                     mqttCallbackRecordDO.setTopic(topic);
 | 
	
		
			
				|  |  |                     mqttCallbackRecordDO.setPayload(new String(message.getPayload()));
 | 
	
		
			
				|  |  |                     mqttCallbackRecordDao.save(mqttCallbackRecordDO);
 | 
	
		
			
				|  |  |                     logger.info("接收消息主题 : " + topic+","+"接收消息内容 : " + new String(message.getPayload()));
 | 
	
		
			
				|  |  |                 }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                 @Override
 | 
	
		
			
				|  |  |                 public void deliveryComplete(IMqttDeliveryToken token) {
 | 
	
		
			
				|  |  |                     logger.info("deliveryComplete---------" + token.isComplete());
 | 
	
		
			
				|  |  |                 }
 | 
	
		
			
				|  |  |             });
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |             client.connect(options);
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         } catch (Exception e) {
 | 
	
		
			
				|  |  |             e.printStackTrace();
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  |         logger.info("启动监听mqtt-end");
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     public MqttClient getClient(){
 | 
	
		
			
				|  |  |         return client;
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     /**
 | 
	
		
			
				|  |  |      * 开启关闭直播
 | 
	
		
			
				|  |  |      */
 | 
	
		
			
				|  |  |     public void ipcam_video_start(String open,String deviceId) throws Exception{
 | 
	
		
			
				|  |  |         String payload = open;//
 | 
	
		
			
				|  |  |         MqttMessage message = new MqttMessage(payload.getBytes("UTF-8"));
 | 
	
		
			
				|  |  |         // 设置消息的服务质量
 | 
	
		
			
				|  |  |         message.setQos(qos);
 | 
	
		
			
				|  |  |         message.setRetained(true);
 | 
	
		
			
				|  |  |         // 发布消息
 | 
	
		
			
				|  |  |         String topic = ipcam_video_start.replace("{deviceId}",deviceId);
 | 
	
		
			
				|  |  |         logger.info(topic);
 | 
	
		
			
				|  |  |         getClient().publish(topic, message);
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     /**
 | 
	
		
			
				|  |  |      * 心跳
 | 
	
		
			
				|  |  |      */
 | 
	
		
			
				|  |  |     public void heart(String deviceId) throws Exception{
 | 
	
		
			
				|  |  |         String payload = "30";//
 | 
	
		
			
				|  |  |         MqttMessage message = new MqttMessage(payload.getBytes("UTF-8"));
 | 
	
		
			
				|  |  |         // 设置消息的服务质量
 | 
	
		
			
				|  |  |         message.setQos(qos);
 | 
	
		
			
				|  |  |         message.setRetained(true);
 | 
	
		
			
				|  |  |         // 发布消息
 | 
	
		
			
				|  |  |         String topic = ipcam_keep_alive.replace("{deviceId}",deviceId);
 | 
	
		
			
				|  |  |         logger.info(topic);
 | 
	
		
			
				|  |  |         getClient().publish(topic, message);
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     /**
 | 
	
		
			
				|  |  |      * 订阅消息
 | 
	
		
			
				|  |  |      */
 | 
	
		
			
				|  |  |     public void subscribe(String deviceId) throws Exception{
 | 
	
		
			
				|  |  |         int[] Qos  = {qos};
 | 
	
		
			
				|  |  |         String topic = TOPIC.replace("{deviceId}",deviceId);
 | 
	
		
			
				|  |  |         logger.info(topic);
 | 
	
		
			
				|  |  |         String[] topic1 = {topic};
 | 
	
		
			
				|  |  |         getClient().subscribe(topic1, Qos);
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  | //    public static void main(String[] args) throws Exception {
 | 
	
		
			
				|  |  | //        try {
 | 
	
		
			
				|  |  | //            MQTTKXClientUtil mqttkxClientUtil = new MQTTKXClientUtil();
 | 
	
		
			
				|  |  | //            mqttkxClientUtil.start();
 | 
	
		
			
				|  |  | //            logger.info("启动监听mqtt");
 | 
	
		
			
				|  |  | //            // 心跳
 | 
	
		
			
				|  |  | //            heart(client,deviceId);
 | 
	
		
			
				|  |  | //
 | 
	
		
			
				|  |  | //            //订阅消息
 | 
	
		
			
				|  |  | //            subscribe(client,deviceId);
 | 
	
		
			
				|  |  | //
 | 
	
		
			
				|  |  | //            Scanner scanner = new Scanner(System.in);
 | 
	
		
			
				|  |  | //            System.out.println("请输入1-3数字");
 | 
	
		
			
				|  |  | //            String username = scanner.nextLine();
 | 
	
		
			
				|  |  | //            if("1".equals(username)){
 | 
	
		
			
				|  |  | //                //开启直播
 | 
	
		
			
				|  |  | //                logger.info("开启直播");
 | 
	
		
			
				|  |  | //                ipcam_video_start(client,"1",deviceId);
 | 
	
		
			
				|  |  | //            }
 | 
	
		
			
				|  |  | //            System.out.println("请输入密码");
 | 
	
		
			
				|  |  | //            String password = scanner.nextLine();
 | 
	
		
			
				|  |  | //            if("2".equals(password)){
 | 
	
		
			
				|  |  | //                //关闭直播
 | 
	
		
			
				|  |  | //                logger.info("关闭直播");
 | 
	
		
			
				|  |  | //                ipcam_video_start(client,"0",deviceId);
 | 
	
		
			
				|  |  | //            }
 | 
	
		
			
				|  |  | //        }catch (Exception e){
 | 
	
		
			
				|  |  | //            e.printStackTrace();
 | 
	
		
			
				|  |  | //        }
 | 
	
		
			
				|  |  | //    }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  | }
 |