|
@ -29,6 +29,7 @@ public class LabelDataListener {
|
|
//@Scheduled(cron = "0 0/1 * * * ?") //每分钟执行一次
|
|
//@Scheduled(cron = "0 0/1 * * * ?") //每分钟执行一次
|
|
@KafkaListener(topics = "flumeLog1")
|
|
@KafkaListener(topics = "flumeLog1")
|
|
public void labelData(ConsumerRecord<?, ?> record) {
|
|
public void labelData(ConsumerRecord<?, ?> record) {
|
|
|
|
Long startTime = System.currentTimeMillis();
|
|
logger.info("Kafka开始消费");
|
|
logger.info("Kafka开始消费");
|
|
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
|
|
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
|
|
if (kafkaMessage.isPresent()) {
|
|
if (kafkaMessage.isPresent()) {
|
|
@ -40,7 +41,10 @@ public class LabelDataListener {
|
|
mongoTemplate.insert(
|
|
mongoTemplate.insert(
|
|
new LabelDataModel(value[0], value[1], value[2], value[3], value[4]), mongoDbTableName
|
|
new LabelDataModel(value[0], value[1], value[2], value[3], value[4]), mongoDbTableName
|
|
);
|
|
);
|
|
logger.info("保存成功,message:" + message);
|
|
|
|
|
|
Long endTime = System.currentTimeMillis();
|
|
|
|
Long time = startTime - endTime;
|
|
|
|
logger.info("time(ms):" + time);
|
|
|
|
logger.info("保存成功 message:" + message);
|
|
} else {
|
|
} else {
|
|
logger.error("数据格式错误,message:" + message);
|
|
logger.error("数据格式错误,message:" + message);
|
|
}
|
|
}
|