Bläddra i källkod

endpoint事件完成

Airhead 8 år sedan
förälder
incheckning
618dd2de28

+ 5 - 1
hos-arbiter/src/main/java/com/yihu/hos/arbiter/routers/EndpointEventRouter.java

@ -27,6 +27,10 @@ public class EndpointEventRouter extends RouteBuilder {
        // Note we can explicit name the component
        context.addComponent(EndPointConstant.CAMEL_COMPONENT, JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
        from(EndPointConstant.CAMEL_ENDPOINT)
                .to("bean:endpointService?method=addBrokerServer"); //TODO:这边可以做Message Filter,减化trigger逻辑
                .choice()
                .when(header("event").isEqualTo(EndPointConstant.ADD_ENDPOINT)).to("bean:endpointService?method=addEndpoint")
                .when(header("event").isEqualTo(EndPointConstant.DELETE_ENDPOINT)).to("bean:endpointService?method=deleteEndpoint")
                .when(header("event").isEqualTo(EndPointConstant.OFF_SERVER)).to("bean:endpointService?method=offServer")
                .endChoice();
    }
}

+ 61 - 40
hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/EndpointService.java

@ -1,13 +1,16 @@
package com.yihu.hos.arbiter.services;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yihu.hos.web.framework.model.bo.BrokerServer;
import com.yihu.hos.core.http.HTTPResponse;
import com.yihu.hos.core.http.HttpClientKit;
import com.yihu.hos.web.framework.model.bo.Endpoint;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoOperations;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.stereotype.Service;
import java.io.IOException;
@ -19,74 +22,92 @@ import java.util.List;
@Service("endpointService")
public class EndpointService {
    private static final Logger logger = LogManager.getLogger(BrokerServerService.class);
    @Autowired
    private BrokerServerService brokerServerService;
    @Autowired
    private MongoOperations mongoOperations;
    @Autowired
    private ObjectMapper objectMapper;
    public void save(Endpoint endpoint) {
        mongoOperations.save(endpoint);
        Query query = new Query();
        query.addCriteria(Criteria.where("code").is(endpoint.getCode()));
        Update update = new Update();
        update.set("code", endpoint.getCode());
        update.set("createTime", endpoint.getCreateTime());
        update.set("endpoint", endpoint.getEndpoint());
        update.set("event", endpoint.getEvent());
        update.set("healthCheckType", endpoint.getHealthCheckType());
        update.set("healthCheckURL", endpoint.getHealthCheckURL());
        update.set("metricsType", endpoint.getMetricsType());
        update.set("metricsURL", endpoint.getMetricsURL());
        update.set("name", endpoint.getName());
        update.set("updateTime", endpoint.getUpdateTime());
        update.set("active", endpoint.isActive());
        mongoOperations.upsert(query, update, Endpoint.class);
    }
    public void delete(Endpoint endpoint) {
        mongoOperations.remove(endpoint);
    }
    public Endpoint get(String serviceName) {
        return mongoOperations.findOne(new Query(), Endpoint.class);
    }
    public void update(Endpoint endpoint) {
    public List<Endpoint> getEndpointList() {
        return mongoOperations.findAll(Endpoint.class);
    }
    public void trigger(String msg) {
        System.out.println(msg);
        BrokerServer brokerServer = brokerServerService.get();
        if (brokerServer == null) {
            logger.trace("can not find a valid broker start.");
            return;
        }
    public void check() {
        List<Endpoint> endpointList = getEndpointList();
        endpointList.forEach(this::remoteCheck);
    }
        ObjectMapper objectMapper = new ObjectMapper();
    public void addEndpoint(String msg) {
        try {
            Endpoint endpoint = objectMapper.readValue(msg, Endpoint.class);
            switch (endpoint.getEvent()) {
                case "endpointRegister": {
                    this.save(endpoint);
                    break;
                }
                case "endpointOnService": {
                    this.save(endpoint);
                    break;
                }
                case "endpointOffService": {
                    this.update(endpoint);
                    break;
                }
                default:
                    break;
            }
            endpoint.setActive(true);
            this.save(endpoint);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public List<Endpoint> getEndpointList() {
        return mongoOperations.findAll(Endpoint.class);
    public void deleteEndpoint(String msg) {
        try {
            Endpoint endpoint = objectMapper.readValue(msg, Endpoint.class);
            this.delete(endpoint);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public void check() {
        List<Endpoint> endpointList = getEndpointList();
        endpointList.forEach(this::remoteCheck);
    public void offServer(String msg) {
        try {
            Endpoint endpoint = objectMapper.readValue(msg, Endpoint.class);
            endpoint.setActive(false);
            this.save(endpoint);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    /**
     * 调用实际的检查地址
     *
     * @param endpoint
     * @param endpoint 通讯点信息
     */
    private void remoteCheck(Endpoint endpoint) {
        //TODO:
        String url = endpoint.getHealthCheckURL();
        HTTPResponse response = HttpClientKit.get(url);
        if (response.getStatusCode() == 200) {
            return;
        }
        endpoint.setActive(false);
        this.save(endpoint);
    }
}

+ 2 - 1
hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/ProxyService.java

@ -48,6 +48,7 @@ public class ProxyService {
        new ServiceFlow().server();
        new Mycat().start();
        new Shell().start();
        new EndPoint().start();
    }
    public class ServiceFlow {
@ -166,7 +167,7 @@ public class ProxyService {
            header.put("event", message.getHead("event"));
            ProducerTemplate producerTemplate = camelContext.createProducerTemplate();
            producerTemplate.sendBodyAndHeaders(ServiceFlowConstant.SHELL_EVENT_SERVICE, message.getBodyString(), header);
            producerTemplate.sendBodyAndHeaders(EndPointConstant.CAMEL_ENDPOINT, message.getBodyString(), header);
        }
        public void start() {

+ 0 - 8
hos-arbiter/src/main/resources/application.yml

@ -23,8 +23,6 @@ arbiter:
    period: 10000
  central:
    url:
    port: 15555
    store: ./store
  terminal:
    url: 192.168.131.119:15555
  tenant:
@ -51,8 +49,6 @@ arbiter:
    period: 10000
  central:
    url:
    port: 15555
    store: ./store
  terminal:
    url: 192.168.131.119:15555
  tenant:
@ -79,8 +75,6 @@ arbiter:
      period: 10000
  central:
    url:
    port: 15555
    store: ./store
  terminal:
    url: 192.168.131.38:15555
  tenant:
@ -105,8 +99,6 @@ arbiter:
      period: 10000
  central:
    url: 192.168.131.119:15555
    port: 15555
    store: ./store
  terminal:
    url: 192.168.131.38:15555
  tenant:

+ 4 - 0
hos-web-framework/src/main/java/com/yihu/hos/web/framework/constant/EndPointConstant.java

@ -9,4 +9,8 @@ public interface EndPointConstant {
    String ACTIVE_MQ = "runtime.endpoint";   //active-mq消息,arbiter到broker的通讯
    String CAMEL_COMPONENT = "event.endpoint";  //camel组件命名
    String CAMEL_ENDPOINT = CAMEL_COMPONENT + ":queue:" + ACTIVE_MQ;   //camel的Endpoint
    String ADD_ENDPOINT = "addEndpoint";
    String DELETE_ENDPOINT = "deleteEndpoint";
    String OFF_SERVER = "offServer";
}

+ 11 - 2
hos-web-framework/src/main/java/com/yihu/hos/web/framework/model/bo/Endpoint.java

@ -14,11 +14,11 @@ import java.util.Date;
public class Endpoint {
    @Id
    private String id;
    @Indexed
    @Indexed(unique = true)
    private String code;
    private String name;
    private String endpoint;
//    @Indexed(name = "updateTime_1", expireAfterSeconds = 30)
    //    @Indexed(name = "updateTime_1", expireAfterSeconds = 30)
    private Date updateTime;
    private String event;
    private Integer healthCheckType;
@ -26,6 +26,7 @@ public class Endpoint {
    private Integer metricsType;
    private String metricsURL;
    private Date createTime;
    private boolean active;
    public Endpoint() {
        this.createTime = DateUtil.getSysDateTime();
@ -118,4 +119,12 @@ public class Endpoint {
    public void setCode(String code) {
        this.code = code;
    }
    public boolean isActive() {
        return active;
    }
    public void setActive(boolean active) {
        this.active = active;
    }
}

+ 57 - 14
src/main/java/com/yihu/hos/system/service/AppManager.java

@ -1,20 +1,30 @@
package com.yihu.hos.system.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yihu.hos.common.constants.ContextAttributes;
import com.yihu.hos.config.MongoConfig;
import com.yihu.hos.core.datatype.StringUtil;
import com.yihu.hos.core.encrypt.DES;
import com.yihu.hos.core.log.Logger;
import com.yihu.hos.core.log.LoggerFactory;
import com.yihu.hos.interceptor.LocalContext;
import com.yihu.hos.system.dao.AppDao;
import com.yihu.hos.system.dao.AppServiceDao;
import com.yihu.hos.system.model.SystemApp;
import com.yihu.hos.system.model.SystemServiceEndpoint;
import com.yihu.hos.web.framework.constant.EndPointConstant;
import com.yihu.hos.web.framework.model.Result;
import com.yihu.hos.web.framework.util.GridFSUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.multipart.commons.CommonsMultipartFile;
import org.zbus.broker.ZbusBroker;
import org.zbus.mq.Producer;
import org.zbus.net.http.Message;
import javax.annotation.Resource;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;
import java.util.UUID;
@ -26,7 +36,7 @@ import java.util.UUID;
@Service("appManager")
public class AppManager {
    public static final String BEAN_ID = "appManager";
    static final Logger logger = LoggerFactory.getLogger(AppManager.class);
    @Resource(name = AppDao.BEAN_ID)
    private AppDao appDao;
@ -36,14 +46,23 @@ public class AppManager {
    @Autowired
    private MongoConfig mongoConfig;
   
    @Autowired
    private ObjectMapper objectMapper;
    private ZbusBroker zbusBroker;
    public void setZbusBroker(ZbusBroker zbusBroker) {
        this.zbusBroker = zbusBroker;
    }
    public Result getAppList(Map<String, Object> params) throws Exception {
        return appDao.getAppList(params);
    }
   
    public SystemApp getAppById(String id) throws Exception {
        return appDao.getEntity(SystemApp.class,id);
        return appDao.getEntity(SystemApp.class, id);
    }
    @Transactional
@ -79,14 +98,13 @@ public class AppManager {
    /* ==============================服务模块=================================  */
   
    public Result getAppServiceList(Map<String, Object> params) throws Exception {
        return appServiceDao.getAppServiceList(params);
    }
   
    public SystemServiceEndpoint getAppServiceById(String id) throws Exception {
        SystemServiceEndpoint serviceEndpoint = appServiceDao.getEntity(SystemServiceEndpoint.class,id);
        SystemServiceEndpoint serviceEndpoint = appServiceDao.getEntity(SystemServiceEndpoint.class, id);
        serviceEndpoint.setRequesModule(serviceEndpoint.getRequesModule().replaceAll("\n", "<br>"));
        serviceEndpoint.setResponeResult(serviceEndpoint.getResponeResult().replaceAll("\n", "<br>"));
        serviceEndpoint.setResponeError(serviceEndpoint.getResponeError().replaceAll("\n", "<br>"));
@ -95,8 +113,9 @@ public class AppManager {
    }
    @Transactional
    public Result addAppService(SystemServiceEndpoint obj) throws Exception {
        appServiceDao.saveEntity(obj);
    public Result addAppService(SystemServiceEndpoint endpoint) throws Exception {
        appServiceDao.saveEntity(endpoint);
        this.sendMsg(EndPointConstant.ADD_ENDPOINT, endpoint);
        return Result.success("保存成功");
    }
@ -126,18 +145,18 @@ public class AppManager {
    @Transactional
    public Result deleteAppService(String id) throws Exception {
        SystemServiceEndpoint systemApp = appServiceDao.getEntity(SystemServiceEndpoint.class, id);
        appServiceDao.deleteEntity(systemApp);
        SystemServiceEndpoint endpoint = appServiceDao.getEntity(SystemServiceEndpoint.class, id);
        appServiceDao.deleteEntity(endpoint);
        this.sendMsg(EndPointConstant.DELETE_ENDPOINT, endpoint);
        return Result.success("删除成功");
    }
   
    public Result uploadFile(CommonsMultipartFile file) {
        String newFileName;
        try {
            String fileName = UUID.randomUUID() + file.getFileItem().getName();
            newFileName = GridFSUtil.uploadFile(file.getInputStream() ,fileName,null);
            newFileName = GridFSUtil.uploadFile(file.getInputStream(), fileName, null);
            if (!StringUtil.isEmpty(newFileName)) {
                return Result.success(DES.encrypt(newFileName, DES.COMMON_PASSWORD));
            }
@ -147,7 +166,7 @@ public class AppManager {
        return Result.error("上传失败");
    }
   
    public Result readFile(OutputStream os, String fileName) {
        String dbName = "upload";
        try {
@ -159,4 +178,28 @@ public class AppManager {
        }
        return Result.error("读取失败");
    }
    private void sendMsg(String event, SystemServiceEndpoint endpoint) {
        if (zbusBroker == null) {
            logger.error("zbusBroker is null.");
            return;
        }
        try {
            String msg = objectMapper.writeValueAsString(endpoint);
            String tenant = LocalContext.getContext().getAttachment(ContextAttributes.TENANT_NAME);
            Producer producer = new Producer(zbusBroker, EndPointConstant.ZBUS_MQ + "@" + tenant);
            producer.createMQ();    //确定为创建消息队列需要显示调用
            Message message = new Message();
            message.setHead("event", event);
            message.setHead("tenant", tenant);
            message.setMethod("POST");
            message.setBody(msg);
            producer.sendSync(message);
        } catch (IOException | InterruptedException e) {
            logger.error(e.getMessage());
            e.printStackTrace();
        }
    }
}