Bläddra i källkod

将zbus的服务端集成在hos-admin中,去除hos-arbiter的zbus。hos-arbiter依然承担admin的agent的功能。

Airhead 8 år sedan
förälder
incheckning
816fcfadcd

+ 27 - 10
hos-agent/pom.xml

@ -36,11 +36,11 @@
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
        <dependency>
            <groupId>org.zbus</groupId>
            <artifactId>zbus</artifactId>
            <version>7.2.0</version>
        </dependency>
        <!--<dependency>-->
            <!--<groupId>org.zbus</groupId>-->
            <!--<artifactId>zbus</artifactId>-->
            <!--<version>7.2.0</version>-->
        <!--</dependency>-->
		<dependency>
			<groupId>com.jcraft</groupId>
			<artifactId>jsch</artifactId>
@ -48,11 +48,11 @@
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.mousio/etcd4j -->
		<dependency>
			<groupId>org.mousio</groupId>
			<artifactId>etcd4j</artifactId>
			<version>2.13.0</version>
		</dependency>
		<!--<dependency>-->
			<!--<groupId>org.mousio</groupId>-->
			<!--<artifactId>etcd4j</artifactId>-->
			<!--<version>2.13.0</version>-->
		<!--</dependency>-->
        <dependency>
            <groupId>com.yihu.hos</groupId>
@ -78,6 +78,23 @@
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-jms</artifactId>
            <version>${camel.version}</version> <!-- use the same version as your Camel core version -->
        </dependency>
		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-etcd</artifactId>
			<version>${camel.version}</version> <!-- use the same version as your Camel core version -->
		</dependency>
		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-stream</artifactId>
			<version>${camel.version}</version> <!-- use the same version as your Camel core version -->
		</dependency>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-xstream</artifactId>
            <version>${camel.version}</version>
            <!-- use the same version as your Camel core version -->
        </dependency>
    </dependencies>

+ 3 - 3
hos-agent/src/main/java/com/yihu/hos/agent/HosAgentApplication.java

@ -20,9 +20,9 @@ public class HosAgentApplication implements CommandLineRunner {
    @Override
    public void run(String... strings) throws Exception {
        flowService.server();
        shellService.server();
        mycatService.server();
//        flowService.server();
//        shellService.server();
//        mycatService.server();
    }
    @Autowired

+ 1 - 1
hos-agent/src/main/java/com/yihu/hos/agent/camelrouter/AdminProxyRouter.java

@ -11,7 +11,7 @@ import org.springframework.stereotype.Component;
public class AdminProxyRouter extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        from("jetty://0.0.0.0:{{agent.proxy.port}}/esb?matchOnUriPrefix=true")
        from("jetty:http://0.0.0.0:{{agent.proxy.port}}/esb?matchOnUriPrefix=true")
                .to("jetty:http://127.0.0.57:8080/esb?bridgeEndpoint=true&amp;throwExceptionOnFailure=false");
    }
}

+ 12 - 16
hos-agent/src/main/java/com/yihu/hos/agent/configuration/AgentConfiguration.java

