pubSub.js 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. /*
  2. * redis发布订阅
  3. *example:
  4. * let channel="ryan";
  5. redis.pubSub.registerHandlers("ryan",msg=> console.log(msg));
  6. redis.pubSub.subscribe(channel);
  7. redis.pubSub.publish(channel,"hello from chen");
  8. */
  9. "use strict";
  10. let configFile = require('../../include/commons').CONFIG_FILE;
  11. let config = require('../../resources/config/' + configFile);
  12. let RedisModel = require('./../redis.model');
  13. let RedisSubClient = require('./redisSubClient');
  14. let RedisPubClient = require('./redisPubClient');
  15. let redisPubConn = RedisPubClient.redisClient().connection;
  16. let redisSubConn = RedisSubClient.redisClient().connection;
  17. let Sessions = require('../sessions/sessions.js');
  18. let WechatClient = require("../client/wechat.client.js");
  19. let WlyySDK = require("../../util/wlyy.sdk");
  20. let AppClient = require("../client/app.client.js");
  21. class PubSub{
  22. constructor(){
  23. this.sub=redisSubConn;
  24. this.handlers=new Map();
  25. this.subAction=(channle,message)=>{
  26. let actions= this.handlers.get(channle)||new Set();
  27. for(let action of actions)
  28. {
  29. console.log("接收消息:"+message);
  30. if(config.pubSubSwitch){//接收订阅消息处理开关,本地运行和测试库单独运行时防止用户接收消息2次
  31. message = JSON.parse(message);
  32. //Sessions.getRedisPushNotification(message);这里不知为什么无法调用这个方法,提示getRedisPushNotification不是方法
  33. if (message.targetType=='patient') {
  34. if(config.environment!='prodPC'){//pc版接收要发给居民的消息不做处理
  35. WechatClient.sendMessage(message.targetUserId, message.targetUserName, message);
  36. }
  37. } else {
  38. if(message.sessionType=="1"){
  39. WechatClient.sendReadDoctorByDoctorId(message.targetUserId, message);
  40. }
  41. //告知医生新消息
  42. WechatClient.sendSocketMessageToDoctor(message.targetUserId,message);
  43. if(config.environment!='prodPC'){//pc版不推送个推
  44. WlyySDK.request(message.targetUserId, '', '', '', '/im/common/message/messages', 'POST', function (err, res) {
  45. let count = 0;
  46. res = JSON.parse(res)
  47. if (res.status == 200) {
  48. let data = res.data;
  49. count = parseInt(JSON.parse(data.imMsgCount).count) + parseInt(data.system.amount) + parseInt(data.healthIndex.amount) + parseInt(data.sign.amount);
  50. }
  51. AppClient.sendNotification(message.targetUserId, message,message.sessionType,count);
  52. });
  53. }
  54. }
  55. }
  56. //action(message);
  57. }
  58. }
  59. this.alredyPublishs=[];
  60. this.subConnected=false;
  61. }
  62. publish(channel,message)
  63. {
  64. let action=()=>{
  65. let pub=redisPubConn;
  66. pub.publish(channel,message);
  67. console.log("发布消息:channel:"+channel+",message:"+message);
  68. };
  69. if(this.subConnected===false)
  70. {
  71. this.alredyPublishs.push(action);
  72. }
  73. else{
  74. action();
  75. }
  76. }
  77. registerHandlers(channel,action)
  78. {
  79. var actions=this.handlers.get(channel)||new Set();
  80. actions.add(action);
  81. this.handlers.set(channel,actions);
  82. }
  83. subscribe(channel)
  84. {
  85. let self=this;
  86. this.sub.subscribe(channel,function (err,reply) {
  87. if(err){
  88. log.error(err);
  89. }
  90. self.subConnected=true;
  91. for(let publish of self.alredyPublishs){
  92. publish();
  93. }
  94. console.log("订阅成功:"+reply);
  95. });
  96. this.sub.on("message", function (channel, message) {
  97. self.subAction(channel,message);
  98. });
  99. }
  100. tearDown()
  101. {
  102. this.sub.quit();
  103. }
  104. }
  105. // Expose class
  106. module.exports = new PubSub();