瀏覽代碼

1.调整arbiter中的zbus的事件处理
2.统一消息队列的命名,完成一部分

Airhead 8 年之前
父節點
當前提交
6caa72e724
共有 20 個文件被更改,包括 307 次插入494 次删除
  1. 3 26
      hos-arbiter/src/main/java/com/yihu/hos/arbiter/HosArbiterApplication.java
  2. 1 1
      hos-arbiter/src/main/java/com/yihu/hos/arbiter/routers/ArbiterServerRouter.java
  3. 5 3
      hos-arbiter/src/main/java/com/yihu/hos/arbiter/routers/EndpointEventRouter.java
  4. 1 1
      hos-arbiter/src/main/java/com/yihu/hos/arbiter/routers/MycatRouter.java
  5. 2 2
      hos-arbiter/src/main/java/com/yihu/hos/arbiter/routers/SerivceHealthRouter.java
  6. 3 3
      hos-arbiter/src/main/java/com/yihu/hos/arbiter/routers/ServiceFlowEventRouter.java
  7. 2 2
      hos-arbiter/src/main/java/com/yihu/hos/arbiter/routers/LinuxShellRouter.java
  8. 1 1
      hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/BrokerServerService.java
  9. 20 4
      hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/EndpointService.java
  10. 0 37
      hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/HealthCheckService.java
  11. 0 162
      hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/LinuxShellService.java
  12. 0 86
      hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/MycatProxy.java
  13. 7 64
      hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/MycatService.java
  14. 147 29
      hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/ProxyService.java
  15. 13 13
      hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/ServiceFlowService.java
  16. 82 49
      hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/ShellService.java
  17. 12 0
      hos-web-framework/src/main/java/com/yihu/hos/web/framework/constant/EndPointConstant.java
  18. 5 8
      hos-web-framework/src/main/java/com/yihu/hos/web/framework/constant/ServiceFlowConstant.java
  19. 1 1
      src/main/java/com/yihu/hos/services/ServiceFlowEventService.java
  20. 2 2
      src/main/java/com/yihu/hos/services/ServiceMycatEventService.java

+ 3 - 26
hos-arbiter/src/main/java/com/yihu/hos/arbiter/HosArbiterApplication.java

@ -1,9 +1,6 @@
package com.yihu.hos.arbiter;
import com.yihu.hos.arbiter.configuration.ArbiterServerConfiguration;
import com.yihu.hos.arbiter.services.MycatProxy;
import com.yihu.hos.arbiter.services.ProxyService;
import com.yihu.hos.arbiter.services.ShellService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
@ -14,39 +11,19 @@ import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfigurat
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class, HibernateJpaAutoConfiguration.class})
public class HosArbiterApplication implements CommandLineRunner {
    private ArbiterServerConfiguration configuration;
    private ProxyService proxyService;
    private ShellService shellService;
    private MycatProxy mycatProxy;
    public static void main(String[] args) {
        SpringApplication.run(HosArbiterApplication.class, args);
    }
    @Override
    public void run(String... strings) throws Exception {
        proxyService.start();
        shellService.start();
        mycatProxy.start();
    }
    @Autowired
    public void setConfiguration(ArbiterServerConfiguration configuration) {
        this.configuration = configuration;
    }
    @Autowired
    public void setProxyService(ProxyService proxyService) {
        this.proxyService = proxyService;
    }
    @Autowired
    public void setShellService(ShellService shellService) {
        this.shellService = shellService;
    }
    @Autowired
    public void setMycatProxy(MycatProxy mycatProxy) {
        this.mycatProxy = mycatProxy;
    @Override
    public void run(String... strings) throws Exception {
        proxyService.start();
    }
}

+ 1 - 1
hos-arbiter/src/main/java/com/yihu/hos/arbiter/routers/ArbiterServerRouter.java

@ -12,6 +12,6 @@ public class ArbiterServerRouter  extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        from("direct:arbiterServer")
                .to("bean:arbiterServerService?method=start");
                .to("bean:arbiterServerService?method=serverServiceFlow");
    }
}

+ 5 - 3
hos-arbiter/src/main/java/com/yihu/hos/arbiter/routers/EndpointEventRouter.java

@ -1,6 +1,7 @@
package com.yihu.hos.arbiter.routers;
import com.yihu.hos.arbiter.configuration.ActivemqConfiguration;
import com.yihu.hos.web.framework.constant.EndPointConstant;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsComponent;
@ -17,14 +18,15 @@ import javax.jms.ConnectionFactory;
public class EndpointEventRouter extends RouteBuilder {
    @Autowired
    private ActivemqConfiguration activemqConfiguration;
    @Override
    public void configure() throws Exception {
        ModelCamelContext context = this.getContext();
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                activemqConfiguration.getUser(), activemqConfiguration.getPassword(),activemqConfiguration.getBrokerURL());
                activemqConfiguration.getUser(), activemqConfiguration.getPassword(), activemqConfiguration.getBrokerURL());
        // Note we can explicit name the component
        context.addComponent("endpoint.event", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
        from("endpoint.event:queue:configuration.endpoint")
        context.addComponent(EndPointConstant.CAMEL_COMPONENT, JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
        from(EndPointConstant.CAMEL_ENDPOINT)
                .to("bean:endpointService?method=addBrokerServer"); //TODO:这边可以做Message Filter,减化trigger逻辑
    }
}

