|
@ -1,18 +1,14 @@
|
|
|
package com.yihu.wlyy.analysis.listener;
|
|
|
|
|
|
import com.yihu.wlyy.analysis.model.LabelDataModel;
|
|
|
import com.yihu.wlyy.analysis.model.BusinessDataModel;
|
|
|
import com.yihu.wlyy.analysis.model.OperatorDataModel;
|
|
|
import net.sf.json.JSONObject;
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.data.mongodb.core.MongoDbUtils;
|
|
|
import org.springframework.data.mongodb.core.MongoTemplate;
|
|
|
import org.springframework.kafka.annotation.KafkaListener;
|
|
|
import org.springframework.kafka.support.SendResult;
|
|
|
import org.springframework.scheduling.annotation.Scheduled;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
import org.springframework.util.concurrent.ListenableFuture;
|
|
|
import org.springframework.util.concurrent.ListenableFutureCallback;
|
|
|
|
|
|
import java.util.Optional;
|
|
|
|
|
@ -20,31 +16,33 @@ import java.util.Optional;
|
|
|
* Created by Administrator on 2017/2/6.
|
|
|
*/
|
|
|
public class LabelDataListener {
|
|
|
|
|
|
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
|
|
@Autowired
|
|
|
private MongoTemplate mongoTemplate;
|
|
|
|
|
|
private static String mongoDbTableName = "WLYY_ANALYSIS";
|
|
|
private static String mongoDb_Business_TableName = "WLYY_BUSINESS_LOG";
|
|
|
private static String mongoDb_Operator_TableName = "WLYY_OPERATOR_LOG";
|
|
|
|
|
|
//@Scheduled(cron = "0 0/1 * * * ?") //每分钟执行一次
|
|
|
@KafkaListener(topics = "flumeLog1")
|
|
|
public void labelData(ConsumerRecord<?, ?> record) {
|
|
|
Long startTime = System.currentTimeMillis();
|
|
|
logger.info("Kafka开始消费");
|
|
|
logger.debug("Kafka开始消费");
|
|
|
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
|
|
|
if (kafkaMessage.isPresent()) {
|
|
|
Object message = kafkaMessage.get();
|
|
|
try {
|
|
|
logger.info("接受到的消息:" + String.valueOf(message));
|
|
|
String[] value = String.valueOf(message).split(" - ");
|
|
|
if (value.length == 5) {
|
|
|
mongoTemplate.insert(
|
|
|
new LabelDataModel(value[0], value[1], value[2], value[3], value[4]), mongoDbTableName
|
|
|
);
|
|
|
JSONObject jsonObject = JSONObject.fromObject(message);
|
|
|
if (jsonObject.has("logType")) {
|
|
|
String logType = jsonObject.getString("logType");
|
|
|
//根据日志类别保存到mongodb
|
|
|
saveLogToMongo(logType, jsonObject);
|
|
|
|
|
|
Long endTime = System.currentTimeMillis();
|
|
|
Long time = startTime - endTime;
|
|
|
logger.info("time(ms):" + time);
|
|
|
logger.info("保存成功 message:" + message);
|
|
|
logger.debug("time(ms):" + time);
|
|
|
logger.debug("保存成功 message:" + message);
|
|
|
} else {
|
|
|
logger.error("数据格式错误,message:" + message);
|
|
|
}
|
|
@ -55,7 +53,27 @@ public class LabelDataListener {
|
|
|
logger.debug("Kafka结束消费");
|
|
|
}
|
|
|
|
|
|
private void saveLogToMongo(String logType, JSONObject jsonObject) throws Exception {
|
|
|
switch (logType) {
|
|
|
case "1": {
|
|
|
//业务日志
|
|
|
insertMongo(OperatorDataModel.getByJsonObject(jsonObject),mongoDb_Operator_TableName);
|
|
|
break;
|
|
|
}
|
|
|
case "2": {
|
|
|
//操作日志
|
|
|
insertMongo(BusinessDataModel.getByJsonObject(jsonObject),mongoDb_Business_TableName);
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
private void insertMongo(Object data, String tableName) {
|
|
|
mongoTemplate.insert(
|
|
|
data, tableName
|
|
|
);
|
|
|
}
|
|
|
// @Scheduled(fixedRate=20000)//每20秒执行一次。开始
|
|
|
// public void testTasks() {
|
|
|
// }
|