12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788 |
- package com.yihu.hos.services;
- import com.fasterxml.jackson.core.JsonProcessingException;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import com.yihu.hos.common.constants.ContextAttributes;
- import com.yihu.hos.core.log.Logger;
- import com.yihu.hos.core.log.LoggerFactory;
- import com.yihu.hos.interceptor.LocalContext;
- import com.yihu.hos.web.framework.constant.ServiceFlowConstant;
- import com.yihu.hos.web.framework.model.bo.ServiceFlow;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.jms.core.JmsMessagingTemplate;
- import org.springframework.stereotype.Component;
- import javax.jms.Queue;
- import java.util.HashMap;
- import java.util.Map;
- /**
- * @created Airhead 2016/8/2.
- */
- @Component
- public class ServiceFlowEventService {
- static final Logger logger = LoggerFactory.getLogger(ServiceFlowEventService.class);
- @Autowired
- private JmsMessagingTemplate jmsMessagingTemplate;
- @Autowired
- private Queue queue;
- @Autowired
- private ObjectMapper objectMapper;
- /**
- * admin启动时,触发一次所有流程更新事件,用于重启整个服务的情况。
- * 同时解决Broker中启动多个采集任务的问题。
- */
- public void serviceFlowStarted(ServiceFlow serviceFlow) {
- this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_STARTED, serviceFlow);
- }
- public void serviceFlowStopped(ServiceFlow serviceFlow) {
- this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_STOPPED, serviceFlow);
- }
- /**
- * 当外界组件通知一个新的processor处理器被定义时,该事件被触发。
- */
- public void serviceFlowAdded(ServiceFlow serviceFlow) {
- this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_ADDED, serviceFlow);
- }
- /**
- * 当外界组件通知一个已有的processor处理器data部分发生变化时,该事件被触发。
- */
- public void serviceFlowModifiedAdd(ServiceFlow serviceFlow) {
- this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_MODIFIED_ADD, serviceFlow);
- }
- public void serviceFlowModifiedReduce(ServiceFlow serviceFlow) {
- this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_MODIFIED_REDUCE, serviceFlow);
- }
- /**
- * 当外界组件通知一个新的RouteDefine路由被定义时,该事件被触发
- */
- public void serviceFlowDelete(ServiceFlow serviceFlow) {
- this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_DELETED, serviceFlow);
- }
- private void sendMsg(String event, ServiceFlow serviceFlow) {
- try {
- String msg = objectMapper.writeValueAsString(serviceFlow);
- Map<String, Object> header = new HashMap<>();
- String attachment = LocalContext.getContext().getAttachment(ContextAttributes.TENANT_NAME);
- serviceFlow.setTenant(attachment);
- header.put("tenant", attachment);
- header.put("event", event);
- this.jmsMessagingTemplate.convertAndSend(this.queue, msg, header);
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- logger.error(e.getMessage());
- }
- }
- }
|