package com.yihu.hos.services; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.yihu.hos.common.constants.Constants; import com.yihu.hos.core.log.Logger; import com.yihu.hos.core.log.LoggerFactory; import com.yihu.hos.system.model.bo.ServiceFlow; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.stereotype.Component; import javax.jms.Queue; import java.util.HashMap; import java.util.Map; /** * @created Airhead 2016/8/2. */ @Component public class ServiceFlowEventService { static final Logger logger = LoggerFactory.getLogger(ServiceFlowEventService.class); @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Autowired private Queue queue; @Autowired private ObjectMapper objectMapper; /** * admin启动时,触发一次所有流程更新事件,用于重启整个服务的情况。 * 同时解决Broker中启动多个采集任务的问题。 */ public void flowRefresh(ServiceFlow serviceFlow) { this.sendMsg(Constants.FlOW_REFRESH, serviceFlow); } /** * 当外界组件通知一个新的processor处理器被定义时,该事件被触发。 * * @param serviceFlow 本次processor处理器变化,所涉及的服务流程Code标识。 * @param packageName processor处理器定义涉及的class包名 * @param className processor处理器定义涉及的class类名 * @param path processor处理器定义涉及的class内容,如果zookeeper数据结构中class分片存储,在业务级接口层面上也进行了合并 */ public void processorAdded(String serviceFlow, String packageName, String className, String path) { sendMsg(Constants.PROCESSOR_ADDED, serviceFlow, packageName, className, path); } public void processorAdded(ServiceFlow serviceFlow) { this.sendMsg(Constants.PROCESSOR_ADDED, serviceFlow); } /** * 当外界组件通知一个已有的processor处理器data部分发生变化时,该事件被触发。 */ public void processorDataChanged(String routeCode, String packageName, String className, String path) { this.sendMsg(Constants.PROCESSOR_DATA_CHANGED, routeCode, packageName, className, path); } public void processorDataChanged(ServiceFlow serviceFlow) { this.sendMsg(Constants.PROCESSOR_DATA_CHANGED, serviceFlow); } public void processorDataDeleted(String routeCode, String packageName, String className) { this.sendMsg(Constants.PROCESSOR_DATA_DELETED, routeCode, packageName, className, null); } public void processorDataDeleted(ServiceFlow serviceFlow) { this.sendMsg(Constants.PROCESSOR_DATA_DELETED, serviceFlow); } /** * 当外界组件通知一个新的RouteDefine路由被定义时,该事件被触发 */ public void routeDefineAdded(String routeCode, String packageName, String className, String path) { this.sendMsg(Constants.ROUTE_DEFINE_ADDED, routeCode, packageName, className, path); } public void routeDefineAdded(ServiceFlow serviceFlow) { this.sendMsg(Constants.ROUTE_DEFINE_ADDED, serviceFlow); } /** * 当外界组件通知一个已有的RouteDefine路由定义被改变时,主要就是路由定义内容被改变时,该事件被触发。 */ public void routeDefineChanged(String routeCode, String packageName, String className, String path) { this.sendMsg(Constants.ROUTE_DEFINE_CHANGED, routeCode, packageName, className, path); } public void routeDefineChanged(ServiceFlow serviceFlow) { this.sendMsg(Constants.ROUTE_DEFINE_CHANGED, serviceFlow); } /** * 当外界组件通知一个已有的RouteDefine路由定义被删除时,该事件被触发。 */ public void routeDefineDelete(String routeCode, String packageName, String className) { this.sendMsg(Constants.ROUTE_DEFINE_DELETED, routeCode, packageName, className, null); } public void routeDefineDelete(ServiceFlow serviceFlow) { this.sendMsg(Constants.ROUTE_DEFINE_DELETED, serviceFlow); } public void routeClassAdded(String routeCode, String packageName, String className, String path, String cron) { this.sendGenMsg(Constants.ROUTE_CLASS_ADDED, routeCode, packageName, className, path, cron); } public void routeClassAdded(ServiceFlow serviceFlow) { this.sendMsg(Constants.ROUTE_CLASS_ADDED, serviceFlow); } public void routeClassChanged(String routeCode, String packageName, String className, String path, String cron) { this.sendGenMsg(Constants.ROUTE_CLASS_CHANGED, routeCode, packageName, className, path, cron); } public void routeClassChanged(ServiceFlow serviceFlow) { this.sendMsg(Constants.ROUTE_CLASS_CHANGED, serviceFlow); } public void processorClassAdded(String routeCode, String packageName, String className, String path) { this.sendMsg(Constants.PROCESSOR_CLASS_ADDED, routeCode, packageName, className, path); } public void processorClassAdded(ServiceFlow serviceFlow) { this.sendMsg(Constants.PROCESSOR_CLASS_ADDED, serviceFlow); } public void processorClassChanged(String routeCode, String packageName, String className, String path) { this.sendMsg(Constants.PROCESSOR_CLASS_CHANGED, routeCode, packageName, className, path); } public void processorClassChanged(ServiceFlow serviceFlow) { this.sendMsg(Constants.PROCESSOR_CLASS_CHANGED, serviceFlow); } private void sendMsg(String event, String routeCode, String packageName, String className, String path) { ServiceFlow flow = new ServiceFlow(); flow.setRouteCode(routeCode); ServiceFlow.HandleFile handleFile = flow.new HandleFile(); // handleFile.setUsage(Constants.FLOW_TYPE_ROUTE); handleFile.setPackageName(packageName); handleFile.setClassName(className); handleFile.setFilePath(path); handleFile.setFileType(Constants.CLASS); flow.addHandleFile(handleFile); this.sendMsg(event, flow); } private void sendGenMsg(String event, String routeCode, String packageName, String className, String path, String cron) { ServiceFlow flow = new ServiceFlow(); flow.setRouteCode(routeCode); ServiceFlow.HandleFile handleFile = flow.new HandleFile(); // handleFile.setUsage(Constants.FLOW_TYPE_ROUTE); handleFile.setPackageName(packageName); handleFile.setClassName(className); handleFile.setFilePath(path); handleFile.setFileType(Constants.JAVA); flow.addHandleFile(handleFile); flow.setCron(cron); this.sendMsg(event, flow); } private void sendMsg(String event, ServiceFlow serviceFlow) { try { String msg = objectMapper.writeValueAsString(serviceFlow); Map header = new HashMap<>(); header.put("event", event); this.jmsMessagingTemplate.convertAndSend(this.queue, msg, header); } catch (JsonProcessingException e) { e.printStackTrace(); logger.error(e.getMessage()); } } }