Ver código fonte

修改服务监控统计逻辑

airhead 8 anos atrás
pai
commit
bf1e51f899

+ 0 - 40
hos-broker/src/main/java/com/yihu/hos/broker/common/camelrouter/CrawlerMongoRouter.java

@ -1,40 +0,0 @@
package com.yihu.hos.broker.common.camelrouter;
import com.yihu.hos.broker.common.processor.CrawlerMongoProcessor;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;
@Component
public class CrawlerMongoRouter extends RouteBuilder {
    @Override
    public void configure() throws Exception {
//        from("quartz://myGroup/myTimerName?cron=0 0/1 * * * ? ")
//            .process(new CrawlerMongoProcessor())
//                .to("stream:out");
//            .process(new CrawlerMongoProcessor())
//            .to("mongodb:mongo?database=runtime&collection=arbiterServer&operation=findOneByQuery")
//            .split(simple("${body}"))
//            .process(new AddMongoProcessor("runtime", "arbiterServer"))
//            .to("bean:centerMongoService?method=save")
//            .process(new CrawlerMongoProcessor())
//            .to("mongodb:mongo?database=runtime&collection=brokerServer&operation=findOneByQuery")
//            .split(simple("${body}"))
//            .process(new AddMongoProcessor("runtime", "brokerServer"))
//            .to("bean:centerMongoService?method=save")
//            .process(new CrawlerMongoProcessor())
//            .to("mongodb:mongo?database=runtime&collection=endpoint&operation=findOneByQuery")
//            .split(simple("${body}"))
//            .process(new AddMongoProcessor("runtime", "endpoint"))
//            .to("bean:centerMongoService?method=save")
//            .process(new CrawlerMongoProcessor())
//            .to("mongodb:mongo?database=log&collection=server&operation=findOneByQuery")
//            .split(simple("${body}"))
//            .process(new AddMongoProcessor("log", "server"))
//            .to("bean:centerMongoService?method=save")
//            .process(new CrawlerMongoProcessor())
//            .to("mongodb:mongo?database=log&collection=service&operation=findOneByQuery")
//            .split(simple("${body}"))
//            .process(new AddMongoProcessor("log", "service"))
//            .to("bean:centerMongoService?method=save");
    }
}

+ 16 - 0
hos-broker/src/main/java/com/yihu/hos/broker/common/camelrouter/MonitorRouterBulider.java

@ -0,0 +1,16 @@
package com.yihu.hos.broker.common.camelrouter;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;
/**
 * Created by l4qiang on 2017-06-16.
 */
@Component
public class MonitorRouterBulider extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        from("quartz://monitor/monitorTimer?cron=0 0/1 * * * ?")
                .to("bean:serviceMonitorService?method=monitor");
    }
}

+ 11 - 12
hos-broker/src/main/java/com/yihu/hos/broker/common/constants/MonitorConstant.java

@ -5,17 +5,16 @@ package com.yihu.hos.broker.common.constants;
 * @vsrsion 1.0
 * Created at 2016/11/3.
 */