@ -1,11 +1,7 @@
package com.yihu.hos.agent.configuration;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.zbus.broker.ZbusBroker;
import java.io.IOException;
/**
 * @author Airhead
@ -18,18 +14,18 @@ public class AgentConfiguration {
    @Value("${agent.tenant.name}")
    private String tenant;
    private ZbusBroker zbusBroker;
    @Bean
    public ZbusBroker getZbusBroker() {
        try {
            zbusBroker = new ZbusBroker(this.zbusBrokerURL);
            return zbusBroker;
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }
//    private ZbusBroker zbusBroker;
//    @Bean
//    public ZbusBroker getZbusBroker() {
////        try {
////            zbusBroker = new ZbusBroker(this.zbusBrokerURL);
////            return zbusBroker;
////        } catch (IOException e) {
////            e.printStackTrace();
////        }
//        return null;
//    }
    public String getZbusBrokerURL() {
        return zbusBrokerURL;

+ 33 - 42
hos-agent/src/main/java/com/yihu/hos/agent/service/FlowService.java

@ -1,20 +1,11 @@
package com.yihu.hos.agent.service;
import com.yihu.hos.agent.configuration.AgentConfiguration;
import com.yihu.hos.agent.constant.FlowConstant;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.zbus.broker.ZbusBroker;
import org.zbus.mq.Consumer;
import org.zbus.net.http.Message;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
 * @author Airhead
@ -24,15 +15,15 @@ import java.util.Map;
public class FlowService {
    private static final Logger logger = LogManager.getLogger(ShellService.class);
    private ZbusBroker zbusBroker;
    private Consumer consumer;
//    private ZbusBroker zbusBroker;
//    private Consumer consumer;
    private AgentConfiguration configuration;
    private CamelContext camelContext;
    @Autowired
    public void setZbusBroker(ZbusBroker zbusBroker) {
        this.zbusBroker = zbusBroker;
    }
//    @Autowired
//    public void setZbusBroker(ZbusBroker zbusBroker) {
//        this.zbusBroker = zbusBroker;
//    }
    @Autowired
    public void setConfiguration(AgentConfiguration configuration) {
@ -44,32 +35,32 @@ public class FlowService {
        this.camelContext = camelContext;
    }
    public void handle(Message message, Consumer consumer) {
        Map<String, Object> header = new HashMap<>();
        header.put("event", message.getHead("event"));
        ProducerTemplate producerTemplate = camelContext.createProducerTemplate();
        producerTemplate.sendBodyAndHeaders(FlowConstant.ENDPOINT, message.getBodyString(), header);
    }
//    public void handle(Message message, Consumer consumer) {
//        Map<String, Object> header = new HashMap<>();
//        header.put("event", message.getHead("event"));
//
//        ProducerTemplate producerTemplate = camelContext.createProducerTemplate();
//        producerTemplate.sendBodyAndHeaders(FlowConstant.ENDPOINT, message.getBodyString(), header);
//    }
    public void server() {
        if (zbusBroker == null) {
            logger.error("zbusBroker is null");
            return;
        }
        consumer = new Consumer(zbusBroker, FlowConstant.PREFIX + "@" + configuration.getTenant());
        try {
            consumer.start(this::handle);
        } catch (IOException e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        }
    }
    @Override
    protected void finalize() throws Throwable {
        consumer.close();
        super.finalize();
    }
//    public void server() {
//        if (zbusBroker == null) {
//            logger.error("zbusBroker is null");
//            return;
//        }
//
//        consumer = new Consumer(zbusBroker, FlowConstant.PREFIX + "@" + configuration.getTenant());
//        try {
//            consumer.start(this::handle);
//        } catch (IOException e) {
//            e.printStackTrace();
//            logger.error(e.getMessage());
//        }
//    }
//
//    @Override
//    protected void finalize() throws Throwable {
//        consumer.close();
//        super.finalize();
//    }
}

+ 39 - 49
hos-agent/src/main/java/com/yihu/hos/agent/service/MycatService.java

@ -1,16 +1,6 @@
package com.yihu.hos.agent.service;
import com.yihu.hos.agent.configuration.AgentConfiguration;
import com.yihu.hos.agent.constant.FlowConstant;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.zbus.broker.ZbusBroker;
import org.zbus.mq.Consumer;
import org.zbus.net.http.Message;
import java.io.IOException;
/**
 * @author Airhead
@ -18,43 +8,43 @@ import java.io.IOException;
 */
