LAPTOP-KB9HII50\70708 před 2 roky
rodič
revize
1011c244c4
19 změnil soubory, kde provedl 1494 přidání a 2 odebrání
  1. 22 0
      svr/svr-basic/src/main/java/com/yihu/jw/basic/config/Config.java
  2. 215 0
      svr/svr-basic/src/main/java/com/yihu/jw/basic/redis/controler/RedisMqChannelEndPoint.java
  3. 43 0
      svr/svr-basic/src/main/java/com/yihu/jw/basic/redis/controler/RedisMqOperationEndPoint.java
  4. 158 0
      svr/svr-basic/src/main/java/com/yihu/jw/basic/redis/controler/RedisMqPublisherEndPoint.java
  5. 215 0
      svr/svr-basic/src/main/java/com/yihu/jw/basic/redis/controler/RedisMqSubscriberEndPoint.java
  6. 24 0
      svr/svr-basic/src/main/java/com/yihu/jw/basic/redis/dao/RedisMqChannelDao.java
  7. 25 0
      svr/svr-basic/src/main/java/com/yihu/jw/basic/redis/dao/RedisMqMessageLogDao.java
  8. 25 0
      svr/svr-basic/src/main/java/com/yihu/jw/basic/redis/dao/RedisMqPublisherDao.java
  9. 28 0
      svr/svr-basic/src/main/java/com/yihu/jw/basic/redis/dao/RedisMqSubscriberDao.java
  10. 46 0
      svr/svr-basic/src/main/java/com/yihu/jw/basic/redis/pubsub/CustomMessageListenerAdapter.java
  11. 129 0
      svr/svr-basic/src/main/java/com/yihu/jw/basic/redis/pubsub/DefaultMessageDelegate.java
  12. 46 0
      svr/svr-basic/src/main/java/com/yihu/jw/basic/redis/pubsub/MessageCommonBiz.java
  13. 13 0
      svr/svr-basic/src/main/java/com/yihu/jw/basic/redis/pubsub/MessageDelegate.java
  14. 182 0
      svr/svr-basic/src/main/java/com/yihu/jw/basic/redis/pubsub/PubSubMessageJob.java
  15. 145 0
      svr/svr-basic/src/main/java/com/yihu/jw/basic/redis/service/RedisMqChannelService.java
  16. 43 0
      svr/svr-basic/src/main/java/com/yihu/jw/basic/redis/service/RedisMqMessageLogService.java
  17. 56 0
      svr/svr-basic/src/main/java/com/yihu/jw/basic/redis/service/RedisMqPublisherService.java
  18. 78 0
      svr/svr-basic/src/main/java/com/yihu/jw/basic/redis/service/RedisMqSubscriberService.java
  19. 1 2
      svr/svr-basic/src/main/java/com/yihu/jw/basic/standard/service/standard/StdDatasetService.java

+ 22 - 0
svr/svr-basic/src/main/java/com/yihu/jw/basic/config/Config.java

@ -0,0 +1,22 @@
package com.yihu.jw.basic.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
/**
 * @author Sand
 * @version 1.0
 * @created 2016.03.31 12:42
 */
@Configuration
public class Config {
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        return container;
    }
}

+ 215 - 0
svr/svr-basic/src/main/java/com/yihu/jw/basic/redis/controler/RedisMqChannelEndPoint.java

@ -0,0 +1,215 @@
package com.yihu.jw.basic.redis.controler;
import com.yihu.ehr.constants.ApiVersion;
import com.yihu.ehr.constants.ServiceApi;
import com.yihu.jw.basic.redis.pubsub.CustomMessageListenerAdapter;
import com.yihu.jw.basic.redis.pubsub.MessageCommonBiz;
import com.yihu.jw.basic.redis.service.RedisMqChannelService;
import com.yihu.jw.basic.redis.service.RedisMqMessageLogService;
import com.yihu.jw.basic.redis.service.RedisMqPublisherService;
import com.yihu.jw.basic.redis.service.RedisMqSubscriberService;
import com.yihu.jw.entity.ehr.redis.RedisMqChannel;
import com.yihu.jw.entity.ehr.redis.RedisMqMessageLog;
import com.yihu.jw.entity.ehr.redis.RedisMqPublisher;
import com.yihu.jw.entity.ehr.redis.RedisMqSubscriber;
import com.yihu.jw.restmodel.ehr.redis.MRedisCacheCategory;
import com.yihu.jw.restmodel.ehr.redis.MRedisMqChannel;
import com.yihu.jw.restmodel.web.Envelop;
import com.yihu.jw.restmodel.web.ObjEnvelop;
import com.yihu.jw.restmodel.web.PageEnvelop;
import com.yihu.jw.restmodel.web.endpoint.EnvelopRestEndpoint;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
/**
 * Redis消息队列 接口
 *
 * @author 张进军
 * @date 2017/11/10 11:45
 */
