|  | @ -128,22 +128,43 @@ public class BrokerServerService {
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     public void addServiceFlow(BrokerServer broker, ServiceFlow serviceFlow) {
 | 
	
		
			
				|  |  |     public void addServiceFlow(BrokerServer brokerServer, ServiceFlow serviceFlow) {
 | 
	
		
			
				|  |  |         BrokerServer.ServiceFlow flow = new BrokerServer.ServiceFlow();
 | 
	
		
			
				|  |  |         flow.setFlowId(serviceFlow.getId());
 | 
	
		
			
				|  |  |         flow.setRouteCode(serviceFlow.getRouteCode());
 | 
	
		
			
				|  |  |         flow.setType(serviceFlow.getFlowType());
 | 
	
		
			
				|  |  |         broker.addServiceFlow(flow);
 | 
	
		
			
				|  |  |         this.save(broker);
 | 
	
		
			
				|  |  |         brokerServer.addServiceFlow(flow);
 | 
	
		
			
				|  |  |         Query query = new Query();
 | 
	
		
			
				|  |  |         query.addCriteria(Criteria.where("hostName").is(brokerServer.getHostName()));
 | 
	
		
			
				|  |  |         query.addCriteria(Criteria.where("hostAddress").is(brokerServer.getHostAddress()));
 | 
	
		
			
				|  |  |         query.addCriteria(Criteria.where("port").is(brokerServer.getPort()));
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         Update update = new Update();
 | 
	
		
			
				|  |  |         update.set("hostName", brokerServer.getHostName());
 | 
	
		
			
				|  |  |         update.set("hostAddress", brokerServer.getHostAddress());
 | 
	
		
			
				|  |  |         update.set("port", brokerServer.getPort());
 | 
	
		
			
				|  |  |         update.set("updateTime", brokerServer.getUpdateTime());
 | 
	
		
			
				|  |  |         update.set("enable", brokerServer.isEnable());
 | 
	
		
			
				|  |  |         update.addToSet("serviceFlows", serviceFlow);
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         mongoOperations.upsert(query, update, BrokerServer.class);
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     public void removeServiceFlow(BrokerServer broker, ServiceFlow serviceFlow) {
 | 
	
		
			
				|  |  | //        BrokerServer.ServiceFlow flow = new BrokerServer.ServiceFlow();
 | 
	
		
			
				|  |  | //        flow.setFlowId(serviceFlow.getId());
 | 
	
		
			
				|  |  | //        flow.setRouteCode(serviceFlow.getRouteCode());
 | 
	
		
			
				|  |  | //        flow.setType(serviceFlow.getFlowType());
 | 
	
		
			
				|  |  | //        broker.addServiceFlow(flow);
 | 
	
		
			
				|  |  | //        brokerServerService.save(broker);
 | 
	
		
			
				|  |  |     public void removeServiceFlow(BrokerServer brokerServer, ServiceFlow serviceFlow) {
 | 
	
		
			
				|  |  |         Query query = new Query();
 | 
	
		
			
				|  |  |         query.addCriteria(Criteria.where("hostName").is(brokerServer.getHostName()));
 | 
	
		
			
				|  |  |         query.addCriteria(Criteria.where("hostAddress").is(brokerServer.getHostAddress()));
 | 
	
		
			
				|  |  |         query.addCriteria(Criteria.where("port").is(brokerServer.getPort()));
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         Update update = new Update();
 | 
	
		
			
				|  |  |         update.set("hostName", brokerServer.getHostName());
 | 
	
		
			
				|  |  |         update.set("hostAddress", brokerServer.getHostAddress());
 | 
	
		
			
				|  |  |         update.set("port", brokerServer.getPort());
 | 
	
		
			
				|  |  |         update.set("updateTime", brokerServer.getUpdateTime());
 | 
	
		
			
				|  |  |         update.set("enable", brokerServer.isEnable());
 | 
	
		
			
				|  |  |         update.pull("serviceFlows", serviceFlow);
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         mongoOperations.upsert(query, update, BrokerServer.class);
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     private ProducerTemplate createProducerTemplate() {
 |