|
@ -1,34 +1,57 @@
|
|
|
package com.yihu.wlyy.analysis.listener;
|
|
|
|
|
|
import com.yihu.wlyy.analysis.model.LabelDataModel;
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.data.mongodb.core.MongoDbUtils;
|
|
|
import org.springframework.data.mongodb.core.MongoTemplate;
|
|
|
import org.springframework.kafka.annotation.KafkaListener;
|
|
|
import org.springframework.kafka.support.SendResult;
|
|
|
import org.springframework.scheduling.annotation.Scheduled;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
import org.springframework.util.concurrent.ListenableFuture;
|
|
|
import org.springframework.util.concurrent.ListenableFutureCallback;
|
|
|
|
|
|
import java.util.Optional;
|
|
|
|
|
|
/**
|
|
|
* Created by Administrator on 2017/2/6.
|
|
|
*/
|
|
|
@Component
|
|
|
public class LabelDataListener {
|
|
|
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
|
|
@Autowired
|
|
|
private MongoTemplate mongoTemplate;
|
|
|
|
|
|
private static String mongoDbTableName = "WLYY_ANALYSIS";
|
|
|
|
|
|
//@Scheduled(cron = "0 0/1 * * * ?") //每分钟执行一次
|
|
|
@KafkaListener(topics = "flumeLog1")
|
|
|
public void labelData(ConsumerRecord<?, ?> record) {
|
|
|
logger.debug("Kafka开始消费");
|
|
|
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
|
|
|
if (kafkaMessage.isPresent()) {
|
|
|
Object message = kafkaMessage.get();
|
|
|
try {
|
|
|
System.out.println(message);
|
|
|
logger.debug("接受到的消息:" + String.valueOf(message));
|
|
|
String[] value = String.valueOf(message).split(" - ");
|
|
|
if (value.length == 5) {
|
|
|
mongoTemplate.insert(
|
|
|
new LabelDataModel(value[0], value[1], value[2], value[3], value[4]), mongoDbTableName
|
|
|
);
|
|
|
logger.debug("保存成功,message:" + message);
|
|
|
} else {
|
|
|
logger.error("数据格式错误,message:" + message);
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
logger.error("将接收到的消息保存到数据库时异常, 消息:{}, 异常:{}", message.toString(), e);
|
|
|
logger.error(e.getMessage());
|
|
|
}
|
|
|
}
|
|
|
logger.debug("Kafka结束消费");
|
|
|
}
|
|
|
|
|
|
|
|
|
// @Scheduled(fixedRate=20000)//每20秒执行一次。开始
|
|
|
// public void testTasks() {
|
|
|
// }
|