Преглед изворни кода

Merge branch 'master' of http://192.168.1.220:10080/esb/esb

# Conflicts:
#	src/main/java/com/yihu/hos/system/service/FlowManager.java
Airhead пре 8 година
родитељ
комит
6cc4954664

+ 22 - 8
hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/ServiceFlowService.java

@ -1,6 +1,5 @@
package com.yihu.hos.arbiter.services;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yihu.hos.arbiter.models.BrokerServer;
import com.yihu.hos.core.http.HTTPResponse;
@ -18,6 +17,7 @@ import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -43,14 +43,27 @@ public class ServiceFlowService {
    public void save(ServiceFlow serviceFlow) {
        if (serviceFlow == null) {
            logger.error("ServiceFlow is null");
            return;
        }
        Query query = new Query();
        query.addCriteria(Criteria.where("routeCode").is(serviceFlow.getRouteCode()));
        ServiceFlow flow = mongoOperations.findOne(query, ServiceFlow.class);
        Update update = new Update();
        update.set("routeCode", serviceFlow.getRouteCode());
        update.set("handleFiles", serviceFlow.getHandleFiles());
        update.set("updated", serviceFlow.getUpdated());
        update.set("flowType", serviceFlow.getFlowType());
        if (flow != null) {
            HashSet<ServiceFlow.HandleFile> flowSets = new HashSet<>(flow.getHandleFiles());
            HashSet<ServiceFlow.HandleFile> serviceFlowSets = new HashSet<>(serviceFlow.getHandleFiles());
            flowSets.addAll(serviceFlowSets);
            ArrayList<ServiceFlow.HandleFile> handleFiles = new ArrayList<>(flowSets);
            update.set("handleFiles", handleFiles); //没有用原生语法比较复杂
        } else {
            update.set("handleFiles", serviceFlow.getHandleFiles());
        }
        mongoOperations.upsert(query, update, ServiceFlow.class);
    }
@ -102,8 +115,6 @@ public class ServiceFlowService {
                BrokerServer brokerServer = objectMapper.readValue(msg, BrokerServer.class);
                String serviceFlowMsg = objectMapper.writeValueAsString(serviceFlow);
                serviceFlowStarted(serviceFlowMsg, brokerServer);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
@ -131,10 +142,9 @@ public class ServiceFlowService {
    private void flowController(String method, String path, String msg, BrokerServer brokerServer) {
        try {
            ServiceFlow serviceFlow = objectMapper.readValue(msg, ServiceFlow.class);
            this.save(serviceFlow);
            ServiceFlow serviceFlow = getServiceFlow(msg);
            this.save(serviceFlow); //需要改造??
            boolean one = ServiceFlowConstant.JAVA.equals(serviceFlow.getFlowType());   //有cron表达式,就是采集任务。
            if (one) {
@ -179,4 +189,8 @@ public class ServiceFlowService {
        }
    }
    private ServiceFlow getServiceFlow(String msg) throws IOException {
        return objectMapper.readValue(msg, ServiceFlow.class);
    }
}

+ 28 - 0
hos-arbiter/src/main/resources/log4j2.xml

@ -0,0 +1,28 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration status ="ALL"  monitorInterval="1800">
    <Appenders>
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout charset="UTF-8" pattern="%d{HH:mm:ss}[%-5p][%t][%c]: %m%n"/>
        </Console>
        <RollingFile name="RollingFile" filename="/usr/local/esb/log/CrunchifyTest.log"
                     filepattern="/usr/local/esb/log/rolling/%d{yyyyMMddHHmmss}-fargo.log">
            <PatternLayout charset="UTF-8" pattern="%d{yyyy-MM-dd HH:mm:ss} [%t] %-5p %c{1}:%L - %msg%n"/>
            <Policies>
                <SizeBasedTriggeringPolicy size="100 MB"/>
            </Policies>
            <DefaultRolloverStrategy max="20"/>
        </RollingFile>
    </Appenders>
    <Loggers>
        <Root level="debug">
            <AppenderRef ref="Console"/>
            <AppenderRef ref="RollingFile" />
        </Root>
        <Logger name="org.hibernate" level="warn" additivity="false">
        </Logger>
    </Loggers>
</configuration>

+ 3 - 3
hos-broker/src/main/java/com/yihu/hos/broker/models/SystemClassMapping.java

@ -18,12 +18,12 @@ public class SystemClassMapping {
        return mapping;
    }
    public static void put(String routeCode, String packageName, String className, String type) {
        mapping.put(routeCode + type + className, packageName + CoreConstant.DOT + className);
    public static void put(String routeCode, String packageName, String type, String className) {
        mapping.put(routeCode + "|" + type + "|" + className, packageName + CoreConstant.DOT + className);
    }
    public static String get(String routeCode, String className, String type) {
        return mapping.get(routeCode + type + className);
        return mapping.get(routeCode + "|" + type + "|" + className);
    }
    public static URL getResource(Object o) {

+ 9 - 3
hos-broker/src/main/java/com/yihu/hos/broker/services/camel/ESBCamelService.java

@ -9,6 +9,7 @@ import com.yihu.hos.core.constants.CoreConstant;
import com.yihu.hos.core.datatype.ClassFileUtil;
import com.yihu.hos.core.datatype.StringUtil;
import com.yihu.hos.core.encrypt.DES;
import com.yihu.hos.web.framework.constant.ServiceFlowConstant;
import com.yihu.hos.web.framework.model.Result;
import com.yihu.hos.web.framework.model.bo.ServiceFlow;
import com.yihu.hos.web.framework.util.GridFSUtil;
@ -179,6 +180,7 @@ public class ESBCamelService {
            return Result.success("服务流程启动成功!");
        } catch (Exception e) {
            logger.error(e.getMessage());
            return Result.error("服务流程启动失败!");
        }
    }
@ -204,7 +206,7 @@ public class ESBCamelService {
        if (handleFile == null) {
            return false;
        }
        SystemClassMapping.put(handleFile.getRouteCode(), handleFile.getPackageName(), handleFile.getClassName(), handleFile.getUsage());
        SystemClassMapping.put(handleFile.getRouteCode(), handleFile.getPackageName(), handleFile.getUsage(), handleFile.getClassName());
        URL resource = SystemClassMapping.getResource(this);
        FileOutputStream outputStream = ClassFileUtil.createFile(resource, handleFile.getPackageName(), handleFile.getClassName(), ClassFileUtil.CLASS_FILE);
@ -223,7 +225,7 @@ public class ESBCamelService {
        if (handleFile == null) {
            return false;
        }
        SystemClassMapping.put(handleFile.getRouteCode(), handleFile.getPackageName(), handleFile.getClassName(), handleFile.getUsage());
        SystemClassMapping.put(handleFile.getRouteCode(), handleFile.getPackageName(), handleFile.getUsage(), handleFile.getClassName());
        URL resource = SystemClassMapping.getResource(this);
        FileOutputStream outputStream = ClassFileUtil.createFile(resource, handleFile.getPackageName(), handleFile.getClassName(), ClassFileUtil.JAVA_FILE);
@ -270,9 +272,13 @@ public class ESBCamelService {
    }
    private void addRouter(ServiceFlow.HandleFile handleFile) throws Exception {
        if (handleFile.getUsage().equals(ServiceFlowConstant.FLOW_TYPE_PROCESSOR)) {
            return;
        }
        CamelClassLoader classLoader = new CamelClassLoader(CamelClassLoader.class.getClassLoader());
        String path = ClassLoader.getSystemResource(CoreConstant.EMPTY).getPath();
        String className = SystemClassMapping.get(handleFile.getRouteCode(), handleFile.getUsage(), handleFile.getClassName());
        String className = SystemClassMapping.get(handleFile.getRouteCode(), handleFile.getClassName(), handleFile.getUsage());
        Class<RouteBuilder> routeBuilderClass = (Class<RouteBuilder>) classLoader.loadClass(path, className);
        if (routeBuilderClass != null) {
            RouteBuilder routeBuilder = routeBuilderClass.newInstance();

+ 2 - 99
src/main/java/com/yihu/hos/services/ServiceFlowEventService.java

@ -43,15 +43,7 @@ public class ServiceFlowEventService {
    /**
     * 当外界组件通知一个新的processor处理器被定义时,该事件被触发。
     *
     * @param serviceFlow 本次processor处理器变化,所涉及的服务流程Code标识。
     * @param packageName processor处理器定义涉及的class包名
     * @param className   processor处理器定义涉及的class类名
     * @param path        processor处理器定义涉及的class内容,如果zookeeper数据结构中class分片存储,在业务级接口层面上也进行了合并
     */
//    public void serviceFlowAdded(String serviceFlow, String packageName, String className, String path) {
//        sendMsg(ServiceFlowConstant.SERVICE_FLOW_ADDED, serviceFlow, packageName, className, path);
//    }
    public void serviceFlowAdded(ServiceFlow serviceFlow) {
        this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_ADDED, serviceFlow);
    }
@ -59,17 +51,10 @@ public class ServiceFlowEventService {
    /**
     * 当外界组件通知一个已有的processor处理器data部分发生变化时,该事件被触发。
     */
//    public void serviceFlowModifiedAdd(String routeCode, String packageName, String className, String path) {
//        this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_MODIFIED_ADD, routeCode, packageName, className, path);
//    }
    public void serviceFlowModifiedAdd(ServiceFlow serviceFlow) {
        this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_MODIFIED_ADD, serviceFlow);
    }
//    public void serviceFlowModifiedReduce(String routeCode, String packageName, String className) {
//        this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_MODIFIED_REDUCE, routeCode, packageName, className, null);
//
//    }
    public void serviceFlowModifiedReduce(ServiceFlow serviceFlow) {
        this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_MODIFIED_REDUCE, serviceFlow);
@ -78,93 +63,11 @@ public class ServiceFlowEventService {
    /**
     * 当外界组件通知一个新的RouteDefine路由被定义时,该事件被触发
     */
//    public void serviceFlowDelete(String routeCode, String packageName, String className, String path) {
//        this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_DELETED, routeCode, packageName, className, path);
//    }
    public void serviceFlowDelete(ServiceFlow serviceFlow) {
        this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_DELETED, serviceFlow);
    }
//
//    /**
//     * 当外界组件通知一个已有的RouteDefine路由定义被改变时,主要就是路由定义内容被改变时,该事件被触发。
//     */
//    public void routeDefineChanged(String routeCode, String packageName, String className, String path) {
//        this.sendMsg(ServiceFlowConstant.ROUTE_DEFINE_CHANGED, routeCode, packageName, className, path);
//    }
//
//    public void routeDefineChanged(ServiceFlow serviceFlow) {
//        this.sendMsg(ServiceFlowConstant.ROUTE_DEFINE_CHANGED, serviceFlow);
//    }
//
//    /**
//     * 当外界组件通知一个已有的RouteDefine路由定义被删除时,该事件被触发。
//     */
//    public void routeDefineDelete(String routeCode, String packageName, String className) {
//        this.sendMsg(ServiceFlowConstant.ROUTE_DEFINE_DELETED, routeCode, packageName, className, null);
//    }
//
//    public void routeDefineDelete(ServiceFlow serviceFlow) {
//        this.sendMsg(ServiceFlowConstant.ROUTE_DEFINE_DELETED, serviceFlow);
//    }
//
//    public void routeClassAdded(String routeCode, String packageName, String className, String path, String cron) {
//        this.sendGenMsg(ServiceFlowConstant.ROUTE_CLASS_ADDED, routeCode, packageName, className, path, cron);
//    }
//
//    public void routeClassAdded(ServiceFlow serviceFlow) {
//        this.sendMsg(ServiceFlowConstant.ROUTE_CLASS_ADDED, serviceFlow);
//    }
//
//    public void routeClassChanged(String routeCode, String packageName, String className, String path, String cron) {
//        this.sendGenMsg(ServiceFlowConstant.ROUTE_CLASS_CHANGED, routeCode, packageName, className, path, cron);
//    }
//
//    public void routeClassChanged(ServiceFlow serviceFlow) {
//        this.sendMsg(ServiceFlowConstant.ROUTE_CLASS_CHANGED, serviceFlow);
//    }
//
//    public void processorClassAdded(String routeCode, String packageName, String className, String path) {
//        this.sendMsg(ServiceFlowConstant.PROCESSOR_CLASS_ADDED, routeCode, packageName, className, path);
//    }
//
//    public void processorClassAdded(ServiceFlow serviceFlow) {
//        this.sendMsg(ServiceFlowConstant.PROCESSOR_CLASS_ADDED, serviceFlow);
//    }
//
//    public void processorClassChanged(String routeCode, String packageName, String className, String path) {
//        this.sendMsg(ServiceFlowConstant.PROCESSOR_CLASS_CHANGED, routeCode, packageName, className, path);
//    }
//
//    public void processorClassChanged(ServiceFlow serviceFlow) {
//        this.sendMsg(ServiceFlowConstant.PROCESSOR_CLASS_CHANGED, serviceFlow);
//    }
//
//    private void sendMsg(String event, String routeCode, String packageName, String className, String path) {
//        ServiceFlow flow = new ServiceFlow();
//        flow.setRouteCode(routeCode);
//        ServiceFlow.HandleFile handleFile = new ServiceFlow.HandleFile();
//        handleFile.setPackageName(packageName);
//        handleFile.setClassName(className);
//        handleFile.setFilePath(path);
//        handleFile.setFileType(ServiceFlowConstant.CLASS);
//        handleFile.setRouteCode(routeCode);
//        flow.addHandleFile(handleFile);
//
//        this.sendMsg(event, flow);
//    }
//
//    private void sendGenMsg(String event, String routeCode, String packageName, String className, String path, String cron) {
//        ServiceFlow flow = new ServiceFlow();
//        flow.setRouteCode(routeCode);
//        ServiceFlow.HandleFile handleFile = new ServiceFlow.HandleFile();
//        handleFile.setPackageName(packageName);
//        handleFile.setClassName(className);
//        handleFile.setFilePath(path);
//        handleFile.setFileType(ServiceFlowConstant.JAVA);
//        handleFile.setRouteCode(routeCode);
//        flow.addHandleFile(handleFile);
//        this.sendMsg(event, flow);
//    }
    private void sendMsg(String event, ServiceFlow serviceFlow) {
        try {

+ 41 - 16
src/main/java/com/yihu/hos/system/service/FlowManager.java

@ -225,18 +225,49 @@ public class FlowManager implements IFlowManage {
        if (ServiceFlowConstant.CLASS.equals(obj.getFileType())) {
            List<SystemServiceFlowClass> flowClassList = obj.getFlowClassArray();
            ServiceFlow serviceFlow = new ServiceFlow();
            serviceFlow.setRouteCode(obj.getCode());
            serviceFlow.setFlowType(ServiceFlowConstant.CLASS);
            for (SystemServiceFlowClass flowClass : flowClassList) {
                flowClass.setFlowId(obj.getId());
                flowDao.saveEntity(flowClass);
                //发送消息到MQ对列
                sendUpdateMessage(obj.getCode(), flowClass, ServiceFlowConstant.FLOW_OP_ADD);
                ServiceFlow.HandleFile handleFile = new ServiceFlow.HandleFile();
                handleFile.setRouteCode(obj.getCode());
                handleFile.setFileType(ServiceFlowConstant.CLASS);
                handleFile.setPackageName(flowClass.getPackageName());
                handleFile.setClassName(flowClass.getClassName());
                handleFile.setFilePath(flowClass.getClassPath());
                handleFile.setUsage(flowClass.getType());
                serviceFlow.addHandleFile(handleFile);
            }
            serviceFlowEventService.serviceFlowAdded(serviceFlow);
        } else if (ServiceFlowConstant.JAVA.equals(obj.getFileType())) {
            List<SystemServiceFlowTemp> flowTempList = obj.getFlowTempArray();
//            ServiceFlow serviceFlow = new ServiceFlow();
//            serviceFlow.setRouteCode(obj.getCode());
//            serviceFlow.setFlowType(ServiceFlowConstant.JAVA);
            for (SystemServiceFlowTemp flowTemp : flowTempList) {
                flowTemp.setFlowId(obj.getId());
                flowDao.saveEntity(flowTemp);
//                ServiceFlow.HandleFile handleFile = new ServiceFlow.HandleFile();
//                handleFile.setRouteCode(obj.getCode());
//                handleFile.setFileType(ServiceFlowConstant.JAVA);
//                handleFile.setPackageName(flowTemp.getPackageName());
//                handleFile.setClassName(flowTemp.getClassName());
//                handleFile.setFilePath(flowTemp.getClassPath());
//                handleFile.setUsage(flowTemp.getType());
//                serviceFlow.addHandleFile(handleFile);
            }
//            serviceFlowEventService.serviceFlowAdded(serviceFlow);
        }
        return Result.success("保存成功");
@ -315,23 +346,17 @@ public class FlowManager implements IFlowManage {
    public Result deleteFlow(Integer id) throws Exception {
        SystemServiceFlow flow = flowDao.getEntity(SystemServiceFlow.class, id);
        List<SystemServiceFlowClass> flowClassList = flowClassDao.getFlowClassByFlowId(id);
        List<SystemServiceFlowClass> processorFlowClassList = new ArrayList<>();
        if (ServiceFlowConstant.JAVA.equals(flow.getFileType())) {
            flowTempDao.deleteFlowTempByFlowId(id);
        } else {
            for (SystemServiceFlowClass flowClass : flowClassList) {
                flowClassDao.deleteEntity(flowClass);
                flowClass.setIsUpdate("1");
                //发送消息到MQ对列
                if (flowClass.getType().equals(ServiceFlowConstant.FLOW_TYPE_ROUTE)) {
                    sendUpdateMessage(flow.getCode(), flowClass, ServiceFlowConstant.FLOW_OP_DELETE);
                } else {
                    processorFlowClassList.add(flowClass);
                }
            }
            for (SystemServiceFlowClass serviceFlowClass : processorFlowClassList) {
                sendUpdateMessage(flow.getCode(), serviceFlowClass, ServiceFlowConstant.FLOW_OP_DELETE);
            }
            ServiceFlow serviceFlow = new ServiceFlow();
            serviceFlow.setRouteCode(flow.getCode());
            serviceFlow.setFlowType(ServiceFlowConstant.CLASS);
            serviceFlowEventService.serviceFlowDelete(serviceFlow);
        }
        flowDao.deleteEntity(flow);
@ -435,11 +460,11 @@ public class FlowManager implements IFlowManage {
            serviceFlow.addHandleFile(handleFile);
            switch (operate) {
                case "add":
                case "update":
                case ServiceFlowConstant.FLOW_OP_ADD:
                case ServiceFlowConstant.FLOW_OP_UPDATE:
                    serviceFlowEventService.serviceFlowModifiedAdd(serviceFlow);
                    break;
                case "delete":
                case ServiceFlowConstant.FLOW_OP_DELETE:
                    serviceFlowEventService.serviceFlowModifiedReduce(serviceFlow);
                    break;
                default: