ServiceFlowEventService.java 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. package com.yihu.hos.services;
  2. import com.fasterxml.jackson.core.JsonProcessingException;
  3. import com.fasterxml.jackson.databind.ObjectMapper;
  4. import com.yihu.hos.common.constants.ContextAttributes;
  5. import com.yihu.hos.core.log.Logger;
  6. import com.yihu.hos.core.log.LoggerFactory;
  7. import com.yihu.hos.interceptor.LocalContext;
  8. import com.yihu.hos.web.framework.constant.ServiceFlowConstant;
  9. import com.yihu.hos.web.framework.model.bo.ServiceFlow;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.jms.core.JmsMessagingTemplate;
  12. import org.springframework.stereotype.Component;
  13. import javax.jms.Queue;
  14. import java.util.HashMap;
  15. import java.util.Map;
  16. /**
  17. * @created Airhead 2016/8/2.
  18. */
  19. @Component
  20. public class ServiceFlowEventService {
  21. static final Logger logger = LoggerFactory.getLogger(ServiceFlowEventService.class);
  22. @Autowired
  23. private JmsMessagingTemplate jmsMessagingTemplate;
  24. @Autowired
  25. private Queue queue;
  26. @Autowired
  27. private ObjectMapper objectMapper;
  28. /**
  29. * admin启动时,触发一次所有流程更新事件,用于重启整个服务的情况。
  30. * 同时解决Broker中启动多个采集任务的问题。
  31. */
  32. public void serviceFlowStarted(ServiceFlow serviceFlow) {
  33. this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_STARTED, serviceFlow);
  34. }
  35. public void serviceFlowStopped(ServiceFlow serviceFlow) {
  36. this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_STOPPED, serviceFlow);
  37. }
  38. /**
  39. * 当外界组件通知一个新的processor处理器被定义时,该事件被触发。
  40. */
  41. public void serviceFlowAdded(ServiceFlow serviceFlow) {
  42. this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_ADDED, serviceFlow);
  43. }
  44. /**
  45. * 当外界组件通知一个已有的processor处理器data部分发生变化时,该事件被触发。
  46. */
  47. public void serviceFlowModifiedAdd(ServiceFlow serviceFlow) {
  48. this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_MODIFIED_ADD, serviceFlow);
  49. }
  50. public void serviceFlowModifiedReduce(ServiceFlow serviceFlow) {
  51. this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_MODIFIED_REDUCE, serviceFlow);
  52. }
  53. /**
  54. * 当外界组件通知一个新的RouteDefine路由被定义时,该事件被触发
  55. */
  56. public void serviceFlowDelete(ServiceFlow serviceFlow) {
  57. this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_DELETED, serviceFlow);
  58. }
  59. private void sendMsg(String event, ServiceFlow serviceFlow) {
  60. try {
  61. String msg = objectMapper.writeValueAsString(serviceFlow);
  62. Map<String, Object> header = new HashMap<>();
  63. String attachment = LocalContext.getContext().getAttachment(ContextAttributes.TENANT_NAME);
  64. serviceFlow.setTenant(attachment);
  65. header.put("tenant", attachment);
  66. header.put("event", event);
  67. this.jmsMessagingTemplate.convertAndSend(this.queue, msg, header);
  68. } catch (JsonProcessingException e) {
  69. e.printStackTrace();
  70. logger.error(e.getMessage());
  71. }
  72. }
  73. }