zhenglingfeng 8 anni fa
parent
commit
c0a0abd36e

+ 19 - 0
hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/ServiceFlowService.java

@ -146,7 +146,26 @@ public class ServiceFlowService {
                        response.close();
                        break;
                    }
                    case "processorDataDeleted": {
                        try {
                            URI uri = new URIBuilder()
                                    .setScheme("http")
                                    .setHost(brokerServer.getHostAddress() + ":" + brokerServer.getPort())
                                    .setPath("/esb/processor")
                                    .setParameter("serviceFlow", serviceFlow.getServiceFlow())
                                    .setParameter("packageName", serviceFlow.getPackageName())
                                    .setParameter("className", serviceFlow.getClassName())
                                    .build();
                            HttpDelete httpDelete = new HttpDelete(uri);
                            CloseableHttpResponse response = httpclient.execute(httpDelete);
                            response.close();
                        } catch (URISyntaxException e) {
                            e.printStackTrace();
                        }
                        break;
                    }
                    default:
                        break;
                }

+ 2 - 7
hos-broker/src/main/java/com/yihu/hos/services/ESBCamelService.java

@ -57,11 +57,6 @@ public class ESBCamelService {
            String fileName = DES.decrypt(path, DES.COMMON_PASSWORD);
            MongoDatabase db = mongoConfig.mongoClient().getDatabase(dbName);
            if (GridFSUtil.readFile(db, out, fileName)) {
                File packageFile = new File(serviceFlow + StringUtil.replaceStrAll(packageName, ".", "/"));
                ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader();
                ClassLoader camelESBClassLoader = new URLClassLoader(new URL[]{packageFile.toURI().toURL()}, currentClassLoader);
                Thread.currentThread().setContextClassLoader(camelESBClassLoader);
                return Result.success("新增处理器成功!");
            } else {
                return Result.error("新增处理器失败!");
@ -142,8 +137,8 @@ public class ESBCamelService {
            MongoDatabase db = mongoConfig.mongoClient().getDatabase(dbName);
            if (GridFSUtil.readFile(db, out, fileName)) {
                // 3、===============加载到CamelContext中
                ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader();
                Class<RouteBuilder> routeBuilderClass = (Class<RouteBuilder>) currentClassLoader.loadClass(SystemClassMapping.getSystemClassNameMapping().get(serviceFlow + BrokerConstant.ROUTE + className));
                DynamicClassLoader classLoader = new DynamicClassLoader(DynamicClassLoader.class.getClassLoader());
                Class<RouteBuilder> routeBuilderClass = (Class<RouteBuilder>) classLoader.loadClass(SystemClassMapping.getSystemClassNameMapping().get(serviceFlow + BrokerConstant.ROUTE + className));
                if(routeBuilderClass != null) {
                    RouteBuilder routeBuilder = routeBuilderClass.newInstance();
                    SystemCamelContext.getDefaultCamelContext().addRoutes(routeBuilder);

+ 0 - 44
hos-camel/src/main/java/api1/processor/ApiProcessor.java

@ -1,44 +0,0 @@
package api1.processor;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.http.common.HttpMessage;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
/**
 * @author Airhead
 * @since 2016-11-13
 */
public class ApiProcessor implements Processor {
    public void process(Exchange exchange) throws Exception {
        Message outMessage = exchange.getOut();
        outMessage.setBody("hello,api1");
    }
    /**
     * 从stream中分析字符串内容
     *
     * @param bodyStream
     * @return
     */
    private String analysisMessage(InputStream bodyStream) throws IOException {
        ByteArrayOutputStream outStream = new ByteArrayOutputStream();
        byte[] contextBytes = new byte[4096];
        int realLen;
        while ((realLen = bodyStream.read(contextBytes, 0, 4096)) != -1) {
            outStream.write(contextBytes, 0, realLen);
        }
        // 返回从Stream中读取的字串
        try {
            return new String(outStream.toByteArray(), "UTF-8");
        } finally {
            outStream.close();
        }
    }
}

+ 0 - 16
hos-camel/src/main/java/api1/route/ApiRouteBulider.java

@ -1,16 +0,0 @@
package api1.route;
import api1.processor.ApiProcessor;
import org.apache.camel.builder.RouteBuilder;
/**
 * @author Airhead
 * @since 2016-11-13
 */
public class ApiRouteBulider extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        from("jetty:http://localhost:9091/api/v1").routeId("api1")
                .process(new ApiProcessor()).to("stream:out");
    }
}

+ 0 - 44
hos-camel/src/main/java/api2/processor/ApiProcessor.java

@ -1,44 +0,0 @@
package api2.processor;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.http.common.HttpMessage;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
/**
 * @author Airhead
 * @since 2016-11-13
 */
public class ApiProcessor implements Processor {
    public void process(Exchange exchange) throws Exception {
        Message outMessage = exchange.getOut();
        outMessage.setBody("hello,api2");
    }
    /**
     * 从stream中分析字符串内容
     *
     * @param bodyStream
     * @return
     */
    private String analysisMessage(InputStream bodyStream) throws IOException {
        ByteArrayOutputStream outStream = new ByteArrayOutputStream();
        byte[] contextBytes = new byte[4096];
        int realLen;
        while ((realLen = bodyStream.read(contextBytes, 0, 4096)) != -1) {
            outStream.write(contextBytes, 0, realLen);
        }
        // 返回从Stream中读取的字串
        try {
            return new String(outStream.toByteArray(), "UTF-8");
        } finally {
            outStream.close();
        }
    }
}

+ 0 - 16
hos-camel/src/main/java/api2/route/ApiRouteBulider.java

@ -1,16 +0,0 @@
package api2.route;
import api2.processor.ApiProcessor;
import org.apache.camel.builder.RouteBuilder;
/**
 * @author Airhead
 * @since 2016-11-13
 */
public class ApiRouteBulider extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        from("jetty:http://localhost:9092/api/v1").routeId("api2")
                .process(new ApiProcessor());
    }
}

+ 2 - 9
hos-camel/src/main/java/api3/processor/ApiProcessor.java

@ -1,20 +1,13 @@
package api3.processor;
package api7.processor;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
/**
 * @author Airhead
 * @since 2016-11-13
 */
public class ApiProcessor implements Processor {
    public void process(Exchange exchange) throws Exception {
        Message outMessage = exchange.getOut();
        outMessage.setBody("hello,api3");
        outMessage.setBody("hello,api7");
    }
}

+ 4 - 7
hos-camel/src/main/java/api3/route/ApiRouteBulider.java

@ -1,16 +1,13 @@
package api3.route;
package api7.route;
import api3.processor.ApiProcessor;
import api7.processor.ApiProcessor;
import org.apache.camel.builder.RouteBuilder;
/**
 * @author Airhead
 * @since 2016-11-13
 */
public class ApiRouteBulider extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        from("jetty:http://localhost:9093/api/v1").routeId("api3").log("servers: 0,code: ,order: 0")
        from("jetty:http://localhost:9097/api/v1").routeId("api7").log("servers: 0,code: ,order: 0")
                .process(new ApiProcessor()).log("servers: 0,code: ,order: 0");
    }
}

