|
@ -17,8 +17,6 @@ import org.springframework.data.mongodb.core.query.Update;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
|
|
@ -45,36 +43,35 @@ public class ServiceFlowService {
|
|
|
private ArbiterServerService arbiterServerService;
|
|
|
|
|
|
|
|
|
public void save(ServiceFlow serviceFlow) {
|
|
|
public ServiceFlow save(ServiceFlow serviceFlow) {
|
|
|
if (serviceFlow == null) {
|
|
|
logger.error("ServiceFlow is null");
|
|
|
return;
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
Query query = new Query();
|
|
|
query.addCriteria(Criteria.where("routeCode").is(serviceFlow.getRouteCode()));
|
|
|
ServiceFlow flow = mongoOperations.findOne(query, ServiceFlow.class);
|
|
|
Update update = new Update();
|
|
|
update.set("routeCode", serviceFlow.getRouteCode());
|
|
|
update.set("updated", serviceFlow.getUpdated());
|
|
|
update.set("flowType", serviceFlow.getFlowType());
|
|
|
if (flow != null) {
|
|
|
HashSet<ServiceFlow.HandleFile> flowSets = new HashSet<>(flow.getHandleFiles());
|
|
|
HashSet<ServiceFlow.HandleFile> serviceFlowSets = new HashSet<>(serviceFlow.getHandleFiles());
|
|
|
flowSets.addAll(serviceFlowSets);
|
|
|
ArrayList<ServiceFlow.HandleFile> handleFiles = new ArrayList<>(flowSets);
|
|
|
update.set("handleFiles", handleFiles); //没有用原生语法比较复杂
|
|
|
} else {
|
|
|
update.set("handleFiles", serviceFlow.getHandleFiles());
|
|
|
for (ServiceFlow.HandleFile handleFile : serviceFlow.getHandleFiles()) {
|
|
|
update.addToSet("handleFiles", handleFile);
|
|
|
}
|
|
|
|
|
|
mongoOperations.upsert(query, update, ServiceFlow.class);
|
|
|
|
|
|
return mongoOperations.findOne(query, ServiceFlow.class);
|
|
|
}
|
|
|
|
|
|
public String get(String serviceName) {
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
public void delete(ServiceFlow serviceFlow) {
|
|
|
mongoOperations.remove(serviceFlow);
|
|
|
}
|
|
|
|
|
|
public List<ServiceFlow> getAll() {
|
|
|
return mongoOperations.findAll(ServiceFlow.class);
|
|
|
}
|
|
@ -85,58 +82,259 @@ public class ServiceFlowService {
|
|
|
* @param msg serviceFlow
|
|
|
*/
|
|
|
public void serviceFlowStarted(String msg) {
|
|
|
flowController("post", "/esb/serviceFlow/start", msg);
|
|
|
}
|
|
|
try {
|
|
|
ServiceFlow serviceFlow = getServiceFlow(msg);
|
|
|
serviceFlow = this.save(serviceFlow);
|
|
|
|
|
|
/**
|
|
|
* 没有使用重载,是因为Camel在判断路由时会产生歧义,而无法路由。
|
|
|
*
|
|
|
* @param msg serviceFlow
|
|
|
* @param brokerServer brokerServer Info
|
|
|
*/
|
|
|
public void serviceFlowStart(String msg, BrokerServer brokerServer) {
|
|
|
flowController("post", "/esb/serviceFlow/start", msg, brokerServer);
|
|
|
boolean one = ServiceFlowConstant.JAVA.equals(serviceFlow.getFlowType());//java类型为采集任务
|
|
|
if (one && isStarted(serviceFlow)) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
List<BrokerServer> brokerServerList;
|
|
|
brokerServerList = brokerServerService.get(one);
|
|
|
for (BrokerServer broker : brokerServerList) {
|
|
|
boolean result = sendMessage(broker, "post", "/esb/serviceFlow/start", msg);
|
|
|
if (!result) {
|
|
|
logger.error("sendMessage to broker server failed, broker:" + broker.getURL() + ", msg:" + msg);
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
brokerServerService.addServiceFlow(broker, serviceFlow);
|
|
|
}
|
|
|
|
|
|
} catch (IOException e) {
|
|
|
e.printStackTrace();
|
|
|
logger.error(e.getMessage());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public void serviceFlowStopped(String msg) {
|
|
|
flowController("post", "/esb/serviceFlow/stop", msg);
|
|
|
try {
|
|
|
ServiceFlow serviceFlow = getServiceFlow(msg);
|
|
|
this.delete(serviceFlow);
|
|
|
|
|
|
List<BrokerServer> brokerServerList;
|
|
|
brokerServerList = brokerServerService.get(false);
|
|
|
for (BrokerServer broker : brokerServerList) {
|
|
|
HTTPResponse response = HttpClientKit.post(broker.getURL() + "", msg);
|
|
|
if (response.getStatusCode() == 200) {
|
|
|
String body = response.getBody();
|
|
|
logger.debug(body);
|
|
|
}
|
|
|
|
|
|
boolean result = sendMessage(broker, "post", "/esb/serviceFlow/stop", msg);
|
|
|
if (!result) {
|
|
|
logger.error("sendMessage to broker server failed, broker:" + broker.getURL() + ", msg:" + msg);
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
brokerServerService.removeServiceFlow(broker, serviceFlow);
|
|
|
}
|
|
|
|
|
|
} catch (IOException e) {
|
|
|
e.printStackTrace();
|
|
|
logger.error(e.getMessage());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public void serviceFlowAdd(String msg) {
|
|
|
flowController("post", "/esb/serviceFlow", msg);
|
|
|
try {
|
|
|
ServiceFlow serviceFlow = getServiceFlow(msg);
|
|
|
serviceFlow = this.save(serviceFlow);
|
|
|
|
|
|
boolean one = ServiceFlowConstant.JAVA.equals(serviceFlow.getFlowType());//java类型为采集任务
|
|
|
if (one) {
|
|
|
BrokerServer brokerServer = brokerServerService.get();
|
|
|
boolean result = sendMessage(brokerServer, "post", "/esb/serviceFlow", msg);
|
|
|
if (!result) {
|
|
|
logger.error("sendMessage to broker server failed, broker:" + brokerServer.getURL() + ", msg:" + msg);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
brokerServerService.addServiceFlow(brokerServer, serviceFlow);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
List<BrokerServer> brokerServerList;
|
|
|
brokerServerList = brokerServerService.get(one);
|
|
|
for (BrokerServer broker : brokerServerList) {
|
|
|
boolean result = sendMessage(broker, "post", "/esb/serviceFlow", msg);
|
|
|
if (!result) {
|
|
|
logger.error("sendMessage to broker server failed, broker:" + broker.getURL() + ", msg:" + msg);
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
brokerServerService.addServiceFlow(broker, serviceFlow);
|
|
|
}
|
|
|
|
|
|
} catch (IOException e) {
|
|
|
e.printStackTrace();
|
|
|
logger.error(e.getMessage());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public void serviceFlowModifyAdd(String msg) {
|
|
|
flowController("put", "/esb/serviceFlow/add", msg);
|
|
|
try {
|
|
|
ServiceFlow serviceFlow = getServiceFlow(msg);
|
|
|
serviceFlow = this.save(serviceFlow);
|
|
|
|
|
|
boolean one = ServiceFlowConstant.JAVA.equals(serviceFlow.getFlowType());//java类型为采集任务
|
|
|
if (one) {
|
|
|
List<BrokerServer> brokerList = brokerServerService.getBrokerList(serviceFlow.getRouteCode());
|
|
|
if (brokerList == null || brokerList.size() == 0) {
|
|
|
logger.error("service flow stopped unexpected.");
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
boolean result = sendMessage(brokerList.get(0), "put", "/esb/serviceFlow/add", msg);
|
|
|
if (!result) {
|
|
|
logger.error("sendMessage to broker server failed, broker:" + brokerList.get(0).getURL() + ", msg:" + msg);
|
|
|
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
brokerServerService.addServiceFlow(brokerList.get(0), serviceFlow);
|
|
|
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
List<BrokerServer> brokerServerList;
|
|
|
brokerServerList = brokerServerService.get(one);
|
|
|
for (BrokerServer broker : brokerServerList) {
|
|
|
boolean result = sendMessage(broker, "put", "/esb/serviceFlow/add", msg);
|
|
|
if (!result) {
|
|
|
logger.error("sendMessage to broker server failed, broker:" + broker.getURL() + ", msg:" + msg);
|
|
|
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
brokerServerService.addServiceFlow(broker, serviceFlow);
|
|
|
}
|
|
|
|
|
|
} catch (IOException e) {
|
|
|
e.printStackTrace();
|
|
|
logger.error(e.getMessage());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public void serviceFlowModifyReduce(String msg) {
|
|
|
flowController("put", "/esb/serviceFlow/reduce", msg);
|
|
|
try {
|
|
|
ServiceFlow serviceFlow = getServiceFlow(msg);
|
|
|
serviceFlow = this.save(serviceFlow);
|
|
|
|
|
|
boolean one = ServiceFlowConstant.JAVA.equals(serviceFlow.getFlowType());//java类型为采集任务
|
|
|
if (one) {
|
|
|
List<BrokerServer> brokerList = brokerServerService.getBrokerList(serviceFlow.getRouteCode());
|
|
|
if (brokerList == null || brokerList.size() == 0) {
|
|
|
logger.error("service flow stopped unexpected.");
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
boolean result = sendMessage(brokerList.get(0), "put", "/esb/serviceFlow/reduce", msg);
|
|
|
if (!result) {
|
|
|
logger.error("sendMessage to broker server failed, broker:" + brokerList.get(0).getURL() + ", msg:" + msg);
|
|
|
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
brokerServerService.addServiceFlow(brokerList.get(0), serviceFlow);
|
|
|
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
List<BrokerServer> brokerServerList;
|
|
|
brokerServerList = brokerServerService.get(one);
|
|
|
for (BrokerServer broker : brokerServerList) {
|
|
|
boolean result = sendMessage(broker, "put", "/esb/serviceFlow/reduce", msg);
|
|
|
if (!result) {
|
|
|
logger.error("sendMessage to broker server failed, broker:" + broker.getURL() + ", msg:" + msg);
|
|
|
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
brokerServerService.addServiceFlow(broker, serviceFlow);
|
|
|
}
|
|
|
|
|
|
} catch (IOException e) {
|
|
|
e.printStackTrace();
|
|
|
logger.error(e.getMessage());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public void serviceFlowDelete(String msg) {
|
|
|
flowController("delete", "/esb/serviceFlow", msg);
|
|
|
try {
|
|
|
ServiceFlow serviceFlow = getServiceFlow(msg);
|
|
|
serviceFlow = this.save(serviceFlow);
|
|
|
|
|
|
boolean one = ServiceFlowConstant.JAVA.equals(serviceFlow.getFlowType());//java类型为采集任务
|
|
|
if (one) {
|
|
|
List<BrokerServer> brokerList = brokerServerService.getBrokerList(serviceFlow.getRouteCode());
|
|
|
if (brokerList == null || brokerList.size() == 0) {
|
|
|
logger.error("service flow stopped unexpected.");
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
boolean result = sendMessage(brokerList.get(0), "delete", "/esb/serviceFlow", msg);
|
|
|
if (!result) {
|
|
|
logger.error("sendMessage to broker server failed, broker:" + brokerList.get(0).getURL() + ", msg:" + msg);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
brokerServerService.removeServiceFlow(brokerList.get(0), serviceFlow);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
List<BrokerServer> brokerServerList;
|
|
|
brokerServerList = brokerServerService.get(one);
|
|
|
for (BrokerServer broker : brokerServerList) {
|
|
|
boolean result = sendMessage(broker, "delete", "/esb/serviceFlow", msg);
|
|
|
if (!result) {
|
|
|
logger.error("sendMessage to broker server failed, broker:" + broker.getURL() + ", msg:" + msg);
|
|
|
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
brokerServerService.removeServiceFlow(broker, serviceFlow);
|
|
|
}
|
|
|
|
|
|
} catch (IOException e) {
|
|
|
e.printStackTrace();
|
|
|
logger.error(e.getMessage());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public void brokerServerOn(String msg) {
|
|
|
List<ServiceFlow> serviceFlowList = getAll();
|
|
|
serviceFlowList.forEach(serviceFlow -> {
|
|
|
try {
|
|
|
serviceFlow = this.save(serviceFlow);
|
|
|
boolean one = ServiceFlowConstant.JAVA.equals(serviceFlow.getFlowType());//java类型为采集任务
|
|
|
if (one && isStarted(serviceFlow)) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
BrokerServer brokerServer = objectMapper.readValue(msg, BrokerServer.class);
|
|
|
String serviceFlowMsg = objectMapper.writeValueAsString(serviceFlow);
|
|
|
serviceFlowStart(serviceFlowMsg, brokerServer);
|
|
|
boolean result = sendMessage(brokerServer, "post", "/esb/serviceFlow/start", serviceFlowMsg);
|
|
|
if (!result) {
|
|
|
logger.error("sendMessage to broker server failed, broker:" + brokerServer.getURL() + ", msg:" + serviceFlowMsg);
|
|
|
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
brokerServerService.addServiceFlow(brokerServer, serviceFlow);
|
|
|
} catch (IOException e) {
|
|
|
e.printStackTrace();
|
|
|
logger.error(e.getMessage());
|
|
|
}
|
|
|
});
|
|
|
|
|
|
}
|
|
|
|
|
|
public void brokerServerOff(String msg) {
|
|
|
//下先Broker就可以了
|
|
|
//可以不用处理。
|
|
|
}
|
|
|
|
|
|
|
|
|
/**
|
|
|
* SAAS化的管理端过来的消息会被proxy进行中转,之后发送到终端的Arbiter对Broker进行实际的控制。
|
|
|
*
|
|
@ -150,61 +348,65 @@ public class ServiceFlowService {
|
|
|
HttpClientKit.post(arbiterServer.getUrl(), msg, header);
|
|
|
}
|
|
|
|
|
|
private void flowController(String method, String path, String msg) {
|
|
|
this.flowController(method, path, msg, null);
|
|
|
}
|
|
|
private boolean sendMessage(BrokerServer brokerServer, String method, String path, String msg) {
|
|
|
if (brokerServer == null) {
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
switch (method) {
|
|
|
case "post": {
|
|
|
HTTPResponse response = HttpClientKit.post(brokerServer.getURL() + path, msg);
|
|
|
if (response.getStatusCode() == 200) {
|
|
|
String body = response.getBody();
|
|
|
logger.debug(body);
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
private void flowController(String method, String path, String msg, BrokerServer brokerServer) {
|
|
|
try {
|
|
|
ServiceFlow serviceFlow = getServiceFlow(msg);
|
|
|
// this.save(serviceFlow); //需要改造??
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
boolean one = ServiceFlowConstant.JAVA.equals(serviceFlow.getFlowType()); //有cron表达式,就是采集任务。
|
|
|
if (one) {
|
|
|
List<BrokerServer> flowOnBroker = brokerServerService.getFlowOnBroker(serviceFlow.getRouteCode());
|
|
|
if (flowOnBroker != null && flowOnBroker.size() != 0) {
|
|
|
return;
|
|
|
case "put": {
|
|
|
HTTPResponse response = HttpClientKit.put(brokerServer.getURL() + path, msg);
|
|
|
if (response.getStatusCode() == 200) {
|
|
|
String body = response.getBody();
|
|
|
logger.debug(body);
|
|
|
return true;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
List<BrokerServer> brokerServerList = new ArrayList<>();
|
|
|
if (brokerServer != null) {
|
|
|
brokerServerList.add(brokerServer);
|
|
|
} else {
|
|
|
brokerServerList = brokerServerService.get(one);
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
for (BrokerServer broker : brokerServerList) {
|
|
|
// if (broker.isFlowOn(serviceFlow.getRouteCode())) {
|
|
|
// continue;
|
|
|
// }
|
|
|
switch (method) {
|
|
|
case "post":
|
|
|
HTTPResponse response = HttpClientKit.post(broker.getURL() + path, msg);
|
|
|
if (response.getStatusCode() == 200) {
|
|
|
String body = response.getBody();
|
|
|
}
|
|
|
break;
|
|
|
case "put":
|
|
|
HttpClientKit.put(broker.getURL() + path, msg);
|
|
|
break;
|
|
|
case "delete":
|
|
|
HttpClientKit.delete(broker.getURL() + path, msg);
|
|
|
break;
|
|
|
default:
|
|
|
break;
|
|
|
|
|
|
case "delete": {
|
|
|
HTTPResponse response = HttpClientKit.delete(brokerServer.getURL() + path, msg);
|
|
|
if (response.getStatusCode() == 200) {
|
|
|
String body = response.getBody();
|
|
|
logger.debug(body);
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
} catch (IOException e) {
|
|
|
e.printStackTrace();
|
|
|
logger.error(e.getMessage());
|
|
|
default:
|
|
|
break;
|
|
|
}
|
|
|
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
private ServiceFlow getServiceFlow(String msg) throws IOException {
|
|
|
return objectMapper.readValue(msg, ServiceFlow.class);
|
|
|
}
|
|
|
|
|
|
private boolean isStarted(ServiceFlow serviceFlow) {
|
|
|
List<BrokerServer> brokerList = brokerServerService.getBrokerList(serviceFlow.getRouteCode());
|
|
|
if (brokerList != null && brokerList.size() != 0) {
|
|
|
logger.debug("service flow is already started on the broker");
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
}
|