+ 1 - 1
hos-arbiter/src/main/java/com/yihu/hos/arbiter/routers/MycatRouter.java

@ -32,7 +32,7 @@ public class MycatRouter extends RouteBuilder {
        context.addComponent("service.mycat.event", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
        from(ServiceFlowConstant.MYCAT_EVENT_SERVICE)
                .choice()
                .when(header("tenant").isNotNull()).to("bean:mycatService?method=proxy")
                .when(header("tenant").isNotNull()).to("bean:mycatService?method=handleServiceFlow")
                .when(header("event").isEqualTo(ServiceFlowConstant.EXECUTE_MYCAT)).to("bean:mycatService?method=updateMycat")
                .endChoice();
    }

+ 2 - 2
hos-arbiter/src/main/java/com/yihu/hos/arbiter/routers/SerivceHealthRouter.java

@ -10,7 +10,7 @@ import org.springframework.stereotype.Component;
public class SerivceHealthRouter extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        from("timer:brokerServerOnline?period={{arbiter.timer.period}}")
                .to("bean:healthCheckService?method=check");
        from("timer:healthCheck?period={{arbiter.timer.period}}")
                .to("bean:endpointService?method=check");
    }
}

+ 3 - 3
hos-arbiter/src/main/java/com/yihu/hos/arbiter/routers/ServiceFlowEventRouter.java

