123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899 |
- package com.yihu.hos.services;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import com.yihu.hos.common.constants.ContextAttributes;
- import com.yihu.hos.core.log.Logger;
- import com.yihu.hos.core.log.LoggerFactory;
- import com.yihu.hos.web.framework.constant.ServiceFlowConstant;
- import com.yihu.hos.web.framework.model.bo.ServiceFlow;
- import com.yihu.hos.web.framework.thread.LocalContext;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- import org.zbus.broker.ZbusBroker;
- import org.zbus.mq.Producer;
- import org.zbus.net.http.Message;
- import java.io.IOException;
- /**
- * @created Airhead 2016/8/2.
- */
- @Component
- public class ServiceFlowEventService {
- static final Logger logger = LoggerFactory.getLogger(ServiceFlowEventService.class);
- @Autowired
- private ObjectMapper objectMapper;
- private ZbusBroker zbusBroker;
- @Autowired
- public void setZbusBroker(ZbusBroker zbusBroker) {
- this.zbusBroker = zbusBroker;
- }
- /**
- * admin启动时,触发一次所有流程更新事件,用于重启整个服务的情况。
- * 同时解决Broker中启动多个采集任务的问题。
- */
- public void serviceFlowStarted(ServiceFlow serviceFlow) {
- this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_STARTED, serviceFlow);
- }
- public void serviceFlowStopped(ServiceFlow serviceFlow) {
- this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_STOPPED, serviceFlow);
- }
- /**
- * 当外界组件通知一个新的processor处理器被定义时,该事件被触发。
- */
- public void serviceFlowAdded(ServiceFlow serviceFlow) {
- this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_ADDED, serviceFlow);
- }
- /**
- * 当外界组件通知一个已有的processor处理器data部分发生变化时,该事件被触发。
- */
- public void serviceFlowModifiedAdd(ServiceFlow serviceFlow) {
- this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_MODIFIED_ADD, serviceFlow);
- }
- public void serviceFlowModifiedReduce(ServiceFlow serviceFlow) {
- this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_MODIFIED_REDUCE, serviceFlow);
- }
- /**
- * 当外界组件通知一个新的RouteDefine路由被定义时,该事件被触发
- */
- public void serviceFlowDelete(ServiceFlow serviceFlow) {
- this.sendMsg(ServiceFlowConstant.SERVICE_FLOW_DELETED, serviceFlow);
- }
- private void sendMsg(String event, ServiceFlow serviceFlow) {
- if (zbusBroker == null) {
- logger.error("zbusBroker is null.");
- return;
- }
- try {
- String msg = objectMapper.writeValueAsString(serviceFlow);
- String tenant = LocalContext.getContext().getAttachment(ContextAttributes.TENANT_NAME);
- if (tenant!=null) {
- Producer producer = new Producer(zbusBroker, ServiceFlowConstant.ZBUS_MQ + "@" + tenant);
- producer.createMQ(); //确定为创建消息队列需要显示调用
- Message message = new Message();
- message.setHead("event", event);
- message.setHead("tenant", tenant);
- message.setMethod("POST");
- message.setBody(msg);
- producer.sendSync(message);
- }
- } catch (IOException | InterruptedException e) {
- logger.error(e.getMessage());
- e.printStackTrace();
- }
- }
- }
|