ServiceFlowEventService.java 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package com.yihu.hos.services;
  2. import com.fasterxml.jackson.databind.ObjectMapper;
  3. import com.yihu.hos.common.constants.ContextAttributes;
  4. import com.yihu.hos.core.log.Logger;
  5. import com.yihu.hos.core.log.LoggerFactory;
  6. import com.yihu.hos.web.framework.constant.ServiceFlowConstant;
  7. import com.yihu.hos.web.framework.model.bo.ServiceFlow;
  8. import com.yihu.hos.web.framework.thread.LocalContext;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.stereotype.Component;
  11. import org.zbus.broker.ZbusBroker;
  12. import org.zbus.mq.Producer;
  13. import org.zbus.net.http.Message;
  14. import java.io.IOException;
  15. /**
  16. * @created Airhead 2016/8/2.
  17. */
  18. @Component
  19. public class ServiceFlowEventService {
  20. static final Logger logger = LoggerFactory.getLogger(ServiceFlowEventService.class);
  21. @Autowired
  22. private ObjectMapper objectMapper;
  23. private ZbusBroker zbusBroker;
  24. @Autowired
  25. public void setZbusBroker(ZbusBroker zbusBroker) {
  26. this.zbusBroker = zbusBroker;
  27. }
  28. /**
  29. * admin启动时,触发一次所有流程更新事件,用于重启整个服务的情况。
  30. * 同时解决Broker中启动多个采集任务的问题。
  31. */
  32. public void serviceFlowStarted(ServiceFlow serviceFlow) {
  33. this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_STARTED, serviceFlow);
  34. }
  35. public void serviceFlowStopped(ServiceFlow serviceFlow) {
  36. this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_STOPPED, serviceFlow);
  37. }
  38. /**
  39. * 当外界组件通知一个新的processor处理器被定义时,该事件被触发。
  40. */
  41. public void serviceFlowAdded(ServiceFlow serviceFlow) {
  42. this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_ADDED, serviceFlow);
  43. }
  44. /**
  45. * 当外界组件通知一个已有的processor处理器data部分发生变化时,该事件被触发。
  46. */
  47. public void serviceFlowModifiedAdd(ServiceFlow serviceFlow) {
  48. this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_MODIFIED_ADD, serviceFlow);
  49. }
  50. public void serviceFlowModifiedReduce(ServiceFlow serviceFlow) {
  51. this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_MODIFIED_REDUCE, serviceFlow);
  52. }
  53. /**
  54. * 当外界组件通知一个新的RouteDefine路由被定义时,该事件被触发
  55. */
  56. public void serviceFlowDelete(ServiceFlow serviceFlow) {
  57. this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_DELETED, serviceFlow);
  58. }
  59. private void sendMsg(String event, ServiceFlow serviceFlow) {
  60. if (zbusBroker == null) {
  61. logger.error("zbusBroker is null.");
  62. return;
  63. }
  64. try {
  65. String msg = objectMapper.writeValueAsString(serviceFlow);
  66. String tenant = LocalContext.getContext().getAttachment(ContextAttributes.TENANT_NAME);
  67. if (tenant!=null) {
  68. Producer producer = new Producer(zbusBroker, ServiceFlowConstant.ZBUS_MQ + "@" + tenant);
  69. producer.createMQ(); //确定为创建消息队列需要显示调用
  70. Message message = new Message();
  71. message.setHead("event", event);
  72. message.setHead("tenant", tenant);
  73. message.setMethod("POST");
  74. message.setBody(msg);
  75. producer.sendSync(message);
  76. }
  77. } catch (IOException | InterruptedException e) {
  78. logger.error(e.getMessage());
  79. e.printStackTrace();
  80. }
  81. }
  82. }