|
@ -28,7 +28,7 @@ public class LabelDataListener {
|
|
|
@KafkaListener(topics = "flumeLog1")
|
|
|
public void labelData(ConsumerRecord<?, ?> record) {
|
|
|
Long startTime = System.currentTimeMillis();
|
|
|
logger.info("Kafka开始消费");
|
|
|
logger.debug("Kafka开始消费");
|
|
|
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
|
|
|
if (kafkaMessage.isPresent()) {
|
|
|
Object message = kafkaMessage.get();
|
|
@ -41,8 +41,8 @@ public class LabelDataListener {
|
|
|
|
|
|
Long endTime = System.currentTimeMillis();
|
|
|
Long time = startTime - endTime;
|
|
|
logger.info("time(ms):" + time);
|
|
|
logger.info("保存成功 message:" + message);
|
|
|
logger.debug("time(ms):" + time);
|
|
|
logger.debug("保存成功 message:" + message);
|
|
|
} else {
|
|
|
logger.error("数据格式错误,message:" + message);
|
|
|
}
|