|
@ -0,0 +1,408 @@
|
|
|
package com.yihu.hos.arbiter.services;
|
|
|
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
import com.yihu.hos.core.http.HTTPResponse;
|
|
|
import com.yihu.hos.core.http.HttpClientKit;
|
|
|
import com.yihu.hos.core.log.Logger;
|
|
|
import com.yihu.hos.core.log.LoggerFactory;
|
|
|
import com.yihu.hos.web.framework.constant.ServiceFlowConstant;
|
|
|
import com.yihu.hos.web.framework.model.bo.BrokerServer;
|
|
|
import com.yihu.hos.web.framework.model.bo.ServiceFlow;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.data.mongodb.core.MongoOperations;
|
|
|
import org.springframework.data.mongodb.core.query.Criteria;
|
|
|
import org.springframework.data.mongodb.core.query.Query;
|
|
|
import org.springframework.data.mongodb.core.query.Update;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.List;
|
|
|
|
|
|
/**
|
|
|
* Broker原则上具有等同性,这样Arbiter无论选择了哪个Broker能提供的服务都是一样的。
|
|
|
* 但是因为Broker上还是会运行一些定时的采集任务,这些采集任务如果是多台Broker运行的话,可能会引起数据重复问题。
|
|
|
* 所以在事件触发时需要做一些策略的调整:
|
|
|
* 1.实时任务,通知所有的Broker进行更新路由
|
|
|
* 2.采集任务,只通知其中的一台进行更新路由
|
|
|
*
|
|
|
* @created Airhead 2016/8/16.
|
|
|
*/
|
|
|
@Service("serviceFlowService")
|
|
|
public class ServiceFlowService {
|
|
|
private static final Logger logger = LoggerFactory.getLogger(BrokerServerService.class);
|
|
|
|
|
|
@Autowired
|
|
|
private MongoOperations mongoOperations;
|
|
|
@Autowired
|
|
|
private ObjectMapper objectMapper;
|
|
|
@Autowired
|
|
|
private BrokerServerService brokerServerService;
|
|
|
|
|
|
public ServiceFlow save(ServiceFlow serviceFlow) {
|
|
|
if (serviceFlow == null) {
|
|
|
logger.error("ServiceFlow is null");
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
Query query = new Query();
|
|
|
query.addCriteria(Criteria.where("routeCode").is(serviceFlow.getRouteCode()));
|
|
|
Update update = new Update();
|
|
|
update.set("routeCode", serviceFlow.getRouteCode());
|
|
|
update.set("updated", serviceFlow.getUpdated());
|
|
|
update.set("flowType", serviceFlow.getFlowType());
|
|
|
update.set("tenant", serviceFlow.getTenant());
|
|
|
|
|
|
//删除旧记录
|
|
|
update.set("handleFiles", serviceFlow.getHandleFiles());
|
|
|
|
|
|
// update1.set("handleFiles.$.className", handleFile.getClassName());
|
|
|
// Query query1 = Query.query(new Criteria().andOperator(Criteria.where("className").is( handleFile.getClassName()),Criteria.where("handleFiles").elemMatch(Criteria.where("className").is( handleFile.getClassName()))));
|
|
|
// mongoOperations.updateFirst(query1, update1, ServiceFlow.class);
|
|
|
|
|
|
mongoOperations.upsert(query, update, ServiceFlow.class);
|
|
|
|
|
|
return mongoOperations.findOne(query, ServiceFlow.class);
|
|
|
}
|
|
|
|
|
|
public String get(String serviceName) {
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
public void delete(ServiceFlow serviceFlow) {
|
|
|
mongoOperations.remove(serviceFlow);
|
|
|
}
|
|
|
|
|
|
public List<ServiceFlow> getAll() {
|
|
|
return mongoOperations.findAll(ServiceFlow.class);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* admin发过来的服务流程启动事件处理。
|
|
|
*
|
|
|
* @param msg serviceFlow
|
|
|
*/
|
|
|
public void serviceFlowStarted(String msg) {
|
|
|
try {
|
|
|
ServiceFlow serviceFlow = getServiceFlow(msg);
|
|
|
serviceFlow = this.save(serviceFlow);
|
|
|
|
|
|
boolean one = ServiceFlowConstant.JAVA.equals(serviceFlow.getFlowType());//java类型为采集任务
|
|
|
if (one && isStarted(serviceFlow)) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
List<BrokerServer> brokerServerList;
|
|
|
brokerServerList = brokerServerService.get(one);
|
|
|
for (BrokerServer broker : brokerServerList) {
|
|
|
boolean result = sendMessage(broker, "post", "/esb/serviceFlow/serverServiceFlow", msg);
|
|
|
if (!result) {
|
|
|
logger.error("sendMessage to broker start failed, broker:" + broker.getURL() + ", msg:" + msg);
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
brokerServerService.addServiceFlow(broker, serviceFlow);
|
|
|
}
|
|
|
|
|
|
} catch (IOException e) {
|
|
|
e.printStackTrace();
|
|
|
logger.error(e.getMessage());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public void serviceFlowStopped(String msg) {
|
|
|
try {
|
|
|
ServiceFlow serviceFlow = getServiceFlow(msg);
|
|
|
this.delete(serviceFlow);
|
|
|
|
|
|
List<BrokerServer> brokerServerList;
|
|
|
brokerServerList = brokerServerService.get(false);
|
|
|
for (BrokerServer broker : brokerServerList) {
|
|
|
HTTPResponse response = HttpClientKit.post(broker.getURL() + "", msg);
|
|
|
if (response.getStatusCode() == 200) {
|
|
|
String body = response.getBody();
|
|
|
logger.debug(body);
|
|
|
}
|
|
|
|
|
|
boolean result = sendMessage(broker, "post", "/esb/serviceFlow/stop", msg);
|
|
|
if (!result) {
|
|
|
logger.error("sendMessage to broker start failed, broker:" + broker.getURL() + ", msg:" + msg);
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
brokerServerService.removeServiceFlow(broker, serviceFlow);
|
|
|
}
|
|
|
|
|
|
} catch (IOException e) {
|
|
|
e.printStackTrace();
|
|
|
logger.error(e.getMessage());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public void serviceFlowAdd(String msg) {
|
|
|
try {
|
|
|
ServiceFlow serviceFlow = getServiceFlow(msg);
|
|
|
serviceFlow = this.save(serviceFlow);
|
|
|
System.out.println("流程添加serviceFlowAdd开始!");
|
|
|
boolean one = ServiceFlowConstant.JAVA.equals(serviceFlow.getFlowType());//java类型为采集任务
|
|
|
if (one) {
|
|
|
BrokerServer brokerServer = brokerServerService.get();
|
|
|
boolean result = sendMessage(brokerServer, "post", "/esb/serviceFlow", msg);
|
|
|
if (!result) {
|
|
|
logger.error("sendMessage to broker start failed, broker:" + brokerServer.getURL() + ", msg:" + msg);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
brokerServerService.addServiceFlow(brokerServer, serviceFlow);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
List<BrokerServer> brokerServerList;
|
|
|
brokerServerList = brokerServerService.get(one);
|
|
|
for (BrokerServer broker : brokerServerList) {
|
|
|
boolean result = sendMessage(broker, "post", "/esb/serviceFlow", msg);
|
|
|
if (!result) {
|
|
|
logger.error("sendMessage to broker start failed, broker:" + broker.getURL() + ", msg:" + msg);
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
brokerServerService.addServiceFlow(broker, serviceFlow);
|
|
|
}
|
|
|
|
|
|
} catch (IOException e) {
|
|
|
e.printStackTrace();
|
|
|
logger.error(e.getMessage());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public void serviceFlowModifyAdd(String msg) {
|
|
|
try {
|
|
|
System.out.println("arbiter'route serviceFlowModifyAdd ================ 1 ");
|
|
|
ServiceFlow serviceFlow = getServiceFlow(msg);
|
|
|
serviceFlow = this.save(serviceFlow);
|
|
|
|
|
|
boolean one = ServiceFlowConstant.JAVA.equals(serviceFlow.getFlowType());//java类型为采集任务
|
|
|
if (one) {
|
|
|
System.out.println("arbiter'route is java ================ 2, cdoe: " + serviceFlow.getRouteCode());
|
|
|
List<BrokerServer> brokerList = brokerServerService.getBrokerList(serviceFlow.getRouteCode());
|
|
|
if (brokerList == null || brokerList.size() == 0) {
|
|
|
logger.error("service flow stopped unexpected.");
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
boolean result = sendMessage(brokerList.get(0), "post", "/esb/serviceFlow/add", msg);
|
|
|
if (!result) {
|
|
|
logger.error("serviceFlowModifyAdd11 sendMessage to broker start failed, broker:" + brokerList.get(0).getURL() + ", msg:" + msg);
|
|
|
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
brokerServerService.addServiceFlow(brokerList.get(0), serviceFlow);
|
|
|
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
List<BrokerServer> brokerServerList;
|
|
|
brokerServerList = brokerServerService.get(one);
|
|
|
for (BrokerServer broker : brokerServerList) {
|
|
|
boolean result = sendMessage(broker, "post", "/esb/serviceFlow/add", msg);
|
|
|
if (!result) {
|
|
|
logger.error("serviceFlowModifyAdd22 sendMessage to broker start failed, broker:" + broker.getURL() + ", msg:" + msg);
|
|
|
|
|
|
continue;
|
|
|
} else {
|
|
|
System.out.println("arbiter'route susscess ================ 3");
|
|
|
}
|
|
|
|
|
|
brokerServerService.addServiceFlow(broker, serviceFlow);
|
|
|
}
|
|
|
|
|
|
} catch (IOException e) {
|
|
|
e.printStackTrace();
|
|
|
logger.error(e.getMessage());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public void serviceFlowModifyReduce(String msg) {
|
|
|
try {
|
|
|
ServiceFlow serviceFlow = getServiceFlow(msg);
|
|
|
serviceFlow = this.save(serviceFlow);
|
|
|
|
|
|
boolean one = ServiceFlowConstant.JAVA.equals(serviceFlow.getFlowType());//java类型为采集任务
|
|
|
if (one) {
|
|
|
List<BrokerServer> brokerList = brokerServerService.getBrokerList(serviceFlow.getRouteCode());
|
|
|
if (brokerList == null || brokerList.size() == 0) {
|
|
|
logger.error("service flow stopped unexpected.");
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
boolean result = sendMessage(brokerList.get(0), "put", "/esb/serviceFlow/reduce", msg);
|
|
|
if (!result) {
|
|
|
logger.error("sendMessage to broker start failed, broker:" + brokerList.get(0).getURL() + ", msg:" + msg);
|
|
|
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
brokerServerService.addServiceFlow(brokerList.get(0), serviceFlow);
|
|
|
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
List<BrokerServer> brokerServerList;
|
|
|
brokerServerList = brokerServerService.get(one);
|
|
|
for (BrokerServer broker : brokerServerList) {
|
|
|
boolean result = sendMessage(broker, "put", "/esb/serviceFlow/reduce", msg);
|
|
|
if (!result) {
|
|
|
logger.error("sendMessage to broker start failed, broker:" + broker.getURL() + ", msg:" + msg);
|
|
|
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
brokerServerService.addServiceFlow(broker, serviceFlow);
|
|
|
}
|
|
|
|
|
|
} catch (IOException e) {
|
|
|
e.printStackTrace();
|
|
|
logger.error(e.getMessage());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public void serviceFlowDelete(String msg) {
|
|
|
try {
|
|
|
ServiceFlow serviceFlow = getServiceFlow(msg);
|
|
|
this.delete(serviceFlow);
|
|
|
|
|
|
boolean one = ServiceFlowConstant.JAVA.equals(serviceFlow.getFlowType());//java类型为采集任务
|
|
|
if (one) {
|
|
|
List<BrokerServer> brokerList = brokerServerService.getBrokerList(serviceFlow.getRouteCode());
|
|
|
if (brokerList == null || brokerList.size() == 0) {
|
|
|
logger.error("service flow stopped unexpected.");
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
boolean result = sendMessage(brokerList.get(0), "delete", "/esb/serviceFlow", msg);
|
|
|
if (!result) {
|
|
|
logger.error("sendMessage to broker start failed, broker:" + brokerList.get(0).getURL() + ", msg:" + msg);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
brokerServerService.removeServiceFlow(brokerList.get(0), serviceFlow);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
List<BrokerServer> brokerServerList;
|
|
|
brokerServerList = brokerServerService.get(one);
|
|
|
for (BrokerServer broker : brokerServerList) {
|
|
|
boolean result = sendMessage(broker, "delete", "/esb/serviceFlow", msg);
|
|
|
if (!result) {
|
|
|
logger.error("sendMessage to broker start failed, broker:" + broker.getURL() + ", msg:" + msg);
|
|
|
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
brokerServerService.removeServiceFlow(broker, serviceFlow);
|
|
|
}
|
|
|
|
|
|
} catch (IOException e) {
|
|
|
e.printStackTrace();
|
|
|
logger.error(e.getMessage());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public void brokerServerOn(String msg) {
|
|
|
List<ServiceFlow> serviceFlowList = getAll();
|
|
|
serviceFlowList.forEach(serviceFlow -> {
|
|
|
try {
|
|
|
serviceFlow = this.save(serviceFlow);
|
|
|
boolean one = ServiceFlowConstant.JAVA.equals(serviceFlow.getFlowType());//java类型为采集任务
|
|
|
if (one && isStarted(serviceFlow)) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
BrokerServer brokerServer = objectMapper.readValue(msg, BrokerServer.class);
|
|
|
String serviceFlowMsg = objectMapper.writeValueAsString(serviceFlow);
|
|
|
boolean result = sendMessage(brokerServer, "post", "/esb/serviceFlow/start", serviceFlowMsg);
|
|
|
if (!result) {
|
|
|
logger.error("sendMessage to broker start failed, broker:" + brokerServer.getURL() + ", msg:" + serviceFlowMsg);
|
|
|
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
brokerServerService.addServiceFlow(brokerServer, serviceFlow);
|
|
|
} catch (IOException e) {
|
|
|
e.printStackTrace();
|
|
|
logger.error(e.getMessage());
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
|
|
|
public void brokerServerOff(String msg) {
|
|
|
//可以不用处理。
|
|
|
}
|
|
|
|
|
|
private boolean sendMessage(BrokerServer brokerServer, String method, String path, String msg) {
|
|
|
if (brokerServer == null) {
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
switch (method) {
|
|
|
case "post": {
|
|
|
HTTPResponse response = HttpClientKit.post(brokerServer.getURL() + path, msg);
|
|
|
if (response.getStatusCode() == 200) {
|
|
|
String body = response.getBody();
|
|
|
logger.debug(body);
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
logger.error("post error,url: " + brokerServer.getURL() + path);
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
case "put": {
|
|
|
HTTPResponse response = HttpClientKit.put(brokerServer.getURL() + path, msg);
|
|
|
if (response.getStatusCode() == 200) {
|
|
|
String body = response.getBody();
|
|
|
logger.debug(body);
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
logger.error("put error,url: " + brokerServer.getURL() + path);
|
|
|
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
|
|
|
case "delete": {
|
|
|
HTTPResponse response = HttpClientKit.delete(brokerServer.getURL() + path, msg);
|
|
|
if (response.getStatusCode() == 200) {
|
|
|
String body = response.getBody();
|
|
|
logger.debug(body);
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
logger.error("delete error,url: " + brokerServer.getURL() + path);
|
|
|
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
default:
|
|
|
break;
|
|
|
}
|
|
|
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
private ServiceFlow getServiceFlow(String msg) throws IOException {
|
|
|
return objectMapper.readValue(msg, ServiceFlow.class);
|
|
|
}
|
|
|
|
|
|
private boolean isStarted(ServiceFlow serviceFlow) {
|
|
|
List<BrokerServer> brokerList = brokerServerService.getBrokerList(serviceFlow.getRouteCode());
|
|
|
if (brokerList != null && brokerList.size() != 0) {
|
|
|
logger.debug("service flow is already started on the broker");
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
}
|