/* * redis发布订阅 *example: * let channel="ryan"; redis.pubSub.registerHandlers("ryan",msg=> console.log(msg)); redis.pubSub.subscribe(channel); redis.pubSub.publish(channel,"hello from chen"); */ "use strict"; let configFile = require('../../include/commons').CONFIG_FILE; let config = require('../../resources/config/' + configFile); let RedisModel = require('./../redis.model'); let RedisSubClient = require('./redisSubClient'); let RedisPubClient = require('./redisPubClient'); let RedisClient = require('../../repository/redis/redis.client.js'); let redisPubConn = RedisPubClient.redisClient().connection; let redisSubConn = RedisSubClient.redisClient().connection; let redis = RedisClient.redisClient().connection; let Sessions = require('../../models/sessions/sessions'); let WechatClient = require("../client/wechat.client.js"); let WlyySDK = require("../../util/wlyy.sdk"); let AppClient = require("../client/app.client.js"); let Participants = require('../sessions/participants'); let Messages = require('../messages/messages'); const REDIS_KEYS = require('../../include/commons').REDIS_KEYS; let ObjectUtil = require("../../util/object.util.js"); let logger = require('../../util/log.js'); class PubSub{ constructor(){ this.sub=redisSubConn; this.handlers=new Map(); this.subAction=(channle,message)=>{ let actions= this.handlers.get(channle)||new Set(); for(let action of actions) { console.log("接收消息:"+message); message = JSON.parse(message); message.timestamp = new Date(message.timestamp); let sessionId = message.session_id; let participants = new Participants(); let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId); let sessionType =0; // 检查会话中是否存在此成员 participants.existsParticipant(sessionId, message.sender_id, function (err, res) { if (res) { redis.hmgetAsync(sessionKey, ["type", "name"]).then(function (res) { sessionType = res[0]; let sessionName = res[1]; if (sessionType) { // 消息保存到Redis,并更新会话最后状态、用户最后消息获取时间 let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId); let messageKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId); let messageTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId); let msgJson = { id: message.id, sender_id: message.sender_id, sender_name: message.sender_name, timestamp: message.timestamp.getTime(), content_type: message.content_type, content: message.content, business_type:message.business_type||1 }; redis.multi() .hset(messageKey, message.id, JSON.stringify(msgJson)) // 保存消息 .zadd(messageTimestampKey, message.timestamp.getTime(), message.id) // 保存消息时间 .execAsync() .then(function (res) { Messages.updateLastContent(sessionKey, sessionType, null, message); Messages.cleanOutRangeMessage(sessionId); // clean out range messages }) .catch(function (ex) { logger.error("Save message to redis failed: ", ex); }); Messages.updateLastContent(sessionKey, sessionType, sessionName, message); var score = message.timestamp.getTime() + 1; let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId); redis.zaddAsync(participantsKey, score, message.sender_id) .then(function (res) { //这个代码需要消息存入缓存后才能执行,否则用户拉取消息列表时可能缺少消息 if(config.pubSubSwitch){//接收订阅消息处理开关,本地运行和测试库单独运行时防止用户接收消息2次 //Sessions.getRedisPushNotification(message);这里不知为什么无法调用这个方法,提示getRedisPushNotification不是方法 if (message.targetType=='patient') { if(config.environment!='local'){//pc版接收要发给居民的消息不做处理 WechatClient.sendMessage(message.targetUserId, message.targetUserName, message); } } else { if(message.sessionType=="1"){ WechatClient.sendReadDoctorByDoctorId(message.targetUserId, message); } //告知医生新消息 WechatClient.sendSocketMessageToDoctor(message.targetUserId,message); if(config.environment!='local'){//pc版不推送个推 WlyySDK.request(message.targetUserId, '', '', '', '/im/common/message/messages', 'POST', function (err, res) { let count = 0; res = JSON.parse(res) if (res.status == 200) { let data = res.data; count = parseInt(JSON.parse(data.imMsgCount).count) + parseInt(data.system.amount) + parseInt(data.healthIndex.amount) + parseInt(data.sign.amount); } AppClient.sendNotification(message.targetUserId, message,message.sessionType,count); }); } //外网pcim通过socket推送 WechatClient.sendPcImSocket(message.targetUserId,message,message.sessionType); } } }) .catch(function (err) { logger.error("Update participant last fetch time failed: ", err); }); } }).catch(function (err) { logger.error({message: "Error occurred while save message to session: " + err}); }) } else { logger.error({message: "当前会话找不到此发送者"}); } }); //action(message); } } this.alredyPublishs=[]; this.subConnected=false; } publish(channel,message) { let action=()=>{ let pub=redisPubConn; pub.publish(channel,message); console.log("发布消息:channel:"+channel+",message:"+message); }; if(this.subConnected===false) { this.alredyPublishs.push(action); } else{ action(); } } registerHandlers(channel,action) { var actions=this.handlers.get(channel)||new Set(); actions.add(action); this.handlers.set(channel,actions); } subscribe(channel) { let self=this; this.sub.subscribe(channel,function (err,reply) { if(err){ log.error(err); } self.subConnected=true; for(let publish of self.alredyPublishs){ publish(); } console.log("订阅成功:"+reply); }); this.sub.on("message", function (channel, message) { self.subAction(channel,message); }); } tearDown() { this.sub.quit(); } } // Expose class module.exports = new PubSub();