浏览代码

Merge branch 'master' of http://192.168.1.220:10080/esb/esb

huangzhiyong 8 年之前
父节点
当前提交
869246fb1c

+ 27 - 17
hos-arbiter/src/main/java/com/yihu/hos/arbiter/controllers/BrokerServerController.java

@ -2,9 +2,9 @@ package com.yihu.hos.arbiter.controllers;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yihu.hos.web.framework.model.bo.BrokerServer;
import com.yihu.hos.arbiter.services.BrokerServerService;
import com.yihu.hos.core.datatype.DateUtil;
import com.yihu.hos.web.framework.model.bo.BrokerServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
@ -21,18 +21,6 @@ public class BrokerServerController {
    @Autowired
    private BrokerServerService brokerServerService;
    @RequestMapping(method = RequestMethod.POST)
    public void save(String brokerServer) {
        try {
            ObjectMapper objectMapper = new ObjectMapper();
            BrokerServer server = objectMapper.readValue(brokerServer, BrokerServer.class);
            server.setCreateTime(DateUtil.getSysDateTime());
            brokerServerService.save(server);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    @RequestMapping(method = RequestMethod.GET)
    public String get() {
        try {
@ -50,16 +38,38 @@ public class BrokerServerController {
        return "{}";
    }
    @RequestMapping(method = RequestMethod.DELETE)
    public String delete(String brokerServer) {
    @RequestMapping(value = "login", method = RequestMethod.POST)
    public void login(String brokerServer) {
        try {
            ObjectMapper objectMapper = new ObjectMapper();
            BrokerServer server = objectMapper.readValue(brokerServer, BrokerServer.class);
            brokerServerService.delete(server);
            server.setCreateTime(DateUtil.getSysDateTime());
            brokerServerService.login(server);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
        return "{}";
    @RequestMapping(value = "heartbeat", method = RequestMethod.POST)
    public void heartbeat(String brokerServer) {
        try {
            ObjectMapper objectMapper = new ObjectMapper();
            BrokerServer server = objectMapper.readValue(brokerServer, BrokerServer.class);
            brokerServerService.heartbeat(server);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    @RequestMapping(value = "logout", method = RequestMethod.POST)
    public void logout(String brokerServer) {
        try {
            ObjectMapper objectMapper = new ObjectMapper();
            BrokerServer server = objectMapper.readValue(brokerServer, BrokerServer.class);
            brokerServerService.logout(server);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

+ 54 - 59
hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/BrokerServerService.java

@ -2,11 +2,8 @@ package com.yihu.hos.arbiter.services;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mongodb.WriteResult;
import com.yihu.hos.web.framework.model.bo.BrokerServer;
import com.yihu.hos.core.http.HTTPResponse;
import com.yihu.hos.core.http.HttpClientKit;
import com.yihu.hos.web.framework.constant.ServiceFlowConstant;
import com.yihu.hos.web.framework.model.bo.BrokerServer;
import com.yihu.hos.web.framework.model.bo.ServiceFlow;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
@ -40,53 +37,6 @@ public class BrokerServerService {
    @Autowired
    private ObjectMapper objectMapper;
    public void save(BrokerServer brokerServer) {
        Query query = new Query();
        query.addCriteria(Criteria.where("hostName").is(brokerServer.getHostName()));
        query.addCriteria(Criteria.where("hostAddress").is(brokerServer.getHostAddress()));
        query.addCriteria(Criteria.where("port").is(brokerServer.getPort()));
        Update update = new Update();
        update.set("tenant", brokerServer.getTenant());
        update.set("hostName", brokerServer.getHostName());
        update.set("hostAddress", brokerServer.getHostAddress());
        update.set("port", brokerServer.getPort());
        Date updateTime = brokerServer.getUpdateTime() == null ? new Date(): brokerServer.getUpdateTime();
        update.set("updateTime", updateTime);
        update.set("enable", brokerServer.isEnable());
        if (brokerServer.getServiceFlows()!=null) {
            for (BrokerServer.ServiceFlow serviceFlow : brokerServer.getServiceFlows()) {
                update.addToSet("serviceFlows", serviceFlow);
            }
        }
        WriteResult writeResult = mongoOperations.upsert(query, update, BrokerServer.class);
        if (writeResult.isUpdateOfExisting()) {
            //避免Broker重启的情况
            HTTPResponse response = HttpClientKit.get(brokerServer.getURL() + "/esb/heartbeat");
            if (response.getStatusCode() == 200 && brokerServer.isRegistered()) {
                return;
            }
            //确保Broker已经启动了流程
            brokerServer = mongoOperations.findOne(query, BrokerServer.class);
            if (brokerServer.getServiceFlows() != null) {
                return;
            }
        }
        //没有启动流程
        try {
            String msg = objectMapper.writeValueAsString(brokerServer);
            ProducerTemplate producerTemplate = createProducerTemplate();
            Map<String, Object> header = new HashMap<>();
            header.put("event", ServiceFlowConstant.BROKER_SERVER_ON);
            producerTemplate.sendBodyAndHeaders(ServiceFlowConstant.CAMEL_ENDPOINT, msg, header);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        }
    }
    /**
     * 暂不提供动态均衡算法,只是随机返回一个。
     *
@ -99,10 +49,6 @@ public class BrokerServerService {
        return mongoOperations.findOne(query, BrokerServer.class);
    }
    public void delete(BrokerServer brokerServer) {
        mongoOperations.remove(brokerServer);
    }
    public List<BrokerServer> get(boolean one) {
        if (one) {
            BrokerServer brokerServer = get();
@ -124,10 +70,6 @@ public class BrokerServerService {
        return mongoOperations.find(query, BrokerServer.class);
    }
    public void flowStarted(String msg) {
    }
    public void addServiceFlow(BrokerServer brokerServer, ServiceFlow serviceFlow) {
        BrokerServer.ServiceFlow flow = new BrokerServer.ServiceFlow();
        flow.setFlowId(serviceFlow.getId());
@ -167,6 +109,59 @@ public class BrokerServerService {
        mongoOperations.upsert(query, update, BrokerServer.class);
    }
    public void login(BrokerServer brokerServer) {
        Query query = new Query();
        query.addCriteria(Criteria.where("hostName").is(brokerServer.getHostName()));
        query.addCriteria(Criteria.where("hostAddress").is(brokerServer.getHostAddress()));
        query.addCriteria(Criteria.where("port").is(brokerServer.getPort()));
        Update update = new Update();
        update.set("tenant", brokerServer.getTenant());
        update.set("hostName", brokerServer.getHostName());
        update.set("hostAddress", brokerServer.getHostAddress());
        update.set("port", brokerServer.getPort());
        Date updateTime = brokerServer.getUpdateTime() == null ? new Date() : brokerServer.getUpdateTime();
        update.set("updateTime", updateTime);
        update.set("createTime", new Date());
        update.set("enable", brokerServer.isEnable());
        mongoOperations.upsert(query, update, BrokerServer.class);
        try {
            String msg = objectMapper.writeValueAsString(brokerServer);
            ProducerTemplate producerTemplate = createProducerTemplate();
            Map<String, Object> header = new HashMap<>();
            header.put("event", ServiceFlowConstant.BROKER_SERVER_ON);
            producerTemplate.sendBodyAndHeaders(ServiceFlowConstant.CAMEL_ENDPOINT, msg, header);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        }
    }
    public void logout(BrokerServer brokerServer) {
        Query query = new Query();
        query.addCriteria(Criteria.where("hostName").is(brokerServer.getHostName()));
        query.addCriteria(Criteria.where("hostAddress").is(brokerServer.getHostAddress()));
        query.addCriteria(Criteria.where("port").is(brokerServer.getPort()));
        BrokerServer server = mongoOperations.findOne(query, BrokerServer.class);
        mongoOperations.remove(server);
    }
    public void heartbeat(BrokerServer brokerServer) {
        Query query = new Query();
        query.addCriteria(Criteria.where("hostName").is(brokerServer.getHostName()));
        query.addCriteria(Criteria.where("hostAddress").is(brokerServer.getHostAddress()));
        query.addCriteria(Criteria.where("port").is(brokerServer.getPort()));
        BrokerServer server = mongoOperations.findOne(query, BrokerServer.class);
        if (server != null) {
            server.setUpdateTime(new Date());
            mongoOperations.save(server);
        }
    }
    private ProducerTemplate createProducerTemplate() {
        if (producerTemplate == null) {
            producerTemplate = camelContext.createProducerTemplate();

+ 1 - 1
hos-broker/src/main/java/com/yihu/hos/broker/HosBrokerApplication.java

@ -23,6 +23,7 @@ import java.util.concurrent.Executors;
public class HosBrokerApplication extends SpringBootServletInitializer implements CommandLineRunner {
    @Autowired
    private GridFsOperations operations;
    @Autowired
    private CamelStartBoot camelStartBoot;
    private ExecutorService executorService = Executors.newSingleThreadExecutor();
@ -35,7 +36,6 @@ public class HosBrokerApplication extends SpringBootServletInitializer implement
    public void run(String... strings) throws Exception {
        GridFSUtil.gridFsOperations = operations;
        executorService.execute(() -> {
            camelStartBoot = new CamelStartBoot();
            camelStartBoot.start();
        });

+ 2 - 2
hos-broker/src/main/java/com/yihu/hos/broker/common/camelrouter/BrokerServerRouter.java

@ -16,8 +16,8 @@ public class BrokerServerRouter extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        if (arbiterConfiguration.getEnable()) {
            from("timer:online?period={{hos.timer.period}}")
                    .to("bean:brokerServerService?method=online");
            from("timer:heartbeat?period={{hos.timer.period}}")
                    .to("bean:brokerServerService?method=heartbeat");
        }
    }
}

+ 33 - 9
hos-broker/src/main/java/com/yihu/hos/broker/services/BrokerServerService.java

@ -3,7 +3,6 @@ package com.yihu.hos.broker.services;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.yihu.hos.broker.configurations.ArbiterConfiguration;
import com.yihu.hos.core.http.HTTPResponse;
import com.yihu.hos.core.http.HttpClientKit;
import com.yihu.hos.core.net.IPChoiceUtils;
import org.apache.camel.util.InetAddressUtil;
@ -26,7 +25,6 @@ import java.util.Map;
 */
@Component("brokerServerService")
public class BrokerServerService {
    private static boolean registered = false;  //区分是否第一次
    @Autowired
    private ArbiterConfiguration arbiterConfiguration;
@ -34,7 +32,7 @@ public class BrokerServerService {
    private String hostAddress;
    private int port;
    public void online() {
    public void login() {
        try {
            hostName = InetAddressUtil.getLocalHostName();
@ -53,15 +51,41 @@ public class BrokerServerService {
            objectNode.put("hostAddress", hostAddress);
            objectNode.put("port", port);
            objectNode.put("enable", true);
            objectNode.put("registered", registered);
            String brokerServer = objectMapper.writeValueAsString(objectNode);
            Map<String, String> params = new HashMap<>();
            params.put("brokerServer", brokerServer);
            HTTPResponse httpResponse = HttpClientKit.post(arbiterConfiguration.getServer() + "/brokerServer", params);
            if (httpResponse.getStatusCode() == 200) {
                registered = true;
            HttpClientKit.post(arbiterConfiguration.getServer() + "/brokerServer/login", params);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public void heartbeat() {
        try {
            hostName = InetAddressUtil.getLocalHostName();
            String host = System.getenv("host");
            if (host == null) {
                hostAddress = IPChoiceUtils.getSourceIP();
            } else {    //使用Docker方式时按固定的方式配置
                hostAddress = host;
            }
            port = 8099;    //目前先固定下来
            ObjectMapper objectMapper = new ObjectMapper();
            ObjectNode objectNode = objectMapper.createObjectNode();
            objectNode.put("tenant", arbiterConfiguration.getTenant());
            objectNode.put("hostName", hostName);
            objectNode.put("hostAddress", hostAddress);
            objectNode.put("port", port);
            objectNode.put("enable", true);
            String brokerServer = objectMapper.writeValueAsString(objectNode);
            Map<String, String> params = new HashMap<>();
            params.put("brokerServer", brokerServer);
            HttpClientKit.post(arbiterConfiguration.getServer() + "/brokerServer/heartbeat", params);
        } catch (IOException e) {
            e.printStackTrace();
@ -71,7 +95,7 @@ public class BrokerServerService {
    /**
     * brokerServer下线时通知
     */
    public void offline() {
    public void logout() {
        try {
            ObjectMapper objectMapper = new ObjectMapper();
            ObjectNode objectNode = objectMapper.createObjectNode();
@ -81,7 +105,7 @@ public class BrokerServerService {
            objectNode.put("enable", false);
            String brokerServer = objectMapper.writeValueAsString(objectNode);
            URI uri = new URIBuilder(arbiterConfiguration.getServer() + "/brokerServer")
            URI uri = new URIBuilder(arbiterConfiguration.getServer() + "/brokerServer/logout")
                    .addParameter("brokerServer", brokerServer)
                    .build();

+ 19 - 10
hos-broker/src/main/java/com/yihu/hos/broker/services/camel/CamelStartBoot.java

@ -5,6 +5,7 @@ import org.apache.camel.builder.RouteBuilder;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.net.URL;
import java.net.URLClassLoader;
@ -16,6 +17,7 @@ import java.util.concurrent.SynchronousQueue;
 * @author Airhead
 * @since 2016/12/9.
 */
@Component
public class CamelStartBoot {
    private static Logger logger = LogManager.getLogger(ESBCamelService.class);
@ -41,26 +43,33 @@ public class CamelStartBoot {
            SystemCamelContext.getContext().setApplicationContextClassLoader(camelClassLoader);
            logger.info("Apache Camel Context 启动完成...");
            brokerServerService.login();
            //如果没有收到其它线程的加载请求,主线程将停止在这里,所以该while之后不能有其他业务逻辑
            SynchronousQueue<String> camelContextOperateQueue = SystemCamelContext.getQueue();
            String className = null;
            // 如果没有收到其它线程的加载请求,主线程将停止在这里
            String className;
            while ((className = camelContextOperateQueue.take()) != null) {
                Class<RouteBuilder> routeBuilderClass = (Class<RouteBuilder>) camelClassLoader.loadClass(className);
                if (routeBuilderClass != null) {
                    RouteBuilder routeBuilder = routeBuilderClass.newInstance();
                    SystemCamelContext.getContext().addRoutes(routeBuilder);
                try{
                    Class<RouteBuilder> routeBuilderClass = (Class<RouteBuilder>) camelClassLoader.loadClass(className);
                    if (routeBuilderClass != null) {
                        RouteBuilder routeBuilder = routeBuilderClass.newInstance();
                        SystemCamelContext.getContext().addRoutes(routeBuilder);
                    }
                }
                catch (ClassNotFoundException e){
                    e.printStackTrace();
                    logger.error("加载数据Class失败。");
                }
            }
            brokerServerService.online();
        } catch (Exception e) {
            e.printStackTrace();
            logger.error("Apache Camel Context 启动失败...");
            logger.error("Apache Camel Context 启动失败。");
        }
    }
    public void shutdown() {
        brokerServerService.offline();
        brokerServerService.logout();
    }
}

+ 8 - 0
hos-camel2/pom.xml

@ -73,6 +73,14 @@
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-restlet</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-jms</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
        </dependency>
    </dependencies>

+ 15 - 0
hos-camel2/src/main/java/camel/gatewaylog/processor/LogProcesser.java

@ -0,0 +1,15 @@
package camel.gatewaylog.processor;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
/**
 * @author Airhead
 * @since 2017/3/30.
 */
public class LogProcesser implements Processor {
    @Override
    public void process(Exchange exchange) throws Exception {
    }
}

+ 27 - 0
hos-camel2/src/main/java/camel/gatewaylog/route/LogRouterBuilder.java

@ -0,0 +1,27 @@
package camel.gatewaylog.route;
import camel.gatewaylog.processor.LogProcesser;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.model.ModelCamelContext;
import javax.jms.ConnectionFactory;
/**
 * @author Airhead
 * @since 2017/3/30.
 */
public class LogRouterBuilder extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        ModelCamelContext context = this.getContext();
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                "admin", "admin", "tcp://172.19.103.57:61616");
        // Note we can explicit name the component
        context.addComponent("business-log-2", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
        from("business-log-2:topic:business.log.queue")
                .process(new LogProcesser())
                .to("file://e:/log");
    }
}

+ 5 - 9
hos-parent/pom.xml

@ -61,15 +61,6 @@
                        <showDeprecation>true</showDeprecation>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-eclipse-plugin</artifactId>
                    <version>2.9</version>
                    <configuration>
                        <downloadSources>true</downloadSources>
                        <downloadJavadocs>false</downloadJavadocs>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-resources-plugin</artifactId>
@ -192,6 +183,11 @@
                <artifactId>camel-jetty</artifactId>
                <version>${camel-version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.camel</groupId>
                <artifactId>camel-jms</artifactId>
                <version>${camel-version}</version>
            </dependency>
            <!-- ActiveMQ -->
            <dependency>