public class MonitorConstant {
    public static String MONITOR_DATABASE = "log";
    public static String SERVER = "server";
    public static String SERVICE = "service";
    public static String BUSSINESS_LOG = "businessLog";
    public static String HOST = "host";     //这个是否需要,可以在runtime库中使用brokerServer?
    public static String CPU = "cpu";
    public static String FILES = "files";
    public static String MEMORY = "memory";
    public static String NET = "net";
public interface MonitorConstant {
    String DATABASE = "log";
    String SERVER = "server";
    String SERVICE = "service";
    String SERVICE_METRICS = "serviceMetrics";
    String BUSINESS = "business";
    String HOST = "host";     //这个是否需要,可以在runtime库中使用brokerServer?
    String CPU = "cpu";
    String FILES = "files";
    String MEMORY = "memory";
    String NET = "net";
}

+ 0 - 41
hos-broker/src/main/java/com/yihu/hos/broker/common/processor/AddMongoProcessor.java

@ -1,41 +0,0 @@
package com.yihu.hos.broker.common.processor;
import com.mongodb.BasicDBObject;
import com.mongodb.QueryOperators;
import com.yihu.hos.core.datatype.DateUtil;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.bson.types.ObjectId;
import org.json.JSONArray;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
/**
 * Created by Zdm on 2016/7/13.
 */
public class AddMongoProcessor implements Processor {
    private String database;
    private String collection;
    public AddMongoProcessor(String database, String collection) {
        this.database = database;
        this.collection = collection;
    }
    @Override
    public void process(Exchange exchange) throws Exception {
        Map record = exchange.getIn().getBody(Map.class);
        Map info = new HashMap();
        info.put("database", database);
        info.put("collection", collection);
        ObjectId objectId = (ObjectId) record.get("_id");
        record.put("_id", objectId.toString());
        JSONArray jsonArray = new JSONArray();
        jsonArray.put(info);
        jsonArray.put(record);
        exchange.getIn().setBody(jsonArray);
    }
}

+ 1 - 1
hos-broker/src/main/java/com/yihu/hos/broker/common/scheduler/MonitorScheduler.java

@ -44,7 +44,7 @@ public class MonitorScheduler {
    }
    public void collectServiceHealth() {
        serviceMonitorService.collectServiceHealth();
//        serviceMonitorService.collectServiceHealth();
    }
    /**

+ 63 - 0
hos-broker/src/main/java/com/yihu/hos/broker/models/LogOffset.java

@ -0,0 +1,63 @@
package com.yihu.hos.broker.models;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.index.Indexed;
import org.springframework.data.mongodb.core.mapping.Document;
import javax.persistence.Entity;
import java.util.Date;
/**
 * Created by l4qiang on 2017-06-16.
 */
@Document
public class LogOffset {
    @Id
    private String id;
    @Indexed
    private String tenant;
    @Indexed
    private String name;    //需要采集的目标
    private Date offset;
    private Date createTime;
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getTenant() {
        return tenant;
    }
    public void setTenant(String tenant) {
        this.tenant = tenant;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public Date getOffset() {
        return offset;
    }
    public void setOffset(Date offset) {
        this.offset = offset;
    }
    public Date getCreateTime() {
        return createTime;
    }
    public void setCreateTime(Date createTime) {
        this.createTime = createTime;
    }
}

+ 2 - 3
hos-broker/src/main/java/com/yihu/hos/broker/services/ServerMonitorService.java

@ -10,7 +10,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.List;
@Service("ServerMonitorService")
@ -31,7 +30,7 @@ public class ServerMonitorService {
    public void collectEnvHealth() {
        try {
            BasicDBObject result = new BasicDBObject();
            DBCollection terminal = mongo.getDB(MonitorConstant.MONITOR_DATABASE).getCollection(MonitorConstant.SERVER);
            DBCollection terminal = mongo.getDB(MonitorConstant.DATABASE).getCollection(MonitorConstant.SERVER);
            result.put("tenant", tenant);
            result.put("create_date", DateUtil.getCurrentString(DateUtil.DEFAULT_YMDHMSDATE_FORMAT));
            result.put("create_time", DateUtil.getSysDateTime());
@ -68,7 +67,7 @@ public class ServerMonitorService {
    }
    public void checkHost() {
        DBCollection terminal = mongo.getDB(MonitorConstant.MONITOR_DATABASE).getCollection(MonitorConstant.HOST);
        DBCollection terminal = mongo.getDB(MonitorConstant.DATABASE).getCollection(MonitorConstant.HOST);
        BasicDBObject queryObject = new BasicDBObject().append(QueryOperators.AND,
                new BasicDBObject[]{

+ 95 - 363
hos-broker/src/main/java/com/yihu/hos/broker/services/ServiceMonitorService.java

@ -1,397 +1,129 @@
package com.yihu.hos.broker.services;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mongodb.*;
import com.mongodb.MongoClient;
import com.mongodb.client.AggregateIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.UpdateOptions;
import com.yihu.hos.broker.common.constants.MonitorConstant;
import com.yihu.hos.broker.daos.BrokerDao;
import com.yihu.hos.core.datatype.DateUtil;
import com.yihu.hos.core.datatype.NumberUtil;
import com.yihu.hos.core.datatype.StringUtil;
import com.yihu.hos.web.framework.model.bo.Endpoint;
import com.yihu.hos.web.framework.model.bo.ServiceFlow;
import org.json.JSONObject;
import com.yihu.hos.broker.models.LogOffset;
import org.bson.Document;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.mongodb.core.MongoOperations;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
import java.util.*;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Locale;
/**
 * 统计流程信息
 * <p>
 * Created by chenweida on 2016/1/27.
 */
@Service("ServiceMonitorService")
public class ServiceMonitorService {
    public static final String BEAN_ID = "ServiceMonitorService";
    @Value("${hos.tenant.name}")
    private String tenant;
    @Autowired
    private Mongo mongo;
    private MongoClient mongoClient;
    @Autowired
    private BrokerDao brokerDao;
    private MongoOperations mongoOperations;
    private DBCollection businessLog;
    public DBCollection getBusinessLog() {
        if (businessLog == null) {
            businessLog = mongo.getDB(MonitorConstant.MONITOR_DATABASE).getCollection(MonitorConstant.BUSSINESS_LOG);
        }
        return businessLog;
    }
    public void collectServiceHealth() {
    public void monitor() {
        try {
            Date now = new Date();
            Date beforeDate = new Date(now.getTime() - 60000);
            String beginTime = DateUtil.toString(beforeDate, DateUtil.DEFAULT_YMDHMSDATE_FORMAT);
            String endTime = DateUtil.toString(now, DateUtil.DEFAULT_YMDHMSDATE_FORMAT);
            List<ServiceFlow> flowList = brokerDao.getServiceFlowList();
            List<Endpoint> endpointList = brokerDao.getEndPointList();
            List<String> codeList = new ArrayList<>();
            for (ServiceFlow systemServiceFlow : flowList) {
                codeList.add(systemServiceFlow.getRouteCode());
            MongoDatabase database = mongoClient.getDatabase(MonitorConstant.DATABASE);
            Query query = new Query();
            query.addCriteria(Criteria.where("name").is(MonitorConstant.SERVICE));  //Service表中已有的数据时间。
            LogOffset offset = mongoOperations.findOne(query, LogOffset.class);
            Date begin = new Date();
            Date end = new Date();
            if (offset != null) {
                begin = offset.getOffset();
            }
            for (Endpoint systemServiceEndpoint : endpointList) {
                codeList.add(systemServiceEndpoint.getCode());
            }
            DateFormat df = new SimpleDateFormat("EEE MMM dd yyyy HH:mm:ss z", Locale.ENGLISH);
            String beginTime = df.format(begin);
            String endTime = df.format(end);
            bandwidth(beginTime, endTime, codeList);
            qps(beginTime, endTime, codeList);
            delay(beginTime, endTime, codeList);
            usage(beginTime, endTime, codeList);
            mapReduce(database, beginTime, endTime);
            AggregateIterable<Document> documents = aggregate(database, beginTime, endTime);
            outPut(database, documents);
            offset.setOffset(end);
            mongoOperations.save(offset);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public void bandwidth(String beginTime, String endTime, List<String> codeList) {
        DBObject match = getMatchFields(beginTime, endTime);
        DBObject flowGroup = getFlowGroupFields();
        DBObject sort = getSortFields();
        DBObject serviceGroup = getServiceGroupFields();
        // run aggregation
        AggregationOutput flowOutput = getBusinessLog().aggregate(match, flowGroup, sort);
        //流程带宽
        Map<String, Integer> bandwidthMapF = new HashMap<>();
        for (DBObject dbObject : flowOutput.results()) {
            Integer count = Integer.parseInt(StringUtil.toString(dbObject.get("count")));
            BasicDBObject id = (BasicDBObject) dbObject.get("_id");
            String code = StringUtil.toString(id.get("routeId"));
            Integer bodyLength = Integer.parseInt(StringUtil.toString(dbObject.get("bodyLength")));
            if (bandwidthMapF.containsKey(code)) {
                bodyLength = bodyLength + bandwidthMapF.get(code);
                bandwidthMapF.put(code, bodyLength);
            } else {
                bandwidthMapF.put(code, bodyLength);
            }
        }
        //服务带宽
        // run aggregation
        AggregationOutput serviceOutput = getBusinessLog().aggregate(match, serviceGroup, sort);
        Map<String, Integer> bandwidthMapS = new HashMap<>();
        for (DBObject dbObject : serviceOutput.results()) {
            BasicDBObject id = (BasicDBObject) dbObject.get("_id");
            String code = StringUtil.toString(id.get("code"));
            Integer bodyLength = Integer.parseInt(StringUtil.toString(dbObject.get("bodyLength")));
            if (bandwidthMapS.containsKey(code)) {
                bodyLength = bodyLength + bandwidthMapS.get(code);
                bandwidthMapS.put(code, bodyLength);
            } else {
                bandwidthMapS.put(code, bodyLength);
            }
        }
        for (String code : codeList) {
            BigDecimal bandwidth = BigDecimal.ZERO;
            long interval = getInterval(beginTime, endTime);
            if (!StringUtil.isEmpty(bandwidthMapF.get(code))) {
                Integer flowCalls = bandwidthMapF.get(code);
                bandwidth = NumberUtil.divideBigDecimal(BigDecimal.valueOf(flowCalls), BigDecimal.valueOf(interval));
            } else if (!StringUtil.isEmpty(bandwidthMapS.get(code))) {
                Integer serviceCalls = bandwidthMapS.get(code) / 2;
                bandwidth = NumberUtil.divideBigDecimal(BigDecimal.valueOf(serviceCalls), BigDecimal.valueOf(interval));
            }
            saveServiceMetrics(code, "bandwidth", bandwidth.toString(), endTime);
        }
    }
    public void qps(String beginTime, String endTime, List<String> codeList) {
        DBObject match = getMatchFields(beginTime, endTime);
        DBObject flowGroup = getFlowGroupFields();
        DBObject sort = getSortFields();
        DBObject serviceGroup = getServiceGroupFields();
        // run aggregation
        AggregationOutput flowOutput = getBusinessLog().aggregate(match, flowGroup, sort);
        //流程qps
        Map<String, Integer> qpsMapF = new HashMap<>();
        for (DBObject dbObject : flowOutput.results()) {
            Integer count = Integer.parseInt(StringUtil.toString(dbObject.get("count")));
            BasicDBObject id = (BasicDBObject) dbObject.get("_id");
            String code = StringUtil.toString(id.get("routeId"));
            Integer total = Integer.parseInt(StringUtil.toString(dbObject.get("total")));
            if (total == count / 2) {
                if (qpsMapF.containsKey(code)) {
                    Integer flowCalls = qpsMapF.get(code);
                    qpsMapF.put(code, ++flowCalls);
                } else {
                    qpsMapF.put(code, 1);
                }
            }
        }
        //服务带宽
        // run aggregation
        AggregationOutput serviceOutput = getBusinessLog().aggregate(match, serviceGroup, sort);
        Map<String, Integer> qpsMapS = new HashMap<>();
        for (DBObject dbObject : serviceOutput.results()) {
            BasicDBObject id = (BasicDBObject) dbObject.get("_id");
            String code = StringUtil.toString(id.get("code"));
            if (qpsMapS.containsKey(code)) {
                Integer serviceCalls = qpsMapS.get(code);
                qpsMapS.put(code, ++serviceCalls);
            } else {
                qpsMapS.put(code, 1);
            }
        }
        for (String code : codeList) {
            BigDecimal qps = BigDecimal.ZERO;
            long interval = getInterval(beginTime, endTime);
            if (!StringUtil.isEmpty(qpsMapF.get(code))) {
                Integer flowCalls = qpsMapF.get(code);
                qps = NumberUtil.divideBigDecimal(BigDecimal.valueOf(flowCalls), BigDecimal.valueOf(interval));
            } else if (!StringUtil.isEmpty(qpsMapS.get(code))) {
                Integer serviceCalls = qpsMapS.get(code) / 2;
                qps = NumberUtil.divideBigDecimal(BigDecimal.valueOf(serviceCalls), BigDecimal.valueOf(interval));
            }
            saveServiceMetrics(code, "qps", qps.toString(), endTime);
        }
    }
    public void usage(String beginTime, String endTime, List<String> codeList) throws JsonProcessingException {
        DBObject match = getMatchFields(beginTime, endTime);
        DBObject flowGroup = getFlowGroupFields();
        DBObject sort = getSortFields();
        // run aggregation
        AggregationOutput flowOutput = getBusinessLog().aggregate(match, flowGroup, sort);
        Map<String, Integer> usageMapSuccessF = new HashMap<>();
        Map<String, Integer> usageMapFailF = new HashMap<>();
        for (DBObject dbObject : flowOutput.results()) {
            Integer total = Integer.parseInt(StringUtil.toString(dbObject.get("total")));
            Integer count = Integer.parseInt(StringUtil.toString(dbObject.get("count")));
            BasicDBObject id = (BasicDBObject) dbObject.get("_id");
            String code = StringUtil.toString(id.get("routeId"));
            if (total == count / 2) {
                if (usageMapSuccessF.containsKey(code)) {
                    Integer countTemp = usageMapSuccessF.get(code);
                    usageMapSuccessF.put(code, ++countTemp);
                } else {
                    usageMapSuccessF.put(code, 1);
                }
            } else {
                if (usageMapFailF.containsKey(code)) {
                    Integer countTemp = usageMapFailF.get(code);
                    usageMapFailF.put(code, ++countTemp);
                } else {
                    usageMapFailF.put(code, 1);
                }
            }
        }
        DBCursor serviceOutput = getBusinessLog().find(getQueryObject(beginTime, endTime));
        Map<String, Integer> usageMapSuccessS = new HashMap<>();
        Map<String, Integer> usageMapFailS = new HashMap<>();
        Map<String, String> serviceMap = new HashMap<>();
        for (DBObject dbObject : serviceOutput.toArray()) {
            String code = StringUtil.toString(dbObject.get("code"));
            String order = StringUtil.toString(dbObject.get("order"));
            String breadcrumbId = StringUtil.toString(dbObject.get("breadcrumbId"));
            if (serviceMap.containsKey(code + breadcrumbId + order)) {
                serviceMap.remove(code + breadcrumbId + order);
                if (usageMapSuccessS.containsKey(code)) {
                    Integer countTemp = usageMapSuccessS.get(code);
                    usageMapSuccessS.put(code, ++countTemp);
                } else {
                    usageMapSuccessS.put(code, 1);
                }
            } else {
                serviceMap.put(code + breadcrumbId + order, code);
            }
        }
        for (String key : serviceMap.keySet()) {
            String code = serviceMap.get(key);
            if (usageMapFailS.containsKey(code)) {
                Integer count = usageMapFailF.get(code);
                usageMapFailF.put(code, ++count);
            } else {
                usageMapFailF.put(code, 1);
            }
        }
        for (String code : codeList) {
            JSONObject result = new JSONObject();
            Integer successCountF = isNull(usageMapSuccessF.get(code));
            Integer failureCountF = isNull(usageMapFailF.get(code));
            Integer successCountS = isNull(usageMapSuccessS.get(code));
            Integer failureCountS = isNull(usageMapFailS.get(code));
            if (successCountF > 0 || failureCountF > 0) {
                result.put("totalCount", successCountF + failureCountF);
                result.put("successCount", successCountF);
                result.put("failureCount", failureCountF);
            } else if (successCountS > 0 || failureCountS > 0) {
                result.put("totalCount", successCountS + failureCountS);
                result.put("successCount", successCountS);
                result.put("failureCount", failureCountS);
            } else {
                result.put("totalCount", 0);
                result.put("successCount", 0);
                result.put("failureCount", 0);
            }
            ObjectMapper objectMapper = new ObjectMapper();
            saveServiceMetrics(code, "usage", objectMapper.writeValueAsString(result.toString()), endTime);
        }
    }
    public void delay(String beginTime, String endTime, List<String> codeList) {
        DBObject match = getMatchFields(beginTime, endTime);
        DBObject flowGroup = getFlowGroupFields();
        DBObject sort = getSortFields();
        // run aggregation
        AggregationOutput flowOutput = getBusinessLog().aggregate(match, flowGroup, sort);
        Map<String, BigDecimal> delayMapF = new HashMap<>();
        for (DBObject dbObject : flowOutput.results()) {
            BasicDBObject id = (BasicDBObject) dbObject.get("_id");
            String code = StringUtil.toString(id.get("routeId"));
            String begin = StringUtil.toString(dbObject.get("beginTime"));
            String end = StringUtil.toString(dbObject.get("endTime"));
            long interval = getIntervalExact(begin, end);
            if (delayMapF.containsKey(code)) {
                BigDecimal flowDelay = delayMapF.get(code);
                delayMapF.put(code, BigDecimal.valueOf(interval).add(flowDelay));
            } else {
                delayMapF.put(code, BigDecimal.valueOf(interval));
            }
        }
        DBCursor serviceOutput = getBusinessLog().find(getQueryObject(beginTime, endTime));
        Map<String, BigDecimal> delayMapS = new HashMap<>();
        Map<String, DBObject> serviceMap = new HashMap<>();
        for (DBObject dbObject : serviceOutput.toArray()) {
            String code = StringUtil.toString(dbObject.get("code"));
            if (serviceMap.containsKey(code)) {
                DBObject dbObjectTemp = serviceMap.remove(code);
                String begin = StringUtil.toString(dbObjectTemp.get("fireTimeSource"));
                String end = StringUtil.toString(dbObject.get("fireTimeSource"));
                long interval = getIntervalExact(begin, end);
                if (delayMapS.containsKey(code)) {
                    BigDecimal delayTemp = delayMapS.get(code);
                    delayMapS.put(code, delayTemp.add(BigDecimal.valueOf(interval)));
                } else {
                    delayMapS.put(code, BigDecimal.valueOf(interval));
                }
            } else {
                serviceMap.put(code, dbObject);
            }
        }
        for (String code : codeList) {
            BigDecimal delay = BigDecimal.ZERO;
            if (delayMapF.containsKey(code)) {
                delay = delayMapF.get(code);
            } else if (delayMapS.containsKey(code)) {
                delay = delayMapS.get(code);
            }
            saveServiceMetrics(code, "delay", delay.toString(), endTime);
        }
    }
    public DBObject getMatchFields(String beginTime, String endTime) {
        return new BasicDBObject("$match", getQueryObject(beginTime, endTime));
    }
    public DBObject getQueryObject(String beginTime, String endTime) {
        BasicDBObject queryObject = new BasicDBObject().append(QueryOperators.AND,
                new BasicDBObject[]{
                        new BasicDBObject().append("fireTime",
                                new BasicDBObject().append(QueryOperators.GTE, DateUtil.toTimestamp(beginTime))),
                        new BasicDBObject().append("fireTime",
                                new BasicDBObject().append(QueryOperators.LT, DateUtil.toTimestamp(endTime)))});
        return queryObject;
    }
    public DBObject getServiceGroupFields() {
        // Now the $group operation
        DBObject groupFields = new BasicDBObject("_id",
                new BasicDBObject("id", "$_id")
                        .append("code", "$code"));
        groupFields.put("count", new BasicDBObject("$sum", 1));
        groupFields.put("bodyLength", new BasicDBObject("$sum", "$bodyLength"));
        groupFields.put("total", new BasicDBObject("$first", "$totalServers"));
        return new BasicDBObject("$group", groupFields);
    }
    public DBObject getFlowGroupFields() {
        // Now the $group operation
        DBObject groupFields = new BasicDBObject("_id",
                new BasicDBObject("breadcrumbId", "$breadcrumbId")
                        .append("routeId", "$routeId"));
        groupFields.put("count", new BasicDBObject("$sum", 1));
        groupFields.put("bodyLength", new BasicDBObject("$sum", "$bodyLength"));
        groupFields.put("total", new BasicDBObject("$first", "$totalServers"));
        groupFields.put("beginTime", new BasicDBObject("$first", "$fireTimeSource"));
        groupFields.put("endTime", new BasicDBObject("$last", "$fireTimeSource"));
        return new BasicDBObject("$group", groupFields);
    }
    public DBObject getSortFields() {
        DBObject sortFields = new BasicDBObject("_id", 1);
        return new BasicDBObject("$sort", sortFields);
    }
    public long getInterval(String beginTime, String endTime) {
        Date from = DateUtil.toTimestamp(beginTime, DateUtil.DEFAULT_YMDHMSDATE_FORMAT);
        Date to = DateUtil.toTimestamp(endTime, DateUtil.DEFAULT_YMDHMSDATE_FORMAT);
        long interval = (to.getTime() - from.getTime()) / 1000;
        return interval;
    }
    public long getIntervalExact(String beginTime, String endTime) {
        Date from = DateUtil.toTimestamp(beginTime, DateUtil.DEFAULT_TIMESTAMP_FORMAT);
        Date to = DateUtil.toTimestamp(endTime, DateUtil.DEFAULT_TIMESTAMP_FORMAT);
        long interval = (to.getTime() - from.getTime()) / 1000;
        return interval;
    private void mapReduce(MongoDatabase database, String beginTime, String endTime) {
        Document document = new Document("mapReduce", MonitorConstant.BUSINESS);
        String mapFunc = "function(){emit(this.breadcrumbId, {fireTime:this.fireTime, createTime:this.createTime, routeId:this.routeId, bodyLength:this.bodyLength, tenant:this.tenant});}";
        document = document.append("map", mapFunc);
        String reduceFunc = "function(key, values){" +
                "var len=values.length;" +
                "var delay=values[len-1].fireTime-values[0].fireTime;" +
                "if(len == 1){delay=values[0].createTime-values[0].fireTime;}" +
                "return {fireTime: values[0].fireTime, in:values[0].bodyLength, out:values[len-1].bodyLength, delay:delay, tenant:values[0].tenant}}";
        document = document.append("reduce", reduceFunc);
        String query = "{fireTime:{$gte:new Date(\"" + beginTime + "\"), $lt:new Date(\"" + endTime + "\")}}";
        document = document.append("query", Document.parse(query));
        document = document.append("out", MonitorConstant.SERVICE);
        database.runCommand(document);
    }
    public void saveServiceMetrics(String name, String type, String value, String createTime) {
        BasicDBObject document = new BasicDBObject();
        document.put("tenant", tenant);
        document.put("name", name);
        document.put("type", type);
        document.put("value", value);
        document.put("create_time", createTime);
        DBCollection terminal = mongo.getDB(MonitorConstant.MONITOR_DATABASE).getCollection(MonitorConstant.SERVICE);
        terminal.save(document);
    private AggregateIterable<Document> aggregate(MongoDatabase database, String beginTime, String endTime) {
        MongoCollection<Document> serviceCollection = database.getCollection(MonitorConstant.SERVICE);
        List<Document> pipeline = new ArrayList<>();
        Document match = new Document();
        String query = "{\"value.fireTime\":{$gte:new Date(\"" + beginTime + "\"), $lt:new Date(\"" + endTime + "\")}}";
        match.put("$match", Document.parse(query));
        pipeline.add(match);
        Document group = new Document();
        group.append("_id", Document.parse("{" +
                "year:{\"$year\":\"$value.fireTime\"}," +
                "month:{\"$month\":\"$value.fireTime\"}," +
                "day:{\"$dayOfMonth\":\"$value.fireTime\"}," +
                "hour:{\"$hour\":\"$value.fireTime\"}," +
                "minute:{\"$minute\": \"$value.fireTime\"}" +
                "routId:\"$value.routeId\"" +
                "tenant:\"$value.tenantId\"" +
                "}"));
        group.append("pv", Document.parse("{$sum:1}"));
        group.append("delay", Document.parse("{$sum:\"$value.delay\"}"));
        group.append("avgDelay", Document.parse("{$avg:\"$value.delay\"}"));
        group.append("in", Document.parse("{$sum:\"$value.in\"}"));
        group.append("out", Document.parse("{$sum:\"$value.out\"}"));
        pipeline.add(new Document("$group", group));
        pipeline.add(new Document("$sort", Document.parse("{\"_id\":1}")));
        return serviceCollection.aggregate(pipeline);
    }
    public Integer isNull(Integer count) {
        if (count == null) {
            count = 0;
    private void outPut(MongoDatabase database, AggregateIterable<Document> documents) {
        MongoCollection<Document> serviceMetrics = database.getCollection(MonitorConstant.SERVICE_METRICS);
        for (Document document : documents) {
            Document id = (Document) document.get("_id");
            String time = id.get("year").toString() + id.get("month").toString() + id.get("day").toString() +
                    id.get("hour").toString() + id.get("minute").toString();
            document.put("time", time);
            document.put("routId", id.get("routeId"));
            document.put("tenant", id.get("tenant"));
            document.putIfAbsent("avgDelay", 0);
            Document query = new Document("_id", id);
            Document update = new Document("$set", document);
            serviceMetrics.updateOne(query, update, new UpdateOptions().upsert(true));
        }
        return count;
    }
}

+ 26 - 0
hos-broker/src/test/java/com/yihu/hos/broker/services/ServiceMonitorServiceTest.java

@ -0,0 +1,26 @@
package com.yihu.hos.broker.services;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
import static org.junit.Assert.*;
/**
 * Created by l4qiang on 2017-06-19.
 */
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
@WebAppConfiguration
public class ServiceMonitorServiceTest {
    @Autowired
    private ServiceMonitorService serviceMonitorService;
    @Test
    public void monitor() throws Exception {
        serviceMonitorService.monitor();
    }
}

+ 32 - 0
hos-camel2/src/main/java/camel/terminal/tenant/processor/AddMongoProcessor.java

@ -0,0 +1,32 @@
package camel.terminal.tenant.processor;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
/**
 * 
 * Created by Zdm on 2016/7/13.
 */
public class AddMongoProcessor implements Processor {
    private String database;
    private String collection;
    public AddMongoProcessor(String database, String collection) {
        this.database = database;
        this.collection = collection;
    }
    @Override
    public void process(Exchange exchange) throws Exception {
//        Map record = exchange.getIn().getBody(Map.class);
//        Map info = new HashMap();
//        info.put("database", database);
//        info.put("collection", collection);
//        ObjectId objectId = (ObjectId) record.get("_id");
//        record.put("_id", objectId.toString());
//        JSONArray jsonArray = new JSONArray();
//        jsonArray.put(info);
//        jsonArray.put(record);
//        exchange.getIn().setBody(jsonArray);
    }
}

+ 1 - 1
hos-broker/src/main/java/com/yihu/hos/broker/common/processor/CrawlerMongoProcessor.java

@ -1,4 +1,4 @@
package com.yihu.hos.broker.common.processor;
package camel.terminal.tenant.processor;
import com.mongodb.BasicDBObject;
import com.mongodb.QueryOperators;

+ 39 - 0
hos-camel2/src/main/java/camel/terminal/tenant/route/CrawlerMongoRouter.java

@ -0,0 +1,39 @@
package camel.terminal.tenant.route;
import camel.terminal.tenant.processor.AddMongoProcessor;
import camel.terminal.tenant.processor.CrawlerMongoProcessor;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;
@Component
public class CrawlerMongoRouter extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        from("quartz://myGroup/myTimerName?cron=0 0/1 * * * ? ")
            .process(new CrawlerMongoProcessor())
            .to("mongodb:mongo?database=runtime&collection=arbiterServer&operation=findOneByQuery")
            .split(simple("${body}"))
            .process(new AddMongoProcessor("runtime", "arbiterServer"))
            .to("bean:centerMongoService?method=save")
            .process(new CrawlerMongoProcessor())
            .to("mongodb:mongo?database=runtime&collection=brokerServer&operation=findOneByQuery")
            .split(simple("${body}"))
            .process(new AddMongoProcessor("runtime", "brokerServer"))
            .to("bean:centerMongoService?method=save")
            .process(new CrawlerMongoProcessor())
            .to("mongodb:mongo?database=runtime&collection=endpoint&operation=findOneByQuery")
            .split(simple("${body}"))
            .process(new AddMongoProcessor("runtime", "endpoint"))
            .to("bean:centerMongoService?method=save")
            .process(new CrawlerMongoProcessor())
            .to("mongodb:mongo?database=log&collection=server&operation=findOneByQuery")
            .split(simple("${body}"))
            .process(new AddMongoProcessor("log", "server"))
            .to("bean:centerMongoService?method=save")
            .process(new CrawlerMongoProcessor())
            .to("mongodb:mongo?database=log&collection=service&operation=findOneByQuery")
            .split(simple("${body}"))
            .process(new AddMongoProcessor("log", "service"))
            .to("bean:centerMongoService?method=save");
    }
}

+ 0 - 1
hos-logger/src/main/java/com/yihu/hos/logger/camel/processor/EHRLogProcesser.java

@ -1,7 +1,6 @@
package com.yihu.hos.logger.camel.processor;
import ch.qos.logback.classic.spi.ILoggingEvent;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

+ 36 - 30
src/main/java/com/yihu/hos/monitor/service/ServiceMonitorService.java

@ -7,7 +7,6 @@ import com.yihu.hos.monitor.dao.ServiceMonitorDao;
import com.yihu.hos.system.model.SystemServiceEndpoint;
import com.yihu.hos.system.model.SystemServiceFlow;
import com.yihu.hos.system.model.SystemServiceFlowConfig;
import com.yihu.hos.system.service.FlowManager;
import com.yihu.hos.tenant.model.TenantSession;
import com.yihu.hos.web.framework.constant.ServiceFlowConstant;
import com.yihu.hos.web.framework.model.Result;
@ -19,7 +18,6 @@ import org.springframework.data.mongodb.core.MongoOperations;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import javax.servlet.http.HttpSession;
import java.util.ArrayList;
import java.util.HashMap;
@ -46,14 +44,14 @@ public class ServiceMonitorService {
    @Autowired
    private ServiceMonitorDao serviceMonitorDao;
    @Resource(name = FlowManager.BEAN_ID)
    private FlowManager flowManager;
//    @Resource(name = FlowManager.BEAN_ID)
//    private FlowManager flowManager;
    public Result metrics(HttpSession session,String id, String beginTime, String endTime) throws Exception {
    public Result metrics(HttpSession session, String id, String beginTime, String endTime) throws Exception {
        String name;
        String code;
        String description;
        TenantSession tenantSession = (TenantSession)session.getAttribute(ContextAttributes.TENANT_SESSION);
        TenantSession tenantSession = (TenantSession) session.getAttribute(ContextAttributes.TENANT_SESSION);
        if (id.contains("flow")) {
            Integer flowId = Integer.parseInt(id.replace("flow", ""));
            SystemServiceFlow systemServiceFlow = serviceMonitorDao.getFlowById(flowId);
@ -61,7 +59,7 @@ public class ServiceMonitorService {
            code = systemServiceFlow.getCode();
            description = systemServiceFlow.getDescription();
        } else {
            String endpointId =id.replace("endpoint", "");
            String endpointId = id.replace("endpoint", "");
            SystemServiceEndpoint systemServiceEndpoint = serviceMonitorDao.getEndpointById(endpointId);
            name = systemServiceEndpoint.getName();
            code = systemServiceEndpoint.getCode();
@ -88,15 +86,23 @@ public class ServiceMonitorService {
        JSONArray qps = new JSONArray();
        JSONArray usage = new JSONArray();
        JSONArray delay = new JSONArray();
        while(cursor.hasNext()) {
        while (cursor.hasNext()) {
            DBObject dbObject = cursor.next();
            dbObject.removeField("_id");
            String type = dbObject.get("type").toString();
            switch (type) {
                case "bandwidth" : bandwidth.put(dbObject);break;
                case "qps" : qps.put(dbObject);break;
                case "usage" : usage.put(dbObject);break;
                case "delay" : delay.put(dbObject);break;
                case "bandwidth":
                    bandwidth.put(dbObject);
                    break;
                case "qps":
                    qps.put(dbObject);
                    break;
                case "usage":
                    usage.put(dbObject);
                    break;
                case "delay":
                    delay.put(dbObject);
                    break;
            }
        }
@ -133,23 +139,23 @@ public class ServiceMonitorService {
        }
        for (SystemServiceFlow flow : flowList) {
                TreeView rootTree = new TreeView();
                rootTree.setIschecked(false);
                rootTree.setId("flow" + flow.getId());
                rootTree.setPid("-1");
                rootTree.setText(flow.getName());
                treeList.add(rootTree);
                List<String> endpointIdList = flowEndpointMap.get(flow.getId());
                if (!CollectionUtil.isEmpty(endpointIdList)) {
                    for (String endpointId : endpointIdList) {
                        SystemServiceEndpoint endpoint = endpointMap.get(endpointId);
                        TreeView childTree = new TreeView();
                        childTree.setIschecked(false);
                        childTree.setId("endpoint" + endpoint.getId());
                        childTree.setPid("flow" + flow.getId());
                        childTree.setText(endpoint.getName());
                        treeList.add(childTree);
                    }
            TreeView rootTree = new TreeView();
            rootTree.setIschecked(false);
            rootTree.setId("flow" + flow.getId());
            rootTree.setPid("-1");
            rootTree.setText(flow.getName());
            treeList.add(rootTree);
            List<String> endpointIdList = flowEndpointMap.get(flow.getId());
            if (!CollectionUtil.isEmpty(endpointIdList)) {
                for (String endpointId : endpointIdList) {
                    SystemServiceEndpoint endpoint = endpointMap.get(endpointId);
                    TreeView childTree = new TreeView();
                    childTree.setIschecked(false);
                    childTree.setId("endpoint" + endpoint.getId());
                    childTree.setPid("flow" + flow.getId());
                    childTree.setText(endpoint.getName());
                    treeList.add(childTree);
                }
            }
        }
        JSONArray jsonArray = new JSONArray(treeList);
@ -157,7 +163,7 @@ public class ServiceMonitorService {
        return Result.success(jsonArray.toString());
    }
    public boolean serviceStatus(String routeCode){
    public boolean serviceStatus(String routeCode) {
        boolean succ = false;
//        MongoDatabase db = mongoConfig.mongoClient().getDatabase(configuration);
//        MongoCollection<Document> collection = db.getCollection(serviceFlow);