Browse Source

1.hos-arbiter增加endpoint实现
2.hos-broker增加日志实现
3.hos-client修改请求参数

Airhead 8 years ago
parent
commit
ef6b114321

+ 12 - 0
hos-arbiter/ReadMe.md

@ -0,0 +1,12 @@
#说明
hos-arbiter的主要功能就是实现服务的发现
* BrokerServer的服务注册与发现,无通知机制(客户端缓存和定时更新机制)
* Endpoint的服务注册与发现,有通知机制,会将服务状态发送给BrokerServer,
以便BrokerServer进行实际的调度策略的实现
* ServiceFlow的服务注册与发现,有通知机制,会将服务状态发送给BrokerServer,
以便BrokerServer实现流程的动态加载与运行。
因为hos-arbiter没有使用zookeeper等相关软件来做事件推送,做了个折中。
尽量使用现有的功能组件来
* 通过ActiveMQ来解耦hos-admin的依赖
* 利用了MongoDB的TTL来做服务的过期
* 直接调用broker进行数据更新

+ 9 - 1
hos-arbiter/src/main/java/com/yihu/hos/arbiter/models/BrokerServer.java

@ -16,7 +16,7 @@ public class BrokerServer {
    private String hostName;
    private String hostAddress;
    private int port;
    private boolean enable;
    @Indexed(name = "updateTime_1", expireAfterSeconds = 30)
    private Date updateTime;
@ -63,4 +63,12 @@ public class BrokerServer {
    public String getURL() {
        return "http://" + hostAddress + ":" + port;
    }
    public boolean isEnable() {
        return enable;
    }
    public void setEnable(boolean enable) {
        this.enable = enable;
    }
}

+ 18 - 5
hos-arbiter/src/main/java/com/yihu/hos/arbiter/models/Endpoint.java

@ -1,18 +1,23 @@
package com.yihu.hos.arbiter.models;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.index.Indexed;
import org.springframework.data.mongodb.core.mapping.Document;
import java.util.Date;
/**
 * @created Airhead 2016/7/27.
 */
@Document
public class Endpoint {
    @Id
    private String id;
    private String serviceName;
    private String endpoint;
    private Date updatTime;
    @Indexed(name = "updateTime_1", expireAfterSeconds = 30)
    private Date updateTime;
    private String event;
    public String getId() {
        return id;
@ -38,11 +43,19 @@ public class Endpoint {
        this.endpoint = endpoint;
    }
    public Date getUpdatTime() {
        return updatTime;
    public Date getUpdateTime() {
        return updateTime;
    }
    public void setUpdatTime(Date updatTime) {
        this.updatTime = updatTime;
    public void setUpdateTime(Date updateTime) {
        this.updateTime = updateTime;
    }
    public String getEvent() {
        return event;
    }
    public void setEvent(String event) {
        this.event = event;
    }
}

+ 15 - 1
hos-arbiter/src/main/java/com/yihu/hos/arbiter/models/ServiceFlowEvent.java

@ -1,14 +1,20 @@
package com.yihu.hos.arbiter.models;
import org.springframework.data.mongodb.core.mapping.Document;
import java.util.Date;
/**
 * @created Airhead 2016/8/4.
 */
public class ServiceFlowEvent {
@Document
public class ServiceFlow {
    private String event;
    private String serviceFlow;
    private String packageName;
    private String className;
    private String path;
    private Date updateTime;
    public String getEvent() {
        return event;
@ -49,4 +55,12 @@ public class ServiceFlowEvent {
    public void setPath(String path) {
        this.path = path;
    }
    public Date getUpdateTime() {
        return updateTime;
    }
    public void setUpdateTime(Date updateTime) {
        this.updateTime = updateTime;
    }
}

+ 30 - 0
hos-arbiter/src/main/java/com/yihu/hos/arbiter/routers/EndpointEventRouter.java

@ -0,0 +1,30 @@
package com.yihu.hos.arbiter.routers;
import com.yihu.hos.arbiter.configuration.ActivemqConfiguration;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.model.ModelCamelContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.jms.ConnectionFactory;
/**
 * @created Airhead 2016/8/1.
 */
@Component
public class EndpointEventRouter extends RouteBuilder {
    @Autowired
    private ActivemqConfiguration activemqConfiguration;
    @Override
    public void configure() throws Exception {
        ModelCamelContext context = this.getContext();
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                activemqConfiguration.getUser(), activemqConfiguration.getPassword(),activemqConfiguration.getBrokerURL());
        // Note we can explicit name the component
        context.addComponent("service-event", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
        from("service-event:queue:configuration.endpoint")
                .to("bean:endpointService?method=trigger"); //TODO:这边可以做Message Filter,减化trigger逻辑
    }
}

+ 3 - 3
hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/ServiceFlowEventRouter.java

@ -1,4 +1,4 @@
package com.yihu.hos.arbiter.services;
package com.yihu.hos.arbiter.routers;
import com.yihu.hos.arbiter.configuration.ActivemqConfiguration;
import org.apache.activemq.ActiveMQConnectionFactory;
@ -24,7 +24,7 @@ public class ServiceFlowEventRouter extends RouteBuilder {
                activemqConfiguration.getUser(), activemqConfiguration.getPassword(),activemqConfiguration.getBrokerURL());
        // Note we can explicit name the component
        context.addComponent("service-event", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
        from("service-event:queue:configuration.queue")
                .to("bean:serviceFlowEventService?method=trigger"); //TODO:这边可以做Message Filter,减化trigger逻辑
        from("service-event:queue:configuration.service.flow")
                .to("bean:serviceFlowService?method=trigger"); //TODO:这边可以做Message Filter,减化trigger逻辑
    }
}

+ 1 - 0
hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/BrokerServerService.java

@ -42,6 +42,7 @@ public class BrokerServerService {
     */
    public BrokerServer get() {
        Query query = new Query();
        query.addCriteria(Criteria.where("enable").is(true));
        query.with(new Sort(new Sort.Order(Sort.Direction.DESC, "updateTime")));
        return mongoOperations.findOne(query, BrokerServer.class);
    }

+ 51 - 3
hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/EndpointService.java

@ -1,6 +1,10 @@
package com.yihu.hos.arbiter.services;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yihu.hos.arbiter.models.BrokerServer;
import com.yihu.hos.arbiter.models.Endpoint;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
@ -8,13 +12,16 @@ import org.springframework.data.mongodb.core.MongoOperations;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Service;
import java.io.IOException;
/**
 * @created Airhead 2016/7/27.
 */
@Service
@Service("endpointService")
public class EndpointService {
    private static final Logger logger = LogManager.getLogger(BrokerServerService.class);
    @Autowired
    private BrokerServerService brokerServerService;
    @Autowired
    private MongoOperations mongoOperations;
@ -22,7 +29,48 @@ public class EndpointService {
        mongoOperations.save(endpoint);
    }
    public Endpoint get(String serviceName){
    public Endpoint get(String serviceName) {
        return mongoOperations.findOne(new Query(), Endpoint.class);
    }
    public void update(Endpoint endpoint) {
    }
    public void trigger(String msg) {
        System.out.println(msg);
        BrokerServer brokerServer = brokerServerService.get();
        if (brokerServer == null) {
            logger.trace("can not find a valid broker server.");
            return;
        }
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            Endpoint endpoint = objectMapper.readValue(msg, Endpoint.class);
            CloseableHttpClient httpclient = HttpClients.createDefault();
            switch (endpoint.getEvent()) {
                case "endpointRegister": {
                    this.save(endpoint);
                    break;
                }
                case "endpointOnService": {
                    this.save(endpoint);
                    break;
                }
                case "endpointOffService": {
                    this.update(endpoint);
                    break;
                }
                default:
                    break;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

+ 0 - 115
hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/ServiceFlowEventService.java

@ -1,115 +0,0 @@
package com.yihu.hos.arbiter.services;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yihu.hos.arbiter.models.BrokerServer;
import com.yihu.hos.arbiter.models.ServiceFlowEvent;
import org.apache.http.Consts;
import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
/**
 * @created Airhead 2016/8/4.
 */
@Service("serviceFlowEventService")
public class ServiceFlowEventService {
    @Autowired
    private BrokerServerService brokerServerService;
    public void trigger(String msg) {
        System.out.println(msg);
        BrokerServer brokerServer = brokerServerService.get();
        if (brokerServer == null) {
            return; //TODO: log
        }
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            ServiceFlowEvent serviceFlowEvent = objectMapper.readValue(msg, ServiceFlowEvent.class);
            List<NameValuePair> nameValuePairList = new ArrayList<>();
            nameValuePairList.add(new BasicNameValuePair("serviceName", serviceFlowEvent.getServiceFlow()));
            nameValuePairList.add(new BasicNameValuePair("packageName", serviceFlowEvent.getPackageName()));
            nameValuePairList.add(new BasicNameValuePair("className", serviceFlowEvent.getClassName()));
            nameValuePairList.add(new BasicNameValuePair("path", serviceFlowEvent.getPath()));
            CloseableHttpClient httpclient = HttpClients.createDefault();
            switch (serviceFlowEvent.getEvent()) {
                case "processorAdded": {
                    HttpPost httpPost = new HttpPost(brokerServer.getURL() + "/esb/processor");
                    httpPost.setEntity(new UrlEncodedFormEntity(nameValuePairList, Consts.UTF_8));
                    CloseableHttpResponse response = httpclient.execute(httpPost);
                    response.close();
                    break;
                }
                case "processorDataChanged": {
                    HttpPut httpPut = new HttpPut(brokerServer.getURL() + "/esb/processor");
                    httpPut.setEntity(new UrlEncodedFormEntity(nameValuePairList, Consts.UTF_8));
                    CloseableHttpResponse response = httpclient.execute(httpPut);
                    response.close();
                    break;
                }
                case "routeDefineAdded": {
                    HttpPost httpPost = new HttpPost(brokerServer.getURL() + "/esb/processor");
                    httpPost.setEntity(new UrlEncodedFormEntity(nameValuePairList, Consts.UTF_8));
                    CloseableHttpResponse response = httpclient.execute(httpPost);
                    response.close();
                    break;
                }
                case "routeDefineChanged": {
                    HttpPut httpPut = new HttpPut(brokerServer.getURL() + "/esb/route");
                    httpPut.setEntity(new UrlEncodedFormEntity(nameValuePairList, Consts.UTF_8));
                    CloseableHttpResponse response = httpclient.execute(httpPut);
                    response.close();
                    break;
                }
                case "routeDefineDelete": {
                    try {
                        URI uri = new URIBuilder()
                                .setScheme("http")
                                .setHost(brokerServer.getHostAddress() + ":" + brokerServer.getPort())
                                .setPath("/route")
                                .setParameter("serviceName", serviceFlowEvent.getServiceFlow())
                                .setParameter("packageName", serviceFlowEvent.getPackageName())
                                .setParameter("className", serviceFlowEvent.getClassName())
                                .setParameter("path", serviceFlowEvent.getPath())
                                .build();
                        HttpDelete httpDelete = new HttpDelete(uri);
                        CloseableHttpResponse response = httpclient.execute(httpDelete);
                        response.close();
                    } catch (URISyntaxException e) {
                        e.printStackTrace();
                    }
                    break;
                }
                default:
                    break;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

+ 110 - 1
hos-arbiter/src/main/java/com/yihu/hos/arbiter/services/ServiceFlowService.java

@ -1,12 +1,38 @@
package com.yihu.hos.arbiter.services;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yihu.hos.arbiter.models.BrokerServer;
import com.yihu.hos.arbiter.models.ServiceFlow;
import org.apache.http.Consts;
import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
/**
 * @created Airhead 2016/8/16.
 */
@Service
@Service("serviceFlowService")
public class ServiceFlowService {
    private static final Logger logger = LogManager.getLogger(BrokerServerService.class);
    @Autowired
    private BrokerServerService brokerServerService;
    public void save(String serviceFlow) {
    }
@ -18,4 +44,87 @@ public class ServiceFlowService {
    public String put(String serviceName, String ClientInfo) {
        return null;
    }
    public void trigger(String msg) {
        System.out.println(msg);
        BrokerServer brokerServer = brokerServerService.get();
        if (brokerServer == null) {
            logger.trace("can not find a valid broker server.");
            return;
        }
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            ServiceFlow serviceFlow = objectMapper.readValue(msg, ServiceFlow.class);
            List<NameValuePair> nameValuePairList = new ArrayList<>();
            nameValuePairList.add(new BasicNameValuePair("serviceName", serviceFlow.getServiceFlow()));
            nameValuePairList.add(new BasicNameValuePair("packageName", serviceFlow.getPackageName()));
            nameValuePairList.add(new BasicNameValuePair("className", serviceFlow.getClassName()));
            nameValuePairList.add(new BasicNameValuePair("path", serviceFlow.getPath()));
            CloseableHttpClient httpclient = HttpClients.createDefault();
            switch (serviceFlow.getEvent()) {
                case "processorAdded": {
                    HttpPost httpPost = new HttpPost(brokerServer.getURL() + "/esb/processor");
                    httpPost.setEntity(new UrlEncodedFormEntity(nameValuePairList, Consts.UTF_8));
                    CloseableHttpResponse response = httpclient.execute(httpPost);
                    response.close();
                    break;
                }
                case "processorDataChanged": {
                    HttpPut httpPut = new HttpPut(brokerServer.getURL() + "/esb/processor");
                    httpPut.setEntity(new UrlEncodedFormEntity(nameValuePairList, Consts.UTF_8));
                    CloseableHttpResponse response = httpclient.execute(httpPut);
                    response.close();
                    break;
                }
                case "routeDefineAdded": {
                    HttpPost httpPost = new HttpPost(brokerServer.getURL() + "/esb/processor");
                    httpPost.setEntity(new UrlEncodedFormEntity(nameValuePairList, Consts.UTF_8));
                    CloseableHttpResponse response = httpclient.execute(httpPost);
                    response.close();
                    break;
                }
                case "routeDefineChanged": {
                    HttpPut httpPut = new HttpPut(brokerServer.getURL() + "/esb/route");
                    httpPut.setEntity(new UrlEncodedFormEntity(nameValuePairList, Consts.UTF_8));
                    CloseableHttpResponse response = httpclient.execute(httpPut);
                    response.close();
                    break;
                }
                case "routeDefineDelete": {
                    try {
                        URI uri = new URIBuilder()
                                .setScheme("http")
                                .setHost(brokerServer.getHostAddress() + ":" + brokerServer.getPort())
                                .setPath("/route")
                                .setParameter("serviceName", serviceFlow.getServiceFlow())
                                .setParameter("packageName", serviceFlow.getPackageName())
                                .setParameter("className", serviceFlow.getClassName())
                                .setParameter("path", serviceFlow.getPath())
                                .build();
                        HttpDelete httpDelete = new HttpDelete(uri);
                        CloseableHttpResponse response = httpclient.execute(httpDelete);
                        response.close();
                    } catch (URISyntaxException e) {
                        e.printStackTrace();
                    }
                    break;
                }
                default:
                    break;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

+ 52 - 0
hos-broker/src/main/java/com/yihu/hos/common/configuration/MongoConfiguration.java

@ -0,0 +1,52 @@
package com.yihu.hos.common.configuration;
import com.mongodb.Mongo;
import com.mongodb.MongoClient;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.config.AbstractMongoConfiguration;
import org.springframework.data.mongodb.core.MongoTemplate;
import static java.util.Collections.singletonList;
/**
 * @created Airhead 2016/7/27.
 */
@Configuration
public class MongoConfiguration extends AbstractMongoConfiguration {
//    @Value("${spring.data.mongodb.uri}")
//    private String uri;
    @Value("${spring.data.mongodb.host}")
    private String host;
    @Value("${spring.data.mongodb.port}")
    private int port;
    @Value("${spring.data.mongodb.username}")
    private String username;
    @Value("${spring.data.mongodb.password}")
    private String password;
    @Value("${spring.data.mongodb.authenticationDatabase}")
    private String authenticationDatabase;
    @Value("${spring.data.mongodb.database}")
    private String database;
    @Override
    public String getDatabaseName() {
        return database;
    }
    @Override
    @Bean
    public Mongo mongo() throws Exception {
        return new MongoClient(singletonList(new ServerAddress(host, port)),
                singletonList(MongoCredential.createCredential(username, authenticationDatabase, password.toCharArray())));
    }
    public MongoTemplate mongoTemplate() throws Exception {
        return new MongoTemplate(mongo(), getDatabaseName());
    }
}

+ 21 - 3
hos-broker/src/main/java/com/yihu/hos/services/BusinessLogService.java

@ -1,14 +1,32 @@
package com.yihu.hos.services;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yihu.hos.core.log.Logger;
import com.yihu.hos.core.log.LoggerFactory;
import com.yihu.hos.models.BusinessLog;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoOperations;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
 * @created Airhead 2016/8/8.
 */
@Component("businessLogService")
public class BusinessLogService {
    public void log(String msg){
        //TODO:转换成BusinessLog Save to Mongodb
        System.out.println(msg);
    private static final Logger logger = LoggerFactory.getLogger(BusinessLogService.class);
    @Autowired
    private MongoOperations mongoOperations;
    public void log(String msg) {
        try {
            ObjectMapper objectMapper = new ObjectMapper();
            BusinessLog businessLog = objectMapper.readValue(msg, BusinessLog.class);
            mongoOperations.save(businessLog);
        } catch (IOException e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        }
    }
}

+ 9 - 0
hos-broker/src/main/resources/application.yml

@ -37,6 +37,15 @@ spring:
      broker-url: tcp://172.19.103.86:61616
      user: admin
      password: admin
  data:
    mongodb:
      host: 172.19.103.86
      port: 27017
      username: esb
      password: esb
      authenticationDatabase: admin
      database: log
security:
  basic:
    enabled: false

+ 9 - 0
sdk/java/hos-client/src/main/java/com/yihu/hos/client/Request.java

@ -7,6 +7,7 @@ import java.util.Map;
 * @created Airhead 2016/8/5.
 */
public class Request {
    private String module;
    private String method;
    private Map<String, String> args;
@ -33,4 +34,12 @@ public class Request {
        args.put(name, value);
    }
    public String getModule() {
        return module;
    }
    public void setModule(String module) {
        this.module = module;
    }
}