@Service
public class MycatService {
    private static final Logger logger = LogManager.getLogger(ShellService.class);
    private ZbusBroker zbusBroker;
    private Consumer consumer;
    private AgentConfiguration configuration;
    @Autowired
    public void setZbusBroker(ZbusBroker zbusBroker) {
        this.zbusBroker = zbusBroker;
    }
    @Autowired
    public void setConfiguration(AgentConfiguration configuration) {
        this.configuration = configuration;
    }
    @Override
    protected void finalize() throws Throwable {
        consumer.close();
        super.finalize();
    }
    public void server(){
        if (zbusBroker == null) {
            logger.error("zbusBroker is null");
            return;
        }
        consumer = new Consumer(zbusBroker, FlowConstant.PREFIX + "@" + configuration.getTenant());
        try {
            consumer.start(this::handle);
        } catch (IOException e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        }
    }
    public void handle(Message message, Consumer consumer) {
    }
//    private static final Logger logger = LogManager.getLogger(ShellService.class);
//
//    private ZbusBroker zbusBroker;
//    private Consumer consumer;
//    private AgentConfiguration configuration;
//
//    @Autowired
//    public void setZbusBroker(ZbusBroker zbusBroker) {
//        this.zbusBroker = zbusBroker;
//    }
//
//    @Autowired
//    public void setConfiguration(AgentConfiguration configuration) {
//        this.configuration = configuration;
//    }
//
//    @Override
//    protected void finalize() throws Throwable {
//        consumer.close();
//        super.finalize();
//    }
//
//    public void server(){
//        if (zbusBroker == null) {
//            logger.error("zbusBroker is null");
//            return;
//        }
//
//        consumer = new Consumer(zbusBroker, FlowConstant.PREFIX + "@" + configuration.getTenant());
//        try {
//            consumer.start(this::handle);
//        } catch (IOException e) {
//            e.printStackTrace();
//            logger.error(e.getMessage());
//        }
//    }
//
//    public void handle(Message message, Consumer consumer) {
//    }
}

+ 50 - 64
hos-agent/src/main/java/com/yihu/hos/agent/service/ShellService.java

@ -1,20 +1,6 @@
package com.yihu.hos.agent.service;
import com.yihu.hos.agent.configuration.AgentConfiguration;
import com.yihu.hos.agent.constant.ShellConstant;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.zbus.broker.ZbusBroker;
import org.zbus.mq.Consumer;
import org.zbus.net.http.Message;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
 * @author HZY
@ -23,54 +9,54 @@ import java.util.Map;
 */
@Service
public class ShellService {
    private static final Logger logger = LogManager.getLogger(ShellService.class);
    private ZbusBroker zbusBroker;
    private Consumer consumer;
    private AgentConfiguration configuration;
    private CamelContext camelContext;
    @Autowired
    public void setZbusBroker(ZbusBroker zbusBroker) {
        this.zbusBroker = zbusBroker;
    }
    @Autowired
    public void setConfiguration(AgentConfiguration configuration) {
        this.configuration = configuration;
    }
    @Autowired
    public void setCamelContext(CamelContext camelContext) {
        this.camelContext = camelContext;
    }
    public void handle(Message message, Consumer consumer) {
        Map<String, Object> header = new HashMap<>();
        header.put("event", message.getHead("event"));
        ProducerTemplate producerTemplate = camelContext.createProducerTemplate();
        producerTemplate.sendBodyAndHeaders(ShellConstant.ENDPOINT, message.getBodyString(), header);
    }
    public void server() {
        if (zbusBroker == null) {
            logger.error("zbusBroker is null");
            return;
        }
        consumer = new Consumer(zbusBroker, ShellConstant.PREFIX_IN + "@" + configuration.getTenant());
        try {
            consumer.start(this::handle);
        } catch (IOException e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        }
    }
    @Override
    protected void finalize() throws Throwable {
        consumer.close();
        super.finalize();
    }
//    private static final Logger logger = LogManager.getLogger(ShellService.class);
//
//    private ZbusBroker zbusBroker;
//    private Consumer consumer;
//    private AgentConfiguration configuration;
//    private CamelContext camelContext;
//
//    @Autowired
//    public void setZbusBroker(ZbusBroker zbusBroker) {
//        this.zbusBroker = zbusBroker;
//    }
//
//    @Autowired
//    public void setConfiguration(AgentConfiguration configuration) {
//        this.configuration = configuration;
//    }
//
//    @Autowired
//    public void setCamelContext(CamelContext camelContext) {
//        this.camelContext = camelContext;
//    }
//
//    public void handle(Message message, Consumer consumer) {
//        Map<String, Object> header = new HashMap<>();
//        header.put("event", message.getHead("event"));
//
//        ProducerTemplate producerTemplate = camelContext.createProducerTemplate();
//        producerTemplate.sendBodyAndHeaders(ShellConstant.ENDPOINT, message.getBodyString(), header);
//    }
//
//    public void server() {
//        if (zbusBroker == null) {
//            logger.error("zbusBroker is null");
//            return;
//        }
//
//        consumer = new Consumer(zbusBroker, ShellConstant.PREFIX_IN + "@" + configuration.getTenant());
//        try {
//            consumer.start(this::handle);
//        } catch (IOException e) {
//            e.printStackTrace();
//            logger.error(e.getMessage());
//        }
//    }
//
//    @Override
//    protected void finalize() throws Throwable {
//        consumer.close();
//        super.finalize();
//    }
}

