|
@ -29,18 +29,18 @@ public class LabelDataListener {
|
|
|
//@Scheduled(cron = "0 0/1 * * * ?") //每分钟执行一次
|
|
|
@KafkaListener(topics = "flumeLog1")
|
|
|
public void labelData(ConsumerRecord<?, ?> record) {
|
|
|
logger.debug("Kafka开始消费");
|
|
|
logger.info("Kafka开始消费");
|
|
|
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
|
|
|
if (kafkaMessage.isPresent()) {
|
|
|
Object message = kafkaMessage.get();
|
|
|
try {
|
|
|
logger.debug("接受到的消息:" + String.valueOf(message));
|
|
|
logger.info("接受到的消息:" + String.valueOf(message));
|
|
|
String[] value = String.valueOf(message).split(" - ");
|
|
|
if (value.length == 5) {
|
|
|
mongoTemplate.insert(
|
|
|
new LabelDataModel(value[0], value[1], value[2], value[3], value[4]), mongoDbTableName
|
|
|
);
|
|
|
logger.debug("保存成功,message:" + message);
|
|
|
logger.info("保存成功,message:" + message);
|
|
|
} else {
|
|
|
logger.error("数据格式错误,message:" + message);
|
|
|
}
|