@ -25,10 +25,10 @@ public class ServiceFlowEventRouter extends RouteBuilder {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                activemqConfiguration.getUser(), activemqConfiguration.getPassword(), activemqConfiguration.getBrokerURL());
        // Note we can explicit name the component
        context.addComponent("service.flow.event", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
        from(ServiceFlowConstant.FLOW_EVENT_ENDPOINT)
        context.addComponent(ServiceFlowConstant.CAMEL_COMPONENT, JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
        from(ServiceFlowConstant.CAMEL_ENDPOINT)
                .choice()
//                .when(header("tenant").isNotNull()).to("bean:serviceFlowService?method=proxy")
//                .when(header("tenant").isNotNull()).to("bean:serviceFlowService?method=handleServiceFlow")
                .when(header("event").isEqualTo(ServiceFlowConstant.SERVICE_FLOW_STARTED)).to("bean:serviceFlowService?method=serviceFlowStarted")
                .when(header("event").isEqualTo(ServiceFlowConstant.SERVICE_FLOW_STOPPED)).to("bean:serviceFlowService?method=serviceFlowStopped")
                .when(header("event").isEqualTo(ServiceFlowConstant.SERVICE_FLOW_ADDED)).to("bean:serviceFlowService?method=serviceFlowAdd")

+ 2 - 2
hos-arbiter/src/main/java/com/yihu/hos/arbiter/routers/LinuxShellRouter.java

@ -18,7 +18,7 @@ import javax.jms.ConnectionFactory;
 * Created at 2017/1/5.
 */
@Component
public class LinuxShellRouter extends RouteBuilder {
public class ShellRouter extends RouteBuilder {
    @Autowired
    private ActivemqConfiguration activemqConfiguration;
@ -32,7 +32,7 @@ public class LinuxShellRouter extends RouteBuilder {
        context.addComponent("service.shell.event", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
        from(ServiceFlowConstant.SHELL_EVENT_SERVICE)
                .choice()
                .when(header("tenant").isNotNull()).to("bean:linuxShellService?method=proxy")
                .when(header("tenant").isNotNull()).to("bean:linuxShellService?method=handleServiceFlow")
                .when(header("event").isEqualTo(ServiceFlowConstant.ARBITER_SHELL_SEND)).to("bean:linuxShellService?method=sendShell")
//                .when(header("event").isEqualTo(ServiceFlowConstant.ARBITER_SHELL_ACEPT)).to("bean:linuxShellService?method=shellBack")
                .endChoice();

+ 1 - 1
hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/BrokerServerService.java

@ -82,7 +82,7 @@ public class BrokerServerService {
            ProducerTemplate producerTemplate = createProducerTemplate();
            Map<String, Object> header = new HashMap<>();
            header.put("event", ServiceFlowConstant.BROKER_SERVER_ON);
            producerTemplate.sendBodyAndHeaders(ServiceFlowConstant.FLOW_EVENT_ENDPOINT, msg, header);
            producerTemplate.sendBodyAndHeaders(ServiceFlowConstant.CAMEL_ENDPOINT, msg, header);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
            logger.error(e.getMessage());

+ 20 - 4
hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/EndpointService.java

@ -3,8 +3,6 @@ package com.yihu.hos.arbiter.services;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yihu.hos.web.framework.model.bo.BrokerServer;
import com.yihu.hos.web.framework.model.bo.Endpoint;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
@ -13,6 +11,7 @@ import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.List;
/**
 * @created Airhead 2016/7/27.
@ -41,14 +40,13 @@ public class EndpointService {
        System.out.println(msg);
        BrokerServer brokerServer = brokerServerService.get();
        if (brokerServer == null) {
            logger.trace("can not find a valid broker server.");
            logger.trace("can not find a valid broker start.");
            return;
        }
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            Endpoint endpoint = objectMapper.readValue(msg, Endpoint.class);
            CloseableHttpClient httpclient = HttpClients.createDefault();
            switch (endpoint.getEvent()) {
                case "endpointRegister": {
                    this.save(endpoint);
@ -73,4 +71,22 @@ public class EndpointService {
        }
    }
    public List<Endpoint> getEndpointList() {
        return mongoOperations.findAll(Endpoint.class);
    }
    public void check() {
        List<Endpoint> endpointList = getEndpointList();
        endpointList.forEach(this::remoteCheck);
    }
    /**
     * 调用实际的检查地址
     *
     * @param endpoint
     */
    private void remoteCheck(Endpoint endpoint) {
        //TODO:
    }
}

+ 0 - 37
hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/HealthCheckService.java

@ -1,37 +0,0 @@
package com.yihu.hos.arbiter.services;
import com.yihu.hos.web.framework.model.bo.Endpoint;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
/**
 * @created Airhead 2016/8/23.
 */
@Service("healthCheckService")
public class HealthCheckService {
    public void check() {
        List<Endpoint> endpointList = getEndpointList();
        endpointList.forEach(this::remoteCheck);
    }
    /**
     * 返回所有拉取方式进行健康检测的Endpoint列表
     *
     * @return
     */
    private List<Endpoint> getEndpointList() {
        //TODO:
        return new ArrayList<>();
    }
    /**
     * 调用实际的检查地址
     *
     * @param endpoint
     */
    private void remoteCheck(Endpoint endpoint) {
        //TODO:
    }
}

+ 0 - 162
hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/LinuxShellService.java

@ -1,162 +0,0 @@
package com.yihu.hos.arbiter.services;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yihu.hos.web.framework.model.bo.BrokerServer;
import com.yihu.hos.core.http.HTTPResponse;
import com.yihu.hos.core.http.HttpClientKit;
import com.yihu.hos.web.framework.constant.ServiceFlowConstant;
import com.yihu.hos.web.framework.model.bo.ServiceShell;
import org.apache.camel.Body;
import org.apache.camel.Headers;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.zbus.broker.ZbusBroker;
import org.zbus.mq.Producer;
import org.zbus.net.http.Message;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
 * @author HZY
 * @vsrsion 1.0
 * Created at 2017/1/5.
 */
@Service("linuxShellService")
public class LinuxShellService {
    private static final Logger logger = LogManager.getLogger(LinuxShellService.class);
    @Autowired
    private BrokerServerService brokerServerService;
    private ZbusBroker zbusBroker;
    @Autowired
    private ObjectMapper objectMapper;
    private ShellService shellService;
    /**
     * SAAS化的管理端过来的消息会被proxy进行中转,之后发送到终端的Arbiter对Broker进行实际的控制。
     *
     * @param header 消息头部信息
     * @param msg    消息
     */
    public void proxy(@Headers Map<String, String> header, @Body String msg) {
        if (zbusBroker == null) {
            logger.error("zbusBroker is null.");
            return;
        }
        try {
            //TODO 设置shell发起用户
            Producer producer = new Producer(zbusBroker, ServiceFlowConstant.SHELL_REQUEST + "@" + header.get("tenant"));
            producer.createMQ();    //确定为创建消息队列需要显示调用
            Message message = new Message();
            message.setHead("event", header.get("event"));
            message.setHead("tenant", header.get("tenant"));
            message.setMethod("POST");
            message.setBody(msg);
            message = producer.sendSync(message);
            logger.debug(message);
            shellService.start();
//            System.out.println("test");
        } catch (IOException | InterruptedException e) {
            logger.error(e.getMessage());
            e.printStackTrace();
        }
    }
    /**
     * 发送shell命令请求到broker
     * @param msg    命令消息内容
     */
    public void sendShell(String msg) {
        try {
            List<BrokerServer> brokerServerList  = brokerServerService.get(false);
            for (BrokerServer broker : brokerServerList) {
                String result = sendMessage(broker, "post", "/esb/serviceShell/send", msg);
                if (result==null) {
                    logger.error("sendMessage to broker server failed, broker:" + broker.getURL() + ", msg:" + msg);
                    continue;
                }else {
                    logger.debug("发送shell请求到broker成功");
                    // shell执行成功,见执行结果返回到中心zbus显示。
                    ServiceShell serviceShell = objectMapper.readValue(msg,ServiceShell.class);
                    Producer producer = new Producer(zbusBroker, ServiceFlowConstant.SHELL_RESPONSE + "@" + serviceShell.getTenant());
                    producer.createMQ();    //确定为创建消息队列需要显示调用
                    Message message = new Message();
                    message.setHead("event", serviceShell.getType());
                    message.setHead("tenant", serviceShell.getTenant());
                    message.setMethod("POST");
                    message.setBody(result);
                    message = producer.sendSync(message);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        }
    }
    private String sendMessage(BrokerServer brokerServer, String method, String path, String msg) {
        if (brokerServer == null) {
            return null;
        }
        switch (method) {
            case "post": {
                HTTPResponse response = HttpClientKit.post(brokerServer.getURL() + path, msg);
                if (response.getStatusCode() == 200) {
                    String body = response.getBody();
                    return body;
                }
                return null;
            }
            case "put": {
                HTTPResponse response = HttpClientKit.put(brokerServer.getURL() + path, msg);
                if (response.getStatusCode() == 200) {
                    String body = response.getBody();
                    logger.debug(body);
                    return body;
                }
                return null;
            }
            case "delete": {
                HTTPResponse response = HttpClientKit.delete(brokerServer.getURL() + path, msg);
                if (response.getStatusCode() == 200) {
                    String body = response.getBody();
                    logger.debug(body);
                    return body;
                }
                return null;
            }
            default:
                break;
        }
        return null;
    }
    @Autowired
    public void setZbusBroker(ZbusBroker zbusBroker) {
        this.zbusBroker = zbusBroker;
    }
    @Autowired
    public void setShellService(ShellService shellService) {
        this.shellService = shellService;
    }
}

+ 0 - 86
hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/MycatProxy.java

@ -1,86 +0,0 @@
package com.yihu.hos.arbiter.services;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yihu.hos.arbiter.configuration.ArbiterServerConfiguration;
import com.yihu.hos.core.log.Logger;
import com.yihu.hos.core.log.LoggerFactory;
import com.yihu.hos.web.framework.constant.ServiceFlowConstant;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.zbus.broker.ZbusBroker;
import org.zbus.mq.Consumer;
import org.zbus.net.http.Message;
import java.util.HashMap;
import java.util.Map;
/**
 * @author HZY
 * @vsrsion 1.0
 * Created at 2017/1/6.
 */
@Service
public class MycatProxy {
    private static final Logger logger = LoggerFactory.getLogger(MycatProxy.class);
    private CamelContext camelContext;
    private ArbiterServerConfiguration configuration;
    private ZbusBroker zbusBroker;
    private Consumer consumer;
    @Autowired
    private ObjectMapper objectMapper;
    private static Map<String,String> cunsumerNap = new HashMap<>();
    public void proxy(Message message, Consumer consumer) {
        Map<String, Object> header = new HashMap<>();
        header.put("event", message.getHead("event"));
        ProducerTemplate producerTemplate = camelContext.createProducerTemplate();
        producerTemplate.sendBodyAndHeaders(ServiceFlowConstant.MYCAT_EVENT_SERVICE, message.getBodyString(), header);
    }
    public void start() {
        if (zbusBroker == null) {
            logger.error("zbusBroker is null");
            return;
        }
        try {
            if (!cunsumerNap.containsKey(ServiceFlowConstant.MYCAT_UPDATE + "@" + configuration.getMycatName())){
                consumer = new Consumer(zbusBroker, ServiceFlowConstant.MYCAT_UPDATE + "@" + configuration.getMycatName());
                consumer.start(this::proxy);
                cunsumerNap.put(ServiceFlowConstant.MYCAT_UPDATE + "@" + configuration.getMycatName(),configuration.getMycatName());
            }
        } catch (Exception e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        }
    }
    @Autowired
    public void setZbusBroker(ZbusBroker zbusBroker) {
        this.zbusBroker = zbusBroker;
    }
    @Autowired
    public void setCamelContext(CamelContext camelContext) {
        this.camelContext = camelContext;
    }
    @Autowired
    public void setConfiguration(ArbiterServerConfiguration configuration) {
        this.configuration = configuration;
    }
    @Override
    protected void finalize() throws Throwable {
        consumer.close();
        super.finalize();
    }
}

+ 7 - 64
hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/MycatService.java

@ -1,23 +1,14 @@
package com.yihu.hos.arbiter.services;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yihu.hos.web.framework.model.bo.BrokerServer;
import com.yihu.hos.core.http.HTTPResponse;
import com.yihu.hos.core.http.HttpClientKit;
import com.yihu.hos.web.framework.constant.ServiceFlowConstant;
import org.apache.camel.Body;
import org.apache.camel.Headers;
import com.yihu.hos.web.framework.model.bo.BrokerServer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.zbus.broker.ZbusBroker;
import org.zbus.mq.Producer;
import org.zbus.net.http.Message;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
 * @author HZY
@ -30,58 +21,21 @@ public class MycatService {
    @Autowired
    private BrokerServerService brokerServerService;
    private ZbusBroker zbusBroker;
    @Autowired
    private ObjectMapper objectMapper;
    private MycatProxy mycatProxy;
    /**
     * SAAS化的管理端过来的消息会被proxy进行中转,之后发送到终端的Arbiter对Broker进行实际的控制。
     *
     * @param header 消息头部信息
     * @param msg    消息
     */
    public void proxy(@Headers Map<String, String> header, @Body String msg) {
        if (zbusBroker == null) {
            logger.error("zbusBroker is null.");
            return;
        }
        try {
            // 设置mycat 配置修改的 消息生产者
            Producer producer = new Producer(zbusBroker, ServiceFlowConstant.MYCAT_UPDATE + "@" + header.get("tenant"));
            producer.createMQ();    //确定为创建消息队列需要显示调用
            Message message = new Message();
            message.setHead("event", header.get("event"));
            message.setHead("tenant", header.get("tenant"));
            message.setMethod("POST");
            message.setBody(msg);
            message = producer.sendSync(message);
            logger.debug(message);
            mycatProxy.start();
//            System.out.println("test");
        } catch (IOException | InterruptedException e) {
            logger.error(e.getMessage());
            e.printStackTrace();
        }
    }
    /**
     * 发送shell命令请求到broker
     * @param msg    命令消息内容
     *
     * @param msg 命令消息内容
     */
    public void updateMycat(String msg) {
        try {
            List<BrokerServer> brokerServerList  = brokerServerService.get(false);
            List<BrokerServer> brokerServerList = brokerServerService.get(false);
            for (BrokerServer broker : brokerServerList) {
                String result = sendMessage(broker, "post", "/esb/serviceMycat/update", msg);
                if (result==null) {
                    logger.error("sendMessage to broker server failed, broker:" + broker.getURL() + ", msg:" + msg);
                if (result == null) {
                    logger.error("sendMessage to broker start failed, broker:" + broker.getURL() + ", msg:" + msg);
                    continue;
                }else {
                } else {
                    logger.debug("发送mycat修改请求到broker成功");
                }
@ -132,15 +86,4 @@ public class MycatService {
        return null;
    }
    @Autowired
    public void setZbusBroker(ZbusBroker zbusBroker) {
        this.zbusBroker = zbusBroker;
    }
    @Autowired
    public void setMycatProxy(MycatProxy mycatProxy) {
        this.mycatProxy = mycatProxy;
    }
}

+ 147 - 29
hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/ProxyService.java

@ -3,6 +3,7 @@ package com.yihu.hos.arbiter.services;
import com.yihu.hos.arbiter.configuration.ArbiterServerConfiguration;
import com.yihu.hos.core.log.Logger;
import com.yihu.hos.core.log.LoggerFactory;
import com.yihu.hos.web.framework.constant.EndPointConstant;
import com.yihu.hos.web.framework.constant.ServiceFlowConstant;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
@ -27,51 +28,168 @@ public class ProxyService {
    private CamelContext camelContext;
    private ArbiterServerConfiguration configuration;
    private ZbusBroker zbusBroker;
    private Consumer consumer;
    @Autowired
    public void setZbusBroker(ZbusBroker zbusBroker) {
        this.zbusBroker = zbusBroker;
    }
    public void proxy(Message message, Consumer consumer) {
        Map<String, Object> header = new HashMap<>();
        header.put("event", message.getHead("event"));
    @Autowired
    public void setCamelContext(CamelContext camelContext) {
        this.camelContext = camelContext;
    }
        ProducerTemplate producerTemplate = camelContext.createProducerTemplate();
        producerTemplate.sendBodyAndHeaders(ServiceFlowConstant.FLOW_EVENT_ENDPOINT, message.getBodyString(), header);
    @Autowired
    public void setConfiguration(ArbiterServerConfiguration configuration) {
        this.configuration = configuration;
    }
    public void start() {
        if (zbusBroker == null) {
            logger.error("zbusBroker is null");
            return;
        new ServiceFlow().server();
        new Mycat().start();
        new Shell().start();
    }
    public class ServiceFlow {
        private Consumer consumer;
        public void handle(Message message, Consumer consumer) {
            Map<String, Object> header = new HashMap<>();
            header.put("event", message.getHead("event"));
            ProducerTemplate producerTemplate = camelContext.createProducerTemplate();
            producerTemplate.sendBodyAndHeaders(ServiceFlowConstant.CAMEL_ENDPOINT, message.getBodyString(), header);
        }
        consumer = new Consumer(zbusBroker, ServiceFlowConstant.SSH + "@" + configuration.getTenant());
        try {
            consumer.start(this::proxy);
        } catch (IOException e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        public void server() {
            if (zbusBroker == null) {
                logger.error("zbusBroker is null");
                return;
            }
            try {
                consumer = new Consumer(zbusBroker, ServiceFlowConstant.ZBUS_MQ + "@" + configuration.getTenant());
                consumer.start(this::handle);
            } catch (IOException e) {
                e.printStackTrace();
                logger.error(e.getMessage());
            }
        }
    }
        @Override
        protected void finalize() throws Throwable {
            if (consumer != null) {
                consumer.close();
            }
    @Autowired
    public void setZbusBroker(ZbusBroker zbusBroker) {
        this.zbusBroker = zbusBroker;
            super.finalize();
        }
    }
    @Autowired
    public void setCamelContext(CamelContext camelContext) {
        this.camelContext = camelContext;
    public class Mycat {
        private Consumer consumer;
        public void handle(Message message, Consumer consumer) {
            Map<String, Object> header = new HashMap<>();
            header.put("event", message.getHead("event"));
            ProducerTemplate producerTemplate = camelContext.createProducerTemplate();
            producerTemplate.sendBodyAndHeaders(ServiceFlowConstant.MYCAT_EVENT_SERVICE, message.getBodyString(), header);
        }
        public void start() {
            if (zbusBroker == null) {
                logger.error("zbusBroker is null");
                return;
            }
            try {
                consumer = new Consumer(zbusBroker, ServiceFlowConstant.MYCAT_UPDATE + "@" + configuration.getMycatName());
                consumer.start(this::handle);
            } catch (IOException e) {
                e.printStackTrace();
                logger.error(e.getMessage());
            }
        }
        @Override
        protected void finalize() throws Throwable {
            if (consumer != null) {
                consumer.close();
            }
            super.finalize();
        }
    }
    @Autowired
    public void setConfiguration(ArbiterServerConfiguration configuration) {
        this.configuration = configuration;
    public class Shell {
        private Consumer consumer;
        public void handle(Message message, Consumer consumer) {
            Map<String, Object> header = new HashMap<>();
            header.put("event", message.getHead("event"));
            ProducerTemplate producerTemplate = camelContext.createProducerTemplate();
            producerTemplate.sendBodyAndHeaders(ServiceFlowConstant.SHELL_EVENT_SERVICE, message.getBodyString(), header);
        }
        public void start() {
            if (zbusBroker == null) {
                logger.error("zbusBroker is null");
                return;
            }
            try {
                consumer = new Consumer(zbusBroker, ServiceFlowConstant.SHELL_REQUEST + "@" + configuration.getTenant());
                consumer.start(this::handle);
            } catch (IOException e) {
                e.printStackTrace();
                logger.error(e.getMessage());
            }
        }
        @Override
        protected void finalize() throws Throwable {
            if (consumer != null) {
                consumer.close();
            }
            super.finalize();
        }
    }
    @Override
    protected void finalize() throws Throwable {
        consumer.close();
        super.finalize();
    public class EndPoint {
        private Consumer consumer;
        public void handle(Message message, Consumer consumer) {
            Map<String, Object> header = new HashMap<>();
            header.put("event", message.getHead("event"));
            ProducerTemplate producerTemplate = camelContext.createProducerTemplate();
            producerTemplate.sendBodyAndHeaders(ServiceFlowConstant.SHELL_EVENT_SERVICE, message.getBodyString(), header);
        }
        public void start() {
            if (zbusBroker == null) {
                logger.error("zbusBroker is null");
                return;
            }
            try {
                consumer = new Consumer(zbusBroker, EndPointConstant.ZBUS_MQ + "@" + configuration.getTenant());
                consumer.start(this::handle);
            } catch (IOException e) {
                e.printStackTrace();
                logger.error(e.getMessage());
            }
        }
        @Override
        protected void finalize() throws Throwable {
            if (consumer != null) {
                consumer.close();
            }
            super.finalize();
        }
    }
}

+ 13 - 13
hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/ServiceFlowService.java

@ -105,9 +105,9 @@ public class ServiceFlowService {
            List<BrokerServer> brokerServerList;
            brokerServerList = brokerServerService.get(one);
            for (BrokerServer broker : brokerServerList) {
                boolean result = sendMessage(broker, "post", "/esb/serviceFlow/start", msg);
                boolean result = sendMessage(broker, "post", "/esb/serviceFlow/serverServiceFlow", msg);
                if (!result) {
                    logger.error("sendMessage to broker server failed, broker:" + broker.getURL() + ", msg:" + msg);
                    logger.error("sendMessage to broker start failed, broker:" + broker.getURL() + ", msg:" + msg);
                    continue;
                }
@ -136,7 +136,7 @@ public class ServiceFlowService {
                boolean result = sendMessage(broker, "post", "/esb/serviceFlow/stop", msg);
                if (!result) {
                    logger.error("sendMessage to broker server failed, broker:" + broker.getURL() + ", msg:" + msg);
                    logger.error("sendMessage to broker start failed, broker:" + broker.getURL() + ", msg:" + msg);
                    continue;
                }
@ -159,7 +159,7 @@ public class ServiceFlowService {
                BrokerServer brokerServer = brokerServerService.get();
                boolean result = sendMessage(brokerServer, "post", "/esb/serviceFlow", msg);
                if (!result) {
                    logger.error("sendMessage to broker server failed, broker:" + brokerServer.getURL() + ", msg:" + msg);
                    logger.error("sendMessage to broker start failed, broker:" + brokerServer.getURL() + ", msg:" + msg);
                    return;
                }
@ -172,7 +172,7 @@ public class ServiceFlowService {
            for (BrokerServer broker : brokerServerList) {
                boolean result = sendMessage(broker, "post", "/esb/serviceFlow", msg);
                if (!result) {
                    logger.error("sendMessage to broker server failed, broker:" + broker.getURL() + ", msg:" + msg);
                    logger.error("sendMessage to broker start failed, broker:" + broker.getURL() + ", msg:" + msg);
                    continue;
                }
@ -200,7 +200,7 @@ public class ServiceFlowService {
                boolean result = sendMessage(brokerList.get(0), "put", "/esb/serviceFlow/add", msg);
                if (!result) {
                    logger.error("sendMessage to broker server failed, broker:" + brokerList.get(0).getURL() + ", msg:" + msg);
                    logger.error("sendMessage to broker start failed, broker:" + brokerList.get(0).getURL() + ", msg:" + msg);
                    return;
                }
@ -215,7 +215,7 @@ public class ServiceFlowService {
            for (BrokerServer broker : brokerServerList) {
                boolean result = sendMessage(broker, "put", "/esb/serviceFlow/add", msg);
                if (!result) {
                    logger.error("sendMessage to broker server failed, broker:" + broker.getURL() + ", msg:" + msg);
                    logger.error("sendMessage to broker start failed, broker:" + broker.getURL() + ", msg:" + msg);
                    continue;
                }
@ -244,7 +244,7 @@ public class ServiceFlowService {
                boolean result = sendMessage(brokerList.get(0), "put", "/esb/serviceFlow/reduce", msg);
                if (!result) {
                    logger.error("sendMessage to broker server failed, broker:" + brokerList.get(0).getURL() + ", msg:" + msg);
                    logger.error("sendMessage to broker start failed, broker:" + brokerList.get(0).getURL() + ", msg:" + msg);
                    return;
                }
@ -259,7 +259,7 @@ public class ServiceFlowService {
            for (BrokerServer broker : brokerServerList) {
                boolean result = sendMessage(broker, "put", "/esb/serviceFlow/reduce", msg);
                if (!result) {
                    logger.error("sendMessage to broker server failed, broker:" + broker.getURL() + ", msg:" + msg);
                    logger.error("sendMessage to broker start failed, broker:" + broker.getURL() + ", msg:" + msg);
                    continue;
                }
@ -288,7 +288,7 @@ public class ServiceFlowService {
                boolean result = sendMessage(brokerList.get(0), "delete", "/esb/serviceFlow", msg);
                if (!result) {
                    logger.error("sendMessage to broker server failed, broker:" + brokerList.get(0).getURL() + ", msg:" + msg);
                    logger.error("sendMessage to broker start failed, broker:" + brokerList.get(0).getURL() + ", msg:" + msg);
                    return;
                }
@ -301,7 +301,7 @@ public class ServiceFlowService {
            for (BrokerServer broker : brokerServerList) {
                boolean result = sendMessage(broker, "delete", "/esb/serviceFlow", msg);
                if (!result) {
                    logger.error("sendMessage to broker server failed, broker:" + broker.getURL() + ", msg:" + msg);
                    logger.error("sendMessage to broker start failed, broker:" + broker.getURL() + ", msg:" + msg);
                    continue;
                }
@ -327,9 +327,9 @@ public class ServiceFlowService {
                BrokerServer brokerServer = objectMapper.readValue(msg, BrokerServer.class);
                String serviceFlowMsg = objectMapper.writeValueAsString(serviceFlow);
                boolean result = sendMessage(brokerServer, "post", "/esb/serviceFlow/start", serviceFlowMsg);
                boolean result = sendMessage(brokerServer, "post", "/esb/serviceFlow/serverServiceFlow", serviceFlowMsg);
                if (!result) {
                    logger.error("sendMessage to broker server failed, broker:" + brokerServer.getURL() + ", msg:" + serviceFlowMsg);
                    logger.error("sendMessage to broker start failed, broker:" + brokerServer.getURL() + ", msg:" + serviceFlowMsg);
                    return;
                }

+ 82 - 49
hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/ShellService.java

@ -1,65 +1,113 @@
package com.yihu.hos.arbiter.services;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yihu.hos.arbiter.configuration.ArbiterServerConfiguration;
import com.yihu.hos.core.log.Logger;
import com.yihu.hos.core.log.LoggerFactory;
import com.yihu.hos.core.http.HTTPResponse;
import com.yihu.hos.core.http.HttpClientKit;
import com.yihu.hos.web.framework.constant.ServiceFlowConstant;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import com.yihu.hos.web.framework.model.bo.BrokerServer;
import com.yihu.hos.web.framework.model.bo.ServiceShell;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.zbus.broker.ZbusBroker;
import org.zbus.mq.Consumer;
import org.zbus.mq.Producer;
import org.zbus.net.http.Message;
import java.util.HashMap;
import java.util.Map;
import java.util.List;
/**
 * @author HZY
 * @vsrsion 1.0
 * Created at 2017/1/6.
 * Created at 2017/1/5.
 */
@Service
@Service("linuxShellService")
public class ShellService {
    private static final Logger logger = LoggerFactory.getLogger(ShellService.class);
    private static final Logger logger = LogManager.getLogger(ShellService.class);
    private CamelContext camelContext;
    private ArbiterServerConfiguration configuration;
    @Autowired
    private BrokerServerService brokerServerService;
    private ZbusBroker zbusBroker;
    private Consumer consumer;
    @Autowired
    private ObjectMapper objectMapper;
    private static Map<String,String> cunsumerNap = new HashMap<>();
    /**
     * 发送shell命令请求到broker
     * @param msg    命令消息内容
     */
    public void sendShell(String msg) {
        try {
            List<BrokerServer> brokerServerList  = brokerServerService.get(false);
            for (BrokerServer broker : brokerServerList) {
                String result = sendMessage(broker, "post", "/esb/serviceShell/send", msg);
                if (result==null) {
                    logger.error("sendMessage to broker start failed, broker:" + broker.getURL() + ", msg:" + msg);
                    continue;
                }else {
                    logger.debug("发送shell请求到broker成功");
                    // shell执行成功,见执行结果返回到中心zbus显示。
                    ServiceShell serviceShell = objectMapper.readValue(msg,ServiceShell.class);
                    Producer producer = new Producer(zbusBroker, ServiceFlowConstant.SHELL_RESPONSE + "@" + serviceShell.getTenant());
                    producer.createMQ();    //确定为创建消息队列需要显示调用
                    Message message = new Message();
                    message.setHead("event", serviceShell.getType());
                    message.setHead("tenant", serviceShell.getTenant());
                    message.setMethod("POST");
                    message.setBody(result);
                    message = producer.sendSync(message);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        }
    }
    public void proxy(Message message, Consumer consumer) {
        Map<String, Object> header = new HashMap<>();
        header.put("event", message.getHead("event"));
    private String sendMessage(BrokerServer brokerServer, String method, String path, String msg) {
        if (brokerServer == null) {
            return null;
        }
        ProducerTemplate producerTemplate = camelContext.createProducerTemplate();
        producerTemplate.sendBodyAndHeaders(ServiceFlowConstant.SHELL_EVENT_SERVICE, message.getBodyString(), header);
    }
        switch (method) {
            case "post": {
                HTTPResponse response = HttpClientKit.post(brokerServer.getURL() + path, msg);
                if (response.getStatusCode() == 200) {
                    String body = response.getBody();
                    return body;
                }
    public void start() {
        if (zbusBroker == null) {
            logger.error("zbusBroker is null");
            return;
        }
                return null;
            }
        try {
            if (!cunsumerNap.containsKey(ServiceFlowConstant.SHELL_REQUEST + "@" + configuration.getTenant())){
                consumer = new Consumer(zbusBroker, ServiceFlowConstant.SHELL_REQUEST + "@" + configuration.getTenant());
                consumer.start(this::proxy);
                cunsumerNap.put(ServiceFlowConstant.SHELL_REQUEST + "@" + configuration.getTenant(),configuration.getTenant());
            case "put": {
                HTTPResponse response = HttpClientKit.put(brokerServer.getURL() + path, msg);
                if (response.getStatusCode() == 200) {
                    String body = response.getBody();
                    logger.debug(body);
                    return body;
                }
                return null;
            }
        } catch (Exception e) {
            e.printStackTrace();
            logger.error(e.getMessage());
            case "delete": {
                HTTPResponse response = HttpClientKit.delete(brokerServer.getURL() + path, msg);
                if (response.getStatusCode() == 200) {
                    String body = response.getBody();
                    logger.debug(body);
                    return body;
                }
                return null;
            }
            default:
                break;
        }
        return null;
    }
@ -68,19 +116,4 @@ public class ShellService {
        this.zbusBroker = zbusBroker;
    }
    @Autowired
    public void setCamelContext(CamelContext camelContext) {
        this.camelContext = camelContext;
    }
    @Autowired
    public void setConfiguration(ArbiterServerConfiguration configuration) {
        this.configuration = configuration;
    }
    @Override
    protected void finalize() throws Throwable {
        consumer.close();
        super.finalize();
    }
}

+ 12 - 0
hos-web-framework/src/main/java/com/yihu/hos/web/framework/constant/EndPointConstant.java

@ -0,0 +1,12 @@
package com.yihu.hos.web.framework.constant;
/**
 * @author Airhead
 * @since 2017/2/6.
 */
public interface EndPointConstant {
    String ZBUS_MQ = "endpoint";        //zbus消息,admin到arbiter上的通讯
    String ACTIVE_MQ = "runtime.endpoint";   //active-mq消息,arbiter到broker的通讯
    String CAMEL_COMPONENT = "event.endpoint";  //camel组件命名
    String CAMEL_ENDPOINT = CAMEL_COMPONENT + ":queue:" + ACTIVE_MQ;   //camel的Endpoint
}

+ 5 - 8
hos-web-framework/src/main/java/com/yihu/hos/web/framework/constant/ServiceFlowConstant.java

@ -5,10 +5,11 @@ package com.yihu.hos.web.framework.constant;
 * @since 2016/12/6.
 */
public interface ServiceFlowConstant {
    //流程-队列名称
    String FLOW_EVENT_QUEUE = "configuration.service.flow";
    String FLOW_EVENT_ENDPOINT = "service.flow.event:queue:configuration.service.flow";
    String ZBUS_MQ = "serviceFlow"; //zbus消息,admin到arbiter上的通讯
    String ACTIVE_MQ = "runtime.serviceFlow";   //active-mq消息,arbiter到broker的通讯
    String CAMEL_COMPONENT = "event.serviceFlow";  //camel组件命名
    String CAMEL_ENDPOINT = CAMEL_COMPONENT + ":queue:" + ACTIVE_MQ;   //camel的Endpoint
    
    //流程-模板类型
    String JAVA = "java";
    String CLASS = "class";
@ -37,10 +38,6 @@ public interface ServiceFlowConstant {
    String BROKER_SERVER_ON = "brokerServerOn"; //Broker启动
    String BROKER_SERVER_OFF = "brokerServerOff";//Broker停止
    //ArbiterServer MQ
    String SSH = "ssh";
    /* *******   shell相关  ******  */
    // shell 请求命令,对列名称
    String SHELL_EVENT_QUEUE = "configuration.service.shell";

+ 1 - 1
src/main/java/com/yihu/hos/services/ServiceFlowEventService.java

@ -82,7 +82,7 @@ public class ServiceFlowEventService {
            String msg = objectMapper.writeValueAsString(serviceFlow);
            String tenant = LocalContext.getContext().getAttachment(ContextAttributes.TENANT_NAME);
            Producer producer = new Producer(zbusBroker, ServiceFlowConstant.SSH + "@" + tenant);
            Producer producer = new Producer(zbusBroker, ServiceFlowConstant.ZBUS_MQ + "@" + tenant);
            producer.createMQ();    //确定为创建消息队列需要显示调用
            Message message = new Message();
            message.setHead("event", event);

+ 2 - 2
src/main/java/com/yihu/hos/services/ServiceMycatEventService.java

@ -44,11 +44,11 @@ public class ServiceMycatEventService {
            logger.error("zbusBroker is null.");
            return;
        }
        
        try {
            String msg = objectMapper.writeValueAsString(servviceMycat);
            Producer producer = new Producer(zbusBroker, ServiceFlowConstant.SSH + "@" + tenant);
            Producer producer = new Producer(zbusBroker, ServiceFlowConstant.MYCAT_UPDATE + "@" + tenant);
            producer.createMQ();    //确定为创建消息队列需要显示调用
            Message message = new Message();
            message.setHead("event", event);