Forráskód Böngészése

服务编排编辑器修改

zhenglingfeng 8 éve
szülő
commit
fa61dad232

+ 19 - 0
hos-camel/src/main/java/crawler/Aggregate.java

@ -0,0 +1,19 @@
package crawler;
import org.apache.camel.Exchange;
import org.apache.camel.processor.aggregate.AggregationStrategy;
public class Aggregate implements AggregationStrategy {
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        //如果oldExchange为null,则说明是第一个分解包
        if (oldExchange == null) {
            return newExchange;
        }
        String oldBody = oldExchange.getIn().getBody(String.class);
        String newBody = newExchange.getIn().getBody(String.class);
        //将新与旧包进行合并,再设置进Message的body中
        oldExchange.getIn().setBody(oldBody + "\n" + newBody);
        return oldExchange;
    }
}

+ 11 - 0
hos-camel/src/main/java/crawler/Split.java

@ -0,0 +1,11 @@
package crawler;
import net.sf.json.JSONArray;
import org.apache.camel.Body;
public class Split {
    public JSONArray splitJsonArray(@Body String jsonArrayStr) {
        return JSONArray.fromObject(jsonArrayStr);
    }
}

+ 4 - 3
hos-camel/src/main/java/crawler/processor/CrawlerProcessor0.java

@ -7,9 +7,10 @@ import org.apache.camel.Processor;
/**
 * Created by Zdm on 2016/7/13.
 */
public class CrawlerProcessor0 implements Processor {
public class Processor1 implements Processor {
    public void process(Exchange exchange) throws Exception {
        Message outMessage = exchange.getOut();
        outMessage.setHeader(Exchange.HTTP_QUERY, "jobId=5ad5c11655d443c30155d477a6b10000");
        Message inMessage = exchange.getIn();
        outMessage.setHeader(Exchange.HTTP_QUERY, "str=" + inMessage.getBody(String.class));
    }
}
}

+ 24 - 0
hos-camel/src/main/java/crawler/processor/Processor2.java

@ -0,0 +1,24 @@
package crawler.processor;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
/**
 * Created by Zdm on 2016/7/13.
 */
public class Processor2 implements Processor {
    private String key;
    public Processor2(String key) {
        this.key = key;
    }
    public void process(Exchange exchange) throws Exception {
        Message inMessage = exchange.getIn();
        Message outMessage = exchange.getOut();
        outMessage.setHeader("test_correlation_key", this.key);
        String body = inMessage.getBody(String.class);
        outMessage.setBody(body);
    }
}

+ 2 - 2
hos-camel/src/main/java/crawler/route/CrawlerQuartzRoute.java

@ -1,6 +1,6 @@
package crawler.route;
import crawler.processor.CrawlerProcessor0;
import crawler.processor.Processor0;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
@ -9,7 +9,7 @@ import org.apache.camel.builder.RouteBuilder;
 */
public class CrawlerQuartzRoute extends RouteBuilder {
    public void configure() throws Exception {
        from("quartz://myGroup/myTimerName?cron=0/3 * * * * ?").routeId("routeId").process(new CrawlerProcessor0())
        from("quartz://myGroup/myTimerName?cron=0/3 * * * * ?").routeId("routeId").process(new Processor0())
                .setHeader(Exchange.HTTP_METHOD, constant("POST")).to("http4://localhost:8088/crawler/patientList").to("stream:out");
    }
}

+ 23 - 0
hos-camel/src/main/java/crawler/route/RouteBulider1.java

@ -0,0 +1,23 @@
package crawler.route;
import crawler.Aggregate;
import crawler.processor.Processor1;
import crawler.Split;
import crawler.processor.Processor2;
import org.apache.camel.builder.RouteBuilder;
/**
 * Created by lingfeng on 2016/7/25.
 */
public class RouteBulider1 extends RouteBuilder {
    public void configure() throws Exception {
        from("quartz://myGroup/myTimerName?cron=0/5 * * * * ? ").routeId("list")
                .to("http://localhost:9999/list")
                .split().method(Split.class, "splitJsonArray")
                .process(new Processor1()).to("http://localhost:9999/str")
                .process(new Processor2("key1"))
                .aggregate(header("test_correlation_key"), new Aggregate()).completionSize(3).to("stream:out");
    }
}

+ 2 - 2
hos-rest/src/main/resources/application.yml

@ -38,11 +38,11 @@ qlc:
    port: 2181
crawler:
  upload:
    ip: 172.19.103.89
    ip: localhost
    port: 8088
    api: crawler/patient
  storage:
    ip: 172.19.103.89
    ip: localhost
    port: 8088
    api: crawler/datapush

