浏览代码

zbus 多个消费者问题处理

demon 8 年之前
父节点
当前提交
09c8909f45

+ 8 - 0
hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/LinuxShellService.java

@ -35,6 +35,9 @@ public class LinuxShellService {
    @Autowired
    private ObjectMapper objectMapper;
    private ShellService shellService;
    /**
     * SAAS化的管理端过来的消息会被proxy进行中转,之后发送到终端的Arbiter对Broker进行实际的控制。
     *
@ -58,6 +61,7 @@ public class LinuxShellService {
            message.setBody(msg);
            message = producer.sendSync(message);
            logger.debug(message);
            shellService.start();
//            System.out.println("test");
        } catch (IOException | InterruptedException e) {
            logger.error(e.getMessage());
@ -151,4 +155,8 @@ public class LinuxShellService {
        this.zbusBroker = zbusBroker;
    }
    @Autowired
    public void setShellService(ShellService shellService) {
        this.shellService = shellService;
    }
}

+ 19 - 4
hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/ShellService.java

@ -1,5 +1,6 @@
package com.yihu.hos.arbiter.services;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yihu.hos.arbiter.configuration.ArbiterServerConfiguration;
import com.yihu.hos.core.log.Logger;
import com.yihu.hos.core.log.LoggerFactory;
@ -12,7 +13,6 @@ import org.zbus.broker.ZbusBroker;
import org.zbus.mq.Consumer;
import org.zbus.net.http.Message;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@ -29,6 +29,8 @@ public class ShellService {
    private ArbiterServerConfiguration configuration;
    private ZbusBroker zbusBroker;
    private Consumer consumer;
    @Autowired
    private ObjectMapper objectMapper;
    public void proxy(Message message, Consumer consumer) {
@ -45,10 +47,23 @@ public class ShellService {
            return;
        }
        consumer = new Consumer(zbusBroker, ServiceFlowConstant.SHELL_REQUEST + "@" + configuration.getTenant());
        try {
            consumer.start(this::proxy);
        } catch (IOException e) {
            if (consumer != null) {
                consumer = new Consumer(zbusBroker, ServiceFlowConstant.SHELL_REQUEST + "@" + configuration.getTenant());
                String messageBodyString = consumer.queryMQ().getBodyString();
                Map<String, Object> message = objectMapper.readValue(messageBodyString, Map.class);
                Integer consumerCount = (Integer) message.get("consumerCount");
                if (consumerCount < 1) {
                    consumer.start(this::proxy);
                }
            } else {
                consumer = new Consumer(zbusBroker, ServiceFlowConstant.SHELL_REQUEST + "@" + configuration.getTenant());
                consumer.start(this::proxy);
            }
        } catch (Exception e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        }

+ 1 - 1
hos-broker/src/main/java/com/yihu/hos/broker/common/shell/SSHLinuxTool.java

@ -96,7 +96,7 @@ public class SSHLinuxTool {
            outstream.flush();
            //获取命令执行的结果
            Thread.sleep(2000);
            Thread.sleep(1000);
            if (instream.available() > 0) {
                byte[] data = new byte[instream.available()];
                int nLen = instream.read(data);

+ 12 - 0
src/main/java/com/yihu/hos/common/CommonPageController.java

@ -1,6 +1,7 @@
package com.yihu.hos.common;
import com.yihu.hos.common.constants.ContextAttributes;
import com.yihu.hos.remoteManage.service.RemoteShellService;
import com.yihu.hos.system.model.SystemUser;
import com.yihu.hos.tenant.model.TenantSession;
import com.yihu.hos.tenant.service.AuthenticateService;
@ -33,6 +34,8 @@ public class CommonPageController extends BaseController {
    @Autowired
    private AuthenticateService authenticateService;
    private RemoteShellService remoteShellService;
    /*
    登录页面
     */
@ -56,6 +59,8 @@ public class CommonPageController extends BaseController {
                } finally {
                    IOUtils.closeQuietly(out);
                }
            }else {
                remoteShellService.start();
            }
        } catch (Exception e) {
            e.printStackTrace();
@ -98,6 +103,8 @@ public class CommonPageController extends BaseController {
            } finally {
                IOUtils.closeQuietly(out);
            }
        }else {
            remoteShellService.start();
        }
        } catch (Exception e) {
            e.printStackTrace();
@ -163,4 +170,9 @@ public class CommonPageController extends BaseController {
        model.addAttribute("contentPage","/common/home");
        return "partView";
    }
    @Autowired
    public void setRemoteShellService(RemoteShellService remoteShellService) {
        this.remoteShellService = remoteShellService;
    }
}

+ 1 - 1
src/main/java/com/yihu/hos/common/constants/ContextAttributes.java

