package com.yihu.hos.services; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.yihu.hos.common.constants.ContextAttributes; import com.yihu.hos.core.log.Logger; import com.yihu.hos.core.log.LoggerFactory; import com.yihu.hos.interceptor.LocalContext; import com.yihu.hos.web.framework.constant.ServiceFlowConstant; import com.yihu.hos.web.framework.model.bo.ServiceFlow; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.stereotype.Component; import javax.jms.Queue; import java.util.HashMap; import java.util.Map; /** * @created Airhead 2016/8/2. */ @Component public class ServiceFlowEventService { static final Logger logger = LoggerFactory.getLogger(ServiceFlowEventService.class); @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Autowired private Queue queue; @Autowired private ObjectMapper objectMapper; /** * admin启动时,触发一次所有流程更新事件,用于重启整个服务的情况。 * 同时解决Broker中启动多个采集任务的问题。 */ public void serviceFlowStarted(ServiceFlow serviceFlow) { this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_STARTED, serviceFlow); } public void serviceFlowStopped(ServiceFlow serviceFlow) { this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_STOPPED, serviceFlow); } /** * 当外界组件通知一个新的processor处理器被定义时,该事件被触发。 */ public void serviceFlowAdded(ServiceFlow serviceFlow) { this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_ADDED, serviceFlow); } /** * 当外界组件通知一个已有的processor处理器data部分发生变化时,该事件被触发。 */ public void serviceFlowModifiedAdd(ServiceFlow serviceFlow) { this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_MODIFIED_ADD, serviceFlow); } public void serviceFlowModifiedReduce(ServiceFlow serviceFlow) { this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_MODIFIED_REDUCE, serviceFlow); } /** * 当外界组件通知一个新的RouteDefine路由被定义时,该事件被触发 */ public void serviceFlowDelete(ServiceFlow serviceFlow) { this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_DELETED, serviceFlow); } private void sendMsg(String event, ServiceFlow serviceFlow) { try { String msg = objectMapper.writeValueAsString(serviceFlow); Map header = new HashMap<>(); String attachment = LocalContext.getContext().getAttachment(ContextAttributes.TENANT_NAME); header.put("tenant", attachment); header.put("event", event); this.jmsMessagingTemplate.convertAndSend(this.queue, msg, header); } catch (JsonProcessingException e) { e.printStackTrace(); logger.error(e.getMessage()); } } }