@RestController
@RequestMapping(value = ApiVersion.Version1_0)
@Api(description = "消息队列接口", tags = {"Redis消息发布订阅--消息队列接口"})
public class RedisMqChannelEndPoint extends EnvelopRestEndpoint {
    @Autowired
    private RedisMqChannelService redisMqChannelService;
    @Autowired
    private RedisMqMessageLogService redisMqMessageLogService;
    @Autowired
    private RedisMqSubscriberService redisMqSubscriberService;
    @Autowired
    private RedisMqPublisherService redisMqPublisherService;
    @Resource
    private RedisTemplate<String, Object> redisTemplate;
    @Autowired
    private RedisMessageListenerContainer redisMessageListenerContainer;
    @ApiOperation("根据ID获取消息队列")
    @RequestMapping(value = ServiceApi.Redis.MqChannel.GetById, method = RequestMethod.GET)
    public Envelop getById(
            @ApiParam(name = "id", value = "主键", required = true)
            @PathVariable(value = "id") Integer id) {
        try {
            MRedisMqChannel mRedisMqChannel = convertToModel(redisMqChannelService.getById(id), MRedisMqChannel.class);
            return ObjEnvelop.getSuccess("成功获取消息队列",mRedisMqChannel);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return failed("获取消息队列发生异常");
    }
    @ApiOperation(value = "根据条件获取消息队列")
    @RequestMapping(value = ServiceApi.Redis.MqChannel.Search, method = RequestMethod.GET)
    public Envelop search(
            @ApiParam(name = "fields", value = "返回的字段,为空则返回全部字段")
            @RequestParam(value = "fields", required = false) String fields,
            @ApiParam(name = "filters", value = "筛选条件")
            @RequestParam(value = "filters", required = false) String filters,
            @ApiParam(name = "sorts", value = "排序")
            @RequestParam(value = "sorts", required = false) String sorts,
            @ApiParam(name = "page", value = "页码", defaultValue = "1")
            @RequestParam(value = "page", required = false) int page,
            @ApiParam(name = "size", value = "分页大小", defaultValue = "15")
            @RequestParam(value = "size", required = false) int size) {
        try {
            List<RedisMqChannel> redisMqChannelList = redisMqChannelService.search(fields, filters, sorts, page, size);
            long count = redisMqChannelService.getCount(filters);
            List<MRedisMqChannel> mRedisMqChannelList = (List<MRedisMqChannel>) convertToModels(redisMqChannelList, new ArrayList<MRedisMqChannel>(), MRedisMqChannel.class, fields);
            return PageEnvelop.getSuccessListWithPage("成功获取消息队列列表",mRedisMqChannelList,page,size,count);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return failed("获取消息队列发生异常");
    }
    @ApiOperation("新增消息队列")
    @RequestMapping(value = ServiceApi.Redis.MqChannel.Save, method = RequestMethod.POST)
    public Envelop add(
            @ApiParam(value = "消息队列JSON", required = true)
            @RequestBody String entityJson) {
        try {
            RedisMqChannel newEntity = objectMapper.readValue(entityJson, RedisMqChannel.class);
            newEntity.setDequeuedNum(0);
            newEntity.setEnqueuedNum(0);
            newEntity.setPublisherNum(0);
            newEntity.setSubscriberNum(0);
            newEntity = redisMqChannelService.save(newEntity);
            // 开启该订阅者的消息队列的消息监听
            String channel = newEntity.getChannel();
            CustomMessageListenerAdapter messageListener = MessageCommonBiz.newCustomMessageListenerAdapter(channel);
            redisMessageListenerContainer.addMessageListener(messageListener, new ChannelTopic(channel));
            MRedisCacheCategory mRedisCacheCategory = convertToModel(newEntity, MRedisCacheCategory.class);
            return ObjEnvelop.getSuccess("成功新增消息队列",mRedisCacheCategory);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return failed("新增消息队列发生异常");
    }
    @ApiOperation("更新消息队列")
    @RequestMapping(value = ServiceApi.Redis.MqChannel.Save, method = RequestMethod.PUT)
    public Envelop update(
            @ApiParam(value = "消息队列JSON", required = true)
            @RequestBody String entityJson) {
        try {
            RedisMqChannel updateEntity = objectMapper.readValue(entityJson, RedisMqChannel.class);
            updateEntity = redisMqChannelService.save(updateEntity);
            MRedisMqChannel mRedisMqChannel = convertToModel(updateEntity, MRedisMqChannel.class);
            return ObjEnvelop.getSuccess("成功更新消息队列",mRedisMqChannel);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return failed("更新消息队列发生异常");
    }
    @ApiOperation("删除消息队列")
    @RequestMapping(value = ServiceApi.Redis.MqChannel.Delete, method = RequestMethod.DELETE)
    public Envelop delete(
            @ApiParam(name = "id", value = "消息队列ID", required = true)
            @RequestParam(value = "id") Integer id) {
        RedisMqChannel redisMqChannel = redisMqChannelService.getById(id);
        String channel = redisMqChannel.getChannel();
        List<RedisMqMessageLog> messageLogList = redisMqMessageLogService.findByChannel(channel);
        if (messageLogList.size() != 0) {
            return failed("该消息队列存在订阅失败的消息,不能删除。");
        }
        List<RedisMqSubscriber> subscriberList = redisMqSubscriberService.findByChannel(channel);
        if (subscriberList.size() != 0) {
            return failed("该消息队列存在订阅者,不能删除。");
        }
        List<RedisMqPublisher> publisherList = redisMqPublisherService.findByChannel(channel);
        if (publisherList.size() != 0) {
            return failed("该消息队列存在发布者,不能删除。");
        }
        redisMqChannelService.delete(id);
        // 删除该消息队列的消息监听
        CustomMessageListenerAdapter messageListener = MessageCommonBiz.newCustomMessageListenerAdapter(channel);
        redisMessageListenerContainer.removeMessageListener(messageListener, new ChannelTopic(channel));
        return Envelop.getSuccess("删除成功");
    }
    @ApiOperation("验证消息队列编码是否唯一")
    @RequestMapping(value = ServiceApi.Redis.MqChannel.IsUniqueChannel, method = RequestMethod.GET)
    public Envelop isUniqueChannel(
            @ApiParam(name = "id", value = "消息队列ID", required = true)
            @RequestParam(value = "id") Integer id,
            @ApiParam(name = "channel", value = "消息队列编码", required = true)
            @RequestParam(value = "channel") String channel) {
        try {
            boolean result = redisMqChannelService.isUniqueChannel(id, channel);
            if (!result) {
                return failed("该消息队列编码已被使用,请重新填写!");
            }
            return Envelop.getSuccess("编码是唯一");
        } catch (Exception e) {
            e.printStackTrace();
        }
        return failed("发生异常");
    }
    @ApiOperation("验证消息队列名称是否唯一")
    @RequestMapping(value = ServiceApi.Redis.MqChannel.IsUniqueChannelName, method = RequestMethod.GET)
    public Envelop isUniqueChannelName(
            @ApiParam(name = "id", value = "消息队列ID", required = true)
            @RequestParam(value = "id") Integer id,
            @ApiParam(name = "channelName", value = "消息队列名称", required = true)
            @RequestParam(value = "channelName") String channelName) {
        try {
            boolean result = redisMqChannelService.isUniqueChannelName(id, channelName);
            if (!result) {
                return failed("该消息队列名称已被使用,请重新填写!");
            }
            return Envelop.getSuccess("队列名称是唯一");
        } catch (Exception e) {
            e.printStackTrace();
        }
        return failed("发生异常");
    }
    @ApiOperation("验证消息队列是否已存在")
    @RequestMapping(value = ServiceApi.Redis.MqChannel.IsExist, method = RequestMethod.GET)
    public Envelop isExist(
            @ApiParam(name = "channel", value = "消息队列", required = true)
            @RequestParam(value = "channel") String channel) {
        return ObjEnvelop.getSuccess("验证成功",redisMqChannelService.isExist(channel));
    }
}

+ 43 - 0
svr/svr-basic/src/main/java/com/yihu/jw/basic/redis/controler/RedisMqOperationEndPoint.java

@ -0,0 +1,43 @@
package com.yihu.jw.basic.redis.controler;
import com.yihu.ehr.constants.ApiVersion;
import com.yihu.ehr.constants.ServiceApi;
import com.yihu.jw.basic.redis.service.RedisMqChannelService;
import com.yihu.jw.restmodel.web.Envelop;
import com.yihu.jw.restmodel.web.endpoint.EnvelopRestEndpoint;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
 * Redis消息队列操作 接口
 *
 * @author 张进军
 * @date 2017/12/27 16:09
 */
@RestController
@RequestMapping(value = ApiVersion.Version1_0)
@Api(description = "消息队列操作接口", tags = {"Redis消息发布订阅--消息队列操作接口"})
public class RedisMqOperationEndPoint extends EnvelopRestEndpoint {
    @Autowired
    private RedisMqChannelService redisMqChannelService;
    @ApiOperation("发布消息")
    @RequestMapping(value = ServiceApi.Redis.MqChannel.SendMessage, method = RequestMethod.POST)
    public Envelop sendMessage(
            @ApiParam(name = "publisherAppId", value = "发布者应用ID", required = true)
            @RequestParam(value = "publisherAppId") String publisherAppId,
            @ApiParam(name = "channel", value = "消息队列编码", required = true)
            @RequestParam(value = "channel") String channel,
            @ApiParam(name = "message", value = "消息", required = true)
            @RequestParam(value = "message") String message) {
        return redisMqChannelService.sendMessage(publisherAppId, channel, message);
    }
}

+ 158 - 0
svr/svr-basic/src/main/java/com/yihu/jw/basic/redis/controler/RedisMqPublisherEndPoint.java

@ -0,0 +1,158 @@
package com.yihu.jw.basic.redis.controler;
import com.yihu.ehr.constants.ApiVersion;
import com.yihu.ehr.constants.ServiceApi;
import com.yihu.jw.basic.redis.service.RedisMqChannelService;
import com.yihu.jw.basic.redis.service.RedisMqPublisherService;
import com.yihu.jw.entity.ehr.redis.RedisMqChannel;
import com.yihu.jw.entity.ehr.redis.RedisMqPublisher;
import com.yihu.jw.restmodel.ehr.redis.MRedisMqPublisher;
import com.yihu.jw.restmodel.web.Envelop;
import com.yihu.jw.restmodel.web.ObjEnvelop;
import com.yihu.jw.restmodel.web.PageEnvelop;
import com.yihu.jw.restmodel.web.endpoint.EnvelopRestEndpoint;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.ArrayList;
import java.util.List;
/**
 * Redis消息发布者 接口
 *
 * @author 张进军
 * @date 2017/11/20 09:35
 */
@RestController
@RequestMapping(value = ApiVersion.Version1_0)
@Api(description = "消息发布者接口", tags = {"Redis消息发布订阅--消息发布者接口"})
public class RedisMqPublisherEndPoint extends EnvelopRestEndpoint {
    @Autowired
    private RedisMqPublisherService redisMqPublisherService;
    @Autowired
    private RedisMqChannelService redisMqChannelService;
    @ApiOperation("根据ID获取消息发布者")
    @RequestMapping(value = ServiceApi.Redis.MqPublisher.GetById, method = RequestMethod.GET)
    public Envelop getById(
            @ApiParam(name = "id", value = "主键", required = true)
            @PathVariable(value = "id") Integer id) {
        try {
            MRedisMqPublisher mRedisMqPublisher = convertToModel(redisMqPublisherService.getById(id), MRedisMqPublisher.class);
            return ObjEnvelop.getSuccess("成功获取消息发布者",mRedisMqPublisher);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return failed("获取消息发布者发生异常");
    }
    @ApiOperation(value = "根据条件获取消息发布者")
    @RequestMapping(value = ServiceApi.Redis.MqPublisher.Search, method = RequestMethod.GET)
    public Envelop search(
            @ApiParam(name = "fields", value = "返回的字段,为空则返回全部字段")
            @RequestParam(value = "fields", required = false) String fields,
            @ApiParam(name = "filters", value = "筛选条件")
            @RequestParam(value = "filters", required = false) String filters,
            @ApiParam(name = "sorts", value = "排序")
            @RequestParam(value = "sorts", required = false) String sorts,
            @ApiParam(name = "page", value = "页码", defaultValue = "1")
            @RequestParam(value = "page", required = false) int page,
            @ApiParam(name = "size", value = "分页大小", defaultValue = "15")
            @RequestParam(value = "size", required = false) int size) {
        try {
            List<RedisMqPublisher> list = redisMqPublisherService.search(fields, filters, sorts, page, size);
            long count = redisMqPublisherService.getCount(filters);
            List<MRedisMqPublisher> mList = (List<MRedisMqPublisher>) convertToModels(list, new ArrayList<MRedisMqPublisher>(), MRedisMqPublisher.class, fields);
            return PageEnvelop.getSuccessListWithPage("成功获取消息发布者列表",mList,page,size,count);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return failed("获取消息发布者发生异常");
    }
    @ApiOperation("新增消息发布者")
    @RequestMapping(value = ServiceApi.Redis.MqPublisher.Save, method = RequestMethod.POST)
    public Envelop add(
            @ApiParam(value = "消息发布者JSON", required = true)
            @RequestBody String entityJson) {
        try {
            RedisMqPublisher newEntity = objectMapper.readValue(entityJson, RedisMqPublisher.class);
            newEntity = redisMqPublisherService.save(newEntity);
            // 累计发布者数
            RedisMqChannel channel = redisMqChannelService.findByChannel(newEntity.getChannel());
            channel.setPublisherNum(channel.getPublisherNum() + 1);
            redisMqChannelService.save(channel);
            MRedisMqPublisher mRedisMqPublisher = convertToModel(newEntity, MRedisMqPublisher.class);
            return ObjEnvelop.getSuccess("成功新增消息发布者",mRedisMqPublisher);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return failed("新增消息发布者发生异常");
    }
    @ApiOperation("更新消息发布者")
    @RequestMapping(value = ServiceApi.Redis.MqPublisher.Save, method = RequestMethod.PUT)
    public Envelop update(
            @ApiParam(value = "消息发布者JSON", required = true)
            @RequestBody String entityJson) {
        try {
            RedisMqPublisher updateEntity = objectMapper.readValue(entityJson, RedisMqPublisher.class);
            updateEntity = redisMqPublisherService.save(updateEntity);
            MRedisMqPublisher mRedisMqPublisher = convertToModel(updateEntity, MRedisMqPublisher.class);
            return ObjEnvelop.getSuccess("成功更新消息发布者",mRedisMqPublisher);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return failed("更新消息发布者发生异常");
    }
    @ApiOperation("删除消息发布者")
    @RequestMapping(value = ServiceApi.Redis.MqPublisher.Delete, method = RequestMethod.DELETE)
    public Envelop delete(
            @ApiParam(name = "id", value = "消息发布者ID", required = true)
            @RequestParam(value = "id") Integer id) {
        try {
            RedisMqPublisher publisher = redisMqPublisherService.getById(id);
            redisMqPublisherService.delete(id);
            // 扣减发布者数
            RedisMqChannel channel = redisMqChannelService.findByChannel(publisher.getChannel());
            channel.setPublisherNum(channel.getPublisherNum() - 1);
            redisMqChannelService.save(channel);
            return Envelop.getSuccess("成功删除消息发布者");
        } catch (Exception e) {
            e.printStackTrace();
        }
        return failed("删除消息发布者发生异常");
    }
    @ApiOperation("验证指定队列中发布者应用ID是否唯一")
    @RequestMapping(value = ServiceApi.Redis.MqPublisher.IsUniqueAppId, method = RequestMethod.GET)
    public Envelop isUniqueAppId(
            @ApiParam(name = "id", value = "消息订阅者ID", required = true)
            @RequestParam(value = "id") Integer id,
            @ApiParam(name = "channel", value = "消息队列编码", required = true)
            @RequestParam(value = "channel") String channel,
            @ApiParam(name = "appId", value = "发布者应用ID", required = true)
            @RequestParam(value = "appId") String appId) {
        try {
            boolean result = redisMqPublisherService.isUniqueAppId(id, channel, appId);
            if (!result) {
                return failed("该指定消息队列中订阅者应用ID已被使用,请重新填写!");
            }
            return Envelop.getSuccess("应用ID是唯一");
        } catch (Exception e) {
            e.printStackTrace();
        }
        return failed("发生异常");
    }
}

+ 215 - 0
svr/svr-basic/src/main/java/com/yihu/jw/basic/redis/controler/RedisMqSubscriberEndPoint.java

@ -0,0 +1,215 @@
package com.yihu.jw.basic.redis.controler;
import com.yihu.ehr.constants.ApiVersion;
import com.yihu.ehr.constants.ServiceApi;
import com.yihu.jw.basic.redis.service.RedisMqChannelService;
import com.yihu.jw.basic.redis.service.RedisMqSubscriberService;
import com.yihu.jw.entity.ehr.redis.RedisMqChannel;
import com.yihu.jw.entity.ehr.redis.RedisMqSubscriber;
import com.yihu.jw.restmodel.ehr.redis.MRedisMqSubscriber;
import com.yihu.jw.restmodel.web.Envelop;
import com.yihu.jw.restmodel.web.ListEnvelop;
import com.yihu.jw.restmodel.web.ObjEnvelop;
import com.yihu.jw.restmodel.web.PageEnvelop;
import com.yihu.jw.restmodel.web.endpoint.EnvelopRestEndpoint;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.web.bind.annotation.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
 * Redis消息订阅者 接口
 *
 * @author 张进军
 * @date 2017/11/13 15:14
 */
@RestController
@RequestMapping(value = ApiVersion.Version1_0)
@Api(description = "消息订阅者接口", tags = {"Redis消息发布订阅--消息订阅者接口"})
public class RedisMqSubscriberEndPoint extends EnvelopRestEndpoint {
    @Autowired
    private RedisMqSubscriberService redisMqSubscriberService;
    @Autowired
    private RedisMqChannelService redisMqChannelService;
    @Autowired
    private RedisMessageListenerContainer redisMessageListenerContainer;
    @ApiOperation("根据ID获取消息订阅者")
    @RequestMapping(value = ServiceApi.Redis.MqSubscriber.GetById, method = RequestMethod.GET)
    public Envelop getById(
            @ApiParam(name = "id", value = "主键", required = true)
            @PathVariable(value = "id") Integer id) {
        try {
            MRedisMqSubscriber mRedisMqSubscriber = convertToModel(redisMqSubscriberService.getById(id), MRedisMqSubscriber.class);
            return ObjEnvelop.getSuccess("成功获取消息订阅者",mRedisMqSubscriber);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return failed("获取消息订阅者发生异常");
    }
    @ApiOperation(value = "根据条件获取消息订阅者")
    @RequestMapping(value = ServiceApi.Redis.MqSubscriber.Search, method = RequestMethod.GET)
    public Envelop search(
            @ApiParam(name = "fields", value = "返回的字段,为空则返回全部字段")
            @RequestParam(value = "fields", required = false) String fields,
            @ApiParam(name = "filters", value = "筛选条件")
            @RequestParam(value = "filters", required = false) String filters,
            @ApiParam(name = "sorts", value = "排序")
            @RequestParam(value = "sorts", required = false) String sorts,
            @ApiParam(name = "page", value = "页码", defaultValue = "1")
            @RequestParam(value = "page", required = false) int page,
            @ApiParam(name = "size", value = "分页大小", defaultValue = "15")
            @RequestParam(value = "size", required = false) int size) {
        try {
            List<RedisMqSubscriber> list = redisMqSubscriberService.search(fields, filters, sorts, page, size);
            long count = redisMqSubscriberService.getCount(filters);
            List<MRedisMqSubscriber> mList = (List<MRedisMqSubscriber>) convertToModels(list, new ArrayList<MRedisMqSubscriber>(), MRedisMqSubscriber.class, fields);
            return PageEnvelop.getSuccessListWithPage("成功获取消息订阅者列表",mList,page,size,count);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return failed("获取消息订阅者发生异常");
    }
    @ApiOperation(value = "获取消息订阅者列表,不分页")
    @RequestMapping(value = ServiceApi.Redis.MqSubscriber.Prefix, method = RequestMethod.GET)
    public Envelop getSubscriberList(
            @ApiParam(name = "filters", value = "过滤器,为空检索所有条件", defaultValue = "")
            @RequestParam(value = "filters") String filters) throws Exception {
        List<RedisMqSubscriber> subscriberList = redisMqSubscriberService.search(filters);
        List<MRedisMqSubscriber> list = convertToModels(subscriberList, new ArrayList<>(subscriberList.size()), MRedisMqSubscriber.class, "");
        return ListEnvelop.getSuccess("查询成功",list);
    }
    @ApiOperation("新增消息订阅者")
    @RequestMapping(value = ServiceApi.Redis.MqSubscriber.Save, method = RequestMethod.POST)
    public Envelop add(
            @ApiParam(value = "消息订阅者JSON", required = true)
            @RequestBody String entityJson) {
        try {
            RedisMqSubscriber newEntity = objectMapper.readValue(entityJson, RedisMqSubscriber.class);
            newEntity = redisMqSubscriberService.save(newEntity);
            // 累计订阅者数
            RedisMqChannel channel = redisMqChannelService.findByChannel(newEntity.getChannel());
            channel.setSubscriberNum(channel.getSubscriberNum() + 1);
            redisMqChannelService.save(channel);
            MRedisMqSubscriber mRedisMqSubscriber = convertToModel(newEntity, MRedisMqSubscriber.class);
            return ObjEnvelop.getSuccess("成功新增消息订阅者",mRedisMqSubscriber);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return failed("新增消息订阅者发生异常");
    }
    @ApiOperation("更新消息订阅者")
    @RequestMapping(value = ServiceApi.Redis.MqSubscriber.Save, method = RequestMethod.PUT)
    public Envelop update(
            @ApiParam(value = "消息订阅者JSON", required = true)
            @RequestBody String entityJson) {
        try {
            RedisMqSubscriber updateEntity = objectMapper.readValue(entityJson, RedisMqSubscriber.class);
            updateEntity = redisMqSubscriberService.save(updateEntity);
            MRedisMqSubscriber mRedisMqSubscriber = convertToModel(updateEntity, MRedisMqSubscriber.class);
            return ObjEnvelop.getSuccess("成功更新消息订阅者",mRedisMqSubscriber);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return failed("更新消息订阅者发生异常");
    }
    @ApiOperation("删除消息订阅者")
    @RequestMapping(value = ServiceApi.Redis.MqSubscriber.Delete, method = RequestMethod.DELETE)
    public Envelop delete(
            @ApiParam(name = "id", value = "消息订阅者ID", required = true)
            @RequestParam(value = "id") Integer id) {
        try {
            RedisMqSubscriber subscriber = redisMqSubscriberService.getById(id);
            redisMqSubscriberService.delete(id);
            // 扣减订阅者数
            RedisMqChannel channel = redisMqChannelService.findByChannel(subscriber.getChannel());
            channel.setSubscriberNum(channel.getSubscriberNum() - 1);
            redisMqChannelService.save(channel);
            return Envelop.getSuccess("成功删除消息订阅者");
        } catch (Exception e) {
            e.printStackTrace();
        }
        return failed("删除消息订阅者发生异常");
    }
    @ApiOperation("验证指定消息队列中订阅者应用ID是否唯一")
    @RequestMapping(value = ServiceApi.Redis.MqSubscriber.IsUniqueAppId, method = RequestMethod.GET)
    public Envelop isUniqueAppId(
            @ApiParam(name = "id", value = "消息订阅者ID", required = true)
            @RequestParam(value = "id") Integer id,
            @ApiParam(name = "channel", value = "消息队列编码", required = true)
            @RequestParam(value = "channel") String channel,
            @ApiParam(name = "appId", value = "消息订阅者应用ID", required = true)
            @RequestParam(value = "appId") String appId) {
        try {
            boolean result = redisMqSubscriberService.isUniqueAppId(id, channel, appId);
            if (!result) {
                return failed("该指定消息队列中订阅者应用ID已被使用,请重新填写!");
            }
            return Envelop.getSuccess("应用ID是唯一");
        } catch (Exception e) {
            e.printStackTrace();
        }
        return failed("发生异常:");
    }
    @ApiOperation("验证指定消息队列中订阅者服务地址是否唯一")
    @RequestMapping(value = ServiceApi.Redis.MqSubscriber.IsUniqueSubscribedUrl, method = RequestMethod.GET)
    public Envelop isUniqueSubscribedUrl(
            @ApiParam(name = "id", value = "消息订阅者ID", required = true)
            @RequestParam(value = "id") Integer id,
            @ApiParam(name = "channel", value = "消息队列编码", required = true)
            @RequestParam(value = "channel") String channel,
            @ApiParam(name = "subscriberUrl", value = "消息订阅者服务地址", required = true)
            @RequestParam(value = "subscriberUrl") String subscriberUrl) {
        try {
            boolean result = redisMqSubscriberService.isUniqueSubscribedUrl(id, channel, subscriberUrl);
            if (!result) {
                return failed("该指定消息队列中订阅者服务地址已被使用,请重新填写!");
            }
            return Envelop.getSuccess("服务地址是唯一");
        } catch (Exception e) {
            e.printStackTrace();
        }
        return failed("发生异常:");
    }
    @ApiOperation("验证消息队列的订阅者服务地址是否存在")
    @RequestMapping(value = ServiceApi.Redis.MqSubscriber.IsExist, method = RequestMethod.GET)
    public Envelop isExist(
            @ApiParam(name = "channel", value = "消息队列编码", required = true)
            @RequestParam(value = "channel") String channel,
            @ApiParam(name = "subscriber", value = "消息订阅者服务地址", required = true)
            @RequestParam(value = "subscriber") String subscriber) {
        return ObjEnvelop.getSuccess("验证成功",redisMqSubscriberService.isExist(channel, subscriber));
    }
    @ApiOperation("取消队列的订阅者")
    @RequestMapping(value = ServiceApi.Redis.MqSubscriber.Unsubscribe, method = RequestMethod.POST)
    public Envelop unsubscribe(
            @ApiParam(name = "channel", value = "消息队列编码", required = true)
            @RequestParam(value = "channel") String channel,
            @ApiParam(name = "subscriber", value = "消息订阅者服务地址", required = false)
            @RequestParam(value = "subscriber", required = false) String subscriber) {
        redisMqSubscriberService.unsubscribe(channel, subscriber);
        return Envelop.getSuccess("取消成功");
    }
}

+ 24 - 0
svr/svr-basic/src/main/java/com/yihu/jw/basic/redis/dao/RedisMqChannelDao.java

@ -0,0 +1,24 @@
package com.yihu.jw.basic.redis.dao;
import com.yihu.jw.entity.ehr.redis.RedisMqChannel;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
/**
 * redis消息队列 DAO
 *
 * @author 张进军
 * @date 2017/11/10 11:45
 */
public interface RedisMqChannelDao extends JpaRepository<RedisMqChannel, Integer> {
    RedisMqChannel findByChannel(@Param("channel") String channel);
    @Query(" FROM RedisMqChannel rmc WHERE rmc.id <> :id AND rmc.channel = :channel ")
    RedisMqChannel isUniqueChannel(@Param("id") Integer id, @Param("channel") String channel);
    @Query(" FROM RedisMqChannel rmc WHERE rmc.id <> :id AND rmc.channelName = :channelName ")
    RedisMqChannel isUniqueChannelName(@Param("id") Integer id, @Param("channelName") String channelName);
}

+ 25 - 0
svr/svr-basic/src/main/java/com/yihu/jw/basic/redis/dao/RedisMqMessageLogDao.java

@ -0,0 +1,25 @@
package com.yihu.jw.basic.redis.dao;
import com.yihu.jw.entity.ehr.redis.RedisMqMessageLog;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import java.util.List;
/**
 * redis消息记录 DAO
 *
 * @author 张进军
 * @date 2017/11/10 11:45
 */
public interface RedisMqMessageLogDao extends JpaRepository<RedisMqMessageLog, String> {
    List<RedisMqMessageLog> findByChannel(@Param("channel") String channel);
    @Modifying
    @Query("update RedisMqMessageLog set status=:status where id=:id")
    void updateStatus(@Param("id") String id, @Param("status") Integer status);
}

+ 25 - 0
svr/svr-basic/src/main/java/com/yihu/jw/basic/redis/dao/RedisMqPublisherDao.java

@ -0,0 +1,25 @@
package com.yihu.jw.basic.redis.dao;
import com.yihu.jw.entity.ehr.redis.RedisMqPublisher;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import java.util.List;
/**
 * redis消息发布者 DAO
 *
 * @author 张进军
 * @date 2017/11/20 09:35
 */
public interface RedisMqPublisherDao extends JpaRepository<RedisMqPublisher, Integer> {
    List<RedisMqPublisher> findByChannel(@Param("channel") String channel);
    RedisMqPublisher findByChannelAndAppId(@Param("channel") String channel, @Param("appId") String appId);
    @Query(" FROM RedisMqPublisher rmp WHERE rmp.id <> :id AND rmp.channel = :channel AND rmp.appId = :appId ")
    RedisMqPublisher isUniqueAppId(@Param("id") Integer id, @Param("channel") String channel, @Param("appId") String appId);
}

+ 28 - 0
svr/svr-basic/src/main/java/com/yihu/jw/basic/redis/dao/RedisMqSubscriberDao.java

@ -0,0 +1,28 @@
package com.yihu.jw.basic.redis.dao;
import com.yihu.jw.entity.ehr.redis.RedisMqSubscriber;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import java.util.List;
/**
 * redis消息订阅者 DAO
 *
 * @author 张进军
 * @date 2017/11/10 11:45
 */
public interface RedisMqSubscriberDao extends JpaRepository<RedisMqSubscriber, Integer> {
    List<RedisMqSubscriber> findByChannel(@Param("channel") String channel);
    RedisMqSubscriber findByChannelAndAndSubscribedUrl(String channel, String subscribedUrl);
    @Query(" FROM RedisMqSubscriber rms WHERE rms.id <> :id AND rms.channel = :channel AND rms.appId = :appId ")
    RedisMqSubscriber isUniqueAppId(@Param("id") Integer id, @Param("channel") String channel, @Param("appId") String appId);
    @Query(" FROM RedisMqSubscriber rms WHERE rms.id <> :id AND rms.channel = :channel AND rms.subscribedUrl = :subscribedUrl ")
    RedisMqSubscriber isUniqueSubscribedUrl(@Param("id") Integer id, @Param("channel") String channel, @Param("subscribedUrl") String subscribedUrl);
}

+ 46 - 0
svr/svr-basic/src/main/java/com/yihu/jw/basic/redis/pubsub/CustomMessageListenerAdapter.java

@ -0,0 +1,46 @@
package com.yihu.jw.basic.redis.pubsub;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
/**
 * 重写父类 MessageListenerAdapter 的 equals() 方法
 *
 * @author 张进军
 * @date 2017/11/16 17:33
 */
public class CustomMessageListenerAdapter extends MessageListenerAdapter {
    // 消息队列编码
    private String channel;
    public CustomMessageListenerAdapter(DefaultMessageDelegate delegate) {
        super.setDelegate(delegate);
        this.channel = super.getDelegate().toString();
    }
    public int hashCode() {
        return this.channel.hashCode();
    }
    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        } else if (obj == null) {
            return false;
        } else if (!(obj instanceof CustomMessageListenerAdapter)) {
            return false;
        } else {
            CustomMessageListenerAdapter other = (CustomMessageListenerAdapter) obj;
            if (this.channel == null) {
                if (other.channel != null) {
                    return false;
                }
            } else if (!this.channel.equals(other.channel)) {
                return false;
            }
            return true;
        }
    }
}

+ 129 - 0
svr/svr-basic/src/main/java/com/yihu/jw/basic/redis/pubsub/DefaultMessageDelegate.java

@ -0,0 +1,129 @@
package com.yihu.jw.basic.redis.pubsub;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yihu.jw.basic.redis.service.RedisMqSubscriberService;
import com.yihu.jw.entity.ehr.redis.RedisMqMessageLog;
import com.yihu.jw.entity.ehr.redis.RedisMqSubscriber;
import com.yihu.jw.profile.queue.RedisCollection;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.web.client.RestTemplate;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
/**
 * Redis 订阅发布的消息代理
 *
 * @author 张进军
 * @date 2017/11/3 10:51
 */
public class DefaultMessageDelegate implements MessageDelegate {
    private static final Logger logger = Logger.getLogger(DefaultMessageDelegate.class);
    @Resource
    RedisTemplate<String, Object> redisTemplate;
    @Autowired
    private RestTemplate restTemplate;
    @Autowired
    private ObjectMapper objectMapper;
    @Autowired
    private RedisMqSubscriberService redisMqSubscriberService;
    @Override
    public void handleMessage(String message, String channel) {
        try {
            Map<String, Object> messageMap = objectMapper.readValue(message, Map.class);
            String messageId = messageMap.get("messageId").toString();
            String publisherAppId = messageMap.get("publisherAppId").toString();
            String messageContent = messageMap.get("messageContent").toString();
            List<RedisMqSubscriber> subscriberList = redisMqSubscriberService.findByChannel(channel);
            if (subscriberList.size() == 0) {
                // 收集订阅成功的消息,定时任务里累计出列数
                RedisMqMessageLog messageLog = MessageCommonBiz.newMessageLog(channel, publisherAppId, messageContent);
                redisTemplate.opsForList().leftPush(RedisCollection.SUB_SUCCESSFUL_MESSAGES, objectMapper.writeValueAsString(messageLog));
            } else {
                // 遍历消息队列的订阅者,并推送消息
                for (RedisMqSubscriber subscriber : subscriberList) {
                    String subscribedUrl = subscriber.getSubscribedUrl();
                    try {
                        // 推送消息到指定服务地址
                        HttpHeaders headers = new HttpHeaders();
                        headers.setContentType(MediaType.APPLICATION_JSON_UTF8);
                        HttpEntity<String> entity = new HttpEntity<>(messageContent, headers);
                        restTemplate.postForObject(subscribedUrl, entity, String.class);
                        // 收集订阅成功的消息,定时任务里累计出列数
                        RedisMqMessageLog messageLog = MessageCommonBiz.newMessageLog(channel, publisherAppId, messageContent);
                        if (!StringUtils.isEmpty(messageId)) {
                            // 首次订阅失败,但重试订阅成功场合
                            messageLog.setId(messageId);
                        }
                        redisTemplate.opsForList().leftPush(RedisCollection.SUB_SUCCESSFUL_MESSAGES, objectMapper.writeValueAsString(messageLog));
                    } catch (Exception e) {
                        e.printStackTrace();
                        // 收集订阅失败的消息
                        RedisMqMessageLog messageLog = MessageCommonBiz.newMessageLog(channel, publisherAppId, messageContent);
                        messageLog.setErrorMsg(e.toString());
                        if (!StringUtils.isEmpty(messageId)) {
                            // 非头次订阅失败
                            messageLog.setId(messageId);
                            // 通过 -1 标记为非头次订阅失败,定时任务里累计更新失败次数。
                            messageLog.setFailedNum(-1);
                        }
                        redisTemplate.opsForList().leftPush(RedisCollection.SUB_FAILED_MESSAGES, objectMapper.writeValueAsString(messageLog));
                        break;
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    // 消息队列编码
    private String channel;
    public DefaultMessageDelegate(String channel) {
        this.channel = channel;
    }
    public int hashCode() {
        return this.channel.hashCode();
    }
    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        } else if (obj == null) {
            return false;
        } else if (!(obj instanceof DefaultMessageDelegate)) {
            return false;
        } else {
            DefaultMessageDelegate other = (DefaultMessageDelegate) obj;
            if (this.channel == null) {
                if (other.channel != null) {
                    return false;
                }
            } else if (!this.channel.equals(other.channel)) {
                return false;
            }
            return true;
        }
    }
    public String toString() {
        return this.channel;
    }
}

+ 46 - 0
svr/svr-basic/src/main/java/com/yihu/jw/basic/redis/pubsub/MessageCommonBiz.java

@ -0,0 +1,46 @@
package com.yihu.jw.basic.redis.pubsub;
import com.yihu.jw.entity.ehr.redis.RedisMqMessageLog;
import com.yihu.jw.lang.SpringContext;
import com.yihu.jw.util.id.UuidUtil;
/**
 * 消息发布订阅共通业务方法
 *
 * @author 张进军
 * @date 2017/11/20 17:03
 */
public class MessageCommonBiz {
    /**
     *  实例化一个 RedisMqMessageLog
     * @param channel 消息队列编码
     * @param publisherAppId 发布者应用ID
     * @param message 要发布的消息
     * @return RedisMqMessageLog
     */
    public static RedisMqMessageLog newMessageLog(String channel, String publisherAppId, String message) {
        RedisMqMessageLog redisMqMessageLog = new RedisMqMessageLog();
        redisMqMessageLog.setId(UuidUtil.randomUUID());
        redisMqMessageLog.setChannel(channel);
        redisMqMessageLog.setPublisherAppId(publisherAppId);
        redisMqMessageLog.setMessage(message);
        redisMqMessageLog.setStatus(0);
        redisMqMessageLog.setFailedNum(0);
        return redisMqMessageLog;
    }
    /**
     *  实例化一个 CustomMessageListenerAdapter
     * @param channel 消息队列编码
     * @return CustomMessageListenerAdapter
     */
    public static CustomMessageListenerAdapter newCustomMessageListenerAdapter(String channel) {
        DefaultMessageDelegate defaultMessageDelegate = new DefaultMessageDelegate(channel);
        SpringContext.autowiredBean(defaultMessageDelegate);
        CustomMessageListenerAdapter messageListener = new CustomMessageListenerAdapter(defaultMessageDelegate);
        SpringContext.autowiredBean(messageListener);
        return messageListener;
    }
}

+ 13 - 0
svr/svr-basic/src/main/java/com/yihu/jw/basic/redis/pubsub/MessageDelegate.java

@ -0,0 +1,13 @@
package com.yihu.jw.basic.redis.pubsub;
/**
 * Redis 订阅发布的消息代理接口
 *
 * @author 张进军
 * @date 2017/11/2 13:43
 */
public interface MessageDelegate {
    void handleMessage(String message, String channel);
}

+ 182 - 0
svr/svr-basic/src/main/java/com/yihu/jw/basic/redis/pubsub/PubSubMessageJob.java

@ -0,0 +1,182 @@
package com.yihu.jw.basic.redis.pubsub;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yihu.jw.basic.redis.service.RedisMqChannelService;
import com.yihu.jw.basic.redis.service.RedisMqMessageLogService;
import com.yihu.jw.entity.ehr.redis.RedisMqChannel;
import com.yihu.jw.entity.ehr.redis.RedisMqMessageLog;
import com.yihu.jw.profile.queue.RedisCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.Map;
/**
 * 定时定量处理消息,包括:
 * - 发送消息。
 * - 重试订阅失败的消息。
 * - 处理订阅成功的消息。
 *
 * @author 张进军
 * @date 2018/5/7 14:01
 */
@Component
public class PubSubMessageJob {
    private Logger logger = LoggerFactory.getLogger(PubSubMessageJob.class);
    // 最大次数尝试重发订阅失败消息
    private final int maxFailedNum = 3;
    // 每次定时最多操作的消息数量
    private final int onceNum = 1000;
    @Autowired
    RedisMqChannelService redisMqChannelService;
    @Autowired
    RedisMqMessageLogService redisMqMessageLogService;
    @Resource
    RedisTemplate<String, Object> redisTemplate;
    @Autowired
    ObjectMapper objectMapper;
    /*
     * 发送消息
     */
    @Scheduled(initialDelay = 1000, fixedDelay = 5000)
    public void send() {
        try {
            int num = 0;
            while (true) {
                Object msgObj = redisTemplate.opsForList().rightPop(RedisCollection.PUB_WAITING_MESSAGES);
                if (msgObj == null) {
                    break;
                }
                Map<String, Object> messageMap = objectMapper.readValue(String.valueOf(msgObj), Map.class);
                String channel = messageMap.get("channel").toString();
                // 发布消息
                redisTemplate.convertAndSend(channel, objectMapper.writeValueAsString(messageMap));
                // 累计入列数
                RedisMqChannel afterChannel = updateChannelQueueNumber(channel, "Enqueued");
                num++;
                if (num == onceNum) {
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    /*
     * 重试订阅失败的消息
     */
    @Scheduled(initialDelay = 1000, fixedDelay = 5000)
    public void resend() {
        try {
            int num = 0;
            while (true) {
                Object msgObj = redisTemplate.opsForList().rightPop(RedisCollection.SUB_FAILED_MESSAGES);
                if (msgObj == null) {
                    break;
                }
                boolean valid = true;
                RedisMqMessageLog cacheMessageLog = objectMapper.readValue(String.valueOf(msgObj), RedisMqMessageLog.class);
                if (cacheMessageLog.getFailedNum() == 0) {
                    // 头次订阅失败,则保存到数据库中
                    cacheMessageLog.setFailedNum(1);
                    redisMqMessageLogService.save(cacheMessageLog);
                } else if (cacheMessageLog.getFailedNum() == -1) {
                    // 累计订阅失败次数
                    RedisMqMessageLog dbMessageLog = redisMqMessageLogService.getById(cacheMessageLog.getId());
                    if (dbMessageLog.getFailedNum() >= maxFailedNum) {
                        // 大于等于最大尝试失败数,则不再重试。
                        valid = false;
                    } else {
                        dbMessageLog.setFailedNum(dbMessageLog.getFailedNum() + 1);
                        dbMessageLog.setErrorMsg(cacheMessageLog.getErrorMsg());
                        redisMqMessageLogService.save(dbMessageLog);
                    }
                }
                if (valid) {
                    // 将消息加入到待发缓存集合中
                    redisMqChannelService.addToPubWaitingMessage(cacheMessageLog.getPublisherAppId(),
                            cacheMessageLog.getChannel(), cacheMessageLog.getMessage(), cacheMessageLog.getId());
                    num++;
                    if (num == onceNum) {
                        break;
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    /*
     * 处理订阅成功的消息。
     * 累计出列数、更新重试成功的订阅失败消息的状态。
     */
    @Scheduled(initialDelay = 1000, fixedDelay = 5000)
    public void update() {
        try {
            int num = 0;
            while (true) {
                Object msgObj = redisTemplate.opsForList().rightPop(RedisCollection.SUB_SUCCESSFUL_MESSAGES);
                if (msgObj == null) {
                    break;
                }
                RedisMqMessageLog cacheMessageLog = objectMapper.readValue(String.valueOf(msgObj), RedisMqMessageLog.class);
                RedisMqMessageLog dbMessageLog = redisMqMessageLogService.getById(cacheMessageLog.getId());
                if (dbMessageLog != null && dbMessageLog.getStatus() == 0) {
                    // 更新重试成功的订阅失败消息的状态
                    redisMqMessageLogService.updateStatus(cacheMessageLog.getId(), 1);
                }
                // 累计出列数
                RedisMqChannel afterChannel = updateChannelQueueNumber(cacheMessageLog.getChannel(), "Dequeued");
                num++;
                if (num == onceNum) {
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    /**
     * 累计 channel 的出入列数
     *
     * @param channel 队列编码
     * @param type    出入列类型标识符
     * @return RedisMqChannel
     */
    private synchronized RedisMqChannel updateChannelQueueNumber(String channel, String type) {
        RedisMqChannel mqChannelAfter = null;
        RedisMqChannel mqChannel = redisMqChannelService.findByChannel(channel);
        if ("Dequeued".equals(type)) {
            // 累计出列数
            mqChannel.setDequeuedNum(mqChannel.getDequeuedNum() + 1);
            mqChannelAfter = redisMqChannelService.save(mqChannel);
        } else if ("Enqueued".equals(type)) {
            // 累计入列数
            mqChannel.setEnqueuedNum(mqChannel.getEnqueuedNum() + 1);
            mqChannelAfter = redisMqChannelService.save(mqChannel);
        }
        return mqChannelAfter;
    }
}

+ 145 - 0
svr/svr-basic/src/main/java/com/yihu/jw/basic/redis/service/RedisMqChannelService.java

@ -0,0 +1,145 @@
package com.yihu.jw.basic.redis.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yihu.jw.basic.redis.dao.RedisMqChannelDao;
import com.yihu.jw.basic.redis.dao.RedisMqPublisherDao;
import com.yihu.jw.entity.ehr.redis.RedisMqChannel;
import com.yihu.jw.entity.ehr.redis.RedisMqPublisher;
import com.yihu.jw.mysql.query.BaseJpaService;
import com.yihu.jw.profile.queue.RedisCollection;
import com.yihu.jw.restmodel.web.Envelop;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
/**
 * redis消息队列 Service
 *
 * @author 张进军
 * @date 2017/11/10 11:45
 */
@Service
public class RedisMqChannelService extends BaseJpaService<RedisMqChannel, RedisMqChannelDao> {
    private static final Logger logger = Logger.getLogger(RedisMqChannelService.class);
    @Autowired
    RedisMqChannelDao redisMqChannelDao;
    @Autowired
    RedisMqPublisherDao redisMqPublisherDao;
    @Autowired
    RedisMqMessageLogService redisMqMessageLogService;
    @Autowired
    RedisMqChannelService redisMqChannelService;
    @Resource
    RedisTemplate<String, Object> redisTemplate;
    @Autowired
    ObjectMapper objectMapper;
    public RedisMqChannel getById(Integer id) {
        return redisMqChannelDao.getOne(id);
    }
    public RedisMqChannel findByChannel(String channel) {
        return redisMqChannelDao.findByChannel(channel);
    }
    @Transactional(readOnly = false)
    public RedisMqChannel save(RedisMqChannel redisMqChannel) {
        return redisMqChannelDao.save(redisMqChannel);
    }
    @Transactional(readOnly = false)
    public void delete(Integer id) {
        redisMqChannelDao.deleteById(id);
    }
    public Boolean isUniqueChannel(Integer id, String channel) {
        RedisMqChannel redisMqChannel = redisMqChannelDao.isUniqueChannel(id, channel);
        if (redisMqChannel == null) {
            return true;
        } else {
            return false;
        }
    }
    public Boolean isUniqueChannelName(Integer id, String channelName) {
        RedisMqChannel redisMqChannel = redisMqChannelDao.isUniqueChannelName(id, channelName);
        if (redisMqChannel == null) {
            return true;
        } else {
            return false;
        }
    }
    public Boolean isExist(String channel) {
        RedisMqChannel channels = redisMqChannelDao.findByChannel(channel);
        return channels != null;
    }
    /**
     * 发布消息
     *
     * @param publisherAppId 发布者应用ID
     * @param channel        消息队列编码
     * @param message        消息
     * @return Envelop
     */
    public Envelop sendMessage(String publisherAppId, String channel, String message) {
        Envelop envelop = new Envelop();
        try {
            // 判断消息队列是否注册
            RedisMqChannel redisMqChannel = redisMqChannelDao.findByChannel(channel);
            if (redisMqChannel == null) {
                envelop.setStatus(-1);
                envelop.setMessage("消息队列 " + channel + " 还未注册,需要先注册才能往队列发布消息。");
                return envelop;
            }
            // 判断队列是否绑定发布者
            RedisMqPublisher redisMqPublisher = redisMqPublisherDao.findByChannelAndAppId(channel, publisherAppId);
            if (redisMqPublisher == null) {
                envelop.setStatus(-1);
                envelop.setMessage("消息队列 " + channel + " 中没绑定过应用ID为 " + publisherAppId + " 的发布者,需要先绑定才能发布消息。");
                return envelop;
            }
            // 将消息加入到待发缓存集合中
            addToPubWaitingMessage(publisherAppId, channel, message, "");
        } catch (Exception e) {
            e.printStackTrace();
            envelop.setStatus(-1);
            envelop.setMessage("发布消息发生异常:" + e.getMessage());
        }
        return Envelop.getSuccess("发布成功");
    }
    /**
     * 将消息加入到待发缓存集合中
     *
     * @param publisherAppId 发布者应用ID
     * @param channel        消息队列编码
     * @param message        消息
     * @param messageId      消息ID,订阅失败重发时才有值,不然为空字符串,即首次发送必为空字符串。
     * @throws JsonProcessingException
     */
    public void addToPubWaitingMessage(String publisherAppId, String channel, String message, String messageId)
            throws JsonProcessingException {
        Map<String, Object> messageMap = new HashMap<>();
        messageMap.put("messageId", messageId);
        messageMap.put("channel", channel);
        messageMap.put("publisherAppId", publisherAppId);
        messageMap.put("messageContent", message);
        redisTemplate.opsForList().leftPush(RedisCollection.PUB_WAITING_MESSAGES, objectMapper.writeValueAsString(messageMap));
    }
}

+ 43 - 0
svr/svr-basic/src/main/java/com/yihu/jw/basic/redis/service/RedisMqMessageLogService.java

@ -0,0 +1,43 @@
package com.yihu.jw.basic.redis.service;
import com.yihu.jw.basic.redis.dao.RedisMqMessageLogDao;
import com.yihu.jw.entity.ehr.redis.RedisMqMessageLog;
import com.yihu.jw.mysql.query.BaseJpaService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
/**
 * redis消息记录 Service
 *
 * @author 张进军
 * @date 2017/11/10 11:45
 */
@Service
@Transactional
public class RedisMqMessageLogService extends BaseJpaService<RedisMqMessageLog, RedisMqMessageLogDao> {
    @Autowired
    RedisMqMessageLogDao redisMqMessageLogDao;
    public RedisMqMessageLog getById(String id) {
        return redisMqMessageLogDao.getOne(id);
    }
    public List<RedisMqMessageLog> findByChannel(String channel) {
        return redisMqMessageLogDao.findByChannel(channel);
    }
    @Transactional(readOnly = false)
    public RedisMqMessageLog save(RedisMqMessageLog redisMqMessageLog) {
        return redisMqMessageLogDao.save(redisMqMessageLog);
    }
    @Transactional(readOnly = false)
    public void updateStatus(String id, Integer status) {
        redisMqMessageLogDao.updateStatus(id, status);
    }
}

+ 56 - 0
svr/svr-basic/src/main/java/com/yihu/jw/basic/redis/service/RedisMqPublisherService.java

@ -0,0 +1,56 @@
package com.yihu.jw.basic.redis.service;
import com.yihu.jw.basic.redis.dao.RedisMqPublisherDao;
import com.yihu.jw.entity.ehr.redis.RedisMqPublisher;
import com.yihu.jw.mysql.query.BaseJpaService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
/**
 * redis消息发布者 Service
 *
 * @author 张进军
 * @date 2017/11/20 09:35
 */
@Service
@Transactional
public class RedisMqPublisherService extends BaseJpaService<RedisMqPublisher, RedisMqPublisherDao> {
    @Autowired
    RedisMqPublisherDao redisMqPublisherDao;
    public RedisMqPublisher getById(Integer id) {
        return redisMqPublisherDao.getOne(id);
    }
    public List<RedisMqPublisher> findByChannel(String channel) {
        return redisMqPublisherDao.findByChannel(channel);
    }
    public RedisMqPublisher findByChannelAndAppId(String channel, String appId) {
        return redisMqPublisherDao.findByChannelAndAppId(channel, appId);
    }
    @Transactional(readOnly = false)
    public RedisMqPublisher save(RedisMqPublisher redisMqChannel) {
        return redisMqPublisherDao.save(redisMqChannel);
    }
    @Transactional(readOnly = false)
    public void delete(Integer id) {
        redisMqPublisherDao.deleteById(id);
    }
    public Boolean isUniqueAppId(Integer id, String channel, String appId) {
        RedisMqPublisher redisMqPublisher = redisMqPublisherDao.isUniqueAppId(id, channel, appId);
        if (redisMqPublisher == null) {
            return true;
        } else {
            return false;
        }
    }
}

+ 78 - 0
svr/svr-basic/src/main/java/com/yihu/jw/basic/redis/service/RedisMqSubscriberService.java

@ -0,0 +1,78 @@
package com.yihu.jw.basic.redis.service;
import com.yihu.jw.basic.redis.dao.RedisMqSubscriberDao;
import com.yihu.jw.entity.ehr.redis.RedisMqSubscriber;
import com.yihu.jw.mysql.query.BaseJpaService;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
/**
 * redis消息订阅者 Service
 *
 * @author 张进军
 * @date 2017/11/10 11:45
 */
@Service
@Transactional
public class RedisMqSubscriberService extends BaseJpaService<RedisMqSubscriber, RedisMqSubscriberDao> {
    @Autowired
    RedisMqSubscriberDao redisMqSubscriberDao;
    public RedisMqSubscriber getById(Integer id) {
        return redisMqSubscriberDao.getOne(id);
    }
    public List<RedisMqSubscriber> findByChannel(String channel) {
        return redisMqSubscriberDao.findByChannel(channel);
    }
    @Transactional(readOnly = false)
    public RedisMqSubscriber save(RedisMqSubscriber redisMqChannel) {
        return redisMqSubscriberDao.save(redisMqChannel);
    }
    @Transactional(readOnly = false)
    public void delete(Integer id) {
        redisMqSubscriberDao.deleteById(id);
    }
    public Boolean isUniqueAppId(Integer id, String channel, String appId) {
        RedisMqSubscriber redisMqSubscriber = redisMqSubscriberDao.isUniqueAppId(id, channel, appId);
        if (redisMqSubscriber == null) {
            return true;
        } else {
            return false;
        }
    }
    public Boolean isUniqueSubscribedUrl(Integer id, String channel, String subscriberUrl) {
        RedisMqSubscriber redisMqSubscriber = redisMqSubscriberDao.isUniqueSubscribedUrl(id, channel, subscriberUrl);
        if (redisMqSubscriber == null) {
            return true;
        } else {
            return false;
        }
    }
    public Boolean isExist(String channel, String subscriber) {
        RedisMqSubscriber redisMqSubscriber = redisMqSubscriberDao.findByChannelAndAndSubscribedUrl(channel, subscriber);
        return redisMqSubscriber != null;
    }
    public void unsubscribe(String channel, String subscriber) {
        List<RedisMqSubscriber> subscriberList = redisMqSubscriberDao.findByChannel(channel);
        subscriberList.forEach(redisMqSubscriber -> {
            if (StringUtils.isEmpty(subscriber)) {//取消所有订阅者
                redisMqSubscriberDao.delete(redisMqSubscriber);
            } else if (subscriber.equals(redisMqSubscriber.getSubscribedUrl())) {
                redisMqSubscriberDao.delete(redisMqSubscriber);//取消指定订阅者
            }
        });
    }
}

+ 1 - 2
svr/svr-basic/src/main/java/com/yihu/jw/basic/standard/service/standard/StdDatasetService.java

@ -288,14 +288,13 @@ public class StdDatasetService extends SQLGeneralDAO {
            }
        }
        String sql = sqlCreator.selectData(tableName);
        sql = sql.replace("STD_ID,","STD_ID as standardId,");
        if (limit != null) {
            if (offset != null &&offset>0) {
                sql += " limit "+(offset - 1) * limit+","+limit;
            }else {
                sql += " limit "+limit;
            }
        }
        return jdbcTemplate.query(sql,new BeanPropertyRowMapper<>(tClass));
    }