浏览代码

修改admin和Arbiter的服务流程控制,引入消息头和消息体两部分,以便能更好的进行路由。这里参考了zbus的协议设计。

Airhead 8 年之前
父节点
当前提交
efc6e56099
共有 24 个文件被更改,包括 841 次插入659 次删除
  1. 10 1
      hos-arbiter/src/main/java/com/yihu/hos/arbiter/common/constants/Constants.java
  2. 33 0
      hos-arbiter/src/main/java/com/yihu/hos/arbiter/controllers/ProxyController.java
  3. 2 1
      hos-arbiter/src/main/java/com/yihu/hos/arbiter/controllers/ServiceFlowController.java
  4. 54 0
      hos-arbiter/src/main/java/com/yihu/hos/arbiter/models/BrokerServer.java
  5. 75 33
      hos-arbiter/src/main/java/com/yihu/hos/arbiter/models/ServiceFlow.java
  6. 1 1
      hos-arbiter/src/main/java/com/yihu/hos/arbiter/routers/EndpointEventRouter.java
  7. 19 2
      hos-arbiter/src/main/java/com/yihu/hos/arbiter/routers/ServiceFlowEventRouter.java
  8. 69 3
      hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/BrokerServerService.java
  9. 122 118
      hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/ServiceFlowService.java
  10. 73 9
      hos-core/src/main/java/com/yihu/hos/core/http/DefaultClientImpl.java
  11. 6 0
      hos-core/src/main/java/com/yihu/hos/core/http/HTTPClient.java
  12. 12 0
      hos-core/src/main/java/com/yihu/hos/core/http/HttpClientKit.java
  13. 1 1
      src/main/java/com/yihu/hos/common/constants/Constants.java
  14. 12 2
      src/main/java/com/yihu/hos/listeners/ApplicationStart.java
  15. 93 52
      src/main/java/com/yihu/hos/services/ServiceFlowEventService.java
  16. 3 9
      src/main/java/com/yihu/hos/system/dao/FlowClassDao.java
  17. 3 6
      src/main/java/com/yihu/hos/system/dao/FlowDao.java
  18. 1 5
      src/main/java/com/yihu/hos/system/dao/FlowTempDao.java
  19. 0 18
      src/main/java/com/yihu/hos/system/dao/intf/IFlowClassDao.java
  20. 0 20
      src/main/java/com/yihu/hos/system/dao/intf/IFlowDao.java
  21. 0 19
      src/main/java/com/yihu/hos/system/dao/intf/IFlowTempDao.java
  22. 115 0
      src/main/java/com/yihu/hos/system/model/bo/ServiceFlow.java
  23. 0 152
      src/main/java/com/yihu/hos/system/model/bo/ServiceFlowEvent.java
  24. 137 207
      src/main/java/com/yihu/hos/system/service/FlowManager.java

+ 10 - 1
hos-arbiter/src/main/java/com/yihu/hos/arbiter/common/constants/Constants.java

@ -6,9 +6,14 @@ package com.yihu.hos.arbiter.common.constants;
 * Created at 2016/8/25.
 */