+ 4 - 4
hos-agent/src/main/java/com/yihu/hos/agent/util/SSHLinuxTool.java

@ -22,13 +22,13 @@ public class SSHLinuxTool {
    public static final String BEAN_ID = "SSHLinuxTool";
    @Value("${hos.jcraft.host}")
    @Value("${agent.jcraft.host}")
    private String host;
    @Value("${hos.jcraft.port}")
    @Value("${agent.jcraft.port}")
    private Integer port;
    @Value("${hos.jcraft.user}")
    @Value("${agent.jcraft.user}")
    private String user;
    @Value("${hos.jcraft.password}")
    @Value("${agent.jcraft.password}")
    private String password;
    public static Channel channel = null;

+ 7 - 2
hos-agent/src/main/resources/application.yml

@ -19,7 +19,12 @@ spring:
agent:
  zbus:
    url: 127.0.0.1:15555
  agent:
  tenant:
    name: jzkl
  proxy:
    port: 9090
    port: 9090
  jcraft:
    host:
    port: 22
    user:
    password:

+ 7 - 9
hos-arbiter/src/main/java/com/yihu/hos/arbiter/HosArbiterApplication.java

@ -9,8 +9,6 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
import org.zbus.mq.server.MqServer;
import org.zbus.mq.server.MqServerConfig;
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class, HibernateJpaAutoConfiguration.class})
@ -25,13 +23,13 @@ public class HosArbiterApplication implements CommandLineRunner {
    @Override
    public void run(String... strings) throws Exception {
        if (configuration.isCentral()) {
            MqServerConfig config = new MqServerConfig();
            config.serverPort = configuration.getCentralPort();
            config.storePath = configuration.getCentralStore();
            final MqServer server = new MqServer(config);
            server.start();
        }
//        if (configuration.isCentral()) {
//            MqServerConfig config = new MqServerConfig();
//            config.serverPort = configuration.getCentralPort();
//            config.storePath = configuration.getCentralStore();
//            final MqServer server = new MqServer(config);
//            server.start();
//        }
        proxyService.start();
        shellService.start();

+ 1 - 1
hos-arbiter/src/main/java/com/yihu/hos/arbiter/listener/ApplicationStartListener.java

@ -22,6 +22,6 @@ public class ApplicationStartListener implements ApplicationListener<ContextRefr
    public void arbiterServerStart() {
        arbiterServerService.start();   //后期可以统一到Router上来处理
//        arbiterServerService.start();   //后期可以统一到Router上来处理
    }
}

+ 1 - 1
hos-arbiter/src/main/java/com/yihu/hos/arbiter/routers/ServiceFlowEventRouter.java

@ -28,7 +28,7 @@ public class ServiceFlowEventRouter extends RouteBuilder {
        context.addComponent("service.flow.event", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
        from(ServiceFlowConstant.FLOW_EVENT_ENDPOINT)
                .choice()
                .when(header("tenant").isNotNull()).to("bean:serviceFlowService?method=proxy")
//                .when(header("tenant").isNotNull()).to("bean:serviceFlowService?method=proxy")
                .when(header("event").isEqualTo(ServiceFlowConstant.SERVICE_FLOW_STARTED)).to("bean:serviceFlowService?method=serviceFlowStarted")
                .when(header("event").isEqualTo(ServiceFlowConstant.SERVICE_FLOW_STOPPED)).to("bean:serviceFlowService?method=serviceFlowStopped")
                .when(header("event").isEqualTo(ServiceFlowConstant.SERVICE_FLOW_ADDED)).to("bean:serviceFlowService?method=serviceFlowAdd")

+ 21 - 23
hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/ServiceFlowService.java

@ -19,8 +19,6 @@ import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.stereotype.Service;
import org.zbus.broker.ZbusBroker;
import org.zbus.mq.Producer;
import org.zbus.net.http.Message;
import java.io.IOException;
import java.util.List;
@ -358,27 +356,27 @@ public class ServiceFlowService {
     * @param msg    消息
     */
    public void proxy(@Headers Map<String, String> header, @Body String msg) {
        if (zbusBroker == null) {
            logger.error("zbusBroker is null.");
            return;
        }
        try {
            Producer producer = new Producer(zbusBroker, ServiceFlowConstant.SSH + "@" + header.get("tenant"));
            producer.createMQ();    //确定为创建消息队列需要显示调用
            Message message = new Message();
            message.setHead("event", header.get("event"));
            message.setHead("tenant", header.get("tenant"));
//            message.setHead(header);
            message.setMethod("POST");
            message.setBody(msg);
            message = producer.sendSync(message);
            logger.debug(message);
//            System.out.println("test");
        } catch (IOException | InterruptedException e) {
            logger.error(e.getMessage());
            e.printStackTrace();
        }
//        if (zbusBroker == null) {
//            logger.error("zbusBroker is null.");
//            return;
//        }
//
//        try {
//            Producer producer = new Producer(zbusBroker, ServiceFlowConstant.SSH + "@" + header.get("tenant"));
//            producer.createMQ();    //确定为创建消息队列需要显示调用
//            Message message = new Message();
//            message.setHead("event", header.get("event"));
//            message.setHead("tenant", header.get("tenant"));
////            message.setHead(header);
//            message.setMethod("POST");
//            message.setBody(msg);
//            message = producer.sendSync(message);
//            logger.debug(message);
////            System.out.println("test");
//        } catch (IOException | InterruptedException e) {
//            logger.error(e.getMessage());
//            e.printStackTrace();
//        }
    }

+ 28 - 11
src/main/java/com/yihu/hos/services/ServiceFlowEventService.java

@ -1,6 +1,5 @@
package com.yihu.hos.services;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yihu.hos.common.constants.ContextAttributes;
import com.yihu.hos.core.log.Logger;
@ -11,10 +10,12 @@ import com.yihu.hos.web.framework.model.bo.ServiceFlow;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;
import org.zbus.broker.ZbusBroker;
import org.zbus.mq.Producer;
import org.zbus.net.http.Message;
import javax.jms.Queue;
import java.util.HashMap;
import java.util.Map;
import java.io.IOException;
/**
 * @created Airhead 2016/8/2.
@ -31,6 +32,13 @@ public class ServiceFlowEventService {
    @Autowired
    private ObjectMapper objectMapper;
    private ZbusBroker zbusBroker;
    @Autowired
    public void setZbusBroker(ZbusBroker zbusBroker) {
        this.zbusBroker = zbusBroker;
    }
    /**
     * admin启动时,触发一次所有流程更新事件,用于重启整个服务的情况。
     * 同时解决Broker中启动多个采集任务的问题。
@ -72,17 +80,26 @@ public class ServiceFlowEventService {
    private void sendMsg(String event, ServiceFlow serviceFlow) {
        if (zbusBroker == null) {
            logger.error("zbusBroker is null.");
            return;
        }
        try {
            String msg = objectMapper.writeValueAsString(serviceFlow);
            Map<String, Object> header = new HashMap<>();
            String attachment = LocalContext.getContext().getAttachment(ContextAttributes.TENANT_NAME);
            serviceFlow.setTenant(attachment);
            header.put("tenant", attachment);
            header.put("event", event);
                    this.jmsMessagingTemplate.convertAndSend(this.queue, msg, header);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
            String tenant = LocalContext.getContext().getAttachment(ContextAttributes.TENANT_NAME);
            Producer producer = new Producer(zbusBroker, ServiceFlowConstant.SSH + "@" + tenant);
            producer.createMQ();    //确定为创建消息队列需要显示调用
            Message message = new Message();
            message.setHead("event", event);
            message.setHead("tenant", tenant);
            message.setMethod("POST");
            message.setBody(msg);
            producer.sendSync(message);
        } catch (IOException | InterruptedException e) {
            logger.error(e.getMessage());
            e.printStackTrace();
        }
    }
}

+ 30 - 13
src/main/java/com/yihu/hos/services/ServiceShellEventService.java

@ -1,6 +1,5 @@
package com.yihu.hos.services;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yihu.hos.common.constants.ContextAttributes;
import com.yihu.hos.core.log.Logger;
@ -11,14 +10,17 @@ import com.yihu.hos.web.framework.model.bo.ServiceShell;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;
import org.zbus.broker.ZbusBroker;
import org.zbus.mq.Producer;
import org.zbus.net.http.Message;
import javax.annotation.Resource;
import javax.jms.Queue;
import java.util.HashMap;
import java.util.Map;
import java.io.IOException;
/**
 *  发送shell命令消息到MQ
 * 发送shell命令消息到MQ
 *
 * @author HZY
 * @vsrsion 1.0
 * Created at 2017/1/5.
@ -35,6 +37,13 @@ public class ServiceShellEventService {
    @Autowired
    private ObjectMapper objectMapper;
    private ZbusBroker zbusBroker;
    @Autowired
    public void setZbusBroker(ZbusBroker zbusBroker) {
        this.zbusBroker = zbusBroker;
    }
    public void serviceShellSend(ServiceShell serviceShell) {
        serviceShell.setType(ServiceFlowConstant.ARBITER_SHELL_SEND);
        this.sendMsg(ServiceFlowConstant.ARBITER_SHELL_SEND, serviceShell);
@ -47,20 +56,28 @@ public class ServiceShellEventService {
    }
    private void sendMsg(String event, ServiceShell serviceShell) {
        if (zbusBroker == null) {
            logger.error("zbusBroker is null.");
            return;
        }
        try {
            Map<String, Object> header = new HashMap<>();
            String attachment = LocalContext.getContext().getAttachment(ContextAttributes.TENANT_NAME);
            serviceShell.setTenant(attachment);
            String msg = objectMapper.writeValueAsString(serviceShell);
            header.put("tenant", attachment);
            header.put("event", event);
            this.jmsMessagingTemplate.convertAndSend(this.shellQueue, msg, header);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
            String tenant = LocalContext.getContext().getAttachment(ContextAttributes.TENANT_NAME);
            Producer producer = new Producer(zbusBroker, ServiceFlowConstant.SHELL_REQUEST + "@" + tenant);
            producer.createMQ();    //确定为创建消息队列需要显示调用
            Message message = new Message();
            message.setHead("event", event);
            message.setHead("tenant", tenant);
            message.setMethod("POST");
            message.setBody(msg);
            producer.sendSync(message);
        } catch (IOException | InterruptedException e) {
            logger.error(e.getMessage());
            e.printStackTrace();
        }
    }
}