MessageManager.java 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. package com.yihu.ehr.kafka;
  2. import com.yihu.ehr.redis.pubsub.entity.RedisMqChannel;
  3. import com.yihu.ehr.redis.pubsub.entity.RedisMqSubscriber;
  4. import com.yihu.ehr.redis.pubsub.service.RedisMqChannelService;
  5. import com.yihu.ehr.redis.pubsub.service.RedisMqSubscriberService;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.stereotype.Component;
  8. import java.util.*;
  9. import java.util.concurrent.ExecutorService;
  10. import java.util.concurrent.Executors;
  11. /**
  12. * Created by progr1mmer on 2018/8/2.
  13. */
  14. //@Component
  15. public class MessageManager {
  16. private ExecutorService executorService;
  17. @Autowired
  18. private RedisMqChannelService redisMqChannelService;
  19. @Autowired
  20. private RedisMqSubscriberService redisMqSubscriberService;
  21. public void initConsumer () {
  22. try {
  23. Map<String, Set<String>> groups = new HashMap<>();
  24. List<RedisMqChannel> channels = redisMqChannelService.search("");
  25. for (RedisMqChannel channel : channels) {
  26. List<RedisMqSubscriber> redisMqSubscribers = redisMqSubscriberService.search("channel=" + channel.channel);
  27. redisMqSubscribers.forEach(item -> {
  28. if (groups.containsKey(item.getAppId())) {
  29. groups.get(item.getAppId()).add(item.getChannel());
  30. } else {
  31. Set<String> topics = new HashSet<>();
  32. topics.add(item.getChannel());
  33. groups.put(item.getAppId(), topics);
  34. }
  35. });
  36. }
  37. executorService = Executors.newFixedThreadPool(groups.size());
  38. for (String key : groups.keySet()) {
  39. executorService.execute(new ConsumerRunner(key, groups.get(key)));
  40. }
  41. } catch (Exception e) {
  42. e.printStackTrace();
  43. System.exit(-1);
  44. }
  45. }
  46. }