123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408 |
- package com.yihu.hos.arbiter.services;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import com.yihu.hos.core.http.HTTPResponse;
- import com.yihu.hos.core.http.HttpClientKit;
- import com.yihu.hos.core.log.Logger;
- import com.yihu.hos.core.log.LoggerFactory;
- import com.yihu.hos.web.framework.constant.ServiceFlowConstant;
- import com.yihu.hos.web.framework.model.bo.BrokerServer;
- import com.yihu.hos.web.framework.model.bo.ServiceFlow;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.data.mongodb.core.MongoOperations;
- import org.springframework.data.mongodb.core.query.Criteria;
- import org.springframework.data.mongodb.core.query.Query;
- import org.springframework.data.mongodb.core.query.Update;
- import org.springframework.stereotype.Service;
- import java.io.IOException;
- import java.util.List;
- /**
- * Broker原则上具有等同性,这样Arbiter无论选择了哪个Broker能提供的服务都是一样的。
- * 但是因为Broker上还是会运行一些定时的采集任务,这些采集任务如果是多台Broker运行的话,可能会引起数据重复问题。
- * 所以在事件触发时需要做一些策略的调整:
- * 1.实时任务,通知所有的Broker进行更新路由
- * 2.采集任务,只通知其中的一台进行更新路由
- *
- * @created Airhead 2016/8/16.
- */
- @Service("serviceFlowService")
- public class ServiceFlowService {
- private static final Logger logger = LoggerFactory.getLogger(BrokerServerService.class);
- @Autowired
- private MongoOperations mongoOperations;
- @Autowired
- private ObjectMapper objectMapper;
- @Autowired
- private BrokerServerService brokerServerService;
- public ServiceFlow save(ServiceFlow serviceFlow) {
- if (serviceFlow == null) {
- logger.error("ServiceFlow is null");
- return null;
- }
- Query query = new Query();
- query.addCriteria(Criteria.where("routeCode").is(serviceFlow.getRouteCode()));
- Update update = new Update();
- update.set("routeCode", serviceFlow.getRouteCode());
- update.set("updated", serviceFlow.getUpdated());
- update.set("flowType", serviceFlow.getFlowType());
- update.set("tenant", serviceFlow.getTenant());
- //删除旧记录
- update.set("handleFiles", serviceFlow.getHandleFiles());
- // update1.set("handleFiles.$.className", handleFile.getClassName());
- // Query query1 = Query.query(new Criteria().andOperator(Criteria.where("className").is( handleFile.getClassName()),Criteria.where("handleFiles").elemMatch(Criteria.where("className").is( handleFile.getClassName()))));
- // mongoOperations.updateFirst(query1, update1, ServiceFlow.class);
- mongoOperations.upsert(query, update, ServiceFlow.class);
- return mongoOperations.findOne(query, ServiceFlow.class);
- }
- public String get(String serviceName) {
- return null;
- }
- public void delete(ServiceFlow serviceFlow) {
- mongoOperations.remove(serviceFlow);
- }
- public List<ServiceFlow> getAll() {
- return mongoOperations.findAll(ServiceFlow.class);
- }
- /**
- * admin发过来的服务流程启动事件处理。
- *
- * @param msg serviceFlow
- */
- public void serviceFlowStarted(String msg) {
- try {
- ServiceFlow serviceFlow = getServiceFlow(msg);
- serviceFlow = this.save(serviceFlow);
- boolean one = ServiceFlowConstant.JAVA.equals(serviceFlow.getFlowType());//java类型为采集任务
- if (one && isStarted(serviceFlow)) {
- return;
- }
- List<BrokerServer> brokerServerList;
- brokerServerList = brokerServerService.get(one);
- for (BrokerServer broker : brokerServerList) {
- boolean result = sendMessage(broker, "post", "/esb/serviceFlow/serverServiceFlow", msg);
- if (!result) {
- logger.error("sendMessage to broker start failed, broker:" + broker.getURL() + ", msg:" + msg);
- continue;
- }
- brokerServerService.addServiceFlow(broker, serviceFlow);
- }
- } catch (IOException e) {
- e.printStackTrace();
- logger.error(e.getMessage());
- }
- }
- public void serviceFlowStopped(String msg) {
- try {
- ServiceFlow serviceFlow = getServiceFlow(msg);
- this.delete(serviceFlow);
- List<BrokerServer> brokerServerList;
- brokerServerList = brokerServerService.get(false);
- for (BrokerServer broker : brokerServerList) {
- HTTPResponse response = HttpClientKit.post(broker.getURL() + "", msg);
- if (response.getStatusCode() == 200) {
- String body = response.getBody();
- logger.debug(body);
- }
- boolean result = sendMessage(broker, "post", "/esb/serviceFlow/stop", msg);
- if (!result) {
- logger.error("sendMessage to broker start failed, broker:" + broker.getURL() + ", msg:" + msg);
- continue;
- }
- brokerServerService.removeServiceFlow(broker, serviceFlow);
- }
- } catch (IOException e) {
- e.printStackTrace();
- logger.error(e.getMessage());
- }
- }
- public void serviceFlowAdd(String msg) {
- try {
- ServiceFlow serviceFlow = getServiceFlow(msg);
- serviceFlow = this.save(serviceFlow);
- System.out.println("流程添加serviceFlowAdd开始!");
- boolean one = ServiceFlowConstant.JAVA.equals(serviceFlow.getFlowType());//java类型为采集任务
- if (one) {
- BrokerServer brokerServer = brokerServerService.get();
- boolean result = sendMessage(brokerServer, "post", "/esb/serviceFlow", msg);
- if (!result) {
- logger.error("sendMessage to broker start failed, broker:" + brokerServer.getURL() + ", msg:" + msg);
- return;
- }
- brokerServerService.addServiceFlow(brokerServer, serviceFlow);
- return;
- }
- List<BrokerServer> brokerServerList;
- brokerServerList = brokerServerService.get(one);
- for (BrokerServer broker : brokerServerList) {
- boolean result = sendMessage(broker, "post", "/esb/serviceFlow", msg);
- if (!result) {
- logger.error("sendMessage to broker start failed, broker:" + broker.getURL() + ", msg:" + msg);
- continue;
- }
- brokerServerService.addServiceFlow(broker, serviceFlow);
- }
- } catch (IOException e) {
- e.printStackTrace();
- logger.error(e.getMessage());
- }
- }
- public void serviceFlowModifyAdd(String msg) {
- try {
- System.out.println("arbiter'route serviceFlowModifyAdd ================ 1 ");
- ServiceFlow serviceFlow = getServiceFlow(msg);
- serviceFlow = this.save(serviceFlow);
- boolean one = ServiceFlowConstant.JAVA.equals(serviceFlow.getFlowType());//java类型为采集任务
- if (one) {
- System.out.println("arbiter'route is java ================ 2, cdoe: " + serviceFlow.getRouteCode());
- List<BrokerServer> brokerList = brokerServerService.getBrokerList(serviceFlow.getRouteCode());
- if (brokerList == null || brokerList.size() == 0) {
- logger.error("service flow stopped unexpected.");
- return;
- }
- boolean result = sendMessage(brokerList.get(0), "post", "/esb/serviceFlow/add", msg);
- if (!result) {
- logger.error("serviceFlowModifyAdd11 sendMessage to broker start failed, broker:" + brokerList.get(0).getURL() + ", msg:" + msg);
- return;
- }
- brokerServerService.addServiceFlow(brokerList.get(0), serviceFlow);
- return;
- }
- List<BrokerServer> brokerServerList;
- brokerServerList = brokerServerService.get(one);
- for (BrokerServer broker : brokerServerList) {
- boolean result = sendMessage(broker, "post", "/esb/serviceFlow/add", msg);
- if (!result) {
- logger.error("serviceFlowModifyAdd22 sendMessage to broker start failed, broker:" + broker.getURL() + ", msg:" + msg);
- continue;
- } else {
- System.out.println("arbiter'route susscess ================ 3");
- }
- brokerServerService.addServiceFlow(broker, serviceFlow);
- }
- } catch (IOException e) {
- e.printStackTrace();
- logger.error(e.getMessage());
- }
- }
- public void serviceFlowModifyReduce(String msg) {
- try {
- ServiceFlow serviceFlow = getServiceFlow(msg);
- serviceFlow = this.save(serviceFlow);
- boolean one = ServiceFlowConstant.JAVA.equals(serviceFlow.getFlowType());//java类型为采集任务
- if (one) {
- List<BrokerServer> brokerList = brokerServerService.getBrokerList(serviceFlow.getRouteCode());
- if (brokerList == null || brokerList.size() == 0) {
- logger.error("service flow stopped unexpected.");
- return;
- }
- boolean result = sendMessage(brokerList.get(0), "put", "/esb/serviceFlow/reduce", msg);
- if (!result) {
- logger.error("sendMessage to broker start failed, broker:" + brokerList.get(0).getURL() + ", msg:" + msg);
- return;
- }
- brokerServerService.addServiceFlow(brokerList.get(0), serviceFlow);
- return;
- }
- List<BrokerServer> brokerServerList;
- brokerServerList = brokerServerService.get(one);
- for (BrokerServer broker : brokerServerList) {
- boolean result = sendMessage(broker, "put", "/esb/serviceFlow/reduce", msg);
- if (!result) {
- logger.error("sendMessage to broker start failed, broker:" + broker.getURL() + ", msg:" + msg);
- continue;
- }
- brokerServerService.addServiceFlow(broker, serviceFlow);
- }
- } catch (IOException e) {
- e.printStackTrace();
- logger.error(e.getMessage());
- }
- }
- public void serviceFlowDelete(String msg) {
- try {
- ServiceFlow serviceFlow = getServiceFlow(msg);
- this.delete(serviceFlow);
- boolean one = ServiceFlowConstant.JAVA.equals(serviceFlow.getFlowType());//java类型为采集任务
- if (one) {
- List<BrokerServer> brokerList = brokerServerService.getBrokerList(serviceFlow.getRouteCode());
- if (brokerList == null || brokerList.size() == 0) {
- logger.error("service flow stopped unexpected.");
- return;
- }
- boolean result = sendMessage(brokerList.get(0), "delete", "/esb/serviceFlow", msg);
- if (!result) {
- logger.error("sendMessage to broker start failed, broker:" + brokerList.get(0).getURL() + ", msg:" + msg);
- return;
- }
- brokerServerService.removeServiceFlow(brokerList.get(0), serviceFlow);
- return;
- }
- List<BrokerServer> brokerServerList;
- brokerServerList = brokerServerService.get(one);
- for (BrokerServer broker : brokerServerList) {
- boolean result = sendMessage(broker, "delete", "/esb/serviceFlow", msg);
- if (!result) {
- logger.error("sendMessage to broker start failed, broker:" + broker.getURL() + ", msg:" + msg);
- continue;
- }
- brokerServerService.removeServiceFlow(broker, serviceFlow);
- }
- } catch (IOException e) {
- e.printStackTrace();
- logger.error(e.getMessage());
- }
- }
- public void brokerServerOn(String msg) {
- List<ServiceFlow> serviceFlowList = getAll();
- serviceFlowList.forEach(serviceFlow -> {
- try {
- serviceFlow = this.save(serviceFlow);
- boolean one = ServiceFlowConstant.JAVA.equals(serviceFlow.getFlowType());//java类型为采集任务
- if (one && isStarted(serviceFlow)) {
- return;
- }
- BrokerServer brokerServer = objectMapper.readValue(msg, BrokerServer.class);
- String serviceFlowMsg = objectMapper.writeValueAsString(serviceFlow);
- boolean result = sendMessage(brokerServer, "post", "/esb/serviceFlow/start", serviceFlowMsg);
- if (!result) {
- logger.error("sendMessage to broker start failed, broker:" + brokerServer.getURL() + ", msg:" + serviceFlowMsg);
- return;
- }
- brokerServerService.addServiceFlow(brokerServer, serviceFlow);
- } catch (IOException e) {
- e.printStackTrace();
- logger.error(e.getMessage());
- }
- });
- }
- public void brokerServerOff(String msg) {
- //可以不用处理。
- }
- private boolean sendMessage(BrokerServer brokerServer, String method, String path, String msg) {
- if (brokerServer == null) {
- return false;
- }
- switch (method) {
- case "post": {
- HTTPResponse response = HttpClientKit.post(brokerServer.getURL() + path, msg);
- if (response.getStatusCode() == 200) {
- String body = response.getBody();
- logger.debug(body);
- return true;
- }
- logger.error("post error,url: " + brokerServer.getURL() + path);
- return false;
- }
- case "put": {
- HTTPResponse response = HttpClientKit.put(brokerServer.getURL() + path, msg);
- if (response.getStatusCode() == 200) {
- String body = response.getBody();
- logger.debug(body);
- return true;
- }
- logger.error("put error,url: " + brokerServer.getURL() + path);
- return false;
- }
- case "delete": {
- HTTPResponse response = HttpClientKit.delete(brokerServer.getURL() + path, msg);
- if (response.getStatusCode() == 200) {
- String body = response.getBody();
- logger.debug(body);
- return true;
- }
- logger.error("delete error,url: " + brokerServer.getURL() + path);
- return false;
- }
- default:
- break;
- }
- return false;
- }
- private ServiceFlow getServiceFlow(String msg) throws IOException {
- return objectMapper.readValue(msg, ServiceFlow.class);
- }
- private boolean isStarted(ServiceFlow serviceFlow) {
- List<BrokerServer> brokerList = brokerServerService.getBrokerList(serviceFlow.getRouteCode());
- if (brokerList != null && brokerList.size() != 0) {
- logger.debug("service flow is already started on the broker");
- return true;
- }
- return false;
- }
- }
|