public interface Constants {
    //流程-模板类型
    String JAVA = "java";
    String CLASS = "class";
    //流程-操作消息
    String FlOW_REFRESH = "flowRefresh";
    String PROCESSOR_ADD = "processorAdded";
    String FLOW_STARTED = "flowStarted";
    String PROCESSOR_ADDED = "processorAdded";
    String PROCESSOR_DATA_CHANGED = "processorDataChanged";
    String PROCESSOR_DATA_DELETED = "processorDataDeleted";
    String ROUTE_DEFINE_ADDED = "routeDefineAdded";
@ -18,4 +23,8 @@ public interface Constants {
    String ROUTE_CLASS_CHANGED = "routeClassChanged";//java类型的路由修改
    String PROCESSOR_CLASS_ADDED = "processorClassAdded"; //java类型的处理器添加
    String PROCESSOR_CLASS_CHANGED = "processorClassChanged"; //java类型的处理器修改
    //BrokerServer
    String BROKER_SERVER_ON = "brokerServerOn";
    String BROKER_SERVER_OFF = "brokerServerOff";
}

+ 33 - 0
hos-arbiter/src/main/java/com/yihu/hos/arbiter/controllers/ProxyController.java

@ -0,0 +1,33 @@
package com.yihu.hos.arbiter.controllers;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
/**
 * @author Airhead
 * @since 2016/12/2.
 */
@RestController
@RequestMapping("/proxy")
public class ProxyController {
    @Autowired
    private CamelContext camelContext;
    @RequestMapping()
    public void proxy(@RequestHeader Map<String, String> headers,
                      @RequestBody String body) {
        Map<String, Object> header = new HashMap<>();
        header.put("event", headers.get("event"));
        ProducerTemplate producerTemplate = camelContext.createProducerTemplate();
        producerTemplate.sendBodyAndHeader("service.flow.event", body, header);
    }
}

+ 2 - 1
hos-arbiter/src/main/java/com/yihu/hos/arbiter/controllers/ServiceFlowController.java

@ -18,7 +18,8 @@ public class ServiceFlowController {
    @RequestMapping(method = RequestMethod.POST)
    public void save(String serviceFlow) {
        serviceFlowService.save(serviceFlow);
        serviceFlowService.save(null);
    }
    @RequestMapping(method = RequestMethod.GET)

+ 54 - 0
hos-arbiter/src/main/java/com/yihu/hos/arbiter/models/BrokerServer.java

@ -4,6 +4,7 @@ import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.index.Indexed;
import org.springframework.data.mongodb.core.mapping.Document;
import java.util.ArrayList;
import java.util.Date;
/**
@ -19,6 +20,21 @@ public class BrokerServer {
    private boolean enable;
    @Indexed(name = "updateTime_1", expireAfterSeconds = 30)
    private Date updateTime;
    private ArrayList<Flow> onFlowList;
    public boolean isFlowOn(String routeCode) {
        if (onFlowList == null) {
            return false;
        }
        for (Flow flow : onFlowList) {
            if (flow.getRouteCode().equals(routeCode)) {
                return true;
            }
        }
        return false;
    }
    public String getId() {
        return id;
@ -71,4 +87,42 @@ public class BrokerServer {
    public void setEnable(boolean enable) {
        this.enable = enable;
    }
    public ArrayList<Flow> getOnFlowList() {
        return onFlowList;
    }
    public void setOnFlowList(ArrayList<Flow> onFlowList) {
        this.onFlowList = onFlowList;
    }
    public class Flow {
        String routeCode;
        String type;
        Date updated;
        public String getRouteCode() {
            return routeCode;
        }
        public void setRouteCode(String routeCode) {
            this.routeCode = routeCode;
        }
        public String getType() {
            return type;
        }
        public void setType(String type) {
            this.type = type;
        }
        public Date getUpdated() {
            return updated;
        }
        public void setUpdated(Date updated) {
            this.updated = updated;
        }
    }
}

+ 75 - 33
hos-arbiter/src/main/java/com/yihu/hos/arbiter/models/ServiceFlow.java

@ -2,6 +2,7 @@ package com.yihu.hos.arbiter.models;
import org.springframework.data.mongodb.core.mapping.Document;
import java.util.ArrayList;
import java.util.Date;
/**
@ -9,13 +10,11 @@ import java.util.Date;
 */
@Document
public class ServiceFlow {
    private String event;
    private String serviceFlow;
    private String packageName;
    private String className;
    private String path;
    private Date updateTime;
    private String cron;
    private String routeCode;
    private ArrayList<HandleFile> handleFiles;
    private Date updated;
    private String flowType;    //pull or push?
    private String cron;    //采集任务时使用
    public String getCron() {
        return cron;
@ -25,51 +24,94 @@ public class ServiceFlow {
        this.cron = cron;
    }
    public String getEvent() {
        return event;
    public String getRouteCode() {
        return routeCode;
    }
    public void setEvent(String event) {
        this.event = event;
    public void setRouteCode(String routeCode) {
        this.routeCode = routeCode;
    }
    public String getServiceFlow() {
        return serviceFlow;
    public Date getUpdated() {
        return updated;
    }
    public void setServiceFlow(String serviceFlow) {
        this.serviceFlow = serviceFlow;
    public void setUpdated(Date updated) {
        this.updated = updated;
    }
    public String getPackageName() {
        return packageName;
    public ArrayList<HandleFile> getHandleFiles() {
        return handleFiles;
    }
    public void setPackageName(String packageName) {
        this.packageName = packageName;
    public void setHandleFiles(ArrayList<HandleFile> handleFiles) {
        this.handleFiles = handleFiles;
    }
    public String getClassName() {
        return className;
    }
    public void addHandleFile(HandleFile handleFile) {
        if (handleFiles == null) {
            handleFiles = new ArrayList<>();
        }
    public void setClassName(String className) {
        this.className = className;
        handleFiles.add(handleFile);
    }
    public String getPath() {
        return path;
    public String getFlowType() {
        return flowType;
    }
    public void setPath(String path) {
        this.path = path;
    public void setFlowType(String flowType) {
        this.flowType = flowType;
    }
    public Date getUpdateTime() {
        return updateTime;
    }
    public class HandleFile {
        private String usage;   //router or processor
        private String packageName;
        private String className;
        private String filePath;
        private String fileType;    //java or class
        public HandleFile() {
        }
        public String getPackageName() {
            return packageName;
        }
        public void setPackageName(String packageName) {
            this.packageName = packageName;
        }
        public String getClassName() {
            return className;
        }
        public void setClassName(String className) {
            this.className = className;
        }
        public String getFilePath() {
            return filePath;
        }
        public void setFilePath(String filePath) {
            this.filePath = filePath;
        }
        public String getFileType() {
            return fileType;
        }
        public void setFileType(String fileType) {
            this.fileType = fileType;
        }
        public String getUsage() {
            return usage;
        }
    public void setUpdateTime(Date updateTime) {
        this.updateTime = updateTime;
        public void setUsage(String usage) {
            this.usage = usage;
        }
    }
}

+ 1 - 1
hos-arbiter/src/main/java/com/yihu/hos/arbiter/routers/EndpointEventRouter.java

@ -25,6 +25,6 @@ public class EndpointEventRouter extends RouteBuilder {
        // Note we can explicit name the component
        context.addComponent("endpoint.event", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
        from("endpoint.event:queue:configuration.endpoint")
                .to("bean:endpointService?method=trigger"); //TODO:这边可以做Message Filter,减化trigger逻辑
                .to("bean:endpointService?method=addBrokerServer"); //TODO:这边可以做Message Filter,减化trigger逻辑
    }
}

+ 19 - 2
hos-arbiter/src/main/java/com/yihu/hos/arbiter/routers/ServiceFlowEventRouter.java

@ -1,5 +1,6 @@
package com.yihu.hos.arbiter.routers;
import com.yihu.hos.arbiter.common.constants.Constants;
import com.yihu.hos.arbiter.configuration.ActivemqConfiguration;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.builder.RouteBuilder;
@ -17,14 +18,30 @@ import javax.jms.ConnectionFactory;
public class ServiceFlowEventRouter extends RouteBuilder {
    @Autowired
    private ActivemqConfiguration activemqConfiguration;
    @Override
    public void configure() throws Exception {
        ModelCamelContext context = this.getContext();
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                activemqConfiguration.getUser(), activemqConfiguration.getPassword(),activemqConfiguration.getBrokerURL());
                activemqConfiguration.getUser(), activemqConfiguration.getPassword(), activemqConfiguration.getBrokerURL());
        // Note we can explicit name the component
        context.addComponent("service.flow.event", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
        from("service.flow.event:queue:configuration.service.flow")
                .to("bean:serviceFlowService?method=trigger"); //TODO:这边可以做Message Filter,减化trigger逻辑
                .choice()
                .when(header("tenant").isNotNull()).to("bean:serviceFlowService?method=proxy")
                .when(header("event").isEqualTo(Constants.FlOW_REFRESH)).to("bean:serviceFlowService?method=flowRefresh")
                .when(header("event").isEqualTo(Constants.PROCESSOR_ADDED)).to("bean:serviceFlowService?method=processorAdded")
                .when(header("event").isEqualTo(Constants.PROCESSOR_DATA_CHANGED)).to("bean:serviceFlowService?method=processorDataChanged")
                .when(header("event").isEqualTo(Constants.PROCESSOR_DATA_DELETED)).to("bean:serviceFlowService?method=processorDataDeleted")
                .when(header("event").isEqualTo(Constants.ROUTE_DEFINE_ADDED)).to("bean:serviceFlowService?method=routeDefineAdded")
                .when(header("event").isEqualTo(Constants.ROUTE_DEFINE_CHANGED)).to("bean:serviceFlowService?method=routeDefineChanged")
                .when(header("event").isEqualTo(Constants.ROUTE_DEFINE_DELETED)).to("bean:serviceFlowService?method=routeDefineDeleted")
                .when(header("event").isEqualTo(Constants.ROUTE_CLASS_ADDED)).to("bean:serviceFlowService?method=routeClassAdded")
                .when(header("event").isEqualTo(Constants.ROUTE_CLASS_CHANGED)).to("bean:serviceFlowService?method=routeClassChanged")
                .when(header("event").isEqualTo(Constants.PROCESSOR_CLASS_ADDED)).to("bean:serviceFlowService?method=processorClassAdded")
                .when(header("event").isEqualTo(Constants.PROCESSOR_CLASS_CHANGED)).to("bean:serviceFlowService?method=processorClassChanged")
                .when(header("event").isEqualTo(Constants.BROKER_SERVER_ON)).to("bean:serviceFlowService?method=brokerServerOn")
                .when(header("event").isEqualTo(Constants.FLOW_STARTED)).to("bean:brokerServerService?method=flowStarted")
                .endChoice();
    }
}

+ 69 - 3
hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/BrokerServerService.java

@ -1,7 +1,12 @@
package com.yihu.hos.arbiter.services;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mongodb.WriteResult;
import com.yihu.hos.arbiter.common.constants.Constants;
import com.yihu.hos.arbiter.models.BrokerServer;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
@ -13,12 +18,14 @@ import org.springframework.data.mongodb.core.query.Update;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
 * @created Airhead 2016/7/27.
 */
@Service
@Service("brokerServerService")
public class BrokerServerService {
    private static final Logger logger = LogManager.getLogger(BrokerServerService.class);
@ -26,12 +33,18 @@ public class BrokerServerService {
    private MongoOperations mongoOperations;
    @Autowired
    private ServiceFlowService serviceFlowService;
    private CamelContext camelContext;
    @Autowired
    private ProducerTemplate producerTemplate;
    @Autowired
    private ObjectMapper objectMapper;
    public void save(BrokerServer brokerServer) {
        Query query = new Query();
        query.addCriteria(Criteria.where("hostName").is(brokerServer.getHostName()));
        query.addCriteria(Criteria.where("hostAddress").is(brokerServer.getHostAddress()));
        query.addCriteria(Criteria.where("port").is(brokerServer.getPort()));
        Update update = new Update();
        update.set("hostName", brokerServer.getHostName());
@ -39,6 +52,9 @@ public class BrokerServerService {
        update.set("port", brokerServer.getPort());
        update.set("updateTime", brokerServer.getUpdateTime());
        update.set("enable", brokerServer.isEnable());
        if (brokerServer.getOnFlowList() != null) {
            update.set("onFlowList", brokerServer.getOnFlowList());
        }
        WriteResult writeResult = mongoOperations.upsert(query, update, BrokerServer.class);
        if (writeResult.isUpdateOfExisting()) {
@ -46,7 +62,39 @@ public class BrokerServerService {
        }
        brokerServer = mongoOperations.findById(writeResult.getUpsertedId(), BrokerServer.class);
        serviceFlowService.trigger(brokerServer); //TODO:异步?
        try {
            String msg = objectMapper.writeValueAsString(brokerServer);
            ProducerTemplate producerTemplate = createProducerTemplate();
            Map<String, Object> header = new HashMap<>();
            header.put("event", Constants.BROKER_SERVER_ON);
            producerTemplate.sendBodyAndHeaders("service.flow.event", msg, header);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        }
    }
    public void updateFlowOn(BrokerServer brokerServer, BrokerServer.Flow flow) {
        Query query = new Query();
        query.addCriteria(Criteria.where("hostName").is(brokerServer.getHostName()));
        query.addCriteria(Criteria.where("hostAddress").is(brokerServer.getHostAddress()));
        query.addCriteria(Criteria.where("port").is(brokerServer.getPort()));
        BrokerServer broker = mongoOperations.findOne(query, BrokerServer.class);
        if (broker == null) {
            return;
        }
        ArrayList<BrokerServer.Flow> onFlowList = broker.getOnFlowList();
        if (onFlowList == null) {
            onFlowList = new ArrayList<>();
        }
        onFlowList.add(flow);
        Update update = new Update();
        update.set("onFlowList", onFlowList);
        mongoOperations.updateFirst(query, update, BrokerServer.class);
        //可以用ExecCommand的方法。
    }
    /**
@ -78,4 +126,22 @@ public class BrokerServerService {
        query.with(new Sort(new Sort.Order(Sort.Direction.DESC, "updateTime")));
        return mongoOperations.find(query, BrokerServer.class);
    }
    public List<BrokerServer> getFlowOnBroker(String routeCode) {
        Query query = new Query();
        query.addCriteria(Criteria.where("onFlowList.routeCode").is(routeCode));
        return mongoOperations.find(query, BrokerServer.class);
    }
    public void flowStarted(String msg) {
    }
    private ProducerTemplate createProducerTemplate() {
        if (producerTemplate == null) {
            producerTemplate = camelContext.createProducerTemplate();
        }
        return producerTemplate;
    }
}

+ 122 - 118
hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/ServiceFlowService.java

@ -1,172 +1,176 @@
package com.yihu.hos.arbiter.services;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yihu.hos.arbiter.common.constants.Constants;
import com.yihu.hos.arbiter.models.BrokerServer;
import com.yihu.hos.arbiter.models.ServiceFlow;
import com.yihu.hos.core.datatype.StringUtil;
import org.apache.http.Consts;
import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import com.yihu.hos.core.http.HttpClientKit;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoOperations;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
 * Broker原则上具有等同性,这样Arbiter无论选择了哪个Broker能提供的服务都是一样的。
 * 但是因为Broker上还是会运行一些定时的采集任务,这些采集任务如果是多台Broker运行的话,可能会引起数据重复问题。
 * 所以在事件触发时需要做一些策略的调整:
 * 1.实时任务,通知所有的Broker进行更新路由
 * 2.采集任务,只通知其中的一台进行更新路由
 *
 * @created Airhead 2016/8/16.
 */
@Service("serviceFlowService")
public class ServiceFlowService {
    private static final Logger logger = LogManager.getLogger(BrokerServerService.class);
    @Autowired
    private MongoOperations mongoOperations;
    @Autowired
    private ObjectMapper objectMapper;
    @Autowired
    private BrokerServerService brokerServerService;
    public void save(String serviceFlow) {
    public void save(ServiceFlow serviceFlow) {
        Query query = new Query();
        query.addCriteria(Criteria.where("routeCode").is(serviceFlow.getRouteCode()));
        Update update = new Update();
        update.set("routeCode", serviceFlow.getRouteCode());
        update.set("handleFiles", serviceFlow.getHandleFiles());
        update.set("updated", serviceFlow.getUpdated());
        update.set("flowType", serviceFlow.getFlowType());
        update.set("cron", serviceFlow.getCron());
        mongoOperations.upsert(query, update, BrokerServer.class);
    }
    public String get(String serviceName) {
        return null;
    }
    public List<ServiceFlow> getAll() {
        return mongoOperations.findAll(ServiceFlow.class);
    }
    public String put(String serviceName, String ClientInfo) {
        return null;
    }
    public void flowRefresh(String msg) {
        flowController("post", "/esb/serviceFlow", msg);
    }
    public void processorAdded(String msg) {
        flowController("post", "/esb/processor", msg);
    }
    public void processorDataChanged(String msg) {
        flowController("put", "/esb/processor", msg);
    }
    public void processorDataDeleted(String msg) {
        flowController("delete", "/esb/processor", msg);
    }
    public void routeDefineAdded(String msg) {
        flowController("post", "/esb/route", msg);
    }
    public void routeDefineChanged(String msg) {
        flowController("put", "/esb/route", msg);
    }
    public void routeDefineDeleted(String msg) {
        flowController("delete", "/esb/route", msg);
    }
    public void routeClassAdded(String msg) {
        flowController("post", "/esb/genRoute", msg);
    }
    public void routeClassChanged(String msg) {
        flowController("put", "/esb/updateRoute", msg);
    }
    public void processorClassAdded(String msg) {
        flowController("post", "/esb/genProcessor", msg);
    }
    public void processorClassChanged(String msg) {
        flowController("put", "/esb/genProcessor", msg);
    }
    public void brokerServerOn(String msg) {
        List<ServiceFlow> serviceFlowList = getAll();
        serviceFlowList.forEach(serviceFlow -> {
            try {
                String serviceFlowMsg = objectMapper.writeValueAsString(serviceFlow);
                flowController("post", "/esb/serviceFlow", serviceFlowMsg);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
        });
    }
    /**
     * Broker原则上具有等同性,这样Arbiter无论选择了哪个Broker能提供的服务都是一样的。
     * 但是因为Broker上还是会运行一些定时的采集任务,这些采集任务如果是多台Broker运行的话,可能引起数据问题。
     * 所以在事件触发时需要做一些策略的调整:
     * 1.实时任务,通知所有的Broker进行更新路由
     * 2.采集任务,只通知其中的一台进行更新路由
     * TODO:遗留BUG多Broker启动时,采集任务会在多个Broker中被启动。
     * SAAS化的管理端过来的消息会被proxy进行中转,之后发送到终端的Arbiter对Broker进行实际的控制。
     *
     * @param msg 数据流
     * @param header 消息头部信息
     * @param msg    消息
     */
    public void trigger(String msg) {
        System.out.println(msg);
    public void proxy(Map<String, Object> header, String msg) {
    }
        ObjectMapper objectMapper = new ObjectMapper();
    private void flowController(String method, String path, String msg) {
        try {
            ServiceFlow serviceFlow = objectMapper.readValue(msg, ServiceFlow.class);
            List<NameValuePair> nameValuePairList = new ArrayList<>();
            nameValuePairList.add(new BasicNameValuePair("serviceFlow", serviceFlow.getServiceFlow()));
            nameValuePairList.add(new BasicNameValuePair("packageName", serviceFlow.getPackageName()));
            nameValuePairList.add(new BasicNameValuePair("className", serviceFlow.getClassName()));
            nameValuePairList.add(new BasicNameValuePair("path", serviceFlow.getPath()));
            nameValuePairList.add(new BasicNameValuePair("cron", serviceFlow.getCron()));
            boolean one = !StringUtil.isEmpty(serviceFlow.getCron());   //有cron表达式,就是采集任务。
            List<BrokerServer> brokerServerList = brokerServerService.get(one);
            for (BrokerServer brokerServer : brokerServerList) {
                CloseableHttpClient httpclient = HttpClients.createDefault();
                switch (serviceFlow.getEvent()) {
                    case Constants.FlOW_REFRESH: {
                        HttpPost httpPost = new HttpPost(brokerServer.getURL() + "/esb/serviceFlow");
                        httpPost.setEntity(new UrlEncodedFormEntity(nameValuePairList, Consts.UTF_8));
                        CloseableHttpResponse response = httpclient.execute(httpPost);
                        response.close();
                        break;
                    }
                    case "processorAdded": {
                        HttpPost httpPost = new HttpPost(brokerServer.getURL() + "/esb/processor");
                        httpPost.setEntity(new UrlEncodedFormEntity(nameValuePairList, Consts.UTF_8));
                        CloseableHttpResponse response = httpclient.execute(httpPost);
                        response.close();
                        break;
                    }
                    case "processorDataChanged": {
                        HttpPut httpPut = new HttpPut(brokerServer.getURL() + "/esb/processor");
                        httpPut.setEntity(new UrlEncodedFormEntity(nameValuePairList, Consts.UTF_8));
                        CloseableHttpResponse response = httpclient.execute(httpPut);
                        response.close();
                        break;
                    }
            this.save(serviceFlow);
                    case "routeDefineAdded": {
                        HttpPost httpPost = new HttpPost(brokerServer.getURL() + "/esb/route");
                        httpPost.setEntity(new UrlEncodedFormEntity(nameValuePairList, Consts.UTF_8));
                        CloseableHttpResponse response = httpclient.execute(httpPost);
                        response.close();
                        break;
                    }
                    case "routeDefineChanged": {
                        HttpPut httpPut = new HttpPut(brokerServer.getURL() + "/esb/route");
                        httpPut.setEntity(new UrlEncodedFormEntity(nameValuePairList, Consts.UTF_8));
                        CloseableHttpResponse response = httpclient.execute(httpPut);
                        response.close();
                        break;
                    }
                    case "routeDefineDelete": {
                        try {
                            URI uri = new URIBuilder()
                                    .setScheme("http")
                                    .setHost(brokerServer.getHostAddress() + ":" + brokerServer.getPort())
                                    .setPath("/esb/route")
                                    .setParameter("serviceFlow", serviceFlow.getServiceFlow())
                                    .setParameter("packageName", serviceFlow.getPackageName())
                                    .setParameter("className", serviceFlow.getClassName())
                                    .build();
                            HttpDelete httpDelete = new HttpDelete(uri);
                            CloseableHttpResponse response = httpclient.execute(httpDelete);
                            response.close();
                        } catch (URISyntaxException e) {
                            e.printStackTrace();
                        }
            boolean one = serviceFlow.getFlowType().equals(Constants.JAVA);   //有cron表达式,就是采集任务。
            if (one) {
                List<BrokerServer> flowOnBroker = brokerServerService.getFlowOnBroker(serviceFlow.getRouteCode());
                if (flowOnBroker != null && flowOnBroker.size() != 0) {
                    return;
                }
            }
            List<BrokerServer> brokerServerList = brokerServerService.get(one);
            for (BrokerServer brokerServer : brokerServerList) {
                if (brokerServer.isFlowOn(serviceFlow.getRouteCode())) {
                    continue;
                }
                switch (method) {
                    case "post":
                        HttpClientKit.post(brokerServer.getURL() + path, msg);
                        break;
                    }
                    case "routeClassAdded": {
                        HttpPost httpPost = new HttpPost(brokerServer.getURL() + "/esb/genRoute");
                        httpPost.setEntity(new UrlEncodedFormEntity(nameValuePairList, Consts.UTF_8));
                        CloseableHttpResponse response = httpclient.execute(httpPost);
                        response.close();
                        break;
                    }
                    case "routeClassChanged": {
                        HttpPut httpPut = new HttpPut(brokerServer.getURL() + "/esb/updateRoute");
                        httpPut.setEntity(new UrlEncodedFormEntity(nameValuePairList, Consts.UTF_8));
                        CloseableHttpResponse response = httpclient.execute(httpPut);
                        response.close();
                    case "put":
                        HttpClientKit.post(brokerServer.getURL() + path, msg);
                        break;
                    }
                    case "processorClassAdded": {
                        HttpPost httpPost = new HttpPost(brokerServer.getURL() + "/esb/genProcessor");
                        httpPost.setEntity(new UrlEncodedFormEntity(nameValuePairList, Consts.UTF_8));
                        CloseableHttpResponse response = httpclient.execute(httpPost);
                        response.close();
                    case "delete":
                        HttpClientKit.post(brokerServer.getURL() + path, msg);
                        break;
                    }
                    default:
                        break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        }
    }
    public void trigger(BrokerServer brokerServer) {
        //TODO:
    }
}

+ 73 - 9
hos-core/src/main/java/com/yihu/hos/core/http/DefaultClientImpl.java

@ -12,7 +12,8 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
 * @created Airhead 2016/8/24.
 * @author Airhead
 * @since 2016/8/24.
 */
class DefaultClientImpl implements HTTPClient {
    private static final Log log = LogFactory.getLog(DefaultClientImpl.class);
@ -49,11 +50,11 @@ class DefaultClientImpl implements HTTPClient {
            log.error(ex.getMessage());
        }
        return new HTTPResponse(500, "");
        return new HTTPResponse(417, "");
    }
    public HTTPResponse post(String url) {
        return post(url, null);
        return post(url, (Map<String, String>) null);
    }
    public HTTPResponse post(String url, Map<String, String> params) {
@ -84,7 +85,28 @@ class DefaultClientImpl implements HTTPClient {
            log.error(ex.getMessage());
        }
        return new HTTPResponse(500, "");
        return new HTTPResponse(417, "");
    }
    @Override
    public HTTPResponse post(String url, String json) {
        try {
            final MediaType JSON = MediaType.parse("application/json; charset=utf-8");
            RequestBody body = RequestBody.create(JSON, json);
            Request request = new Request.Builder()
                    .url(url)
                    .post(body)
                    .build();
            Response response = httpClient.newCall(request).execute();
            if (response.isSuccessful()) {
                return new HTTPResponse(response.code(), response.body().string());
            }
        } catch (IOException ex) {
            log.error(ex.getMessage());
        }
        return new HTTPResponse(417, "");
    }
    public HTTPResponse postFile(String url, String path) {
@ -117,17 +139,38 @@ class DefaultClientImpl implements HTTPClient {
            log.error(ex.getMessage());
        }
        return new HTTPResponse(500, "");
        return new HTTPResponse(417, "");
    }
    public HTTPResponse put(String url) {
        return put(url, null);
        return put(url, (Map<String, String>) null);
    }
    public HTTPResponse put(String url, Map<String, String> params) {
        return put(url, params, null);
    }
    @Override
    public HTTPResponse put(String url, String json) {
        try {
            final MediaType JSON = MediaType.parse("application/json; charset=utf-8");
            RequestBody body = RequestBody.create(JSON, json);
            Request request = new Request.Builder()
                    .url(url)
                    .put(body)
                    .build();
            Response response = httpClient.newCall(request).execute();
            if (response.isSuccessful()) {
                return new HTTPResponse(response.code(), response.body().string());
            }
        } catch (IOException ex) {
            log.error(ex.getMessage());
        }
        return new HTTPResponse(417, "");
    }
    public HTTPResponse put(String url, Map<String, String> params, Map<String, String> headers) {
        try {
            FormBody.Builder fromBodyBuilder = new FormBody.Builder();
@ -152,17 +195,38 @@ class DefaultClientImpl implements HTTPClient {
            log.error(ex.getMessage());
        }
        return new HTTPResponse(500, "");
        return new HTTPResponse(417, "");
    }
    public HTTPResponse delete(String url) {
        return delete(url, null);
        return delete(url, (Map<String, String>) null);
    }
    public HTTPResponse delete(String url, Map<String, String> params) {
        return delete(url, params, null);
    }
    @Override
    public HTTPResponse delete(String url, String json) {
        try {
            final MediaType JSON = MediaType.parse("application/json; charset=utf-8");
            RequestBody body = RequestBody.create(JSON, json);
            Request request = new Request.Builder()
                    .url(url)
                    .delete(body)
                    .build();
            Response response = httpClient.newCall(request).execute();
            if (response.isSuccessful()) {
                return new HTTPResponse(response.code(), response.body().string());
            }
        } catch (IOException ex) {
            log.error(ex.getMessage());
        }
        return new HTTPResponse(417, "");
    }
    public HTTPResponse delete(String url, Map<String, String> params, Map<String, String> headers) {
        try {
            FormBody.Builder fromBodyBuilder = new FormBody.Builder();
@ -187,7 +251,7 @@ class DefaultClientImpl implements HTTPClient {
            log.error(ex.getMessage());
        }
        return new HTTPResponse(500, "");
        return new HTTPResponse(417, "");
    }
    public HTTPResponse request(String method, String url, Map<String, String> params, Map<String, String> headers) {

+ 6 - 0
hos-core/src/main/java/com/yihu/hos/core/http/HTTPClient.java

@ -24,6 +24,8 @@ public interface HTTPClient {
    HTTPResponse post(String url, Map<String, String> params, Map<String, String> headers);
    HTTPResponse post(String url, String json);
    HTTPResponse postFile(String url, String path);
    HTTPResponse postFile(String url, String path, Map<String, String> params);
@ -34,12 +36,16 @@ public interface HTTPClient {
    HTTPResponse put(String url, Map<String, String> params);
    HTTPResponse put(String url, String json);
    HTTPResponse put(String url, Map<String, String> params, Map<String, String> headers);
    HTTPResponse delete(String url);
    HTTPResponse delete(String url, Map<String, String> params);
    HTTPResponse delete(String url, String json);
    HTTPResponse delete(String url, Map<String, String> params, Map<String, String> headers);
    HTTPResponse request(String method, String url, Map<String, String> params, Map<String, String> headers);

+ 12 - 0
hos-core/src/main/java/com/yihu/hos/core/http/HttpClientKit.java

@ -66,6 +66,10 @@ public class HttpClientKit {
        return use().post(url, params);
    }
    public static HTTPResponse post(String url, String json) {
        return use().post(url, json);
    }
    public static HTTPResponse post(String url, Map<String, String> params, Map<String, String> headers) {
        return use().post(url, params, headers);
    }
@ -90,6 +94,10 @@ public class HttpClientKit {
        return use().put(url, params);
    }
    public static HTTPResponse put(String url, String json) {
        return use().put(url, json);
    }
    public static HTTPResponse put(String url, Map<String, String> params, Map<String, String> headers) {
        return use().put(url, params, headers);
    }
@ -102,6 +110,10 @@ public class HttpClientKit {
        return use().delete(url, params);
    }
    public static HTTPResponse delete(String url, String json) {
        return use().delete(url, json);
    }
    public static HTTPResponse delete(String url, Map<String, String> params, Map<String, String> headers) {
        return use().delete(url, params, headers);
    }

+ 1 - 1
src/main/java/com/yihu/hos/common/constants/Constants.java

@ -24,7 +24,7 @@ public interface Constants {
    //流程-操作消息
    String FlOW_REFRESH = "flowRefresh";
    String PROCESSOR_ADD = "processorAdded";
    String PROCESSOR_ADDED = "processorAdded";
    String PROCESSOR_DATA_CHANGED = "processorDataChanged";
    String PROCESSOR_DATA_DELETED = "processorDataDeleted";
    String ROUTE_DEFINE_ADDED = "routeDefineAdded";

+ 12 - 2
src/main/java/com/yihu/hos/listeners/ApplicationStart.java

@ -3,6 +3,8 @@ package com.yihu.hos.listeners;
import com.yihu.hos.core.log.Logger;
import com.yihu.hos.core.log.LoggerFactory;
import com.yihu.hos.services.ServiceFlowEventService;
import com.yihu.hos.system.model.bo.ServiceFlow;
import com.yihu.hos.system.service.FlowManager;
import com.yihu.hos.web.framework.constrant.DateConvert;
import org.apache.commons.beanutils.ConvertUtils;
import org.springframework.beans.factory.annotation.Autowired;
@ -10,6 +12,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import java.util.Date;
import java.util.List;
/**
 * 项目启动执行
@ -19,7 +22,9 @@ public class ApplicationStart implements ServletContextListener {
    static private final Logger logger = LoggerFactory.getLogger(ApplicationStart.class);
    @Autowired
    ServiceFlowEventService serviceFlowEventService;
    private ServiceFlowEventService serviceFlowEventService;
    @Autowired
    private FlowManager flowManager;
    @Override
@ -38,7 +43,12 @@ public class ApplicationStart implements ServletContextListener {
     * 同时解决Broker中启动多个采集任务的问题。
     */
    private void flowRefresh() {
        serviceFlowEventService.flowRefresh();
        try {
            List<ServiceFlow> serviceFlowList = flowManager.getServiceFlowList();
            serviceFlowList.forEach(serviceFlow -> serviceFlowEventService.flowRefresh(serviceFlow));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

+ 93 - 52
src/main/java/com/yihu/hos/services/ServiceFlowEventService.java

@ -2,16 +2,17 @@ package com.yihu.hos.services;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.yihu.hos.common.constants.Constants;
import com.yihu.hos.core.log.Logger;
import com.yihu.hos.core.log.LoggerFactory;
import com.yihu.hos.system.model.bo.ServiceFlowEvent;
import com.yihu.hos.system.model.bo.ServiceFlow;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;
import javax.jms.Queue;
import java.util.HashMap;
import java.util.Map;
/**
 * @created Airhead 2016/8/2.
@ -32,7 +33,8 @@ public class ServiceFlowEventService {
     * admin启动时,触发一次所有流程更新事件,用于重启整个服务的情况。
     * 同时解决Broker中启动多个采集任务的问题。
     */
    public void flowRefresh() {
    public void flowRefresh(ServiceFlow serviceFlow) {
        this.sendMsg(Constants.FlOW_REFRESH, serviceFlow);
    }
    /**
@ -44,92 +46,131 @@ public class ServiceFlowEventService {
     * @param path        processor处理器定义涉及的class内容,如果zookeeper数据结构中class分片存储,在业务级接口层面上也进行了合并
     */
    public void processorAdded(String serviceFlow, String packageName, String className, String path) {
        sendMsg(Constants.PROCESSOR_ADD, serviceFlow, packageName, className, path);
        sendMsg(Constants.PROCESSOR_ADDED, serviceFlow, packageName, className, path);
    }
    public void processorAdded(ServiceFlow serviceFlow) {
        this.sendMsg(Constants.PROCESSOR_ADDED, serviceFlow);
    }
    /**
     * 当外界组件通知一个已有的processor处理器data部分发生变化时,该事件被触发。
     */
    public void processorDataChanged(String serviceFlow, String packageName, String className, String path) {
        this.sendMsg(Constants.PROCESSOR_DATA_CHANGED, serviceFlow, packageName, className, path);
    public void processorDataChanged(String routeCode, String packageName, String className, String path) {
        this.sendMsg(Constants.PROCESSOR_DATA_CHANGED, routeCode, packageName, className, path);
    }
    public void processorDataChanged(ServiceFlow serviceFlow) {
        this.sendMsg(Constants.PROCESSOR_DATA_CHANGED, serviceFlow);
    }
    public void processorDataDeleted(String serviceFlow, String packageName, String className, String path) {
        this.sendMsg(Constants.PROCESSOR_DATA_DELETED, serviceFlow, packageName, className, path);
    public void processorDataDeleted(String routeCode, String packageName, String className, String path) {
        this.sendMsg(Constants.PROCESSOR_DATA_DELETED, routeCode, packageName, className, path);
    }
    public void processorDataDeleted(ServiceFlow serviceFlow) {
        this.sendMsg(Constants.PROCESSOR_DATA_DELETED, serviceFlow);
    }
    /**
     * 当外界组件通知一个新的RouteDefine路由被定义时,该事件被触发
     */
    public void routeDefineAdded(String serviceFlow, String packageName, String className, String path) {
        this.sendMsg(Constants.ROUTE_DEFINE_ADDED, serviceFlow, packageName, className, path);
    public void routeDefineAdded(String routeCode, String packageName, String className, String path) {
        this.sendMsg(Constants.ROUTE_DEFINE_ADDED, routeCode, packageName, className, path);
    }
    public void routeDefineAdded(ServiceFlow serviceFlow) {
        this.sendMsg(Constants.ROUTE_DEFINE_ADDED, serviceFlow);
    }
    /**
     * 当外界组件通知一个已有的RouteDefine路由定义被改变时,主要就是路由定义内容被改变时,该事件被触发。
     */
    public void routeDefineChanged(String serviceFlow, String packageName, String className, String path) {
        this.sendMsg(Constants.ROUTE_DEFINE_CHANGED, serviceFlow, packageName, className, path);
    public void routeDefineChanged(String routeCode, String packageName, String className, String path) {
        this.sendMsg(Constants.ROUTE_DEFINE_CHANGED, routeCode, packageName, className, path);
    }
    public void routeDefineChanged(ServiceFlow serviceFlow) {
        this.sendMsg(Constants.ROUTE_DEFINE_CHANGED, serviceFlow);
    }
    /**
     * 当外界组件通知一个已有的RouteDefine路由定义被删除时,该事件被触发。
     */
    public void routeDefineDelete(String serviceFlow, String packageName, String className) {
        this.sendMsg(Constants.ROUTE_DEFINE_DELETED, serviceFlow, packageName, className, null);
    public void routeDefineDelete(String routeCode, String packageName, String className) {
        this.sendMsg(Constants.ROUTE_DEFINE_DELETED, routeCode, packageName, className, null);
    }
    public void routeClassAdded(String serviceFlow, String packageName, String className, String path, String cron) {
        this.sendGenMsg(Constants.ROUTE_CLASS_ADDED, serviceFlow, packageName, className, path, cron);
    public void routeDefineDelete(ServiceFlow serviceFlow) {
        this.sendMsg(Constants.ROUTE_DEFINE_DELETED, serviceFlow);
    }
    public void routeClassChanged(String serviceFlow, String packageName, String className, String path, String cron) {
        this.sendGenMsg(Constants.ROUTE_CLASS_CHANGED, serviceFlow, packageName, className, path, cron);
    public void routeClassAdded(String routeCode, String packageName, String className, String path, String cron) {
        this.sendGenMsg(Constants.ROUTE_CLASS_ADDED, routeCode, packageName, className, path, cron);
    }
    public void processorClassAdded(String serviceFlow, String packageName, String className, String path) {
        this.sendMsg(Constants.PROCESSOR_CLASS_ADDED, serviceFlow, packageName, className, path);
    public void routeClassAdded(ServiceFlow serviceFlow) {
        this.sendMsg(Constants.ROUTE_CLASS_ADDED, serviceFlow);
    }
    public void processorClassChanged(String serviceFlow, String packageName, String className, String path) {
        this.sendMsg(Constants.PROCESSOR_CLASS_CHANGED, serviceFlow, packageName, className, path);
    public void routeClassChanged(String routeCode, String packageName, String className, String path, String cron) {
        this.sendGenMsg(Constants.ROUTE_CLASS_CHANGED, routeCode, packageName, className, path, cron);
    }
    private void sendMsg(String event, String serviceFlow, String packageName, String className, String path) {
        ObjectNode objectNode = objectMapper.createObjectNode();
        objectNode.put("event", event);
        objectNode.put("serviceFlow", serviceFlow);
        objectNode.put("packageName", packageName);
        objectNode.put("className", className);
        objectNode.put("path", path);
        try {
            String msg = objectMapper.writeValueAsString(objectNode);
            this.jmsMessagingTemplate.convertAndSend(this.queue, msg);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    public void routeClassChanged(ServiceFlow serviceFlow) {
        this.sendMsg(Constants.ROUTE_CLASS_CHANGED, serviceFlow);
    }
    private void sendGenMsg(String event, String serviceFlow, String packageName, String className, String path, String cron) {
        ObjectNode objectNode = objectMapper.createObjectNode();
        objectNode.put("event", event);
        objectNode.put("serviceFlow", serviceFlow);
        objectNode.put("packageName", packageName);
        objectNode.put("className", className);
        objectNode.put("path", path);
        objectNode.put("cron", cron);
        try {
            String msg = objectMapper.writeValueAsString(objectNode);
            this.jmsMessagingTemplate.convertAndSend(this.queue, msg);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    public void processorClassAdded(String routeCode, String packageName, String className, String path) {
        this.sendMsg(Constants.PROCESSOR_CLASS_ADDED, routeCode, packageName, className, path);
    }
    public void processorClassAdded(ServiceFlow serviceFlow) {
        this.sendMsg(Constants.PROCESSOR_CLASS_ADDED, serviceFlow);
    }
    public void processorClassChanged(String routeCode, String packageName, String className, String path) {
        this.sendMsg(Constants.PROCESSOR_CLASS_CHANGED, routeCode, packageName, className, path);
    }
    public void processorClassChanged(ServiceFlow serviceFlow) {
        this.sendMsg(Constants.PROCESSOR_CLASS_CHANGED, serviceFlow);
    }
    private void sendMsg(String event, String routeCode, String packageName, String className, String path) {
        ServiceFlow flow = new ServiceFlow();
        flow.setRouteCode(routeCode);
        ServiceFlow.HandleFile handleFile = flow.new HandleFile();
//        handleFile.setUsage(Constants.FLOW_TYPE_ROUTE);
        handleFile.setPackageName(packageName);
        handleFile.setClassName(className);
        handleFile.setFilePath(path);
        handleFile.setFileType(Constants.CLASS);
        flow.addHandleFile(handleFile);
        this.sendMsg(event, flow);
    }
    private void sendGenMsg(String event, String routeCode, String packageName, String className, String path, String cron) {
        ServiceFlow flow = new ServiceFlow();
        flow.setRouteCode(routeCode);
        ServiceFlow.HandleFile handleFile = flow.new HandleFile();
//        handleFile.setUsage(Constants.FLOW_TYPE_ROUTE);
        handleFile.setPackageName(packageName);
        handleFile.setClassName(className);
        handleFile.setFilePath(path);
        handleFile.setFileType(Constants.JAVA);
        flow.addHandleFile(handleFile);
        flow.setCron(cron);
        this.sendMsg(event, flow);
    }
    private void sendMsg(ServiceFlowEvent serviceFlowEvent) {
    private void sendMsg(String event, ServiceFlow serviceFlow) {
        try {
            String msg = objectMapper.writeValueAsString(serviceFlowEvent);
            this.jmsMessagingTemplate.convertAndSend(this.queue, msg);
            String msg = objectMapper.writeValueAsString(serviceFlow);
            Map<String, Object> header = new HashMap<>();
            header.put("event", event);
            this.jmsMessagingTemplate.convertAndSend(this.queue, msg, header);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
            logger.error(e.getMessage());

+ 3 - 9
src/main/java/com/yihu/hos/system/dao/FlowClassDao.java

@ -1,6 +1,5 @@
package com.yihu.hos.system.dao;
import com.yihu.hos.system.dao.intf.IFlowClassDao;
import com.yihu.hos.system.model.SystemServiceFlowClass;
import com.yihu.hos.web.framework.dao.SQLGeneralDAO;
import org.hibernate.Query;
@ -16,12 +15,10 @@ import java.util.List;
 * Created at 2016/8/19.
 */
@Repository("flowClassDao")
public class FlowClassDao extends SQLGeneralDAO implements IFlowClassDao {
public class FlowClassDao extends SQLGeneralDAO {
    public static final String BEAN_ID = "flowClassDao";
    @Override
    public List<SystemServiceFlowClass> getFlowClassByFlowId(Integer flowId) throws Exception {
        List<SystemServiceFlowClass> flowClasses = (List<SystemServiceFlowClass>) super.hibernateTemplate.find("from SystemServiceFlowClass s where s.flowId=? ", flowId);
        if (flowClasses != null && flowClasses.size() > 0) {
@ -30,16 +27,14 @@ public class FlowClassDao extends SQLGeneralDAO implements IFlowClassDao {
        return new ArrayList<>();
    }
    @Override
    public List<SystemServiceFlowClass> getFlowClass(Integer flowId, String type) throws Exception {
        List<SystemServiceFlowClass> flowClasses = (List<SystemServiceFlowClass>) super.hibernateTemplate.find("from SystemServiceFlowClass s where s.flowId=? and type=? ", flowId,type);
        List<SystemServiceFlowClass> flowClasses = (List<SystemServiceFlowClass>) super.hibernateTemplate.find("from SystemServiceFlowClass s where s.flowId=? and type=? ", flowId, type);
        if (flowClasses != null && flowClasses.size() > 0) {
            return flowClasses;
        }
        return new ArrayList<>();
    }
    @Override
    public boolean deleteFlowClassByFlowId(Integer flowId) {
        try {
            Session session = getCurrentSession();
@ -48,14 +43,13 @@ public class FlowClassDao extends SQLGeneralDAO implements IFlowClassDao {
            query.setInteger("flowId", flowId);
            query.executeUpdate();
            return true;
        }catch (Exception e){
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
    @Override
    public List<Integer> getFlowClassIds(Integer flowId) {
        Session session = getCurrentSession();
        String sql = "SELECT id from system_service_flow_class where flow_id = :flowId";

+ 3 - 6
src/main/java/com/yihu/hos/system/dao/FlowDao.java

@ -1,6 +1,5 @@
package com.yihu.hos.system.dao;
import com.yihu.hos.system.dao.intf.IFlowDao;
import com.yihu.hos.system.model.SystemServiceFlow;
import com.yihu.hos.web.framework.dao.SQLGeneralDAO;
import com.yihu.hos.web.framework.model.Result;
@ -16,8 +15,7 @@ import java.util.Map;
 * Created at 2016/8/19.
 */
@Repository("flowDao")
public class FlowDao  extends SQLGeneralDAO implements IFlowDao {
    @Override
public class FlowDao extends SQLGeneralDAO {
    public Result getFlowList(Map<String, Object> params) throws Exception {
        StringBuilder sb = new StringBuilder("from SystemServiceFlow t where 1=1 ");
        if (!StringUtils.isEmpty(params.get("valid"))) //是否有效
@ -32,10 +30,9 @@ public class FlowDao  extends SQLGeneralDAO implements IFlowDao {
        return super.getDataGridResult(sb.toString(), Integer.valueOf(params.get("page").toString()), Integer.valueOf(params.get("rows").toString()));
    }
    @Override
    public List<SystemServiceFlow> getFlowList(String type) throws Exception {
        String sql = "select * from system_service_flow where valid = 1 and file_type= '"+type+"' order by create_date";
        List<SystemServiceFlow> list = super.queryListBySql(sql,SystemServiceFlow.class);
        String sql = "select * from system_service_flow where valid = 1 and file_type= '" + type + "' order by create_date";
        List<SystemServiceFlow> list = super.queryListBySql(sql, SystemServiceFlow.class);
        return list;
    }

+ 1 - 5
src/main/java/com/yihu/hos/system/dao/FlowTempDao.java

@ -1,6 +1,5 @@
package com.yihu.hos.system.dao;
import com.yihu.hos.system.dao.intf.IFlowTempDao;
import com.yihu.hos.system.model.SystemServiceFlowTemp;
import com.yihu.hos.web.framework.dao.SQLGeneralDAO;
import org.hibernate.Query;
@ -16,11 +15,10 @@ import java.util.List;
 * Created at 2016/8/19.
 */
@Repository("flowTempDao")
public class FlowTempDao extends SQLGeneralDAO implements IFlowTempDao {
public class FlowTempDao extends SQLGeneralDAO {
    public static final String BEAN_ID = "flowTempDao";
    @Override
    public List<SystemServiceFlowTemp> getFlowTempByFlowId(Integer flowId) throws Exception {
        List<SystemServiceFlowTemp> flowTemps = (List<SystemServiceFlowTemp>) super.hibernateTemplate.find("from SystemServiceFlowTemp s where s.flowId=? ", flowId);
        if (flowTemps != null && flowTemps.size() > 0) {
@ -29,7 +27,6 @@ public class FlowTempDao extends SQLGeneralDAO implements IFlowTempDao {
        return new ArrayList<>();
    }
    @Override
    public List<SystemServiceFlowTemp> getFlowTemps(Integer flowId, String type) throws Exception {
        List<SystemServiceFlowTemp> flowTemps = (List<SystemServiceFlowTemp>) super.hibernateTemplate.find("from SystemServiceFlowTemp s where s.flowId=? and type=? ", flowId,type);
        if (flowTemps != null && flowTemps.size() > 0) {
@ -38,7 +35,6 @@ public class FlowTempDao extends SQLGeneralDAO implements IFlowTempDao {
        return new ArrayList<>();
    }
    @Override
    public boolean deleteFlowTempByFlowId(Integer flowId) {
        try {
            Session session = getCurrentSession();

+ 0 - 18
src/main/java/com/yihu/hos/system/dao/intf/IFlowClassDao.java

@ -1,18 +0,0 @@
package com.yihu.hos.system.dao.intf;
import com.yihu.hos.system.model.SystemServiceFlowClass;
import com.yihu.hos.web.framework.dao.XSQLGeneralDAO;
import java.util.List;
/**
 * @author HZY
 * @vsrsion 1.0
 * Created at 2016/8/19.
 */
public interface IFlowClassDao extends XSQLGeneralDAO {
    List<SystemServiceFlowClass> getFlowClassByFlowId(Integer flowId) throws Exception;
    List<SystemServiceFlowClass> getFlowClass(Integer flowId,String type) throws Exception;
    boolean deleteFlowClassByFlowId(Integer flowId);
    List<Integer> getFlowClassIds(Integer flowId);
}

+ 0 - 20
src/main/java/com/yihu/hos/system/dao/intf/IFlowDao.java

@ -1,20 +0,0 @@
package com.yihu.hos.system.dao.intf;
import com.yihu.hos.system.model.SystemServiceFlow;
import com.yihu.hos.web.framework.dao.XSQLGeneralDAO;
import com.yihu.hos.web.framework.model.Result;
import java.util.List;
import java.util.Map;
/**
 * @author HZY
 * @vsrsion 1.0
 * Created at 2016/8/19.
 */
public interface IFlowDao extends XSQLGeneralDAO {
    Result getFlowList(Map<String, Object> params) throws Exception;
    List<SystemServiceFlow> getFlowList(String type) throws Exception;
}

+ 0 - 19
src/main/java/com/yihu/hos/system/dao/intf/IFlowTempDao.java

@ -1,19 +0,0 @@
package com.yihu.hos.system.dao.intf;
import com.yihu.hos.system.model.SystemServiceFlowTemp;
import com.yihu.hos.web.framework.dao.XSQLGeneralDAO;
import java.util.List;
/**
 * @author HZY
 * @vsrsion 1.0
 * Created at 2016/8/19.
 */
public interface IFlowTempDao extends XSQLGeneralDAO {
    List<SystemServiceFlowTemp> getFlowTempByFlowId(Integer flowId) throws Exception;
    List<SystemServiceFlowTemp> getFlowTemps(Integer flowId,String type) throws Exception;
    boolean deleteFlowTempByFlowId(Integer flowId);
}

+ 115 - 0
src/main/java/com/yihu/hos/system/model/bo/ServiceFlow.java

@ -0,0 +1,115 @@
package com.yihu.hos.system.model.bo;
import java.util.ArrayList;
import java.util.Date;
/**
 * @author Airhead
 * @since 2016/8/4.
 */
public class ServiceFlow {
    private String routeCode;
    private ArrayList<HandleFile> handleFiles;
    private Date updated;
    private String flowType;    //pull or push?
    private String cron;    //采集任务时使用
    public String getCron() {
        return cron;
    }
    public void setCron(String cron) {
        this.cron = cron;
    }
    public String getRouteCode() {
        return routeCode;
    }
    public void setRouteCode(String routeCode) {
        this.routeCode = routeCode;
    }
    public Date getUpdated() {
        return updated;
    }
    public void setUpdated(Date updated) {
        this.updated = updated;
    }
    public ArrayList<HandleFile> getHandleFiles() {
        return handleFiles;
    }
    public void setHandleFiles(ArrayList<HandleFile> handleFiles) {
        this.handleFiles = handleFiles;
    }
    public void addHandleFile(HandleFile handleFile) {
        if (handleFiles == null) {
            handleFiles = new ArrayList<>();
        }
        handleFiles.add(handleFile);
    }
    public String getFlowType() {
        return flowType;
    }
    public void setFlowType(String flowType) {
        this.flowType = flowType;
    }
    public class HandleFile {
        private String usage;   //router or processor
        private String packageName;
        private String className;
        private String filePath;
        private String fileType;    //java or class
        public HandleFile() {
        }
        public String getPackageName() {
            return packageName;
        }
        public void setPackageName(String packageName) {
            this.packageName = packageName;
        }
        public String getClassName() {
            return className;
        }
        public void setClassName(String className) {
            this.className = className;
        }
        public String getFilePath() {
            return filePath;
        }
        public void setFilePath(String filePath) {
            this.filePath = filePath;
        }
        public String getFileType() {
            return fileType;
        }
        public void setFileType(String fileType) {
            this.fileType = fileType;
        }
        public String getUsage() {
            return usage;
        }
        public void setUsage(String usage) {
            this.usage = usage;
        }
    }
}

+ 0 - 152
src/main/java/com/yihu/hos/system/model/bo/ServiceFlowEvent.java

@ -1,152 +0,0 @@
package com.yihu.hos.system.model.bo;
import java.util.ArrayList;
import java.util.Date;
/**
 * @author  Airhead
 * @since 2016/8/4.
 */
public class ServiceFlowEvent {
    private String event;
    private String serviceFlow;
    private ArrayList<Router> routers;
    private ArrayList<Processor> processors;
    private Date updateTime;
    private String flowType;
    private String cron;    //采集任务时使用
    public String getCron() {
        return cron;
    }
    public void setCron(String cron) {
        this.cron = cron;
    }
    public String getEvent() {
        return event;
    }
    public void setEvent(String event) {
        this.event = event;
    }
    public String getServiceFlow() {
        return serviceFlow;
    }
    public void setServiceFlow(String serviceFlow) {
        this.serviceFlow = serviceFlow;
    }
    public Date getUpdateTime() {
        return updateTime;
    }
    public void setUpdateTime(Date updateTime) {
        this.updateTime = updateTime;
    }
    public ArrayList<Router> getRouters() {
        return routers;
    }
    public void setRouters(ArrayList<Router> routers) {
        this.routers = routers;
    }
    public ArrayList<Processor> getProcessors() {
        return processors;
    }
    public void setProcessors(ArrayList<Processor> processors) {
        this.processors = processors;
    }
    public String getFlowType() {
        return flowType;
    }
    public void setFlowType(String flowType) {
        this.flowType = flowType;
    }
    public class Router {
        private String packageName;
        private String className;
        private String filePath;
        private String fileType;
        public String getPackageName() {
            return packageName;
        }
        public void setPackageName(String packageName) {
            this.packageName = packageName;
        }
        public String getClassName() {
            return className;
        }
        public void setClassName(String className) {
            this.className = className;
        }
        public String getFilePath() {
            return filePath;
        }
        public void setFilePath(String filePath) {
            this.filePath = filePath;
        }
        public String getFileType() {
            return fileType;
        }
        public void setFileType(String fileType) {
            this.fileType = fileType;
        }
    }
    public class Processor {
        private String packageName;
        private String className;
        private String filePath;
        private String fileType;
        public String getPackageName() {
            return packageName;
        }
        public void setPackageName(String packageName) {
            this.packageName = packageName;
        }
        public String getClassName() {
            return className;
        }
        public void setClassName(String className) {
            this.className = className;
        }
        public String getFilePath() {
            return filePath;
        }
        public void setFilePath(String filePath) {
            this.filePath = filePath;
        }
        public String getFileType() {
            return fileType;
        }
        public void setFileType(String fileType) {
            this.fileType = fileType;
        }
    }
}

+ 137 - 207
src/main/java/com/yihu/hos/system/service/FlowManager.java

@ -3,23 +3,19 @@ package com.yihu.hos.system.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yihu.hos.common.constants.Constants;
import com.yihu.hos.core.file.FileUtil;
import com.yihu.hos.core.http.HTTPResponse;
import com.yihu.hos.core.http.HttpClientKit;
import com.yihu.hos.services.ServiceFlowEventService;
import com.yihu.hos.system.dao.FlowClassDao;
import com.yihu.hos.system.dao.FlowDao;
import com.yihu.hos.system.dao.FlowTempDao;
import com.yihu.hos.system.dao.intf.IFlowClassDao;
import com.yihu.hos.system.dao.intf.IFlowDao;
import com.yihu.hos.system.dao.intf.IFlowTempDao;
import com.yihu.hos.system.model.SystemServiceFlow;
import com.yihu.hos.system.model.SystemServiceFlowClass;
import com.yihu.hos.system.model.SystemServiceFlowTemp;
import com.yihu.hos.system.model.bo.ServiceFlow;
import com.yihu.hos.system.service.intf.IFlowManage;
import com.yihu.hos.web.framework.model.ActionResult;
import com.yihu.hos.web.framework.model.DictItem;
import com.yihu.hos.web.framework.model.Result;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.multipart.MultipartFile;
@ -27,7 +23,10 @@ import org.springframework.web.multipart.MultipartFile;
import javax.annotation.Resource;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
/**
 * 系统流程管理业务类
@ -40,26 +39,76 @@ import java.util.*;
public class FlowManager implements IFlowManage {
    public static final String BEAN_ID = "flowManager";
    @Value("${esb.genCamelUrl}")
    private String genCamelUrl;
    @Autowired
    ServiceFlowEventService serviceFlowEventService;
    @Resource(name = "flowDao")
    private IFlowDao flowDao;
    private FlowDao flowDao;
    @Resource(name = FlowClassDao.BEAN_ID)
    private IFlowClassDao flowClassDao;
    private FlowClassDao flowClassDao;
    @Resource(name = FlowTempDao.BEAN_ID)
    private IFlowTempDao flowTempDao;
    @Autowired
    ServiceFlowEventService serviceFlowEventService;
    private FlowTempDao flowTempDao;
    @Override
    public Result getFlowList(Map<String, Object> params) throws Exception {
        return flowDao.getFlowList(params);
    }
    /**
     * @return List<ServiceFlow> 返回所有可运行流程
     * @throws Exception ...
     */
    public List<ServiceFlow> getServiceFlowList() throws Exception {
        List<ServiceFlow> serviceFlowList = new ArrayList<>();
        List<SystemServiceFlow> classFlowList = flowDao.getFlowList(Constants.CLASS);
        for (SystemServiceFlow systemServiceFlow : classFlowList) {
            ServiceFlow serviceFlow = new ServiceFlow();
            serviceFlow.setRouteCode(systemServiceFlow.getCode());
            serviceFlow.setFlowType(systemServiceFlow.getFileType());
            List<SystemServiceFlowClass> classList = flowClassDao.getFlowClassByFlowId(systemServiceFlow.getId());
            ArrayList<ServiceFlow.HandleFile> handleFileList = new ArrayList<>();
            for (SystemServiceFlowClass flowClass : classList) {
                ServiceFlow.HandleFile handleFile = serviceFlow.new HandleFile();
                handleFile.setFileType(Constants.CLASS);
                handleFile.setClassName(flowClass.getClassName());
                handleFile.setPackageName(flowClass.getPackageName());
                handleFile.setFilePath(flowClass.getClassPath());
                handleFile.setUsage(flowClass.getType());
                handleFileList.add(handleFile);
            }
            serviceFlow.setHandleFiles(handleFileList);
            serviceFlowList.add(serviceFlow);
        }
        List<SystemServiceFlow> javaFlowList = flowDao.getFlowList(Constants.JAVA);
        for (SystemServiceFlow systemServiceFlow : javaFlowList) {
            ServiceFlow serviceFlow = new ServiceFlow();
            serviceFlow.setRouteCode(systemServiceFlow.getCode());
            List<SystemServiceFlowTemp> tempList = flowTempDao.getFlowTempByFlowId(systemServiceFlow.getId());
            ArrayList<ServiceFlow.HandleFile> handleFileList = new ArrayList<>();
            for (SystemServiceFlowTemp flowTemp : tempList) {
                ServiceFlow.HandleFile handleFile = serviceFlow.new HandleFile();
                handleFile.setFileType(Constants.JAVA);
                handleFile.setClassName(flowTemp.getClassName());
                handleFile.setPackageName(flowTemp.getPackageName());
                handleFile.setFilePath(flowTemp.getClassPath());
                handleFile.setUsage(flowTemp.getType());
                handleFileList.add(handleFile);
            }
            serviceFlow.setHandleFiles(handleFileList);
            serviceFlowList.add(serviceFlow);
        }
        return serviceFlowList;
    }
    @Override
    public SystemServiceFlow getFlowById(Integer id) throws Exception {
        return flowDao.getEntity(SystemServiceFlow.class, id);
@ -70,17 +119,17 @@ public class FlowManager implements IFlowManage {
        obj.setCreateDate(new Date());
        flowDao.saveEntity(obj);
        if (Constants.CLASS.equals(obj.getFileType())){
        if (Constants.CLASS.equals(obj.getFileType())) {
            List<SystemServiceFlowClass> flowClassList = obj.getFlowClassArray();
            for (SystemServiceFlowClass flowClass:flowClassList){
            for (SystemServiceFlowClass flowClass : flowClassList) {
                flowClass.setFlowId(obj.getId());
                flowDao.saveEntity(flowClass);
                //发送消息到MQ对列
                sendUpdateMessage(obj.getCode(), flowClass, Constants.FLOW_OP_ADD);
            }
        }else if (Constants.JAVA.equals(obj.getFileType())){
        } else if (Constants.JAVA.equals(obj.getFileType())) {
            List<SystemServiceFlowTemp> flowTempList = obj.getFlowTempArray();
            for (SystemServiceFlowTemp flowTemp:flowTempList){
            for (SystemServiceFlowTemp flowTemp : flowTempList) {
                flowTemp.setFlowId(obj.getId());
                flowDao.saveEntity(flowTemp);
            }
@ -100,21 +149,21 @@ public class FlowManager implements IFlowManage {
        flow.setFileType(obj.getFileType());
        if (Constants.JAVA.equals(flow.getFileType())){
        if (Constants.JAVA.equals(flow.getFileType())) {
            List<SystemServiceFlowTemp> flowTempList = obj.getFlowTempArray();
            boolean succ = flowTempDao.deleteFlowTempByFlowId(obj.getId());
            if (succ){
                for (SystemServiceFlowTemp flowTemp:flowTempList){
            if (succ) {
                for (SystemServiceFlowTemp flowTemp : flowTempList) {
                    flowTempDao.saveEntity(flowTemp);
                }
            }
        }else if (Constants.CLASS.equals(flow.getFileType())){
        } else if (Constants.CLASS.equals(flow.getFileType())) {
            List<Integer> classIds = flowClassDao.getFlowClassIds(obj.getId());//原flowclass集合
            List<SystemServiceFlowClass> flowClassList = obj.getFlowClassArray();
            SystemServiceFlowClass flowClassRoute = null;
            String oper = "";
            for (SystemServiceFlowClass flowClass:flowClassList){
                if (flowClass.getId()!=null) {
            for (SystemServiceFlowClass flowClass : flowClassList) {
                if (flowClass.getId() != null) {
                    classIds.remove(flowClass.getId());
                    flowClassDao.updateEntity(flowClass);
                    if (!flowClass.getType().equals(Constants.FLOW_TYPE_ROUTE)) {
@ -123,7 +172,7 @@ public class FlowManager implements IFlowManage {
                        flowClassRoute = flowClass;
                        oper = Constants.FLOW_OP_UPDATE;
                    }
                }else {
                } else {
                    if (!flowClass.getType().equals(Constants.FLOW_TYPE_ROUTE)) {
                        flowClassDao.saveEntity(flowClass);
                        sendUpdateMessage(flow.getCode(), flowClass, Constants.FLOW_OP_ADD);
@ -134,8 +183,8 @@ public class FlowManager implements IFlowManage {
                }
            }
            //删除判断
            if (classIds !=null && classIds.size()>0){
                for (Integer id:classIds){
            if (classIds != null && classIds.size() > 0) {
                for (Integer id : classIds) {
                    SystemServiceFlowClass flowClass = getFlowClassById(id);
                    flowClassDao.deleteEntity(flowClass);
                    if (!flowClass.getType().equals(Constants.FLOW_TYPE_ROUTE)) {
@ -155,7 +204,6 @@ public class FlowManager implements IFlowManage {
        flowDao.updateEntity(flow);
        return Result.success("更新成功");
    }
@ -165,7 +213,7 @@ public class FlowManager implements IFlowManage {
        List<SystemServiceFlowClass> flowClassList = flowClassDao.getFlowClassByFlowId(id);
        SystemServiceFlowClass flowClassRoute = null;
        String oper = "";
        for (SystemServiceFlowClass flowClass:flowClassList){
        for (SystemServiceFlowClass flowClass : flowClassList) {
            flowClassDao.deleteEntity(flowClass);
            //发送消息到MQ对列
            if (!flowClass.getType().equals(Constants.FLOW_TYPE_ROUTE)) {
@ -179,7 +227,7 @@ public class FlowManager implements IFlowManage {
        if (flowClassRoute != null) {
            sendUpdateMessage(flow.getCode(), flowClassRoute, oper);
        }
        if (Constants.JAVA.equals(flow.getFileType())){
        if (Constants.JAVA.equals(flow.getFileType())) {
            flowTempDao.deleteFlowTempByFlowId(id);
        }
        flowDao.deleteEntity(flow);
@ -217,7 +265,7 @@ public class FlowManager implements IFlowManage {
    @Transactional
    public boolean deleteFlowClassByFlowId(Integer flowId) {
        boolean succ =flowClassDao.deleteFlowClassByFlowId(flowId);
        boolean succ = flowClassDao.deleteFlowClassByFlowId(flowId);
        return succ;
    }
@ -239,12 +287,12 @@ public class FlowManager implements IFlowManage {
    @Override
    public String uploadFile(MultipartFile file, String baseSavePath) {
        String fileName =  file.getOriginalFilename();
        String fileName = file.getOriginalFilename();
        boolean succ = false;
        try {
            succ = FileUtil.writeFile(baseSavePath + File.separator + fileName, file.getBytes(), "utf-8");
            if (succ){
                return  fileName;
            if (succ) {
                return fileName;
            }
        } catch (IOException e) {
            e.printStackTrace();
@ -254,45 +302,60 @@ public class FlowManager implements IFlowManage {
    /**
     * 发送MQ消息-更新路由
     *
     * @param flowCode  服务流程Code标识
     * @param flowClass
     * @param operate
     */
    public  void sendUpdateMessage(String flowCode,SystemServiceFlowClass flowClass,String operate){
    public void sendUpdateMessage(String flowCode, SystemServiceFlowClass flowClass, String operate) {
        //发送消息到MQ对列
        if ("1".equals(flowClass.getIsUpdate()) && Constants.FLOW_TYPE_ROUTE.equals(flowClass.getType())) {
            //route
            switch (operate){
                case "add" : serviceFlowEventService.routeDefineAdded(flowCode, flowClass.getPackageName(), flowClass.getClassName(), flowClass.getClassPath()); break;
                case "delete" : serviceFlowEventService.routeDefineDelete(flowCode, flowClass.getPackageName(), flowClass.getClassName()); break;
                case "update" : serviceFlowEventService.routeDefineChanged(flowCode, flowClass.getPackageName(), flowClass.getClassName(), flowClass.getClassPath()); break;
                default : break;
            switch (operate) {
                case "add":
                    serviceFlowEventService.routeDefineAdded(flowCode, flowClass.getPackageName(), flowClass.getClassName(), flowClass.getClassPath());
                    break;
                case "delete":
                    serviceFlowEventService.routeDefineDelete(flowCode, flowClass.getPackageName(), flowClass.getClassName());
                    break;
                case "update":
                    serviceFlowEventService.routeDefineChanged(flowCode, flowClass.getPackageName(), flowClass.getClassName(), flowClass.getClassPath());
                    break;
                default:
                    break;
            }
        } else if ("1".equals(flowClass.getIsUpdate()) && Constants.FLOW_TYPE_PROCESSOR.equals(flowClass.getType())) {
            //processor
            switch (operate){
                case "add" : serviceFlowEventService.processorAdded(flowCode, flowClass.getPackageName(), flowClass.getClassName(), flowClass.getClassPath()); break;
                case "delete" : serviceFlowEventService.processorDataDeleted(flowCode, flowClass.getPackageName(), flowClass.getClassName(), flowClass.getClassPath()); break;
                case "update" : serviceFlowEventService.processorDataChanged(flowCode, flowClass.getPackageName(), flowClass.getClassName(), flowClass.getClassPath()); break;
                default : break;
            switch (operate) {
                case "add":
                    serviceFlowEventService.processorAdded(flowCode, flowClass.getPackageName(), flowClass.getClassName(), flowClass.getClassPath());
                    break;
                case "delete":
                    serviceFlowEventService.processorDataDeleted(flowCode, flowClass.getPackageName(), flowClass.getClassName(), flowClass.getClassPath());
                    break;
                case "update":
                    serviceFlowEventService.processorDataChanged(flowCode, flowClass.getPackageName(), flowClass.getClassName(), flowClass.getClassPath());
                    break;
                default:
                    break;
            }
        }
    }
    /**
     * 获取流程列表
     *
     * @param type 流程的文件类型
     * @return
     * @throws Exception
     */
    @Override
    public  ActionResult getFlowList(String type) throws Exception {
    public ActionResult getFlowList(String type) throws Exception {
        List<SystemServiceFlow> flowList = flowDao.getFlowList(type);
        ActionResult re = new ActionResult();
        if(flowList!=null&&flowList.size()>0)
        {
        if (flowList != null && flowList.size() > 0) {
            List<DictItem> dictList = new ArrayList<>();
            for(SystemServiceFlow item:flowList){
            for (SystemServiceFlow item : flowList) {
                DictItem dict = new DictItem();
                dict.setCode(item.getId().toString());
                dict.setValue(item.getName());
@ -307,6 +370,7 @@ public class FlowManager implements IFlowManage {
    /**
     * TODO 调用broker接口生成camel相关文件
     *
     * @param flowTempId
     * @param newCron
     * @throws Exception
@ -314,186 +378,53 @@ public class FlowManager implements IFlowManage {
    public Integer genCamelFile(Integer flowTempId, String newCron) throws Exception {
        Long timestamp = System.currentTimeMillis();
        Integer newFlowId = sendAddProcessore(flowTempId, timestamp);
        if (newFlowId != null){
        if (newFlowId != null) {
            newFlowId = sendAddRoute(flowTempId, newFlowId, newCron, timestamp);
            if (newFlowId !=null){
            if (newFlowId != null) {
                return newFlowId;
            }else {
            } else {
                System.out.println("生成route文件失败");
                return null;
            }
        }else {
        } else {
            System.out.println("生成processor文件失败");
            return null;
        }
    }
    public Integer addRouteFile(Integer tempId,Integer flowId, String newCron ,Long timestamp) throws Exception {
        ObjectMapper objectMapper = new ObjectMapper();
        List<SystemServiceFlowTemp> flowTempRouters = flowTempDao.getFlowTemps(tempId, Constants.FLOW_TYPE_ROUTE);
        SystemServiceFlow newFlow = getFlowById(flowId);
        //route模板文件记录是否存在。不存在就返回。
        if (!flowTempRouters.isEmpty()){
            Map<String,String> params = null;
            SystemServiceFlowTemp flowTemp =flowTempRouters.get(0);
            StringBuilder basePath = new StringBuilder();;
            if (flowTemp.getPackageName()!=null){
                String packagePath[] = flowTemp.getPackageName().split("\\.");
                for (int i=0;i<packagePath.length;i++){
                    basePath.append(packagePath[i]).append("/");
                }
            }
            //新增processor记录
                String newClassName = flowTemp.getClassName()+timestamp;
                String newRoutePath =null;
                params = new HashMap<>();
                params.put("routeId", newFlow.getCode());
                params.put("type",Constants.FLOW_TYPE_ROUTE);
                params.put("filePath", flowTemp.getClassPath());
                params.put("packageName", basePath.toString());
                params.put("oldClassName", flowTemp.getClassName());
                params.put("newClassName",newClassName);//原文件名加当前时间戳
                params.put("newCron",newCron);
                HTTPResponse response  = HttpClientKit.post(genCamelUrl, params);
                if (response.getStatusCode()==200 ){
                    Map<String,Object> body = objectMapper.readValue(response.getBody(),Map.class);
                    boolean succ = (boolean) body.get("successFlg");
                    if (succ){
                        newRoutePath = body.get("message").toString();
                    }else {
                        return null;
                    }
                    System.out.println(response.getBody());
                    SystemServiceFlowClass newFlowClass = new SystemServiceFlowClass();
                    newFlowClass.setPackageName(flowTemp.getPackageName());
                    newFlowClass.setClassName(newClassName);
                    newFlowClass.setClassPath(newRoutePath);
                    newFlowClass.setFlowId(newFlow.getId());
                    newFlowClass.setType(Constants.FLOW_TYPE_ROUTE);
                    flowClassDao.saveEntity(newFlowClass);
                    newFlowClass.setIsUpdate("1");
                    sendUpdateMessage(newFlow.getCode(), newFlowClass, Constants.FLOW_OP_ADD);
                }else {
                    return null;
                }
            return newFlow.getId();
        }
        return null;
    }
    public Integer addProcessorFile(Integer flowId, String newCron,Long timestamp) throws Exception {
        ObjectMapper objectMapper = new ObjectMapper();
        List<SystemServiceFlowTemp> flowClassRouters = flowTempDao.getFlowTemps(flowId, Constants.FLOW_TYPE_ROUTE);
        List<SystemServiceFlowTemp> flowClassProces = flowTempDao.getFlowTemps(flowId, Constants.FLOW_TYPE_PROCESSOR);
        SystemServiceFlow oldFlow = getFlowById(flowId);
        //route模板文件记录是否存在。不存在就返回。
        if (!flowClassRouters.isEmpty()){
            Map<String,String> params = null;
            SystemServiceFlowTemp flowTemp =flowClassRouters.get(0);
            StringBuilder basePath = new StringBuilder();;
            if (flowTemp.getPackageName()!=null){
                String packagePath[] = flowTemp.getPackageName().split("\\.");
                for (int i=0;i<packagePath.length;i++){
                    basePath.append(packagePath[i]).append("/");
                }
            }
            //成功生成文件后,添加flow和flowclass记录
            //生成新流程
            SystemServiceFlow newFlow = new SystemServiceFlow();
            newFlow.setName(oldFlow.getName()+timestamp);
            newFlow.setCode(oldFlow.getCode()+timestamp);
            newFlow.setChart(oldFlow.getChart());
            newFlow.setValid(1);
            newFlow.setCreateDate(new Date());
            newFlow.setFileType(Constants.CLASS);
            flowDao.saveEntity(newFlow);
            //新增processor记录
            for (SystemServiceFlowTemp process:flowClassProces){
//                String newProcessName = process.getClassName()+timestamp;
                String newProcessPath = null;
                StringBuilder proPath =  new StringBuilder( );;
                if (process.getPackageName()!=null){
                    String packagePath[] = process.getPackageName().split("\\.");
                    for (int i=0;i<packagePath.length;i++){
                        proPath.append(packagePath[i]).append("/");
                    }
                }
                params = new HashMap<>();
                params.put("routeId", newFlow.getCode());
                params.put("type",Constants.FLOW_TYPE_PROCESSOR);
                params.put("filePath", process.getClassPath());
                params.put("packageName", proPath.toString());
                params.put("newClassName",process.getClassName());//原文件名加当前时间戳
                params.put("oldClassName", process.getClassName());
                params.put("newCron",newCron);
                HTTPResponse response = HttpClientKit.post(genCamelUrl, params);
                if (response.getStatusCode()==200 ){
                    Map<String,Object> body = objectMapper.readValue(response.getBody(),Map.class);
                    boolean succ = (boolean) body.get("successFlg");
                    if (succ){
                        newProcessPath = body.get("message").toString();
                        System.out.println(response.getBody());
                        SystemServiceFlowClass processClass = new SystemServiceFlowClass();
                        processClass.setPackageName(process.getPackageName());
                        processClass.setClassName(process.getClassName());
                        processClass.setClassPath(newProcessPath);
                        processClass.setFlowId(newFlow.getId());
                        processClass.setType(Constants.FLOW_TYPE_PROCESSOR);
                        flowClassDao.saveEntity(processClass);
                        processClass.setIsUpdate("1");
                        sendUpdateMessage(newFlow.getCode(), processClass, Constants.FLOW_OP_ADD);
                    }else {
                        return null;
                    }
                }else {
                    return null;
                }
            }
            return newFlow.getId();
        }
        return null;
    }
    /**
     * 修改camel相关文件
     * @param flowId 流程ID
     * @param newCron  新cron
     *
     * @param flowId  流程ID
     * @param newCron 新cron
     * @return
     * @throws Exception
     */
    @Override
    public Integer updateCamelFile(Integer flowTempId,Integer flowId, String newCron) throws Exception {
    public Integer updateCamelFile(Integer flowTempId, Integer flowId, String newCron) throws Exception {
        Long timestamp = System.currentTimeMillis();
        ObjectMapper objectMapper = new ObjectMapper();
        List<SystemServiceFlowTemp> flowTempRouters = flowTempDao.getFlowTemps(flowTempId, Constants.FLOW_TYPE_ROUTE);
        List<SystemServiceFlowClass> flowClassRouters = flowClassDao.getFlowClass(flowId, Constants.FLOW_TYPE_ROUTE);
//        SystemServiceFlow oldFlow = getFlowById(flowId);
        SystemServiceFlow flow = flowDao.getEntity(SystemServiceFlow.class,flowId);
        SystemServiceFlow flow = flowDao.getEntity(SystemServiceFlow.class, flowId);
        //route模板文件记录是否存在。不存在就返回。
        if (!flowTempRouters.isEmpty()){
            SystemServiceFlowTemp flowTemp =flowTempRouters.get(0);
            SystemServiceFlowClass flowClass =flowClassRouters.get(0);
        if (!flowTempRouters.isEmpty()) {
            SystemServiceFlowTemp flowTemp = flowTempRouters.get(0);
            SystemServiceFlowClass flowClass = flowClassRouters.get(0);
            StringBuilder basePath = new StringBuilder();;
            if (flowTemp.getPackageName()!=null){
            StringBuilder basePath = new StringBuilder();
            ;
            if (flowTemp.getPackageName() != null) {
                String packagePath[] = flowTemp.getPackageName().split("\\.");
                for (int i=0;i<packagePath.length;i++){
                for (int i = 0; i < packagePath.length; i++) {
                    basePath.append(packagePath[i]).append("/");
                }
            }
                    //route文件生成成功,发送消息
            //route文件生成成功,发送消息
//                    flowClass.setIsUpdate("1");
//                    sendUpdateMessage(flow.getCode(), flowClass, Constants.FLOW_OP_UPDATE);
            serviceFlowEventService.routeClassChanged(flow.getCode(),basePath.toString(), flowTemp.getClassName(), flowTemp.getClassPath(),newCron);
            serviceFlowEventService.routeClassChanged(flow.getCode(), basePath.toString(), flowTemp.getClassName(), flowTemp.getClassPath(), newCron);
            return flowId;
@ -505,7 +436,6 @@ public class FlowManager implements IFlowManage {
    }
    /* *********************       发送消息方式生成文件   ********************************/
    public Integer sendAddRoute(Integer tempId, Integer flowId, String newCron, Long timestamp) throws Exception {
        List<SystemServiceFlowTemp> flowTempRouters = flowTempDao.getFlowTemps(tempId, Constants.FLOW_TYPE_ROUTE);
@ -526,7 +456,7 @@ public class FlowManager implements IFlowManage {
            //新增processor记录
            String newClassName = flowTemp.getClassName() + newFlow.getCode();
            String newRoutePath = flowTemp.getClassPath().replace(".java",".class");
            String newRoutePath = flowTemp.getClassPath().replace(".java", ".class");
            SystemServiceFlowClass newFlowClass = new SystemServiceFlowClass();
            newFlowClass.setPackageName(flowTemp.getPackageName());
            newFlowClass.setClassName(newClassName);
@ -535,7 +465,7 @@ public class FlowManager implements IFlowManage {
            newFlowClass.setType(Constants.FLOW_TYPE_ROUTE);
            flowClassDao.saveEntity(newFlowClass);
            newFlowClass.setIsUpdate("1");
            serviceFlowEventService.routeClassAdded(newFlow.getCode(), basePath.toString(),  flowTemp.getClassName(), flowTemp.getClassPath(),newCron);
            serviceFlowEventService.routeClassAdded(newFlow.getCode(), basePath.toString(), flowTemp.getClassName(), flowTemp.getClassPath(), newCron);
            return newFlow.getId();
        }
@ -583,7 +513,7 @@ public class FlowManager implements IFlowManage {
                processClass.setType(Constants.FLOW_TYPE_PROCESSOR);
                processClass.setIsUpdate("1");
//                sendUpdateMessage(newFlow.getCode(), processClass, Constants.FLOW_OP_ADD);
                serviceFlowEventService.processorClassAdded(newFlow.getCode(),proPath.toString(), processClass.getClassName(), process.getClassPath());
                serviceFlowEventService.processorClassAdded(newFlow.getCode(), proPath.toString(), processClass.getClassName(), process.getClassPath());
                flowClassDao.saveEntity(processClass);