Bladeren bron

调整broker启动注册逻辑

Airhead 8 jaren geleden
bovenliggende
commit
617802dafc

+ 9 - 0
hos-arbiter/src/main/java/com/yihu/hos/arbiter/models/BrokerServer.java

@ -18,6 +18,7 @@ public class BrokerServer {
    private String hostAddress;
    private int port;
    private boolean enable;
    private boolean registered;
    @Indexed(name = "updateTime_1", expireAfterSeconds = 30)
    private Date updateTime;
    private ArrayList<Flow> onFlowList;
@ -96,6 +97,14 @@ public class BrokerServer {
        this.onFlowList = onFlowList;
    }
    public boolean isRegistered() {
        return registered;
    }
    public void setRegistered(boolean registered) {
        this.registered = registered;
    }
    public class Flow {
        String routeCode;
        String type;

+ 16 - 3
hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/BrokerServerService.java

@ -4,6 +4,8 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mongodb.WriteResult;
import com.yihu.hos.arbiter.models.BrokerServer;
import com.yihu.hos.core.http.HTTPResponse;
import com.yihu.hos.core.http.HttpClientKit;
import com.yihu.hos.web.framework.constant.ServiceFlowConstant;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
@ -52,22 +54,33 @@ public class BrokerServerService {
        update.set("port", brokerServer.getPort());
        update.set("updateTime", brokerServer.getUpdateTime());
        update.set("enable", brokerServer.isEnable());
//        update.set("registered", true); //?
        if (brokerServer.getOnFlowList() != null) {
            update.set("onFlowList", brokerServer.getOnFlowList());
        }
        WriteResult writeResult = mongoOperations.upsert(query, update, BrokerServer.class);
        if (writeResult.isUpdateOfExisting()) {
            return;
            //避免Broker重启的情况
            HTTPResponse response = HttpClientKit.post(brokerServer.getURL() + "/esb/heartbeat");
            if (response.getStatusCode() == 200 && brokerServer.isRegistered()) {
                return;
            }
            //确保Broker已经启动了流程
            brokerServer = mongoOperations.findOne(query, BrokerServer.class);
            if (brokerServer.getOnFlowList() != null) {
                return;
            }
        }
        brokerServer = mongoOperations.findById(writeResult.getUpsertedId(), BrokerServer.class);
        //没有启动流程
        try {
            String msg = objectMapper.writeValueAsString(brokerServer);
            ProducerTemplate producerTemplate = createProducerTemplate();
            Map<String, Object> header = new HashMap<>();
            header.put("event", ServiceFlowConstant.BROKER_SERVER_ON);
            producerTemplate.sendBodyAndHeaders("service.flow.event", msg, header);
            producerTemplate.sendBodyAndHeaders("service.flow.event:queue:configuration.service.flow", msg, header);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
            logger.error(e.getMessage());

+ 29 - 10
hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/ServiceFlowService.java

@ -17,6 +17,7 @@ import org.springframework.data.mongodb.core.query.Update;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@ -70,6 +71,10 @@ public class ServiceFlowService {
        flowController("post", "/esb/serviceFlow/start", msg);
    }
    public void serviceFlowStarted(String msg, BrokerServer brokerServer) {
        flowController("post", "/esb/serviceFlow/start", msg, brokerServer);
    }
    public void serviceFlowStopped(String msg) {
        flowController("post", "/esb/serviceFlow/stop", msg);
    }
@ -94,10 +99,13 @@ public class ServiceFlowService {
        List<ServiceFlow> serviceFlowList = getAll();
        serviceFlowList.forEach(serviceFlow -> {
            try {
                BrokerServer brokerServer = objectMapper.readValue(msg, BrokerServer.class);
                String serviceFlowMsg = objectMapper.writeValueAsString(serviceFlow);
                serviceFlowStarted(msg);
                serviceFlowStarted(serviceFlowMsg, brokerServer);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
@ -118,6 +126,12 @@ public class ServiceFlowService {
    }
    private void flowController(String method, String path, String msg) {
        this.flowController(method, path, msg, null);
    }
    private void flowController(String method, String path, String msg, BrokerServer brokerServer) {
        try {
            ServiceFlow serviceFlow = objectMapper.readValue(msg, ServiceFlow.class);
            this.save(serviceFlow);
@ -130,23 +144,29 @@ public class ServiceFlowService {
                }
            }
            List<BrokerServer> brokerServerList = brokerServerService.get(one);
            for (BrokerServer brokerServer : brokerServerList) {
                if (brokerServer.isFlowOn(serviceFlow.getRouteCode())) {
                    continue;
                }
            List<BrokerServer> brokerServerList = new ArrayList<>();
            if (brokerServer != null) {
                brokerServerList.add(brokerServer);
            } else {
                brokerServerList = brokerServerService.get(one);
            }
            for (BrokerServer broker : brokerServerList) {
//                if (broker.isFlowOn(serviceFlow.getRouteCode())) {
//                    continue;
//                }
                switch (method) {
                    case "post":
                        HTTPResponse response = HttpClientKit.post(brokerServer.getURL() + path, msg);
                        HTTPResponse response = HttpClientKit.post(broker.getURL() + path, msg);
                        if (response.getStatusCode() == 200) {
                            String body = response.getBody();
                        }
                        break;
                    case "put":
                        HttpClientKit.put(brokerServer.getURL() + path, msg);
                        HttpClientKit.put(broker.getURL() + path, msg);
                        break;
                    case "delete":
                        HttpClientKit.delete(brokerServer.getURL() + path, msg);
                        HttpClientKit.delete(broker.getURL() + path, msg);
                        break;
                    default:
                        break;
@ -159,5 +179,4 @@ public class ServiceFlowService {
        }
    }
}

+ 3 - 4
hos-broker/src/main/java/com/yihu/hos/broker/controllers/ESBCamelController.java

@ -19,10 +19,9 @@ public class ESBCamelController {
    @Autowired
    private ESBCamelService esbCamelService;
    @RequestMapping(value = "/test", produces = "application/json;charset=UTF-8", method = RequestMethod.POST)
    @ApiOperation(value = "新增Processor处理器", produces = "application/json", notes = "当外界组件通知一个新的processor处理器被定义时,该事件被触发")
    public Result test() {
        return Result.success("test");
    @RequestMapping(value = "/heartbeat", produces = "application/json;charset=UTF-8", method = RequestMethod.POST)
    @ApiOperation(value = "测试服务器可以正常连接", produces = "application/json", notes = "测试服务器可以正常连接")
    public void heartbeat() {
    }
    @RequestMapping(value = "/serviceFlow", produces = "application/json;charset=UTF-8", method = RequestMethod.POST)

+ 1 - 4
hos-broker/src/main/java/com/yihu/hos/broker/models/SystemClassMapping.java

@ -12,12 +12,9 @@ import java.util.Map;
 * Created by lingfeng on 2016/8/4.
 */
public class SystemClassMapping {
    private static Map<String, String> mapping;
    private static Map<String, String> mapping = new HashMap<>();
    public static Map<String, String> getMapping() {
        if (mapping == null) {
            mapping = new HashMap<>();
        }
        return mapping;
    }

+ 12 - 15
hos-broker/src/main/java/com/yihu/hos/broker/services/BrokerServerService.java

@ -4,18 +4,15 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.yihu.hos.broker.configurations.ApplicationConfiguration;
import com.yihu.hos.broker.configurations.ArbiterConfiguration;
import com.yihu.hos.core.http.HTTPResponse;
import com.yihu.hos.core.http.HttpClientKit;
import com.yihu.hos.core.net.IPChoiceUtils;
import org.apache.camel.util.InetAddressUtil;
import org.apache.http.Consts;
import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@ -23,14 +20,15 @@ import org.springframework.stereotype.Component;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
/**
 * @created Airhead 2016/8/1.
 */
@Component("brokerServerService")
public class BrokerServerService implements DisposableBean {
    private static boolean registered = false;
    @Autowired
    private ApplicationConfiguration applicationConfiguration;
    @Autowired
@ -50,16 +48,15 @@ public class BrokerServerService implements DisposableBean {
            objectNode.put("hostAddress", hostAddress);
            objectNode.put("port", port);
            objectNode.put("enable", true);
            objectNode.put("registered", registered);
            String brokerServer = objectMapper.writeValueAsString(objectNode);
            List<NameValuePair> nameValuePairList = new ArrayList<>();
            nameValuePairList.add(new BasicNameValuePair("brokerServer", brokerServer));
            CloseableHttpClient httpclient = HttpClients.createDefault();
            HttpPost httpPost = new HttpPost(arbiterConfiguration.getServer() + "/brokerServer");
            httpPost.setEntity(new UrlEncodedFormEntity(nameValuePairList, Consts.UTF_8));
            CloseableHttpResponse response = httpclient.execute(httpPost);
            response.close();
            Map<String, String> params = new HashMap<>();
            params.put("brokerServer", brokerServer);
            HTTPResponse httpResponse = HttpClientKit.post(arbiterConfiguration.getServer() + "/brokerServer", params);
            if (httpResponse.getStatusCode() == 200) {
                registered = true;
            }
        } catch (IOException e) {
            e.printStackTrace();

+ 43 - 7
hos-broker/src/main/java/com/yihu/hos/broker/services/camel/ESBCamelService.java

@ -5,12 +5,12 @@ import com.mongodb.client.MongoDatabase;
import com.yihu.hos.broker.configurations.MongoConfiguration;
import com.yihu.hos.broker.models.SystemCamelContext;
import com.yihu.hos.broker.models.SystemClassMapping;
import com.yihu.hos.web.framework.model.bo.ServiceFlow;
import com.yihu.hos.core.constants.CoreConstant;
import com.yihu.hos.core.datatype.ClassFileUtil;
import com.yihu.hos.core.datatype.StringUtil;
import com.yihu.hos.core.encrypt.DES;
import com.yihu.hos.web.framework.model.Result;
import com.yihu.hos.web.framework.model.bo.ServiceFlow;
import com.yihu.hos.web.framework.util.GridFSUtil;
import org.apache.camel.builder.RouteBuilder;
import org.apache.log4j.LogManager;
@ -55,6 +55,7 @@ public class ESBCamelService {
            ServiceFlowValid serviceFlowValid = new ServiceFlowValid(msg).invoke();
            if (serviceFlowValid.is()) return Result.error("必要的入参数据不正确,请检查!");
            ServiceFlow serviceFlow = serviceFlowValid.getServiceFlow();
            ServiceFlow.HandleFile handleFile = serviceFlowValid.getHandleFile();
            boolean created;
            if ("java".equals(handleFile.getFileType())) {
@ -66,7 +67,8 @@ public class ESBCamelService {
                return Result.error("服务流程变更增加失败!");
            }
            this.startRouter(handleFile);
            this.addRouter(handleFile);
            this.startRouter(serviceFlow.getRouteCode());
            return Result.error("服务流程变更增加成功!");
        } catch (Exception e) {
@ -97,7 +99,8 @@ public class ESBCamelService {
            this.stopRouter(serviceFlow.getRouteCode());
            this.removeRouter(serviceFlow.getRouteCode());
            this.startRouter(handleFile);
            this.addRouter(handleFile);
            this.startRouter(serviceFlow.getRouteCode());
            return Result.error("服务流程变更增加成功!");
        } catch (Exception e) {
@ -120,7 +123,8 @@ public class ESBCamelService {
            SystemCamelContext.getContext().stopRoute(routeCode);
            SystemCamelContext.getContext().removeRoute(routeCode);
            this.deleteClassFile(handleFile);
            this.startRouter(handleFile);
            this.addRouter(handleFile);
            this.startRouter(serviceFlow.getRouteCode());
            return Result.success("服务流程变更减少成功!");
        } catch (Exception e) {
@ -167,8 +171,12 @@ public class ESBCamelService {
                if (!created) {
                    return Result.error("服务流程启动失败!");
                }
                this.addRouter(handleFile);
            }
            SystemCamelContext.getContext().startRoute(routeCode);
            this.startRouter(serviceFlow.getRouteCode());
            return Result.success("服务流程启动成功!");
        } catch (Exception e) {
            return Result.error("服务流程启动失败!");
@ -193,6 +201,9 @@ public class ESBCamelService {
    }
    private boolean createClassFile(ServiceFlow.HandleFile handleFile) throws Exception {
        if (handleFile == null) {
            return false;
        }
        SystemClassMapping.put(handleFile.getRouteCode(), handleFile.getPackageName(), handleFile.getClassName(), handleFile.getUsage());
        URL resource = SystemClassMapping.getResource(this);
        FileOutputStream outputStream = ClassFileUtil.createFile(resource, handleFile.getPackageName(), handleFile.getClassName(), ClassFileUtil.CLASS_FILE);
@ -200,10 +211,18 @@ public class ESBCamelService {
        String fileName = DES.decrypt(handleFile.getFilePath(), DES.COMMON_PASSWORD);
        MongoDatabase db = mongoConfig.mongoClient().getDatabase(dbName);
        return GridFSUtil.readFile(db, outputStream, fileName);
        boolean read = GridFSUtil.readFile(db, outputStream, fileName);
        if (!read) {
            logger.error("not mongo file, fileName:" + fileName);
        }
        return read;
    }
    private boolean generateClassFile(ServiceFlow.HandleFile handleFile) throws Exception {
        if (handleFile == null) {
            return false;
        }
        SystemClassMapping.put(handleFile.getRouteCode(), handleFile.getPackageName(), handleFile.getClassName(), handleFile.getUsage());
        URL resource = SystemClassMapping.getResource(this);
        FileOutputStream outputStream = ClassFileUtil.createFile(resource, handleFile.getPackageName(), handleFile.getClassName(), ClassFileUtil.JAVA_FILE);
@ -213,10 +232,12 @@ public class ESBCamelService {
        boolean read = GridFSUtil.readFile(db, outputStream, fileName);
        if (!read) {
            logger.error("not mongo file, fileName:" + fileName);
            return false;
        }
        String sourcePath = ClassFileUtil.getFilePath(resource, handleFile.getPackageName(), handleFile.getClassName(), ClassFileUtil.JAVA_FILE);
        logger.info(sourcePath);
        return CamelCompiler.compile(sourcePath, resource.toString());
    }
@ -237,7 +258,18 @@ public class ESBCamelService {
        logger.info("===================" + handleFile.getPackageName() + CoreConstant.DOT + className + ".class 删除过程结束");
    }
    private void startRouter(ServiceFlow.HandleFile handleFile) throws Exception {
    private void addRouter(ArrayList<ServiceFlow.HandleFile> handleFiles) throws Exception {
        handleFiles.forEach(handleFile -> {
            try {
                addRouter(handleFile);
            } catch (Exception e) {
                logger.error(e.getMessage());
                e.printStackTrace();
            }
        });
    }
    private void addRouter(ServiceFlow.HandleFile handleFile) throws Exception {
        CamelClassLoader classLoader = new CamelClassLoader(CamelClassLoader.class.getClassLoader());
        String path = ClassLoader.getSystemResource(CoreConstant.EMPTY).getPath();
        String className = SystemClassMapping.get(handleFile.getRouteCode(), handleFile.getUsage(), handleFile.getClassName());
@ -248,6 +280,10 @@ public class ESBCamelService {
        }
    }
    private void startRouter(String routeCode) throws Exception {
        SystemCamelContext.getContext().startRoute(routeCode);
    }
    private void stopRouter(String routeCode) throws Exception {
        SystemCamelContext.getContext().stopRoute(routeCode);
    }

+ 1 - 1
hos-core/src/main/java/com/yihu/hos/core/datatype/ClassFileUtil.java

@ -162,6 +162,6 @@ public class ClassFileUtil {
    public static String getFilePath(URL url, String packageName, String className, String fileType) {
        String packagePath = StringUtil.replaceStrAll(packageName, ".", "/");
        return url.getPath() + "/" + packagePath + "/" + className + fileType;
        return url.getPath() + packagePath + "/" + className + fileType;
    }
}

+ 9 - 3
hos-core/src/main/java/com/yihu/hos/core/http/DefaultClientImpl.java

@ -63,7 +63,9 @@ class DefaultClientImpl implements HTTPClient {
    public HTTPResponse post(String url, Map<String, String> params, Map<String, String> headers) {
        try {
            FormBody.Builder fromBodyBuilder = new FormBody.Builder();
            params.forEach(fromBodyBuilder::add);
            if (params != null) {
                params.forEach(fromBodyBuilder::add);
            }
            RequestBody requestBody = fromBodyBuilder
                    .build();
@ -169,7 +171,9 @@ class DefaultClientImpl implements HTTPClient {
    public HTTPResponse put(String url, Map<String, String> params, Map<String, String> headers) {
        try {
            FormBody.Builder fromBodyBuilder = new FormBody.Builder();
            params.forEach(fromBodyBuilder::add);
            if (params != null) {
                params.forEach(fromBodyBuilder::add);
            }
            RequestBody requestBody = fromBodyBuilder
                    .build();
@ -223,7 +227,9 @@ class DefaultClientImpl implements HTTPClient {
    public HTTPResponse delete(String url, Map<String, String> params, Map<String, String> headers) {
        try {
            FormBody.Builder fromBodyBuilder = new FormBody.Builder();
            params.forEach(fromBodyBuilder::add);
            if (params != null) {
                params.forEach(fromBodyBuilder::add);
            }
            RequestBody requestBody = fromBodyBuilder
                    .build();

+ 1 - 0
src/main/java/com/yihu/hos/system/service/FlowManager.java

@ -148,6 +148,7 @@ public class FlowManager implements IFlowManage {
                handleFile.setPackageName(flowClass.getPackageName());
                handleFile.setFilePath(flowClass.getClassPath());
                handleFile.setUsage(flowClass.getType());
                handleFile.setRouteCode(serviceFlow.getRouteCode());
                handleFileList.add(handleFile);
            }