Bläddra i källkod

arbiter之间使用zbus进行通讯,这样可以处理终端无对外IP问题

Airhead 8 år sedan
förälder
incheckning
d48b6808af

+ 7 - 0
hos-arbiter/pom.xml

@ -40,6 +40,13 @@
            <artifactId>hos-web-framework</artifactId>
            <version>1.3.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.zbus/zbus -->
        <dependency>
            <groupId>org.zbus</groupId>
            <artifactId>zbus</artifactId>
            <version>7.2.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>

+ 25 - 0
hos-arbiter/src/main/java/com/yihu/hos/arbiter/HosArbiterApplication.java

@ -1,21 +1,46 @@
package com.yihu.hos.arbiter;
import com.yihu.hos.arbiter.configuration.ArbiterServerConfiguration;
import com.yihu.hos.arbiter.services.ProxyService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
import org.zbus.mq.server.MqServer;
import org.zbus.mq.server.MqServerConfig;
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class, HibernateJpaAutoConfiguration.class})
public class HosArbiterApplication implements CommandLineRunner {
    private ArbiterServerConfiguration configuration;
    private ProxyService proxyService;
    public static void main(String[] args) {
        SpringApplication.run(HosArbiterApplication.class, args);
    }
    @Override
    public void run(String... strings) throws Exception {
        if (configuration.isCentral()) {
            MqServerConfig config = new MqServerConfig();
            config.serverPort = configuration.getCentralPort();
            config.storePath = configuration.getCentralStore();
            final MqServer server = new MqServer(config);
            server.start();
        } else {
            proxyService.start();
        }
    }
    @Autowired
    public void setConfiguration(ArbiterServerConfiguration configuration) {
        this.configuration = configuration;
    }
    @Autowired
    public void setProxyService(ProxyService proxyService) {
        this.proxyService = proxyService;
    }
}

+ 37 - 0
hos-arbiter/src/main/java/com/yihu/hos/arbiter/configuration/ArbiterServerConfiguration.java

@ -2,7 +2,11 @@ package com.yihu.hos.arbiter.configuration;
import com.yihu.hos.core.datatype.StringUtil;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.zbus.broker.ZbusBroker;
import java.io.IOException;
/**
 * @author Airhead
@ -12,11 +16,16 @@ import org.springframework.context.annotation.Configuration;
public class ArbiterServerConfiguration {
    @Value("${arbiter.central.url}")
    private String centralUrl;
    @Value("${arbiter.central.port}")
    private Integer centralPort;
    @Value("${arbiter.central.store}")
    private String centralStore;
    @Value("${arbiter.terminal.url}")
    private String terminalUrl;
    @Value("${arbiter.tenant.name}")
    private String tenant;
    private ZbusBroker zbusBroker;
    public String getCentralUrl() {
        return centralUrl;
@ -34,6 +43,34 @@ public class ArbiterServerConfiguration {
        return StringUtil.isEmpty(centralUrl);
    }
    public Integer getCentralPort() {
        return centralPort;
    }
    public String getCentralStore() {
        return centralStore;
    }
    @Bean
    public ZbusBroker getZbusBroker() {
        try {
            zbusBroker = new ZbusBroker(this.centralUrl);
            return zbusBroker;
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }
    @Override
    protected void finalize() throws Throwable {
        if (zbusBroker != null) {
            zbusBroker.close();
        }
        super.finalize();
    }
//    public boolean isTerminal() {
//        return !isCentral();
//    }

+ 0 - 34
hos-arbiter/src/main/java/com/yihu/hos/arbiter/controllers/ProxyController.java

@ -1,34 +0,0 @@
package com.yihu.hos.arbiter.controllers;
import com.yihu.hos.web.framework.constant.ServiceFlowConstant;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
/**
 * @author Airhead
 * @since 2016/12/2.
 */
