Airhead пре 8 година
родитељ
комит
8e024ceb2e

+ 27 - 17
hos-arbiter/src/main/java/com/yihu/hos/arbiter/controllers/BrokerServerController.java

@ -2,9 +2,9 @@ package com.yihu.hos.arbiter.controllers;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yihu.hos.web.framework.model.bo.BrokerServer;
import com.yihu.hos.arbiter.services.BrokerServerService;
import com.yihu.hos.core.datatype.DateUtil;
import com.yihu.hos.web.framework.model.bo.BrokerServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
@ -21,18 +21,6 @@ public class BrokerServerController {
    @Autowired
    private BrokerServerService brokerServerService;
    @RequestMapping(method = RequestMethod.POST)
    public void save(String brokerServer) {
        try {
            ObjectMapper objectMapper = new ObjectMapper();
            BrokerServer server = objectMapper.readValue(brokerServer, BrokerServer.class);
            server.setCreateTime(DateUtil.getSysDateTime());
            brokerServerService.save(server);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    @RequestMapping(method = RequestMethod.GET)
    public String get() {
        try {
@ -50,16 +38,38 @@ public class BrokerServerController {
        return "{}";
    }
    @RequestMapping(method = RequestMethod.DELETE)
    public String delete(String brokerServer) {
    @RequestMapping(value = "login", method = RequestMethod.POST)
    public void login(String brokerServer) {
        try {
            ObjectMapper objectMapper = new ObjectMapper();
            BrokerServer server = objectMapper.readValue(brokerServer, BrokerServer.class);
            brokerServerService.delete(server);
            server.setCreateTime(DateUtil.getSysDateTime());
            brokerServerService.login(server);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
        return "{}";
    @RequestMapping(value = "heartbeat", method = RequestMethod.POST)
    public void heartbeat(String brokerServer) {
        try {
            ObjectMapper objectMapper = new ObjectMapper();
            BrokerServer server = objectMapper.readValue(brokerServer, BrokerServer.class);
            brokerServerService.heartbeat(server);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    @RequestMapping(value = "logout", method = RequestMethod.POST)
    public void logout(String brokerServer) {
        try {
            ObjectMapper objectMapper = new ObjectMapper();
            BrokerServer server = objectMapper.readValue(brokerServer, BrokerServer.class);
            brokerServerService.logout(server);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

+ 54 - 59
hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/BrokerServerService.java

@ -2,11 +2,8 @@ package com.yihu.hos.arbiter.services;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mongodb.WriteResult;
import com.yihu.hos.web.framework.model.bo.BrokerServer;
import com.yihu.hos.core.http.HTTPResponse;
import com.yihu.hos.core.http.HttpClientKit;
import com.yihu.hos.web.framework.constant.ServiceFlowConstant;
import com.yihu.hos.web.framework.model.bo.BrokerServer;
import com.yihu.hos.web.framework.model.bo.ServiceFlow;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
@ -40,53 +37,6 @@ public class BrokerServerService {
    @Autowired
    private ObjectMapper objectMapper;
    public void save(BrokerServer brokerServer) {
        Query query = new Query();
        query.addCriteria(Criteria.where("hostName").is(brokerServer.getHostName()));
        query.addCriteria(Criteria.where("hostAddress").is(brokerServer.getHostAddress()));
        query.addCriteria(Criteria.where("port").is(brokerServer.getPort()));
        Update update = new Update();
        update.set("tenant", brokerServer.getTenant());
        update.set("hostName", brokerServer.getHostName());
        update.set("hostAddress", brokerServer.getHostAddress());
        update.set("port", brokerServer.getPort());
        Date updateTime = brokerServer.getUpdateTime() == null ? new Date(): brokerServer.getUpdateTime();
        update.set("updateTime", updateTime);
        update.set("enable", brokerServer.isEnable());
        if (brokerServer.getServiceFlows()!=null) {
            for (BrokerServer.ServiceFlow serviceFlow : brokerServer.getServiceFlows()) {
                update.addToSet("serviceFlows", serviceFlow);
            }
        }
        WriteResult writeResult = mongoOperations.upsert(query, update, BrokerServer.class);
        if (writeResult.isUpdateOfExisting()) {
            //避免Broker重启的情况
            HTTPResponse response = HttpClientKit.get(brokerServer.getURL() + "/esb/heartbeat");
            if (response.getStatusCode() == 200 && brokerServer.isRegistered()) {
                return;
            }
            //确保Broker已经启动了流程
            brokerServer = mongoOperations.findOne(query, BrokerServer.class);
            if (brokerServer.getServiceFlows() != null) {
                return;
            }
        }
        //没有启动流程
        try {
            String msg = objectMapper.writeValueAsString(brokerServer);
            ProducerTemplate producerTemplate = createProducerTemplate();
            Map<String, Object> header = new HashMap<>();
            header.put("event", ServiceFlowConstant.BROKER_SERVER_ON);
            producerTemplate.sendBodyAndHeaders(ServiceFlowConstant.CAMEL_ENDPOINT, msg, header);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        }
    }
    /**
     * 暂不提供动态均衡算法,只是随机返回一个。
     *
@ -99,10 +49,6 @@ public class BrokerServerService {
        return mongoOperations.findOne(query, BrokerServer.class);
    }
    public void delete(BrokerServer brokerServer) {
        mongoOperations.remove(brokerServer);
    }
    public List<BrokerServer> get(boolean one) {
        if (one) {
            BrokerServer brokerServer = get();
@ -124,10 +70,6 @@ public class BrokerServerService {
        return mongoOperations.find(query, BrokerServer.class);
    }
    public void flowStarted(String msg) {
    }
    public void addServiceFlow(BrokerServer brokerServer, ServiceFlow serviceFlow) {
        BrokerServer.ServiceFlow flow = new BrokerServer.ServiceFlow();
        flow.setFlowId(serviceFlow.getId());
@ -167,6 +109,59 @@ public class BrokerServerService {
        mongoOperations.upsert(query, update, BrokerServer.class);
    }
    public void login(BrokerServer brokerServer) {
        Query query = new Query();
        query.addCriteria(Criteria.where("hostName").is(brokerServer.getHostName()));
        query.addCriteria(Criteria.where("hostAddress").is(brokerServer.getHostAddress()));
        query.addCriteria(Criteria.where("port").is(brokerServer.getPort()));
        Update update = new Update();
        update.set("tenant", brokerServer.getTenant());
        update.set("hostName", brokerServer.getHostName());
        update.set("hostAddress", brokerServer.getHostAddress());
        update.set("port", brokerServer.getPort());
        Date updateTime = brokerServer.getUpdateTime() == null ? new Date() : brokerServer.getUpdateTime();
        update.set("updateTime", updateTime);
        update.set("enable", brokerServer.isEnable());
        mongoOperations.upsert(query, update, BrokerServer.class);
        //没有启动流程
        try {
            String msg = objectMapper.writeValueAsString(brokerServer);
            ProducerTemplate producerTemplate = createProducerTemplate();
            Map<String, Object> header = new HashMap<>();
            header.put("event", ServiceFlowConstant.BROKER_SERVER_ON);
            producerTemplate.sendBodyAndHeaders(ServiceFlowConstant.CAMEL_ENDPOINT, msg, header);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        }
    }
    public void logout(BrokerServer brokerServer) {
        Query query = new Query();
        query.addCriteria(Criteria.where("hostName").is(brokerServer.getHostName()));
        query.addCriteria(Criteria.where("hostAddress").is(brokerServer.getHostAddress()));
        query.addCriteria(Criteria.where("port").is(brokerServer.getPort()));
        BrokerServer server = mongoOperations.findOne(query, BrokerServer.class);
        mongoOperations.remove(server);
    }
    public void heartbeat(BrokerServer brokerServer) {
        Query query = new Query();
        query.addCriteria(Criteria.where("hostName").is(brokerServer.getHostName()));
        query.addCriteria(Criteria.where("hostAddress").is(brokerServer.getHostAddress()));
        query.addCriteria(Criteria.where("port").is(brokerServer.getPort()));
        BrokerServer server = mongoOperations.findOne(query, BrokerServer.class);
        if (server != null) {
            server.setUpdateTime(new Date());
            mongoOperations.save(server);
        }
    }
    private ProducerTemplate createProducerTemplate() {
        if (producerTemplate == null) {
            producerTemplate = camelContext.createProducerTemplate();

+ 1 - 1
hos-broker/src/main/java/com/yihu/hos/broker/common/camelrouter/BrokerServerRouter.java

@ -17,7 +17,7 @@ public class BrokerServerRouter extends RouteBuilder {
    public void configure() throws Exception {
        if (arbiterConfiguration.getEnable()) {
            from("timer:online?period={{hos.timer.period}}")
                    .to("bean:brokerServerService?method=online");
                    .to("bean:brokerServerService?method=heartbeat");
        }
    }
}

+ 31 - 7
hos-broker/src/main/java/com/yihu/hos/broker/services/BrokerServerService.java

@ -3,7 +3,6 @@ package com.yihu.hos.broker.services;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.yihu.hos.broker.configurations.ArbiterConfiguration;
import com.yihu.hos.core.http.HTTPResponse;
import com.yihu.hos.core.http.HttpClientKit;
import com.yihu.hos.core.net.IPChoiceUtils;
import org.apache.camel.util.InetAddressUtil;
@ -26,7 +25,6 @@ import java.util.Map;
 */
@Component("brokerServerService")
public class BrokerServerService {
    private static boolean registered = false;  //区分是否第一次
    @Autowired
    private ArbiterConfiguration arbiterConfiguration;
@ -53,15 +51,41 @@ public class BrokerServerService {
            objectNode.put("hostAddress", hostAddress);
            objectNode.put("port", port);
            objectNode.put("enable", true);
            objectNode.put("registered", registered);
            String brokerServer = objectMapper.writeValueAsString(objectNode);
            Map<String, String> params = new HashMap<>();
            params.put("brokerServer", brokerServer);
            HTTPResponse httpResponse = HttpClientKit.post(arbiterConfiguration.getServer() + "/brokerServer", params);
            if (httpResponse.getStatusCode() == 200) {
                registered = true;
            HttpClientKit.post(arbiterConfiguration.getServer() + "/brokerServer/online", params);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public void heartbeat() {
        try {
            hostName = InetAddressUtil.getLocalHostName();
            String host = System.getenv("host");
            if (host == null) {
                hostAddress = IPChoiceUtils.getSourceIP();
            } else {    //使用Docker方式时按固定的方式配置
                hostAddress = host;
            }
            port = 8099;    //目前先固定下来
            ObjectMapper objectMapper = new ObjectMapper();
            ObjectNode objectNode = objectMapper.createObjectNode();
            objectNode.put("tenant", arbiterConfiguration.getTenant());
            objectNode.put("hostName", hostName);
            objectNode.put("hostAddress", hostAddress);
            objectNode.put("port", port);
            objectNode.put("enable", true);
            String brokerServer = objectMapper.writeValueAsString(objectNode);
            Map<String, String> params = new HashMap<>();
            params.put("brokerServer", brokerServer);
            HttpClientKit.post(arbiterConfiguration.getServer() + "/brokerServer/heartbeat", params);
        } catch (IOException e) {
            e.printStackTrace();
@ -81,7 +105,7 @@ public class BrokerServerService {
            objectNode.put("enable", false);
            String brokerServer = objectMapper.writeValueAsString(objectNode);
            URI uri = new URIBuilder(arbiterConfiguration.getServer() + "/brokerServer")
            URI uri = new URIBuilder(arbiterConfiguration.getServer() + "/brokerServer/offline")
                    .addParameter("brokerServer", brokerServer)
                    .build();