+ 16 - 16
src/main/java/com/yihu/hos/system/service/FlowManager.java

@ -162,25 +162,25 @@ public class FlowManager implements IFlowManage {
    public Result deleteFlow(Integer id) throws Exception {
        SystemServiceFlow flow = flowDao.getEntity(SystemServiceFlow.class, id);
        List<SystemServiceFlowClass> flowClassList = flowClassDao.getFlowClassByFlowId(id);
        SystemServiceFlowClass flowClassRoute = null;
        String oper = "";
        for (SystemServiceFlowClass flowClass:flowClassList){
            flowClassDao.deleteEntity(flowClass);
            //发送消息到MQ对列
            if (!flowClass.getType().equals(Constants.FLOW_TYPE_ROUTE)) {
                sendUpdateMessage(flow.getCode(), flowClass, Constants.FLOW_OP_DELETE);
            } else {
                flowClassRoute = flowClass;
                flowClassRoute.setIsUpdate("1");
                oper = Constants.FLOW_OP_DELETE;
            }
        }
        if (flowClassRoute != null) {
            sendUpdateMessage(flow.getCode(), flowClassRoute, oper);
        }
        List<SystemServiceFlowClass> processorFlowClassList = new ArrayList<>();
        if (Constants.JAVA.equals(flow.getFileType())){
            flowTempDao.deleteFlowTempByFlowId(id);
        } else {
            for (SystemServiceFlowClass flowClass:flowClassList){
                flowClassDao.deleteEntity(flowClass);
                flowClass.setIsUpdate("1");
                //发送消息到MQ对列
                if (flowClass.getType().equals(Constants.FLOW_TYPE_ROUTE)) {
                    sendUpdateMessage(flow.getCode(), flowClass, Constants.FLOW_OP_DELETE);
                } else {
                    processorFlowClassList.add(flowClass);
                }
            }
            for (SystemServiceFlowClass serviceFlowClass : processorFlowClassList) {
                sendUpdateMessage(flow.getCode(), serviceFlowClass, Constants.FLOW_OP_DELETE);
            }
        }
        flowDao.deleteEntity(flow);
        return Result.success("删除成功");