Browse Source

zbus消费者创建问题修改

demon 8 năm trước cách đây
mục cha
commit
1aaf765911

+ 4 - 11
hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/ShellService.java

@ -31,6 +31,8 @@ public class ShellService {
    private Consumer consumer;
    @Autowired
    private ObjectMapper objectMapper;
    private static Map<String,String> cunsumerNap = new HashMap<>();
    public void proxy(Message message, Consumer consumer) {
@ -48,19 +50,10 @@ public class ShellService {
        }
        try {
            if (consumer != null) {
                consumer = new Consumer(zbusBroker, ServiceFlowConstant.SHELL_REQUEST + "@" + configuration.getTenant());
                String messageBodyString = consumer.queryMQ().getBodyString();
                Map<String, Object> message = objectMapper.readValue(messageBodyString, Map.class);
                Integer consumerCount = (Integer) message.get("consumerCount");
                if (consumerCount < 1) {
                    consumer.start(this::proxy);
                }
            } else {
            if (!cunsumerNap.containsKey(ServiceFlowConstant.SHELL_REQUEST + "@" + configuration.getTenant())){
                consumer = new Consumer(zbusBroker, ServiceFlowConstant.SHELL_REQUEST + "@" + configuration.getTenant());
                consumer.start(this::proxy);
                cunsumerNap.put(ServiceFlowConstant.SHELL_REQUEST + "@" + configuration.getTenant(),configuration.getTenant());
            }
        } catch (Exception e) {

+ 6 - 3
hos-broker/src/main/java/com/yihu/hos/broker/common/shell/SSHLinuxTool.java

@ -96,10 +96,13 @@ public class SSHLinuxTool {
            outstream.write(shellCommand.getBytes());
            outstream.flush();
            //获取命令执行的结果
            Thread.sleep(1000);
            while (instream.available()<1){
                //获取命令执行的结果
                Thread.sleep(1000);
            }
            if (instream.available() > 0) {
                byte[] data = new byte[instream.available()];
                    byte[] data = new byte[instream.available()];
                int nLen = instream.read(data);
                if (nLen < 0) {
                    throw new Exception("network error.");

+ 4 - 6
src/main/java/com/yihu/hos/remoteManage/controller/RemoteShellController.java

@ -51,10 +51,9 @@ public class RemoteShellController extends BaseController {
            @RequestParam(value = "disCon", required = true) boolean disCon) {
        String result = "";
        try {
            //TODO 发送shell命令 消息
            System.out.println("发送shell请求:" + command);
            //发送shell命令 消息
            remoteShellService.sendShell(command, disCon);
            return Result.success("shell请求发送成功!");
            return Result.success("shell请求发送成功:"+command);
        } catch (Exception e) {
            e.printStackTrace();
        }
@ -69,16 +68,15 @@ public class RemoteShellController extends BaseController {
        String result = "";
        try {
            //TODO 如何去除等待时间,目前添加sleep是因为需要等待zbus回调方法的返回结果;
            Thread.sleep(2000);
            String attachment = LocalContext.getContext().getAttachment(ContextAttributes.TENANT_NAME);
            result = LocalContext.getContext().getAttachment(ContextAttributes.SHELL_RESPONSE + attachment);
            int count = 0;
            while (result == null) {
                count++;
                Thread.sleep(2000);
                Thread.sleep(1000);
                result = LocalContext.getContext().getAttachment(ContextAttributes.SHELL_RESPONSE + attachment);
                //获取失败时,尝试再一次获取结果
                if (count > 2) {
                if (count > 5) {
                    break;
                }
            }

+ 4 - 12
src/main/java/com/yihu/hos/remoteManage/service/RemoteShellService.java

@ -16,6 +16,7 @@ import org.zbus.net.http.Message;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
/**
@ -33,6 +34,7 @@ public class RemoteShellService {
    private ServiceShellEventService serviceShellEventService;
    private ZbusBroker zbusBroker;
    private Consumer consumer;
    private static Map<String,String> cunsumerNap = new HashMap<>();
    @Autowired
    private ObjectMapper objectMapper;
@ -81,21 +83,11 @@ public class RemoteShellService {
        }
        try {
            if (consumer != null) {
                //TODO  new后,原对象引用可能断开无法找到;使用静态锁map来保存
                consumer = new Consumer(zbusBroker, ServiceFlowConstant.SHELL_RESPONSE + "@" + attachment);
                String messageBodyString = consumer.queryMQ().getBodyString();
                Map<String,Object> message = objectMapper.readValue(messageBodyString,Map.class);
                Integer consumerCount = (Integer) message.get("consumerCount");
                if (consumerCount<1) {
                    consumer.start(this::handle);
                }
            } else {
            if (!cunsumerNap.containsKey(ServiceFlowConstant.SHELL_RESPONSE + "@" + attachment)){
                consumer = new Consumer(zbusBroker, ServiceFlowConstant.SHELL_RESPONSE + "@" + attachment);
                consumer.start(this::handle);
                cunsumerNap.put(ServiceFlowConstant.SHELL_RESPONSE + "@" + attachment,attachment);
            }
            System.out.println("start success~");
        } catch (Exception e) {
            e.printStackTrace();

+ 2 - 1
src/main/java/com/yihu/hos/tenant/service/TenantService.java

@ -74,7 +74,7 @@ public class TenantService {
        obj.setUpdatedUnix(0);
        String message = "";
        try {
            //TODO 建库建表操作
            // 建库建表操作
            DBInfoModel dbInfo = dbInfoService.getDBInfoById(obj.getDataSourceId());
            DBInfoModel db = new DBInfoModel();
            db.setUserName(dbInfo.getUserName());
@ -84,6 +84,7 @@ public class TenantService {
            db.setDbName(obj.getSchema());
            //建库,建表,数据复制操作
            message = mySqlImportAndExport.importSql(db, sqlFilePath);
            //TODO  mycat操作问题
            tenantDao.saveEntity(obj);
            return Result.success("保存成功");
        } catch (IOException ex) {

+ 2 - 0
src/main/resources/application.yml

@ -59,6 +59,8 @@ spring:
hos:
  zbus:
    url: 192.168.131.119:15555
    port: 15555
    store: ./store
  mysql:
    filePath: e://learn.sql   #sql文件位置