ServiceFlowEventService.java 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. package com.yihu.hos.services;
  2. import com.fasterxml.jackson.core.JsonProcessingException;
  3. import com.fasterxml.jackson.databind.ObjectMapper;
  4. import com.yihu.hos.common.constants.Constants;
  5. import com.yihu.hos.core.log.Logger;
  6. import com.yihu.hos.core.log.LoggerFactory;
  7. import com.yihu.hos.system.model.bo.ServiceFlow;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.jms.core.JmsMessagingTemplate;
  10. import org.springframework.stereotype.Component;
  11. import javax.jms.Queue;
  12. import java.util.HashMap;
  13. import java.util.Map;
  14. /**
  15. * @created Airhead 2016/8/2.
  16. */
  17. @Component
  18. public class ServiceFlowEventService {
  19. static final Logger logger = LoggerFactory.getLogger(ServiceFlowEventService.class);
  20. @Autowired
  21. private JmsMessagingTemplate jmsMessagingTemplate;
  22. @Autowired
  23. private Queue queue;
  24. @Autowired
  25. private ObjectMapper objectMapper;
  26. /**
  27. * admin启动时,触发一次所有流程更新事件,用于重启整个服务的情况。
  28. * 同时解决Broker中启动多个采集任务的问题。
  29. */
  30. public void flowRefresh(ServiceFlow serviceFlow) {
  31. this.sendMsg(Constants.FlOW_REFRESH, serviceFlow);
  32. }
  33. /**
  34. * 当外界组件通知一个新的processor处理器被定义时,该事件被触发。
  35. *
  36. * @param serviceFlow 本次processor处理器变化,所涉及的服务流程Code标识。
  37. * @param packageName processor处理器定义涉及的class包名
  38. * @param className processor处理器定义涉及的class类名
  39. * @param path processor处理器定义涉及的class内容,如果zookeeper数据结构中class分片存储,在业务级接口层面上也进行了合并
  40. */
  41. public void processorAdded(String serviceFlow, String packageName, String className, String path) {
  42. sendMsg(Constants.PROCESSOR_ADDED, serviceFlow, packageName, className, path);
  43. }
  44. public void processorAdded(ServiceFlow serviceFlow) {
  45. this.sendMsg(Constants.PROCESSOR_ADDED, serviceFlow);
  46. }
  47. /**
  48. * 当外界组件通知一个已有的processor处理器data部分发生变化时,该事件被触发。
  49. */
  50. public void processorDataChanged(String routeCode, String packageName, String className, String path) {
  51. this.sendMsg(Constants.PROCESSOR_DATA_CHANGED, routeCode, packageName, className, path);
  52. }
  53. public void processorDataChanged(ServiceFlow serviceFlow) {
  54. this.sendMsg(Constants.PROCESSOR_DATA_CHANGED, serviceFlow);
  55. }
  56. public void processorDataDeleted(String routeCode, String packageName, String className) {
  57. this.sendMsg(Constants.PROCESSOR_DATA_DELETED, routeCode, packageName, className, null);
  58. }
  59. public void processorDataDeleted(ServiceFlow serviceFlow) {
  60. this.sendMsg(Constants.PROCESSOR_DATA_DELETED, serviceFlow);
  61. }
  62. /**
  63. * 当外界组件通知一个新的RouteDefine路由被定义时,该事件被触发
  64. */
  65. public void routeDefineAdded(String routeCode, String packageName, String className, String path) {
  66. this.sendMsg(Constants.ROUTE_DEFINE_ADDED, routeCode, packageName, className, path);
  67. }
  68. public void routeDefineAdded(ServiceFlow serviceFlow) {
  69. this.sendMsg(Constants.ROUTE_DEFINE_ADDED, serviceFlow);
  70. }
  71. /**
  72. * 当外界组件通知一个已有的RouteDefine路由定义被改变时,主要就是路由定义内容被改变时,该事件被触发。
  73. */
  74. public void routeDefineChanged(String routeCode, String packageName, String className, String path) {
  75. this.sendMsg(Constants.ROUTE_DEFINE_CHANGED, routeCode, packageName, className, path);
  76. }
  77. public void routeDefineChanged(ServiceFlow serviceFlow) {
  78. this.sendMsg(Constants.ROUTE_DEFINE_CHANGED, serviceFlow);
  79. }
  80. /**
  81. * 当外界组件通知一个已有的RouteDefine路由定义被删除时,该事件被触发。
  82. */
  83. public void routeDefineDelete(String routeCode, String packageName, String className) {
  84. this.sendMsg(Constants.ROUTE_DEFINE_DELETED, routeCode, packageName, className, null);
  85. }
  86. public void routeDefineDelete(ServiceFlow serviceFlow) {
  87. this.sendMsg(Constants.ROUTE_DEFINE_DELETED, serviceFlow);
  88. }
  89. public void routeClassAdded(String routeCode, String packageName, String className, String path, String cron) {
  90. this.sendGenMsg(Constants.ROUTE_CLASS_ADDED, routeCode, packageName, className, path, cron);
  91. }
  92. public void routeClassAdded(ServiceFlow serviceFlow) {
  93. this.sendMsg(Constants.ROUTE_CLASS_ADDED, serviceFlow);
  94. }
  95. public void routeClassChanged(String routeCode, String packageName, String className, String path, String cron) {
  96. this.sendGenMsg(Constants.ROUTE_CLASS_CHANGED, routeCode, packageName, className, path, cron);
  97. }
  98. public void routeClassChanged(ServiceFlow serviceFlow) {
  99. this.sendMsg(Constants.ROUTE_CLASS_CHANGED, serviceFlow);
  100. }
  101. public void processorClassAdded(String routeCode, String packageName, String className, String path) {
  102. this.sendMsg(Constants.PROCESSOR_CLASS_ADDED, routeCode, packageName, className, path);
  103. }
  104. public void processorClassAdded(ServiceFlow serviceFlow) {
  105. this.sendMsg(Constants.PROCESSOR_CLASS_ADDED, serviceFlow);
  106. }
  107. public void processorClassChanged(String routeCode, String packageName, String className, String path) {
  108. this.sendMsg(Constants.PROCESSOR_CLASS_CHANGED, routeCode, packageName, className, path);
  109. }
  110. public void processorClassChanged(ServiceFlow serviceFlow) {
  111. this.sendMsg(Constants.PROCESSOR_CLASS_CHANGED, serviceFlow);
  112. }
  113. private void sendMsg(String event, String routeCode, String packageName, String className, String path) {
  114. ServiceFlow flow = new ServiceFlow();
  115. flow.setRouteCode(routeCode);
  116. ServiceFlow.HandleFile handleFile = flow.new HandleFile();
  117. // handleFile.setUsage(Constants.FLOW_TYPE_ROUTE);
  118. handleFile.setPackageName(packageName);
  119. handleFile.setClassName(className);
  120. handleFile.setFilePath(path);
  121. handleFile.setFileType(Constants.CLASS);
  122. flow.addHandleFile(handleFile);
  123. this.sendMsg(event, flow);
  124. }
  125. private void sendGenMsg(String event, String routeCode, String packageName, String className, String path, String cron) {
  126. ServiceFlow flow = new ServiceFlow();
  127. flow.setRouteCode(routeCode);
  128. ServiceFlow.HandleFile handleFile = flow.new HandleFile();
  129. // handleFile.setUsage(Constants.FLOW_TYPE_ROUTE);
  130. handleFile.setPackageName(packageName);
  131. handleFile.setClassName(className);
  132. handleFile.setFilePath(path);
  133. handleFile.setFileType(Constants.JAVA);
  134. flow.addHandleFile(handleFile);
  135. flow.setCron(cron);
  136. this.sendMsg(event, flow);
  137. }
  138. private void sendMsg(String event, ServiceFlow serviceFlow) {
  139. try {
  140. String msg = objectMapper.writeValueAsString(serviceFlow);
  141. Map<String, Object> header = new HashMap<>();
  142. header.put("event", event);
  143. this.jmsMessagingTemplate.convertAndSend(this.queue, msg, header);
  144. } catch (JsonProcessingException e) {
  145. e.printStackTrace();
  146. logger.error(e.getMessage());
  147. }
  148. }
  149. }