+ 1 - 0
src/main/java/com/yihu/hos/filter/SessionOutTimeFilter.java

@ -31,6 +31,7 @@ public class SessionOutTimeFilter extends OncePerRequestFilter {
                || path.indexOf(httpServletRequest.getContextPath() + "/static-dev") != -1
                || path.indexOf(httpServletRequest.getContextPath() + "/develop") != -1
                || path.indexOf(httpServletRequest.getContextPath() + "/rest") != -1
                || path.indexOf(httpServletRequest.getContextPath() + "/process") != -1
                || path.indexOf("swagger") != -1
                || path.indexOf(httpServletRequest.getContextPath() + "/v2/api-docs") != -1
                || path.indexOf("/tenant/down") != -1

+ 6 - 6
src/main/java/com/yihu/hos/system/controller/ProcessController.java

@ -8,13 +8,8 @@ import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
/**
 *  流程管理
 * @author HZY
 * @vsrsion 1.0
 * Created at 2016/8/12.
 */
@RequestMapping("/process")
@Controller
public class ProcessController  extends BaseController {
@ -34,6 +29,7 @@ public class ProcessController  extends BaseController {
    }
    @RequestMapping(value = "/getAllApp", method = RequestMethod.GET)
    @ResponseBody
    public Result getAllApp() {
        try {
            return processManager.getAllApp();
@ -43,6 +39,7 @@ public class ProcessController  extends BaseController {
    }
    @RequestMapping(value = "/getAllAppService", method = RequestMethod.GET)
    @ResponseBody
    public Result getAllAppService() {
        try {
            return processManager.getAllAppService();
@ -52,6 +49,7 @@ public class ProcessController  extends BaseController {
    }
    @RequestMapping(value = "/getAppService", method = RequestMethod.GET)
    @ResponseBody
    public Result getAppService(String appId) {
        try {
            return processManager.getAppServiceByAppId(appId);
@ -61,6 +59,7 @@ public class ProcessController  extends BaseController {
    }
    @RequestMapping(value = "/getAllProcessor", method = RequestMethod.GET)
    @ResponseBody
    public Result getAllProcessor() {
        try {
            return processManager.getAllProcessor();
@ -70,6 +69,7 @@ public class ProcessController  extends BaseController {
    }
    @RequestMapping(value = "/json", method = RequestMethod.POST)
    @ResponseBody
    public Result formatJson(String code, String name, String positionJson, String flowJson) {
        try {
            String fileName = processManager.formatJson(flowJson);

+ 86 - 23
src/main/java/com/yihu/hos/system/service/ProcessManager.java

@ -5,9 +5,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.yihu.hos.common.graph.BFSGraph;
import com.yihu.hos.common.graph.DGraph;
import com.yihu.hos.common.graph.Edge;
import com.yihu.hos.config.MongoConfig;
import com.yihu.hos.core.datatype.StringUtil;
import com.yihu.hos.core.encrypt.DES;
import com.yihu.hos.system.dao.AppDao;
import com.yihu.hos.system.dao.AppServiceDao;
import com.yihu.hos.system.dao.FlowProcessDao;
@ -17,15 +15,14 @@ import com.yihu.hos.system.model.SystemServiceEndpoint;
import com.yihu.hos.system.model.SystemServiceFlowProcess;
import com.yihu.hos.system.model.SystemServiceFlowProcessor;
import com.yihu.hos.web.framework.model.Result;
import com.yihu.hos.web.framework.util.GridFSUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.*;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@Service("ProcessManager")
public class ProcessManager {
@ -44,28 +41,28 @@ public class ProcessManager {
    private ObjectMapper objectMapper = new ObjectMapper();
    public Result getAllApp() throws Exception {
        String hql = "select * from SystemApp";
        String hql = "from SystemApp";
        List<SystemApp> appList = appDao.getEntityList(SystemApp.class, hql);
        String result = objectMapper.writeValueAsString(appList);
        return Result.success(result);
    }
    public Result getAllAppService() throws Exception {
        String hql = "select * from SystemServiceEndpoint";
        String hql = "from SystemServiceEndpoint";
        List<SystemServiceEndpoint> serviceEndpointList = appServiceDao.getEntityList(SystemServiceEndpoint.class, hql);
        String result = objectMapper.writeValueAsString(serviceEndpointList);
        return Result.success(result);
    }
    public Result getAppServiceByAppId(String appId) throws Exception {
        String hql = "select * from SystemServiceEndpoint where appId = "+appId+"";
        String hql = "from SystemServiceEndpoint where appId = "+appId+"";
        List<SystemServiceEndpoint> serviceEndpointList = appServiceDao.getEntityList(SystemServiceEndpoint.class, hql);
        String result = objectMapper.writeValueAsString(serviceEndpointList);
        return Result.success(result);
    }
    public Result getAllProcessor() throws Exception {
        String hql = "select * from SystemServiceFlowProcessor";
        String hql = "from SystemServiceFlowProcessor";
        List<SystemServiceFlowProcessor> processorList = processorDao.getEntityList(SystemServiceFlowProcessor.class, hql);
        String result = objectMapper.writeValueAsString(processorList);
        return Result.success(result);
@ -80,8 +77,8 @@ public class ProcessManager {
        processDao.saveEntity(process);
    }
    public String formatJson(String flowJsonStr) throws Exception {
        flowJsonStr = "{\n" +
    public static void main(String[] args) throws Exception {
        String flowJsonStr = "{\n" +
                "    \"code\": \"cralwer\",\n" +
                "    \"nodes\": {\n" +
                "        \"node_1\": {\n" +
@ -96,6 +93,16 @@ public class ProcessManager {
                "        },\n" +
                "        \"node_3\": {\n" +
                "            \"name\": \"crawler\",\n" +
                "            \"value\": \"body().isEqualTo(false)\",\n" +
                "            \"type\": \"judgement\"\n" +
                "        },\n" +
                "        \"node_4\": {\n" +
                "            \"name\": \"crawler\",\n" +
                "            \"value\": \"http4://localhost:8088/crawler/collect\",\n" +
                "            \"type\": \"service\"\n" +
                "        },\n" +
                "        \"node_5\": {\n" +
                "            \"name\": \"crawler\",\n" +
                "            \"value\": \"http4://localhost:8088/crawler/patientList\",\n" +
                "            \"type\": \"service\"\n" +
                "        }\n" +
@ -108,11 +115,24 @@ public class ProcessManager {
                "        \"line_2\": {\n" +
                "            \"from\": \"node_2\",\n" +
                "            \"to\": \"node_3\"\n" +
                "        },\n" +
                "        \"line_3\": {\n" +
                "            \"from\": \"node_3\",\n" +
                "            \"to\": \"node_4\",\n" +
                "            \"value\": \"correct\"\n" +
                "        },\n" +
                "        \"line_4\": {\n" +
                "            \"from\": \"node_3\",\n" +
                "            \"to\": \"node_5\"\n" +
                "        }\n" +
                "    }\n" +
                "}";
        formatJson(flowJsonStr);
    }
    public static String formatJson(String flowJsonStr) throws Exception {
        String root = "";
        ObjectMapper objectMapper = new ObjectMapper();
        JsonNode flowJson = objectMapper.readValue(flowJsonStr, JsonNode.class);
        String code = flowJson.get("code").asText();
@ -130,6 +150,9 @@ public class ProcessManager {
        while (nodeIterator.hasNext()) {
            Map.Entry<String, JsonNode> map = nodeIterator.next();
            nodeMap.put(map.getKey(), map.getValue());
            if (StringUtil.isEmpty(mDG.get(0))) {
                root = map.getKey();
            }
            mDG.add(map.getKey());
        }
@ -138,14 +161,14 @@ public class ProcessManager {
            lineMap.put(map.getKey(), map.getValue());
            String nodeNameFrom = map.getValue().get("from").asText();
            String nodeNameTo = map.getValue().get("to").asText();
            mDG.add(new Edge<String>(nodeNameFrom, nodeNameTo, map.getKey()));
            mDG.add(new Edge<>(nodeNameFrom, nodeNameTo, map.getKey()));
        }
        //generate the java code
        return generate(code, lineMap, nodeMap, mDG);
        return generate(code, root, lineMap, nodeMap, mDG);
    }
    public String generate(String code, Map<String, JsonNode> lineMap, Map<String, JsonNode> nodeMap, DGraph<String> mDG) throws IOException {
    public static String generate(String code, String root, Map<String, JsonNode> lineMap, Map<String, JsonNode> nodeMap, DGraph<String> mDG) throws IOException {
        Boolean isFirstNodeFlg = true;
        StringBuilder javaBuilder = new StringBuilder();
        String javaName = toUpperCaseFirstOne(code)+"Route";
@ -153,6 +176,8 @@ public class ProcessManager {
        javaBuilder.append("package "+code+".route;\n\n");
        javaBuilder.append("import org.apache.camel.Exchange;\n");
        javaBuilder.append("import org.apache.camel.builder.RouteBuilder;\n");
        javaBuilder.append("import crawler.SplitUtil;\n");
        for (String key : nodeMap.keySet()) {
            JsonNode node = nodeMap.get(key);
            String type = node.get("type").asText();
@ -163,7 +188,7 @@ public class ProcessManager {
        javaBuilder.append("public class "+javaName+" extends RouteBuilder {\n");
        javaBuilder.append("public void configure() throws Exception {\n");
        Iterator<String> it = mDG.iterator("1");
        Iterator<String> it = mDG.iterator(root);
        while(it.hasNext()) {
            String nodeName = it.next();
            JsonNode node = nodeMap.get(nodeName);
@ -177,9 +202,26 @@ public class ProcessManager {
                isFirstNodeFlg = false;
            } else {
                if (type.equals("processor")) {
                    javaBuilder.append("\n.process(\"new "+name+"())");
                    JsonNode args = node.get("args");
                    if (args == null) {
                        javaBuilder.append("\n.process(\"new "+name+"())");
                    } else {
                        String argStr = "";
                        String[] argArr = args.asText().split(",");
                        for (String arg : argArr) {
                            argStr += "\"" + arg + "\",";
                        }
                        argStr = StringUtil.substring(argStr, 0, argStr.length() - 1);
                        javaBuilder.append("\n.process(\"new "+name+"("+argStr+"))");
                    }
                } else if (type.equals("judgement")) {
                    judgement(javaBuilder, value, nodeName, mDG, it, lineMap, nodeMap);
                } else if (type.equals("circle")) {
                    split(javaBuilder, value);
                } else if (type.equals("aggregate")) {
                    aggregate(javaBuilder);
                } else if (type.equals("multicast")) {
                    mulitcast(javaBuilder, value, nodeName, mDG, it, lineMap, nodeMap);
                } else {
                    javaBuilder.append("\n.setHeader(Exchange.HTTP_METHOD, constant(\"POST\"))");
                    javaBuilder.append("\n.to(\"");
@ -215,24 +257,25 @@ public class ProcessManager {
    }
    //首字母转大写
    public String toUpperCaseFirstOne(String s) {
    public static String toUpperCaseFirstOne(String s) {
        if(Character.isUpperCase(s.charAt(0)))
            return s;
        else
            return (new StringBuilder()).append(Character.toUpperCase(s.charAt(0))).append(s.substring(1)).toString();
    }
    public void judgement(StringBuilder javaBuilder, String value, String nodeName, DGraph<String> mDG, Iterator<String> it,  Map<String, JsonNode> lineMap, Map<String, JsonNode> nodeMap) {
    public static void judgement(StringBuilder javaBuilder, String value, String nodeName, DGraph<String> mDG, Iterator<String> it,  Map<String, JsonNode> lineMap, Map<String, JsonNode> nodeMap) {
        javaBuilder.append("\n.when("+value+")");
        List<Edge<String>> edgeList = mDG.getEdgeList(nodeName);
        String trueNodeName = "";
        String falseNodeName = "";
        for (Edge<String> edge : edgeList) {
            it.next();
            String nextNodeName = edge.getDest();
            String nextLineName = edge.getName();
            JsonNode nextLine = lineMap.get(nextLineName);
            if (nextLine.get("value").asText().equals("right")) {
            if (nextLine.get("value") != null &&nextLine.get("value").asText().equals("correct")) {
                trueNodeName = nextNodeName;
            } else {
                falseNodeName = nextNodeName;
@ -250,7 +293,27 @@ public class ProcessManager {
        javaBuilder.append("\n.setHeader(Exchange.HTTP_METHOD, constant(\"POST\"))");
        javaBuilder.append("\n.to(\"");
        javaBuilder.append(secondValue + "\")");
        it.next();
        it.next();
    }
    public static void split(StringBuilder javaBuilder, String value) {
        javaBuilder.append("\n.split().method(Split.class, \""+value+"\")");
    }
    public static void aggregate(StringBuilder javaBuilder) {
        javaBuilder.append("\n.aggregate(header(\"test_correlation_key\"), new Aggregate()).completionSize(3)");
    }
    public static void mulitcast(StringBuilder javaBuilder, String value, String nodeName, DGraph<String> mDG, Iterator<String> it,  Map<String, JsonNode> lineMap, Map<String, JsonNode> nodeMap) {
        List<Edge<String>> edgeList = mDG.getEdgeList(nodeName);
        String endpoints = "";
        for (Edge<String> edge : edgeList) {
            String nextNodeName = edge.getDest();
            JsonNode node = nodeMap.get(nextNodeName);
            endpoints += "\"" + node.get("value") + "\",";
            it.next();
        }
        endpoints = StringUtil.substring(endpoints, 0, endpoints.length() - 1);
        javaBuilder.append("\n.multicast().stopOnException().to("+endpoints+").end()");
    }
}