| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899 | 
							- package com.yihu.hos.services;
 
- import com.fasterxml.jackson.databind.ObjectMapper;
 
- import com.yihu.hos.common.constants.ContextAttributes;
 
- import com.yihu.hos.core.log.Logger;
 
- import com.yihu.hos.core.log.LoggerFactory;
 
- import com.yihu.hos.web.framework.constant.ServiceFlowConstant;
 
- import com.yihu.hos.web.framework.model.bo.ServiceFlow;
 
- import com.yihu.hos.web.framework.thread.LocalContext;
 
- import org.springframework.beans.factory.annotation.Autowired;
 
- import org.springframework.stereotype.Component;
 
- import org.zbus.broker.ZbusBroker;
 
- import org.zbus.mq.Producer;
 
- import org.zbus.net.http.Message;
 
- import java.io.IOException;
 
- /**
 
-  * @created Airhead 2016/8/2.
 
-  */
 
- @Component
 
- public class ServiceFlowEventService {
 
-     static final Logger logger = LoggerFactory.getLogger(ServiceFlowEventService.class);
 
-     @Autowired
 
-     private ObjectMapper objectMapper;
 
-     private ZbusBroker zbusBroker;
 
-     @Autowired
 
-     public void setZbusBroker(ZbusBroker zbusBroker) {
 
-         this.zbusBroker = zbusBroker;
 
-     }
 
-     /**
 
-      * admin启动时,触发一次所有流程更新事件,用于重启整个服务的情况。
 
-      * 同时解决Broker中启动多个采集任务的问题。
 
-      */
 
-     public void serviceFlowStarted(ServiceFlow serviceFlow) {
 
-         this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_STARTED, serviceFlow);
 
-     }
 
-     public void serviceFlowStopped(ServiceFlow serviceFlow) {
 
-         this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_STOPPED, serviceFlow);
 
-     }
 
-     /**
 
-      * 当外界组件通知一个新的processor处理器被定义时,该事件被触发。
 
-      */
 
-     public void serviceFlowAdded(ServiceFlow serviceFlow) {
 
-         this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_ADDED, serviceFlow);
 
-     }
 
-     /**
 
-      * 当外界组件通知一个已有的processor处理器data部分发生变化时,该事件被触发。
 
-      */
 
-     public void serviceFlowModifiedAdd(ServiceFlow serviceFlow) {
 
-         this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_MODIFIED_ADD, serviceFlow);
 
-     }
 
-     public void serviceFlowModifiedReduce(ServiceFlow serviceFlow) {
 
-         this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_MODIFIED_REDUCE, serviceFlow);
 
-     }
 
-     /**
 
-      * 当外界组件通知一个新的RouteDefine路由被定义时,该事件被触发
 
-      */
 
-     public void serviceFlowDelete(ServiceFlow serviceFlow) {
 
-         this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_DELETED, serviceFlow);
 
-     }
 
-     private void sendMsg(String event, ServiceFlow serviceFlow) {
 
-         if (zbusBroker == null) {
 
-             logger.error("zbusBroker is null.");
 
-             return;
 
-         }
 
-         try {
 
-             String msg = objectMapper.writeValueAsString(serviceFlow);
 
-             String tenant = LocalContext.getContext().getAttachment(ContextAttributes.TENANT_NAME);
 
-             if (tenant!=null) {
 
-                 Producer producer = new Producer(zbusBroker, ServiceFlowConstant.ZBUS_MQ + "@" + tenant);
 
-                 producer.createMQ();    //确定为创建消息队列需要显示调用
 
-                 Message message = new Message();
 
-                 message.setHead("event", event);
 
-                 message.setHead("tenant", tenant);
 
-                 message.setMethod("POST");
 
-                 message.setBody(msg);
 
-                 producer.sendSync(message);
 
-             }
 
-         } catch (IOException | InterruptedException e) {
 
-             logger.error(e.getMessage());
 
-             e.printStackTrace();
 
-         }
 
-     }
 
- }
 
 
  |