|
@ -2,18 +2,24 @@ package com.yihu.hos.services;
|
|
|
|
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
import com.fasterxml.jackson.databind.node.ObjectNode;
|
|
|
import com.yihu.hos.common.constants.Constants;
|
|
|
import com.yihu.hos.core.log.Logger;
|
|
|
import com.yihu.hos.core.log.LoggerFactory;
|
|
|
import com.yihu.hos.system.model.bo.ServiceFlow;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.jms.core.JmsMessagingTemplate;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
import javax.jms.Queue;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
|
|
|
/**
|
|
|
* @created Airhead 2016/8/2.
|
|
|
*/
|
|
|
@Component
|
|
|
public class ServiceFlowEventService {
|
|
|
static final Logger logger = LoggerFactory.getLogger(ServiceFlowEventService.class);
|
|
|
@Autowired
|
|
|
private JmsMessagingTemplate jmsMessagingTemplate;
|
|
|
|
|
@ -23,6 +29,14 @@ public class ServiceFlowEventService {
|
|
|
@Autowired
|
|
|
private ObjectMapper objectMapper;
|
|
|
|
|
|
/**
|
|
|
* admin启动时,触发一次所有流程更新事件,用于重启整个服务的情况。
|
|
|
* 同时解决Broker中启动多个采集任务的问题。
|
|
|
*/
|
|
|
public void flowRefresh(ServiceFlow serviceFlow) {
|
|
|
this.sendMsg(Constants.FlOW_REFRESH, serviceFlow);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 当外界组件通知一个新的processor处理器被定义时,该事件被触发。
|
|
|
*
|
|
@ -32,81 +46,135 @@ public class ServiceFlowEventService {
|
|
|
* @param path processor处理器定义涉及的class内容,如果zookeeper数据结构中class分片存储,在业务级接口层面上也进行了合并
|
|
|
*/
|
|
|
public void processorAdded(String serviceFlow, String packageName, String className, String path) {
|
|
|
sendMsg("processorAdded", serviceFlow, packageName, className, path);
|
|
|
sendMsg(Constants.PROCESSOR_ADDED, serviceFlow, packageName, className, path);
|
|
|
}
|
|
|
|
|
|
public void processorAdded(ServiceFlow serviceFlow) {
|
|
|
this.sendMsg(Constants.PROCESSOR_ADDED, serviceFlow);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 当外界组件通知一个已有的processor处理器data部分发生变化时,该事件被触发。
|
|
|
*/
|
|
|
public void processorDataChanged(String serviceFlow, String packageName, String className, String path) {
|
|
|
this.sendMsg("processorDataChanged", serviceFlow, packageName, className, path);
|
|
|
public void processorDataChanged(String routeCode, String packageName, String className, String path) {
|
|
|
this.sendMsg(Constants.PROCESSOR_DATA_CHANGED, routeCode, packageName, className, path);
|
|
|
}
|
|
|
|
|
|
public void processorDataChanged(ServiceFlow serviceFlow) {
|
|
|
this.sendMsg(Constants.PROCESSOR_DATA_CHANGED, serviceFlow);
|
|
|
}
|
|
|
|
|
|
public void processorDataDeleted(String serviceFlow, String packageName, String className) {
|
|
|
this.sendMsg("processorDataDeleted", serviceFlow, packageName, className, null);
|
|
|
public void processorDataDeleted(String routeCode, String packageName, String className) {
|
|
|
this.sendMsg(Constants.PROCESSOR_DATA_DELETED, routeCode, packageName, className, null);
|
|
|
|
|
|
}
|
|
|
|
|
|
public void processorDataDeleted(ServiceFlow serviceFlow) {
|
|
|
this.sendMsg(Constants.PROCESSOR_DATA_DELETED, serviceFlow);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 当外界组件通知一个新的RouteDefine路由被定义时,该事件被触发
|
|
|
*/
|
|
|
public void routeDefineAdded(String serviceFlow, String packageName, String className, String path) {
|
|
|
this.sendMsg("routeDefineAdded", serviceFlow, packageName, className, path);
|
|
|
public void routeDefineAdded(String routeCode, String packageName, String className, String path) {
|
|
|
this.sendMsg(Constants.ROUTE_DEFINE_ADDED, routeCode, packageName, className, path);
|
|
|
}
|
|
|
|
|
|
public void routeDefineAdded(ServiceFlow serviceFlow) {
|
|
|
this.sendMsg(Constants.ROUTE_DEFINE_ADDED, serviceFlow);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 当外界组件通知一个已有的RouteDefine路由定义被改变时,主要就是路由定义内容被改变时,该事件被触发。
|
|
|
*/
|
|
|
public void routeDefineChanged(String serviceFlow, String packageName, String className, String path) {
|
|
|
this.sendMsg("routeDefineChanged", serviceFlow, packageName, className, path);
|
|
|
public void routeDefineChanged(String routeCode, String packageName, String className, String path) {
|
|
|
this.sendMsg(Constants.ROUTE_DEFINE_CHANGED, routeCode, packageName, className, path);
|
|
|
}
|
|
|
|
|
|
public void routeDefineChanged(ServiceFlow serviceFlow) {
|
|
|
this.sendMsg(Constants.ROUTE_DEFINE_CHANGED, serviceFlow);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 当外界组件通知一个已有的RouteDefine路由定义被删除时,该事件被触发。
|
|
|
*/
|
|
|
public void routeDefineDelete(String serviceFlow, String packageName, String className) {
|
|
|
this.sendMsg("routeDefineDelete", serviceFlow, packageName, className, null);
|
|
|
public void routeDefineDelete(String routeCode, String packageName, String className) {
|
|
|
this.sendMsg(Constants.ROUTE_DEFINE_DELETED, routeCode, packageName, className, null);
|
|
|
}
|
|
|
|
|
|
public void routeClassAdded(String serviceFlow, String packageName, String className, String path,String cron) {
|
|
|
this.sendGenMsg("routeClassAdded", serviceFlow, packageName, className, path, cron);
|
|
|
public void routeDefineDelete(ServiceFlow serviceFlow) {
|
|
|
this.sendMsg(Constants.ROUTE_DEFINE_DELETED, serviceFlow);
|
|
|
}
|
|
|
|
|
|
public void routeClassChanged(String serviceFlow, String packageName, String className, String path,String cron) {
|
|
|
this.sendGenMsg("routeClassChanged", serviceFlow, packageName, className, path, cron);
|
|
|
public void routeClassAdded(String routeCode, String packageName, String className, String path, String cron) {
|
|
|
this.sendGenMsg(Constants.ROUTE_CLASS_ADDED, routeCode, packageName, className, path, cron);
|
|
|
}
|
|
|
|
|
|
public void processorClassAdded(String serviceFlow, String packageName, String className, String path) {
|
|
|
this.sendMsg("processorClassAdded", serviceFlow, packageName, className, path);
|
|
|
public void routeClassAdded(ServiceFlow serviceFlow) {
|
|
|
this.sendMsg(Constants.ROUTE_CLASS_ADDED, serviceFlow);
|
|
|
}
|
|
|
|
|
|
private void sendMsg(String event, String serviceFlow, String packageName, String className, String path) {
|
|
|
ObjectNode objectNode = objectMapper.createObjectNode();
|
|
|
objectNode.put("event", event);
|
|
|
objectNode.put("serviceFlow", serviceFlow);
|
|
|
objectNode.put("packageName", packageName);
|
|
|
objectNode.put("className", className);
|
|
|
objectNode.put("path", path);
|
|
|
try {
|
|
|
String msg = objectMapper.writeValueAsString(objectNode);
|
|
|
this.jmsMessagingTemplate.convertAndSend(this.queue, msg);
|
|
|
} catch (JsonProcessingException e) {
|
|
|
e.printStackTrace();
|
|
|
}
|
|
|
public void routeClassChanged(String routeCode, String packageName, String className, String path, String cron) {
|
|
|
this.sendGenMsg(Constants.ROUTE_CLASS_CHANGED, routeCode, packageName, className, path, cron);
|
|
|
}
|
|
|
|
|
|
public void routeClassChanged(ServiceFlow serviceFlow) {
|
|
|
this.sendMsg(Constants.ROUTE_CLASS_CHANGED, serviceFlow);
|
|
|
}
|
|
|
|
|
|
public void processorClassAdded(String routeCode, String packageName, String className, String path) {
|
|
|
this.sendMsg(Constants.PROCESSOR_CLASS_ADDED, routeCode, packageName, className, path);
|
|
|
}
|
|
|
|
|
|
public void processorClassAdded(ServiceFlow serviceFlow) {
|
|
|
this.sendMsg(Constants.PROCESSOR_CLASS_ADDED, serviceFlow);
|
|
|
}
|
|
|
|
|
|
public void processorClassChanged(String routeCode, String packageName, String className, String path) {
|
|
|
this.sendMsg(Constants.PROCESSOR_CLASS_CHANGED, routeCode, packageName, className, path);
|
|
|
}
|
|
|
|
|
|
public void processorClassChanged(ServiceFlow serviceFlow) {
|
|
|
this.sendMsg(Constants.PROCESSOR_CLASS_CHANGED, serviceFlow);
|
|
|
}
|
|
|
|
|
|
private void sendMsg(String event, String routeCode, String packageName, String className, String path) {
|
|
|
ServiceFlow flow = new ServiceFlow();
|
|
|
flow.setRouteCode(routeCode);
|
|
|
ServiceFlow.HandleFile handleFile = flow.new HandleFile();
|
|
|
// handleFile.setUsage(Constants.FLOW_TYPE_ROUTE);
|
|
|
handleFile.setPackageName(packageName);
|
|
|
handleFile.setClassName(className);
|
|
|
handleFile.setFilePath(path);
|
|
|
handleFile.setFileType(Constants.CLASS);
|
|
|
flow.addHandleFile(handleFile);
|
|
|
|
|
|
this.sendMsg(event, flow);
|
|
|
}
|
|
|
|
|
|
private void sendGenMsg(String event, String routeCode, String packageName, String className, String path, String cron) {
|
|
|
ServiceFlow flow = new ServiceFlow();
|
|
|
flow.setRouteCode(routeCode);
|
|
|
ServiceFlow.HandleFile handleFile = flow.new HandleFile();
|
|
|
// handleFile.setUsage(Constants.FLOW_TYPE_ROUTE);
|
|
|
handleFile.setPackageName(packageName);
|
|
|
handleFile.setClassName(className);
|
|
|
handleFile.setFilePath(path);
|
|
|
handleFile.setFileType(Constants.JAVA);
|
|
|
flow.addHandleFile(handleFile);
|
|
|
flow.setCron(cron);
|
|
|
this.sendMsg(event, flow);
|
|
|
}
|
|
|
|
|
|
private void sendGenMsg(String event, String serviceFlow, String packageName, String className, String path,String cron) {
|
|
|
ObjectNode objectNode = objectMapper.createObjectNode();
|
|
|
objectNode.put("event", event);
|
|
|
objectNode.put("serviceFlow", serviceFlow);
|
|
|
objectNode.put("packageName", packageName);
|
|
|
objectNode.put("className", className);
|
|
|
objectNode.put("path", path);
|
|
|
objectNode.put("cron", cron);
|
|
|
private void sendMsg(String event, ServiceFlow serviceFlow) {
|
|
|
try {
|
|
|
String msg = objectMapper.writeValueAsString(objectNode);
|
|
|
this.jmsMessagingTemplate.convertAndSend(this.queue, msg);
|
|
|
String msg = objectMapper.writeValueAsString(serviceFlow);
|
|
|
Map<String, Object> header = new HashMap<>();
|
|
|
header.put("event", event);
|
|
|
this.jmsMessagingTemplate.convertAndSend(this.queue, msg, header);
|
|
|
} catch (JsonProcessingException e) {
|
|
|
e.printStackTrace();
|
|
|
logger.error(e.getMessage());
|
|
|
}
|
|
|
}
|
|
|
}
|