@RestController
@RequestMapping("/proxy")
public class ProxyController {
    @Autowired
    private CamelContext camelContext;
    @RequestMapping()
    public void proxy(@RequestHeader Map<String, String> headers,
                      @RequestBody String body) {
        Map<String, Object> header = new HashMap<>();
        header.put("event", headers.get("event"));
        ProducerTemplate producerTemplate = camelContext.createProducerTemplate();
        producerTemplate.sendBodyAndHeaders(ServiceFlowConstant.FLOW_EVENT_ENDPOINT, body, header);
    }
}

+ 2 - 20
hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/ArbiterServerService.java

@ -1,11 +1,8 @@
package com.yihu.hos.arbiter.services;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yihu.hos.arbiter.configuration.ArbiterServerConfiguration;
import com.yihu.hos.arbiter.models.ArbiterServer;
import com.yihu.hos.core.http.HTTPResponse;
import com.yihu.hos.core.http.HttpClientKit;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
@ -36,29 +33,14 @@ public class ArbiterServerService {
    public void start() {
        //中心Arbiter服务器直接注册
        if (configuration.isCentral()) {
            ArbiterServer arbiterServer = new ArbiterServer();
            arbiterServer.setTenant(configuration.getTenant());
            arbiterServer.setUrl(configuration.getTerminalUrl());
            this.save(arbiterServer);
            return;
        }
        //终端Arbiter服务器调用中心Arbiter服务进行注册
        ArbiterServer arbiterServer = new ArbiterServer();
        arbiterServer.setTenant(configuration.getTenant());
        arbiterServer.setUrl(configuration.getTerminalUrl());
        try {
            String jsonBody = objectMapper.writeValueAsString(arbiterServer);
            HTTPResponse response = HttpClientKit.post(configuration.getCentralUrl() + "/arbiter", jsonBody);
            if (response.getStatusCode() != 200) {
                logger.error("register terminal arbiter server failed");
            }
        } catch (JsonProcessingException e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        }
        this.save(arbiterServer);
    }
    public void save(ArbiterServer arbiterServer) {

+ 66 - 0
hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/ProxyService.java

@ -0,0 +1,66 @@
package com.yihu.hos.arbiter.services;
import com.yihu.hos.core.log.Logger;
import com.yihu.hos.core.log.LoggerFactory;
import com.yihu.hos.web.framework.constant.ServiceFlowConstant;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.zbus.broker.ZbusBroker;
import org.zbus.mq.Consumer;
import org.zbus.net.http.Message;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
 * @author Airhead
 * @since 2016/12/22.
 */
@Service
public class ProxyService {
    private static final Logger logger = LoggerFactory.getLogger(ProxyService.class);
    @Autowired
    private CamelContext camelContext;
    private ZbusBroker zbusBroker;
    private Consumer consumer;
    public void proxy(Message message, Consumer consumer) {
        Map<String, Object> header = new HashMap<>();
        header.put("event", message.getHead("event"));
        ProducerTemplate producerTemplate = camelContext.createProducerTemplate();
        producerTemplate.sendBodyAndHeaders(ServiceFlowConstant.FLOW_EVENT_ENDPOINT, message.getBodyString(), header);
    }
    public void start() {
        if (zbusBroker == null) {
            logger.error("zbusBroker is null");
            return;
        }
        consumer = new Consumer(zbusBroker, ServiceFlowConstant.SSH);
        try {
            consumer.start(this::proxy);
        } catch (IOException e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        }
    }
    @Autowired
    public void setZbusBroker(ZbusBroker zbusBroker) {
        this.zbusBroker = zbusBroker;
    }
    @Override
    protected void finalize() throws Throwable {
        consumer.close();
        super.finalize();
    }
}

+ 30 - 9
hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/ServiceFlowService.java

@ -1,7 +1,6 @@
package com.yihu.hos.arbiter.services;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yihu.hos.arbiter.models.ArbiterServer;
import com.yihu.hos.arbiter.models.BrokerServer;
import com.yihu.hos.core.http.HTTPResponse;
import com.yihu.hos.core.http.HttpClientKit;
@ -17,9 +16,11 @@ import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.stereotype.Service;
import org.zbus.broker.ZbusBroker;
import org.zbus.mq.Producer;
import org.zbus.net.http.Message;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -45,6 +46,8 @@ public class ServiceFlowService {
    @Autowired
    private ArbiterServerService arbiterServerService;
    private ZbusBroker zbusBroker;
    public ServiceFlow save(ServiceFlow serviceFlow) {
        if (serviceFlow == null) {
@ -345,13 +348,31 @@ public class ServiceFlowService {
     * @param msg    消息
     */
    public void proxy(@Headers Map<String, String> header, @Body String msg) {
        String tenant = header.get("tenant");
        String event = header.get("event");
        ArbiterServer arbiterServer = arbiterServerService.get(tenant);
        header.remove("tenant");
        header = new HashMap<>();
        header.put("event",event);
        HttpClientKit.post(arbiterServer.getUrl() + "/proxy", msg, header);
        if (zbusBroker == null) {
            logger.error("zbusBroker is null.");
            return;
        }
        try {
            Producer producer = new Producer(zbusBroker, ServiceFlowConstant.SSH);
            producer.createMQ();    //确定为创建消息队列需要显示调用
            Message message = new Message();
            message.setHead(header);
            message.setMethod("POST");
            message.setBody(msg);
            message = producer.sendSync(message);
            logger.debug(message);
        } catch (IOException | InterruptedException e) {
            logger.error(e.getMessage());
            e.printStackTrace();
        }
    }
    @Autowired
    public void setZbusBroker(ZbusBroker zbusBroker) {
        this.zbusBroker = zbusBroker;
    }
    private boolean sendMessage(BrokerServer brokerServer, String method, String path, String msg) {

+ 15 - 8
hos-arbiter/src/main/resources/application.yml

@ -3,9 +3,6 @@ server:
spring:
  application:
    name: ArbiterServer
  central:
    url: http://127.0.0.1:10135
---
spring:
  profiles: dev
@ -49,9 +46,11 @@ arbiter:
  timer:
      period: 10000
  central:
    url: http://127.0.0.1:10135
    url:
    port: 15555
    store: ./store
  terminal:
    url: http://127.0.0.1:10135
    url: 127.0.0.1:15555
  tenant:
    name: jkzl
---
@ -73,7 +72,13 @@ arbiter:
  timer:
      period: 10000
  central:
    url: http://127.0.0.1:10135
    url:
    port: 15555
    store: ./store
  terminal:
    url: 192.168.131.38:15555
  tenant:
    name: yichang
---
spring:
  profiles: lfq
@ -93,8 +98,10 @@ arbiter:
  timer:
      period: 10000
  central:
    url: http://192.168.131.119:10135
    url: 192.168.131.119:15555
    port: 15555
    store: ./store
  terminal:
    url: http://192.168.131.38:10135
    url: 192.168.131.38:15555
  tenant:
    name: yichang

+ 22 - 0
hos-arbiter/src/test/java/com/yihu/hos/arbiter/services/ProxyServiceTest.java

@ -0,0 +1,22 @@
package com.yihu.hos.arbiter.services;
import org.junit.Test;
import org.zbus.broker.ZbusBroker;
import org.zbus.mq.Producer;
import org.zbus.net.http.Message;
/**
 * @author Airhead
 * @since 2016/12/22.
 */
public class ProxyServiceTest {
    @Test
    public void proxy() throws Exception {
        Producer producer = new Producer(new ZbusBroker("192.168.131.38:15555"), "RemoteControl");
        producer.createMQ();//确定为创建消息队列需要显示调用
        Message msg = new Message();
        msg.setBody("hello world"); //消息体底层是byte[]
        msg = producer.sendSync(msg);
    }
}

+ 3 - 0
hos-web-framework/src/main/java/com/yihu/hos/web/framework/constant/ServiceFlowConstant.java

@ -36,4 +36,7 @@ public interface ServiceFlowConstant {
    //BrokerServer
    String BROKER_SERVER_ON = "brokerServerOn"; //Broker启动
    String BROKER_SERVER_OFF = "brokerServerOff";//Broker停止
    //ArbiterServer MQ
    String SSH = "ssh";
}