|
@ -1,397 +0,0 @@
|
|
|
package com.yihu.hos.broker.services;
|
|
|
|
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
import com.mongodb.*;
|
|
|
import com.yihu.hos.broker.common.constants.MonitorConstant;
|
|
|
import com.yihu.hos.broker.daos.BrokerDao;
|
|
|
import com.yihu.hos.core.datatype.DateUtil;
|
|
|
import com.yihu.hos.core.datatype.NumberUtil;
|
|
|
import com.yihu.hos.core.datatype.StringUtil;
|
|
|
import com.yihu.hos.web.framework.model.bo.Endpoint;
|
|
|
import com.yihu.hos.web.framework.model.bo.ServiceFlow;
|
|
|
import org.json.JSONObject;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
import java.math.BigDecimal;
|
|
|
import java.util.*;
|
|
|
|
|
|
/**
|
|
|
* Created by chenweida on 2016/1/27.
|
|
|
*/
|
|
|
@Service("ServiceMonitorService")
|
|
|
public class ServiceMonitorService {
|
|
|
public static final String BEAN_ID = "ServiceMonitorService";
|
|
|
|
|
|
@Value("${hos.tenant.name}")
|
|
|
private String tenant;
|
|
|
@Autowired
|
|
|
private Mongo mongo;
|
|
|
@Autowired
|
|
|
private BrokerDao brokerDao;
|
|
|
|
|
|
private DBCollection businessLog;
|
|
|
|
|
|
public DBCollection getBusinessLog() {
|
|
|
if (businessLog == null) {
|
|
|
businessLog = mongo.getDB(MonitorConstant.MONITOR_DATABASE).getCollection(MonitorConstant.BUSSINESS_LOG);
|
|
|
}
|
|
|
return businessLog;
|
|
|
}
|
|
|
|
|
|
public void collectServiceHealth() {
|
|
|
try {
|
|
|
Date now = new Date();
|
|
|
Date beforeDate = new Date(now.getTime() - 60000);
|
|
|
String beginTime = DateUtil.toString(beforeDate, DateUtil.DEFAULT_YMDHMSDATE_FORMAT);
|
|
|
String endTime = DateUtil.toString(now, DateUtil.DEFAULT_YMDHMSDATE_FORMAT);
|
|
|
List<ServiceFlow> flowList = brokerDao.getServiceFlowList();
|
|
|
List<Endpoint> endpointList = brokerDao.getEndPointList();
|
|
|
List<String> codeList = new ArrayList<>();
|
|
|
for (ServiceFlow systemServiceFlow : flowList) {
|
|
|
codeList.add(systemServiceFlow.getRouteCode());
|
|
|
}
|
|
|
|
|
|
for (Endpoint systemServiceEndpoint : endpointList) {
|
|
|
codeList.add(systemServiceEndpoint.getCode());
|
|
|
}
|
|
|
|
|
|
bandwidth(beginTime, endTime, codeList);
|
|
|
qps(beginTime, endTime, codeList);
|
|
|
delay(beginTime, endTime, codeList);
|
|
|
usage(beginTime, endTime, codeList);
|
|
|
} catch (Exception e) {
|
|
|
e.printStackTrace();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public void bandwidth(String beginTime, String endTime, List<String> codeList) {
|
|
|
|
|
|
DBObject match = getMatchFields(beginTime, endTime);
|
|
|
DBObject flowGroup = getFlowGroupFields();
|
|
|
DBObject sort = getSortFields();
|
|
|
DBObject serviceGroup = getServiceGroupFields();
|
|
|
// run aggregation
|
|
|
AggregationOutput flowOutput = getBusinessLog().aggregate(match, flowGroup, sort);
|
|
|
//流程带宽
|
|
|
Map<String, Integer> bandwidthMapF = new HashMap<>();
|
|
|
for (DBObject dbObject : flowOutput.results()) {
|
|
|
Integer count = Integer.parseInt(StringUtil.toString(dbObject.get("count")));
|
|
|
BasicDBObject id = (BasicDBObject) dbObject.get("_id");
|
|
|
String code = StringUtil.toString(id.get("routeId"));
|
|
|
Integer bodyLength = Integer.parseInt(StringUtil.toString(dbObject.get("bodyLength")));
|
|
|
if (bandwidthMapF.containsKey(code)) {
|
|
|
bodyLength = bodyLength + bandwidthMapF.get(code);
|
|
|
bandwidthMapF.put(code, bodyLength);
|
|
|
} else {
|
|
|
bandwidthMapF.put(code, bodyLength);
|
|
|
}
|
|
|
}
|
|
|
//服务带宽
|
|
|
// run aggregation
|
|
|
AggregationOutput serviceOutput = getBusinessLog().aggregate(match, serviceGroup, sort);
|
|
|
Map<String, Integer> bandwidthMapS = new HashMap<>();
|
|
|
for (DBObject dbObject : serviceOutput.results()) {
|
|
|
BasicDBObject id = (BasicDBObject) dbObject.get("_id");
|
|
|
String code = StringUtil.toString(id.get("code"));
|
|
|
Integer bodyLength = Integer.parseInt(StringUtil.toString(dbObject.get("bodyLength")));
|
|
|
if (bandwidthMapS.containsKey(code)) {
|
|
|
bodyLength = bodyLength + bandwidthMapS.get(code);
|
|
|
bandwidthMapS.put(code, bodyLength);
|
|
|
} else {
|
|
|
bandwidthMapS.put(code, bodyLength);
|
|
|
}
|
|
|
|
|
|
}
|
|
|
for (String code : codeList) {
|
|
|
BigDecimal bandwidth = BigDecimal.ZERO;
|
|
|
long interval = getInterval(beginTime, endTime);
|
|
|
if (!StringUtil.isEmpty(bandwidthMapF.get(code))) {
|
|
|
Integer flowCalls = bandwidthMapF.get(code);
|
|
|
bandwidth = NumberUtil.divideBigDecimal(BigDecimal.valueOf(flowCalls), BigDecimal.valueOf(interval));
|
|
|
} else if (!StringUtil.isEmpty(bandwidthMapS.get(code))) {
|
|
|
Integer serviceCalls = bandwidthMapS.get(code) / 2;
|
|
|
bandwidth = NumberUtil.divideBigDecimal(BigDecimal.valueOf(serviceCalls), BigDecimal.valueOf(interval));
|
|
|
}
|
|
|
saveServiceMetrics(code, "bandwidth", bandwidth.toString(), endTime);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
public void qps(String beginTime, String endTime, List<String> codeList) {
|
|
|
DBObject match = getMatchFields(beginTime, endTime);
|
|
|
DBObject flowGroup = getFlowGroupFields();
|
|
|
DBObject sort = getSortFields();
|
|
|
DBObject serviceGroup = getServiceGroupFields();
|
|
|
// run aggregation
|
|
|
AggregationOutput flowOutput = getBusinessLog().aggregate(match, flowGroup, sort);
|
|
|
//流程qps
|
|
|
Map<String, Integer> qpsMapF = new HashMap<>();
|
|
|
for (DBObject dbObject : flowOutput.results()) {
|
|
|
Integer count = Integer.parseInt(StringUtil.toString(dbObject.get("count")));
|
|
|
BasicDBObject id = (BasicDBObject) dbObject.get("_id");
|
|
|
String code = StringUtil.toString(id.get("routeId"));
|
|
|
Integer total = Integer.parseInt(StringUtil.toString(dbObject.get("total")));
|
|
|
if (total == count / 2) {
|
|
|
if (qpsMapF.containsKey(code)) {
|
|
|
Integer flowCalls = qpsMapF.get(code);
|
|
|
qpsMapF.put(code, ++flowCalls);
|
|
|
} else {
|
|
|
qpsMapF.put(code, 1);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
//服务带宽
|
|
|
// run aggregation
|
|
|
AggregationOutput serviceOutput = getBusinessLog().aggregate(match, serviceGroup, sort);
|
|
|
|
|
|
Map<String, Integer> qpsMapS = new HashMap<>();
|
|
|
for (DBObject dbObject : serviceOutput.results()) {
|
|
|
BasicDBObject id = (BasicDBObject) dbObject.get("_id");
|
|
|
String code = StringUtil.toString(id.get("code"));
|
|
|
if (qpsMapS.containsKey(code)) {
|
|
|
Integer serviceCalls = qpsMapS.get(code);
|
|
|
qpsMapS.put(code, ++serviceCalls);
|
|
|
} else {
|
|
|
qpsMapS.put(code, 1);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
for (String code : codeList) {
|
|
|
BigDecimal qps = BigDecimal.ZERO;
|
|
|
long interval = getInterval(beginTime, endTime);
|
|
|
if (!StringUtil.isEmpty(qpsMapF.get(code))) {
|
|
|
Integer flowCalls = qpsMapF.get(code);
|
|
|
qps = NumberUtil.divideBigDecimal(BigDecimal.valueOf(flowCalls), BigDecimal.valueOf(interval));
|
|
|
} else if (!StringUtil.isEmpty(qpsMapS.get(code))) {
|
|
|
Integer serviceCalls = qpsMapS.get(code) / 2;
|
|
|
qps = NumberUtil.divideBigDecimal(BigDecimal.valueOf(serviceCalls), BigDecimal.valueOf(interval));
|
|
|
}
|
|
|
saveServiceMetrics(code, "qps", qps.toString(), endTime);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public void usage(String beginTime, String endTime, List<String> codeList) throws JsonProcessingException {
|
|
|
DBObject match = getMatchFields(beginTime, endTime);
|
|
|
DBObject flowGroup = getFlowGroupFields();
|
|
|
DBObject sort = getSortFields();
|
|
|
// run aggregation
|
|
|
AggregationOutput flowOutput = getBusinessLog().aggregate(match, flowGroup, sort);
|
|
|
Map<String, Integer> usageMapSuccessF = new HashMap<>();
|
|
|
Map<String, Integer> usageMapFailF = new HashMap<>();
|
|
|
for (DBObject dbObject : flowOutput.results()) {
|
|
|
Integer total = Integer.parseInt(StringUtil.toString(dbObject.get("total")));
|
|
|
Integer count = Integer.parseInt(StringUtil.toString(dbObject.get("count")));
|
|
|
BasicDBObject id = (BasicDBObject) dbObject.get("_id");
|
|
|
String code = StringUtil.toString(id.get("routeId"));
|
|
|
if (total == count / 2) {
|
|
|
if (usageMapSuccessF.containsKey(code)) {
|
|
|
Integer countTemp = usageMapSuccessF.get(code);
|
|
|
usageMapSuccessF.put(code, ++countTemp);
|
|
|
} else {
|
|
|
usageMapSuccessF.put(code, 1);
|
|
|
}
|
|
|
} else {
|
|
|
if (usageMapFailF.containsKey(code)) {
|
|
|
Integer countTemp = usageMapFailF.get(code);
|
|
|
usageMapFailF.put(code, ++countTemp);
|
|
|
} else {
|
|
|
usageMapFailF.put(code, 1);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
DBCursor serviceOutput = getBusinessLog().find(getQueryObject(beginTime, endTime));
|
|
|
|
|
|
Map<String, Integer> usageMapSuccessS = new HashMap<>();
|
|
|
Map<String, Integer> usageMapFailS = new HashMap<>();
|
|
|
Map<String, String> serviceMap = new HashMap<>();
|
|
|
for (DBObject dbObject : serviceOutput.toArray()) {
|
|
|
String code = StringUtil.toString(dbObject.get("code"));
|
|
|
String order = StringUtil.toString(dbObject.get("order"));
|
|
|
String breadcrumbId = StringUtil.toString(dbObject.get("breadcrumbId"));
|
|
|
if (serviceMap.containsKey(code + breadcrumbId + order)) {
|
|
|
serviceMap.remove(code + breadcrumbId + order);
|
|
|
if (usageMapSuccessS.containsKey(code)) {
|
|
|
Integer countTemp = usageMapSuccessS.get(code);
|
|
|
usageMapSuccessS.put(code, ++countTemp);
|
|
|
} else {
|
|
|
usageMapSuccessS.put(code, 1);
|
|
|
}
|
|
|
} else {
|
|
|
serviceMap.put(code + breadcrumbId + order, code);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
for (String key : serviceMap.keySet()) {
|
|
|
String code = serviceMap.get(key);
|
|
|
if (usageMapFailS.containsKey(code)) {
|
|
|
Integer count = usageMapFailF.get(code);
|
|
|
usageMapFailF.put(code, ++count);
|
|
|
} else {
|
|
|
usageMapFailF.put(code, 1);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
for (String code : codeList) {
|
|
|
JSONObject result = new JSONObject();
|
|
|
Integer successCountF = isNull(usageMapSuccessF.get(code));
|
|
|
Integer failureCountF = isNull(usageMapFailF.get(code));
|
|
|
Integer successCountS = isNull(usageMapSuccessS.get(code));
|
|
|
Integer failureCountS = isNull(usageMapFailS.get(code));
|
|
|
|
|
|
if (successCountF > 0 || failureCountF > 0) {
|
|
|
result.put("totalCount", successCountF + failureCountF);
|
|
|
result.put("successCount", successCountF);
|
|
|
result.put("failureCount", failureCountF);
|
|
|
} else if (successCountS > 0 || failureCountS > 0) {
|
|
|
result.put("totalCount", successCountS + failureCountS);
|
|
|
result.put("successCount", successCountS);
|
|
|
result.put("failureCount", failureCountS);
|
|
|
} else {
|
|
|
result.put("totalCount", 0);
|
|
|
result.put("successCount", 0);
|
|
|
result.put("failureCount", 0);
|
|
|
}
|
|
|
ObjectMapper objectMapper = new ObjectMapper();
|
|
|
saveServiceMetrics(code, "usage", objectMapper.writeValueAsString(result.toString()), endTime);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public void delay(String beginTime, String endTime, List<String> codeList) {
|
|
|
DBObject match = getMatchFields(beginTime, endTime);
|
|
|
DBObject flowGroup = getFlowGroupFields();
|
|
|
DBObject sort = getSortFields();
|
|
|
// run aggregation
|
|
|
AggregationOutput flowOutput = getBusinessLog().aggregate(match, flowGroup, sort);
|
|
|
|
|
|
Map<String, BigDecimal> delayMapF = new HashMap<>();
|
|
|
for (DBObject dbObject : flowOutput.results()) {
|
|
|
BasicDBObject id = (BasicDBObject) dbObject.get("_id");
|
|
|
String code = StringUtil.toString(id.get("routeId"));
|
|
|
String begin = StringUtil.toString(dbObject.get("beginTime"));
|
|
|
String end = StringUtil.toString(dbObject.get("endTime"));
|
|
|
long interval = getIntervalExact(begin, end);
|
|
|
if (delayMapF.containsKey(code)) {
|
|
|
BigDecimal flowDelay = delayMapF.get(code);
|
|
|
delayMapF.put(code, BigDecimal.valueOf(interval).add(flowDelay));
|
|
|
} else {
|
|
|
delayMapF.put(code, BigDecimal.valueOf(interval));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
DBCursor serviceOutput = getBusinessLog().find(getQueryObject(beginTime, endTime));
|
|
|
|
|
|
Map<String, BigDecimal> delayMapS = new HashMap<>();
|
|
|
Map<String, DBObject> serviceMap = new HashMap<>();
|
|
|
for (DBObject dbObject : serviceOutput.toArray()) {
|
|
|
String code = StringUtil.toString(dbObject.get("code"));
|
|
|
if (serviceMap.containsKey(code)) {
|
|
|
DBObject dbObjectTemp = serviceMap.remove(code);
|
|
|
String begin = StringUtil.toString(dbObjectTemp.get("fireTimeSource"));
|
|
|
String end = StringUtil.toString(dbObject.get("fireTimeSource"));
|
|
|
long interval = getIntervalExact(begin, end);
|
|
|
if (delayMapS.containsKey(code)) {
|
|
|
BigDecimal delayTemp = delayMapS.get(code);
|
|
|
delayMapS.put(code, delayTemp.add(BigDecimal.valueOf(interval)));
|
|
|
} else {
|
|
|
delayMapS.put(code, BigDecimal.valueOf(interval));
|
|
|
}
|
|
|
} else {
|
|
|
serviceMap.put(code, dbObject);
|
|
|
}
|
|
|
}
|
|
|
for (String code : codeList) {
|
|
|
BigDecimal delay = BigDecimal.ZERO;
|
|
|
if (delayMapF.containsKey(code)) {
|
|
|
delay = delayMapF.get(code);
|
|
|
} else if (delayMapS.containsKey(code)) {
|
|
|
delay = delayMapS.get(code);
|
|
|
}
|
|
|
saveServiceMetrics(code, "delay", delay.toString(), endTime);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public DBObject getMatchFields(String beginTime, String endTime) {
|
|
|
return new BasicDBObject("$match", getQueryObject(beginTime, endTime));
|
|
|
}
|
|
|
|
|
|
public DBObject getQueryObject(String beginTime, String endTime) {
|
|
|
BasicDBObject queryObject = new BasicDBObject().append(QueryOperators.AND,
|
|
|
new BasicDBObject[]{
|
|
|
new BasicDBObject().append("fireTime",
|
|
|
new BasicDBObject().append(QueryOperators.GTE, DateUtil.toTimestamp(beginTime))),
|
|
|
new BasicDBObject().append("fireTime",
|
|
|
new BasicDBObject().append(QueryOperators.LT, DateUtil.toTimestamp(endTime)))});
|
|
|
|
|
|
return queryObject;
|
|
|
}
|
|
|
|
|
|
|
|
|
public DBObject getServiceGroupFields() {
|
|
|
|
|
|
// Now the $group operation
|
|
|
DBObject groupFields = new BasicDBObject("_id",
|
|
|
new BasicDBObject("id", "$_id")
|
|
|
.append("code", "$code"));
|
|
|
groupFields.put("count", new BasicDBObject("$sum", 1));
|
|
|
groupFields.put("bodyLength", new BasicDBObject("$sum", "$bodyLength"));
|
|
|
groupFields.put("total", new BasicDBObject("$first", "$totalServers"));
|
|
|
return new BasicDBObject("$group", groupFields);
|
|
|
}
|
|
|
|
|
|
public DBObject getFlowGroupFields() {
|
|
|
|
|
|
// Now the $group operation
|
|
|
DBObject groupFields = new BasicDBObject("_id",
|
|
|
new BasicDBObject("breadcrumbId", "$breadcrumbId")
|
|
|
.append("routeId", "$routeId"));
|
|
|
groupFields.put("count", new BasicDBObject("$sum", 1));
|
|
|
groupFields.put("bodyLength", new BasicDBObject("$sum", "$bodyLength"));
|
|
|
groupFields.put("total", new BasicDBObject("$first", "$totalServers"));
|
|
|
groupFields.put("beginTime", new BasicDBObject("$first", "$fireTimeSource"));
|
|
|
groupFields.put("endTime", new BasicDBObject("$last", "$fireTimeSource"));
|
|
|
|
|
|
return new BasicDBObject("$group", groupFields);
|
|
|
}
|
|
|
|
|
|
public DBObject getSortFields() {
|
|
|
DBObject sortFields = new BasicDBObject("_id", 1);
|
|
|
return new BasicDBObject("$sort", sortFields);
|
|
|
}
|
|
|
|
|
|
public long getInterval(String beginTime, String endTime) {
|
|
|
Date from = DateUtil.toTimestamp(beginTime, DateUtil.DEFAULT_YMDHMSDATE_FORMAT);
|
|
|
Date to = DateUtil.toTimestamp(endTime, DateUtil.DEFAULT_YMDHMSDATE_FORMAT);
|
|
|
long interval = (to.getTime() - from.getTime()) / 1000;
|
|
|
return interval;
|
|
|
}
|
|
|
|
|
|
public long getIntervalExact(String beginTime, String endTime) {
|
|
|
Date from = DateUtil.toTimestamp(beginTime, DateUtil.DEFAULT_TIMESTAMP_FORMAT);
|
|
|
Date to = DateUtil.toTimestamp(endTime, DateUtil.DEFAULT_TIMESTAMP_FORMAT);
|
|
|
long interval = (to.getTime() - from.getTime()) / 1000;
|
|
|
return interval;
|
|
|
}
|
|
|
|
|
|
public void saveServiceMetrics(String name, String type, String value, String createTime) {
|
|
|
BasicDBObject document = new BasicDBObject();
|
|
|
document.put("tenant", tenant);
|
|
|
document.put("name", name);
|
|
|
document.put("type", type);
|
|
|
document.put("value", value);
|
|
|
document.put("create_time", createTime);
|
|
|
DBCollection terminal = mongo.getDB(MonitorConstant.MONITOR_DATABASE).getCollection(MonitorConstant.SERVICE);
|
|
|
terminal.save(document);
|
|
|
}
|
|
|
|
|
|
public Integer isNull(Integer count) {
|
|
|
if (count == null) {
|
|
|
count = 0;
|
|
|
}
|
|
|
return count;
|
|
|
}
|
|
|
}
|