@ -10,7 +10,7 @@ public interface ContextAttributes {
    String TENANT_SESSION = "tenantSession";
    String GLOBAL_DB = "global_db";//平台管理中心库
    String SHELL_RESPONSE = "shell_repsonse";
    String SHELL_RESPONSE = "shell_repsonse.";
}

+ 3 - 4
src/main/java/com/yihu/hos/remoteManage/controller/RemoteShellController.java

@ -56,8 +56,6 @@ public class RemoteShellController extends BaseController {
            //TODO 发送shell命令 消息
            System.out.println("发送shell请求:"+command);
            remoteShellService.sendShell(command, disCon);
            //TODO 接收shell命令执行结果 消息
            remoteShellService.start();
            return Result.success("shell请求发送成功!");
        } catch (Exception e) {
            e.printStackTrace();
@ -73,8 +71,9 @@ public class RemoteShellController extends BaseController {
        String result = "";
        try {
            //TODO 如何去除等待时间,目前添加sleep是因为需要等待zbus回调方法的返回结果;
            Thread.sleep(3000);
            result = LocalContext.getContext().getAttachment(ContextAttributes.SHELL_RESPONSE);
            Thread.sleep(2000);
            String attachment = LocalContext.getContext().getAttachment(ContextAttributes.TENANT_NAME);
            result = LocalContext.getContext().getAttachment(ContextAttributes.SHELL_RESPONSE+attachment);
            System.out.println("接口返回结果:"+result);
        } catch (InterruptedException e) {
            e.printStackTrace();

+ 27 - 9
src/main/java/com/yihu/hos/remoteManage/service/RemoteShellService.java

@ -1,5 +1,6 @@
package com.yihu.hos.remoteManage.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yihu.hos.common.constants.ContextAttributes;
import com.yihu.hos.interceptor.LocalContext;
import com.yihu.hos.services.ServiceShellEventService;
@ -15,6 +16,7 @@ import org.zbus.net.http.Message;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Date;
import java.util.Map;
/**
 * @author HZY
@ -32,13 +34,16 @@ public class RemoteShellService {
    private ZbusBroker zbusBroker;
    private Consumer consumer;
    @Autowired
    private ObjectMapper objectMapper;
    /**
     * 发送shell命令消息
     *
     * @param command
     * @param disConnection
     * @return
     */
    public boolean sendShell(String command,boolean disConnection){
    public boolean sendShell(String command, boolean disConnection) {
        InetAddress addr = null;
        try {
            addr = InetAddress.getLocalHost();
@ -57,11 +62,11 @@ public class RemoteShellService {
    }
    public void proxy(Message message, Consumer consumer) {
    public void handle(Message message, Consumer consumer) throws IOException {
        String bodyString = message.getBodyString();
        LocalContext.getContext().setAttachment(ContextAttributes.SHELL_RESPONSE,bodyString);
        String attachment = LocalContext.getContext().getAttachment(ContextAttributes.TENANT_NAME);
        LocalContext.getContext().setAttachment(ContextAttributes.SHELL_RESPONSE + attachment, bodyString);
        System.out.println("回调返回:" + bodyString);
    }
    public void start() {
@ -71,16 +76,29 @@ public class RemoteShellService {
        }
        String attachment = LocalContext.getContext().getAttachment(ContextAttributes.TENANT_NAME);
        if (attachment==null){
        if (attachment == null) {
            return;
        }
        consumer = new Consumer(zbusBroker, ServiceFlowConstant.SHELL_RESPONSE + "@" + attachment);
        try {
            consumer.start(this::proxy);
        } catch (IOException e) {
            if (consumer != null) {
                consumer = new Consumer(zbusBroker, ServiceFlowConstant.SHELL_RESPONSE + "@" + attachment);
                String messageBodyString = consumer.queryMQ().getBodyString();
                Map<String,Object> message = objectMapper.readValue(messageBodyString,Map.class);
                Integer consumerCount = (Integer) message.get("consumerCount");
                if (consumerCount<1) {
                    consumer.start(this::handle);
                }
            } else {
                consumer = new Consumer(zbusBroker, ServiceFlowConstant.SHELL_RESPONSE + "@" + attachment);
                consumer.start(this::handle);
            }
            System.out.println("start success~");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

+ 2 - 2
src/main/webapp/WEB-INF/ehr/jsp/monitor/shell/shell.jsp

@ -4,7 +4,7 @@
<!--######用户管理页面Title设置######-->
<style >
    #main_text{
        width: 60%;
        width: 50%;
        background: black;
        color: white;
        height: 500px;
@ -19,7 +19,7 @@
        <div class="m-form-group">
            <div class="m-form-control left" >
                <label>输入shell命令:</label>
                <label style="font-size: 16px;font-weight: bold;">终端服务器控制台</label>
            </div>
        </div>
    </div>