|
@ -1,7 +1,10 @@
|
|
package com.yihu.wlyy.analysis.listener;
|
|
package com.yihu.wlyy.analysis.listener;
|
|
|
|
|
|
|
|
import com.yihu.wlyy.analysis.entity.UserPortrait;
|
|
|
|
import com.yihu.wlyy.analysis.etl.LogDataTransform;
|
|
import com.yihu.wlyy.analysis.model.BusinessDataModel;
|
|
import com.yihu.wlyy.analysis.model.BusinessDataModel;
|
|
import com.yihu.wlyy.analysis.model.OperatorDataModel;
|
|
import com.yihu.wlyy.analysis.model.OperatorDataModel;
|
|
|
|
import com.yihu.wlyy.analysis.repository.UserPortraitDao;
|
|
import net.sf.json.JSONObject;
|
|
import net.sf.json.JSONObject;
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
@ -9,7 +12,9 @@ import org.slf4j.LoggerFactory;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.data.mongodb.core.MongoTemplate;
|
|
import org.springframework.data.mongodb.core.MongoTemplate;
|
|
import org.springframework.kafka.annotation.KafkaListener;
|
|
import org.springframework.kafka.annotation.KafkaListener;
|
|
|
|
import org.springframework.transaction.annotation.Transactional;
|
|
|
|
|
|
|
|
import java.util.List;
|
|
import java.util.Optional;
|
|
import java.util.Optional;
|
|
|
|
|
|
/**
|
|
/**
|
|
@ -20,12 +25,15 @@ public class LabelDataListener {
|
|
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
|
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
|
@Autowired
|
|
@Autowired
|
|
private MongoTemplate mongoTemplate;
|
|
private MongoTemplate mongoTemplate;
|
|
|
|
@Autowired
|
|
|
|
private UserPortraitDao userPortraitDao;
|
|
|
|
|
|
private static String mongoDb_Business_TableName = "WLYY_BUSINESS_LOG";
|
|
private static String mongoDb_Business_TableName = "WLYY_BUSINESS_LOG";
|
|
private static String mongoDb_Operator_TableName = "WLYY_OPERATOR_LOG";
|
|
private static String mongoDb_Operator_TableName = "WLYY_OPERATOR_LOG";
|
|
|
|
|
|
//@Scheduled(cron = "0 0/1 * * * ?") //每分钟执行一次
|
|
//@Scheduled(cron = "0 0/1 * * * ?") //每分钟执行一次
|
|
@KafkaListener(topics = "flumeLog1")
|
|
@KafkaListener(topics = "flumeLog1")
|
|
|
|
@Transactional
|
|
public void labelData(ConsumerRecord<?, ?> record) {
|
|
public void labelData(ConsumerRecord<?, ?> record) {
|
|
Long startTime = System.currentTimeMillis();
|
|
Long startTime = System.currentTimeMillis();
|
|
logger.debug("Kafka开始消费");
|
|
logger.debug("Kafka开始消费");
|
|
@ -36,9 +44,14 @@ public class LabelDataListener {
|
|
JSONObject jsonObject = JSONObject.fromObject(message);
|
|
JSONObject jsonObject = JSONObject.fromObject(message);
|
|
if (jsonObject.has("logType")) {
|
|
if (jsonObject.has("logType")) {
|
|
String logType = jsonObject.getString("logType");
|
|
String logType = jsonObject.getString("logType");
|
|
//根据日志类别保存到mongodb
|
|
|
|
|
|
// 根据日志类别保存到mongodb
|
|
saveLogToMongo(logType, jsonObject);
|
|
saveLogToMongo(logType, jsonObject);
|
|
|
|
|
|
|
|
// 日志分析
|
|
|
|
List<UserPortrait> userPortraitList = LogDataTransform.getLogTransform().transform(jsonObject);
|
|
|
|
if(userPortraitList != null && userPortraitList.size() > 0) {
|
|
|
|
// 日志分析结果保存
|
|
|
|
userPortraitDao.save(userPortraitList);
|
|
|
|
}
|
|
Long endTime = System.currentTimeMillis();
|
|
Long endTime = System.currentTimeMillis();
|
|
Long time = startTime - endTime;
|
|
Long time = startTime - endTime;
|
|
logger.debug("time(ms):" + time);
|
|
logger.debug("time(ms):" + time);
|