瀏覽代碼

调整Broker注册逻辑

airhead 8 年之前
父節點
當前提交
6eb4604b18

+ 1 - 1
hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/BrokerServerService.java

@ -122,11 +122,11 @@ public class BrokerServerService {
        update.set("port", brokerServer.getPort());
        Date updateTime = brokerServer.getUpdateTime() == null ? new Date() : brokerServer.getUpdateTime();
        update.set("updateTime", updateTime);
        update.set("createTime", new Date());
        update.set("enable", brokerServer.isEnable());
        mongoOperations.upsert(query, update, BrokerServer.class);
        //没有启动流程
        try {
            String msg = objectMapper.writeValueAsString(brokerServer);
            ProducerTemplate producerTemplate = createProducerTemplate();

+ 1 - 1
hos-broker/src/main/java/com/yihu/hos/broker/HosBrokerApplication.java

@ -23,6 +23,7 @@ import java.util.concurrent.Executors;
public class HosBrokerApplication extends SpringBootServletInitializer implements CommandLineRunner {
    @Autowired
    private GridFsOperations operations;
    @Autowired
    private CamelStartBoot camelStartBoot;
    private ExecutorService executorService = Executors.newSingleThreadExecutor();
@ -35,7 +36,6 @@ public class HosBrokerApplication extends SpringBootServletInitializer implement
    public void run(String... strings) throws Exception {
        GridFSUtil.gridFsOperations = operations;
        executorService.execute(() -> {
            camelStartBoot = new CamelStartBoot();
            camelStartBoot.start();
        });

+ 1 - 1
hos-broker/src/main/java/com/yihu/hos/broker/common/camelrouter/BrokerServerRouter.java

@ -16,7 +16,7 @@ public class BrokerServerRouter extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        if (arbiterConfiguration.getEnable()) {
            from("timer:online?period={{hos.timer.period}}")
            from("timer:heartbeat?period={{hos.timer.period}}")
                    .to("bean:brokerServerService?method=heartbeat");
        }
    }

+ 4 - 4
hos-broker/src/main/java/com/yihu/hos/broker/services/BrokerServerService.java

@ -32,7 +32,7 @@ public class BrokerServerService {
    private String hostAddress;
    private int port;
    public void online() {
    public void login() {
        try {
            hostName = InetAddressUtil.getLocalHostName();
@ -55,7 +55,7 @@ public class BrokerServerService {
            Map<String, String> params = new HashMap<>();
            params.put("brokerServer", brokerServer);
            HttpClientKit.post(arbiterConfiguration.getServer() + "/brokerServer/online", params);
            HttpClientKit.post(arbiterConfiguration.getServer() + "/brokerServer/login", params);
        } catch (IOException e) {
            e.printStackTrace();
@ -95,7 +95,7 @@ public class BrokerServerService {
    /**
     * brokerServer下线时通知
     */
    public void offline() {
    public void logout() {
        try {
            ObjectMapper objectMapper = new ObjectMapper();
            ObjectNode objectNode = objectMapper.createObjectNode();
@ -105,7 +105,7 @@ public class BrokerServerService {
            objectNode.put("enable", false);
            String brokerServer = objectMapper.writeValueAsString(objectNode);
            URI uri = new URIBuilder(arbiterConfiguration.getServer() + "/brokerServer/offline")
            URI uri = new URIBuilder(arbiterConfiguration.getServer() + "/brokerServer/logout")
                    .addParameter("brokerServer", brokerServer)
                    .build();

+ 19 - 10
hos-broker/src/main/java/com/yihu/hos/broker/services/camel/CamelStartBoot.java

@ -5,6 +5,7 @@ import org.apache.camel.builder.RouteBuilder;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.net.URL;
import java.net.URLClassLoader;
@ -16,6 +17,7 @@ import java.util.concurrent.SynchronousQueue;
 * @author Airhead
 * @since 2016/12/9.
 */
@Component
public class CamelStartBoot {
    private static Logger logger = LogManager.getLogger(ESBCamelService.class);
@ -41,26 +43,33 @@ public class CamelStartBoot {
            SystemCamelContext.getContext().setApplicationContextClassLoader(camelClassLoader);
            logger.info("Apache Camel Context 启动完成...");
            brokerServerService.login();
            //如果没有收到其它线程的加载请求,主线程将停止在这里,所以该while之后不能有其他业务逻辑
            SynchronousQueue<String> camelContextOperateQueue = SystemCamelContext.getQueue();
            String className = null;
            // 如果没有收到其它线程的加载请求,主线程将停止在这里
            String className;
            while ((className = camelContextOperateQueue.take()) != null) {
                Class<RouteBuilder> routeBuilderClass = (Class<RouteBuilder>) camelClassLoader.loadClass(className);
                if (routeBuilderClass != null) {
                    RouteBuilder routeBuilder = routeBuilderClass.newInstance();
                    SystemCamelContext.getContext().addRoutes(routeBuilder);
                try{
                    Class<RouteBuilder> routeBuilderClass = (Class<RouteBuilder>) camelClassLoader.loadClass(className);
                    if (routeBuilderClass != null) {
                        RouteBuilder routeBuilder = routeBuilderClass.newInstance();
                        SystemCamelContext.getContext().addRoutes(routeBuilder);
                    }
                }
                catch (ClassNotFoundException e){
                    e.printStackTrace();
                    logger.error("加载数据Class失败。");
                }
            }
            brokerServerService.online();
        } catch (Exception e) {
            e.printStackTrace();
            logger.error("Apache Camel Context 启动失败...");
            logger.error("Apache Camel Context 启动失败。");
        }
    }
    public void shutdown() {
        brokerServerService.offline();
        brokerServerService.logout();
    }
}