|
@ -3,13 +3,11 @@ package com.yihu.hos.arbiter.services;
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
import com.mongodb.BasicDBObject;
|
|
|
import com.mongodb.WriteResult;
|
|
|
import com.yihu.hos.arbiter.models.BrokerServer;
|
|
|
import com.yihu.hos.web.framework.model.bo.BrokerServer;
|
|
|
import com.yihu.hos.core.http.HTTPResponse;
|
|
|
import com.yihu.hos.core.http.HttpClientKit;
|
|
|
import com.yihu.hos.web.framework.constant.ServiceFlowConstant;
|
|
|
import com.yihu.hos.web.framework.model.bo.ServiceFlow;
|
|
|
import org.apache.camel.Body;
|
|
|
import org.apache.camel.Headers;
|
|
|
import org.apache.log4j.LogManager;
|
|
|
import org.apache.log4j.Logger;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
@ -22,7 +20,6 @@ import org.zbus.broker.ZbusBroker;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
|
|
|
/**
|
|
|
* Broker原则上具有等同性,这样Arbiter无论选择了哪个Broker能提供的服务都是一样的。
|
|
@ -108,9 +105,9 @@ public class ServiceFlowService {
|
|
|
List<BrokerServer> brokerServerList;
|
|
|
brokerServerList = brokerServerService.get(one);
|
|
|
for (BrokerServer broker : brokerServerList) {
|
|
|
boolean result = sendMessage(broker, "post", "/esb/serviceFlow/start", msg);
|
|
|
boolean result = sendMessage(broker, "post", "/esb/serviceFlow/serverServiceFlow", msg);
|
|
|
if (!result) {
|
|
|
logger.error("sendMessage to broker server failed, broker:" + broker.getURL() + ", msg:" + msg);
|
|
|
logger.error("sendMessage to broker start failed, broker:" + broker.getURL() + ", msg:" + msg);
|
|
|
continue;
|
|
|
}
|
|
|
|
|
@ -139,7 +136,7 @@ public class ServiceFlowService {
|
|
|
|
|
|
boolean result = sendMessage(broker, "post", "/esb/serviceFlow/stop", msg);
|
|
|
if (!result) {
|
|
|
logger.error("sendMessage to broker server failed, broker:" + broker.getURL() + ", msg:" + msg);
|
|
|
logger.error("sendMessage to broker start failed, broker:" + broker.getURL() + ", msg:" + msg);
|
|
|
continue;
|
|
|
}
|
|
|
|
|
@ -162,7 +159,7 @@ public class ServiceFlowService {
|
|
|
BrokerServer brokerServer = brokerServerService.get();
|
|
|
boolean result = sendMessage(brokerServer, "post", "/esb/serviceFlow", msg);
|
|
|
if (!result) {
|
|
|
logger.error("sendMessage to broker server failed, broker:" + brokerServer.getURL() + ", msg:" + msg);
|
|
|
logger.error("sendMessage to broker start failed, broker:" + brokerServer.getURL() + ", msg:" + msg);
|
|
|
return;
|
|
|
}
|
|
|
|
|
@ -175,7 +172,7 @@ public class ServiceFlowService {
|
|
|
for (BrokerServer broker : brokerServerList) {
|
|
|
boolean result = sendMessage(broker, "post", "/esb/serviceFlow", msg);
|
|
|
if (!result) {
|
|
|
logger.error("sendMessage to broker server failed, broker:" + broker.getURL() + ", msg:" + msg);
|
|
|
logger.error("sendMessage to broker start failed, broker:" + broker.getURL() + ", msg:" + msg);
|
|
|
continue;
|
|
|
}
|
|
|
|
|
@ -203,7 +200,7 @@ public class ServiceFlowService {
|
|
|
|
|
|
boolean result = sendMessage(brokerList.get(0), "put", "/esb/serviceFlow/add", msg);
|
|
|
if (!result) {
|
|
|
logger.error("sendMessage to broker server failed, broker:" + brokerList.get(0).getURL() + ", msg:" + msg);
|
|
|
logger.error("sendMessage to broker start failed, broker:" + brokerList.get(0).getURL() + ", msg:" + msg);
|
|
|
|
|
|
return;
|
|
|
}
|
|
@ -218,7 +215,7 @@ public class ServiceFlowService {
|
|
|
for (BrokerServer broker : brokerServerList) {
|
|
|
boolean result = sendMessage(broker, "put", "/esb/serviceFlow/add", msg);
|
|
|
if (!result) {
|
|
|
logger.error("sendMessage to broker server failed, broker:" + broker.getURL() + ", msg:" + msg);
|
|
|
logger.error("sendMessage to broker start failed, broker:" + broker.getURL() + ", msg:" + msg);
|
|
|
|
|
|
continue;
|
|
|
}
|
|
@ -247,7 +244,7 @@ public class ServiceFlowService {
|
|
|
|
|
|
boolean result = sendMessage(brokerList.get(0), "put", "/esb/serviceFlow/reduce", msg);
|
|
|
if (!result) {
|
|
|
logger.error("sendMessage to broker server failed, broker:" + brokerList.get(0).getURL() + ", msg:" + msg);
|
|
|
logger.error("sendMessage to broker start failed, broker:" + brokerList.get(0).getURL() + ", msg:" + msg);
|
|
|
|
|
|
return;
|
|
|
}
|
|
@ -262,7 +259,7 @@ public class ServiceFlowService {
|
|
|
for (BrokerServer broker : brokerServerList) {
|
|
|
boolean result = sendMessage(broker, "put", "/esb/serviceFlow/reduce", msg);
|
|
|
if (!result) {
|
|
|
logger.error("sendMessage to broker server failed, broker:" + broker.getURL() + ", msg:" + msg);
|
|
|
logger.error("sendMessage to broker start failed, broker:" + broker.getURL() + ", msg:" + msg);
|
|
|
|
|
|
continue;
|
|
|
}
|
|
@ -291,7 +288,7 @@ public class ServiceFlowService {
|
|
|
|
|
|
boolean result = sendMessage(brokerList.get(0), "delete", "/esb/serviceFlow", msg);
|
|
|
if (!result) {
|
|
|
logger.error("sendMessage to broker server failed, broker:" + brokerList.get(0).getURL() + ", msg:" + msg);
|
|
|
logger.error("sendMessage to broker start failed, broker:" + brokerList.get(0).getURL() + ", msg:" + msg);
|
|
|
return;
|
|
|
}
|
|
|
|
|
@ -304,7 +301,7 @@ public class ServiceFlowService {
|
|
|
for (BrokerServer broker : brokerServerList) {
|
|
|
boolean result = sendMessage(broker, "delete", "/esb/serviceFlow", msg);
|
|
|
if (!result) {
|
|
|
logger.error("sendMessage to broker server failed, broker:" + broker.getURL() + ", msg:" + msg);
|
|
|
logger.error("sendMessage to broker start failed, broker:" + broker.getURL() + ", msg:" + msg);
|
|
|
|
|
|
continue;
|
|
|
}
|
|
@ -330,9 +327,9 @@ public class ServiceFlowService {
|
|
|
|
|
|
BrokerServer brokerServer = objectMapper.readValue(msg, BrokerServer.class);
|
|
|
String serviceFlowMsg = objectMapper.writeValueAsString(serviceFlow);
|
|
|
boolean result = sendMessage(brokerServer, "post", "/esb/serviceFlow/start", serviceFlowMsg);
|
|
|
boolean result = sendMessage(brokerServer, "post", "/esb/serviceFlow/serverServiceFlow", serviceFlowMsg);
|
|
|
if (!result) {
|
|
|
logger.error("sendMessage to broker server failed, broker:" + brokerServer.getURL() + ", msg:" + serviceFlowMsg);
|
|
|
logger.error("sendMessage to broker start failed, broker:" + brokerServer.getURL() + ", msg:" + serviceFlowMsg);
|
|
|
|
|
|
return;
|
|
|
}
|
|
@ -349,37 +346,6 @@ public class ServiceFlowService {
|
|
|
//可以不用处理。
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* SAAS化的管理端过来的消息会被proxy进行中转,之后发送到终端的Arbiter对Broker进行实际的控制。
|
|
|
*
|
|
|
* @param header 消息头部信息
|
|
|
* @param msg 消息
|
|
|
*/
|
|
|
public void proxy(@Headers Map<String, String> header, @Body String msg) {
|
|
|
// if (zbusBroker == null) {
|
|
|
// logger.error("zbusBroker is null.");
|
|
|
// return;
|
|
|
// }
|
|
|
//
|
|
|
// try {
|
|
|
// Producer producer = new Producer(zbusBroker, ServiceFlowConstant.SSH + "@" + header.get("tenant"));
|
|
|
// producer.createMQ(); //确定为创建消息队列需要显示调用
|
|
|
// Message message = new Message();
|
|
|
// message.setHead("event", header.get("event"));
|
|
|
// message.setHead("tenant", header.get("tenant"));
|
|
|
//// message.setHead(header);
|
|
|
// message.setMethod("POST");
|
|
|
// message.setBody(msg);
|
|
|
// message = producer.sendSync(message);
|
|
|
// logger.debug(message);
|
|
|
//// System.out.println("test");
|
|
|
// } catch (IOException | InterruptedException e) {
|
|
|
// logger.error(e.getMessage());
|
|
|
// e.printStackTrace();
|
|
|
// }
|
|
|
|
|
|
}
|
|
|
|
|
|
@Autowired
|
|
|
public void setZbusBroker(ZbusBroker zbusBroker) {
|
|
|
this.zbusBroker = zbusBroker;
|