123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180 |
- package com.yihu.hos.services;
- import com.fasterxml.jackson.core.JsonProcessingException;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import com.yihu.hos.common.constants.Constants;
- import com.yihu.hos.core.log.Logger;
- import com.yihu.hos.core.log.LoggerFactory;
- import com.yihu.hos.system.model.bo.ServiceFlow;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.jms.core.JmsMessagingTemplate;
- import org.springframework.stereotype.Component;
- import javax.jms.Queue;
- import java.util.HashMap;
- import java.util.Map;
- /**
- * @created Airhead 2016/8/2.
- */
- @Component
- public class ServiceFlowEventService {
- static final Logger logger = LoggerFactory.getLogger(ServiceFlowEventService.class);
- @Autowired
- private JmsMessagingTemplate jmsMessagingTemplate;
- @Autowired
- private Queue queue;
- @Autowired
- private ObjectMapper objectMapper;
- /**
- * admin启动时,触发一次所有流程更新事件,用于重启整个服务的情况。
- * 同时解决Broker中启动多个采集任务的问题。
- */
- public void flowRefresh(ServiceFlow serviceFlow) {
- this.sendMsg(Constants.FlOW_REFRESH, serviceFlow);
- }
- /**
- * 当外界组件通知一个新的processor处理器被定义时,该事件被触发。
- *
- * @param serviceFlow 本次processor处理器变化,所涉及的服务流程Code标识。
- * @param packageName processor处理器定义涉及的class包名
- * @param className processor处理器定义涉及的class类名
- * @param path processor处理器定义涉及的class内容,如果zookeeper数据结构中class分片存储,在业务级接口层面上也进行了合并
- */
- public void processorAdded(String serviceFlow, String packageName, String className, String path) {
- sendMsg(Constants.PROCESSOR_ADDED, serviceFlow, packageName, className, path);
- }
- public void processorAdded(ServiceFlow serviceFlow) {
- this.sendMsg(Constants.PROCESSOR_ADDED, serviceFlow);
- }
- /**
- * 当外界组件通知一个已有的processor处理器data部分发生变化时,该事件被触发。
- */
- public void processorDataChanged(String routeCode, String packageName, String className, String path) {
- this.sendMsg(Constants.PROCESSOR_DATA_CHANGED, routeCode, packageName, className, path);
- }
- public void processorDataChanged(ServiceFlow serviceFlow) {
- this.sendMsg(Constants.PROCESSOR_DATA_CHANGED, serviceFlow);
- }
- public void processorDataDeleted(String routeCode, String packageName, String className) {
- this.sendMsg(Constants.PROCESSOR_DATA_DELETED, routeCode, packageName, className, null);
- }
- public void processorDataDeleted(ServiceFlow serviceFlow) {
- this.sendMsg(Constants.PROCESSOR_DATA_DELETED, serviceFlow);
- }
- /**
- * 当外界组件通知一个新的RouteDefine路由被定义时,该事件被触发
- */
- public void routeDefineAdded(String routeCode, String packageName, String className, String path) {
- this.sendMsg(Constants.ROUTE_DEFINE_ADDED, routeCode, packageName, className, path);
- }
- public void routeDefineAdded(ServiceFlow serviceFlow) {
- this.sendMsg(Constants.ROUTE_DEFINE_ADDED, serviceFlow);
- }
- /**
- * 当外界组件通知一个已有的RouteDefine路由定义被改变时,主要就是路由定义内容被改变时,该事件被触发。
- */
- public void routeDefineChanged(String routeCode, String packageName, String className, String path) {
- this.sendMsg(Constants.ROUTE_DEFINE_CHANGED, routeCode, packageName, className, path);
- }
- public void routeDefineChanged(ServiceFlow serviceFlow) {
- this.sendMsg(Constants.ROUTE_DEFINE_CHANGED, serviceFlow);
- }
- /**
- * 当外界组件通知一个已有的RouteDefine路由定义被删除时,该事件被触发。
- */
- public void routeDefineDelete(String routeCode, String packageName, String className) {
- this.sendMsg(Constants.ROUTE_DEFINE_DELETED, routeCode, packageName, className, null);
- }
- public void routeDefineDelete(ServiceFlow serviceFlow) {
- this.sendMsg(Constants.ROUTE_DEFINE_DELETED, serviceFlow);
- }
- public void routeClassAdded(String routeCode, String packageName, String className, String path, String cron) {
- this.sendGenMsg(Constants.ROUTE_CLASS_ADDED, routeCode, packageName, className, path, cron);
- }
- public void routeClassAdded(ServiceFlow serviceFlow) {
- this.sendMsg(Constants.ROUTE_CLASS_ADDED, serviceFlow);
- }
- public void routeClassChanged(String routeCode, String packageName, String className, String path, String cron) {
- this.sendGenMsg(Constants.ROUTE_CLASS_CHANGED, routeCode, packageName, className, path, cron);
- }
- public void routeClassChanged(ServiceFlow serviceFlow) {
- this.sendMsg(Constants.ROUTE_CLASS_CHANGED, serviceFlow);
- }
- public void processorClassAdded(String routeCode, String packageName, String className, String path) {
- this.sendMsg(Constants.PROCESSOR_CLASS_ADDED, routeCode, packageName, className, path);
- }
- public void processorClassAdded(ServiceFlow serviceFlow) {
- this.sendMsg(Constants.PROCESSOR_CLASS_ADDED, serviceFlow);
- }
- public void processorClassChanged(String routeCode, String packageName, String className, String path) {
- this.sendMsg(Constants.PROCESSOR_CLASS_CHANGED, routeCode, packageName, className, path);
- }
- public void processorClassChanged(ServiceFlow serviceFlow) {
- this.sendMsg(Constants.PROCESSOR_CLASS_CHANGED, serviceFlow);
- }
- private void sendMsg(String event, String routeCode, String packageName, String className, String path) {
- ServiceFlow flow = new ServiceFlow();
- flow.setRouteCode(routeCode);
- ServiceFlow.HandleFile handleFile = flow.new HandleFile();
- // handleFile.setUsage(Constants.FLOW_TYPE_ROUTE);
- handleFile.setPackageName(packageName);
- handleFile.setClassName(className);
- handleFile.setFilePath(path);
- handleFile.setFileType(Constants.CLASS);
- flow.addHandleFile(handleFile);
- this.sendMsg(event, flow);
- }
- private void sendGenMsg(String event, String routeCode, String packageName, String className, String path, String cron) {
- ServiceFlow flow = new ServiceFlow();
- flow.setRouteCode(routeCode);
- ServiceFlow.HandleFile handleFile = flow.new HandleFile();
- // handleFile.setUsage(Constants.FLOW_TYPE_ROUTE);
- handleFile.setPackageName(packageName);
- handleFile.setClassName(className);
- handleFile.setFilePath(path);
- handleFile.setFileType(Constants.JAVA);
- flow.addHandleFile(handleFile);
- flow.setCron(cron);
- this.sendMsg(event, flow);
- }
- private void sendMsg(String event, ServiceFlow serviceFlow) {
- try {
- String msg = objectMapper.writeValueAsString(serviceFlow);
- Map<String, Object> header = new HashMap<>();
- header.put("event", event);
- this.jmsMessagingTemplate.convertAndSend(this.queue, msg, header);
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- logger.error(e.getMessage());
- }
- }
- }
|