/* * redis发布订阅 *example: * let channel="ryan"; redis.pubSub.registerHandlers("ryan",msg=> console.log(msg)); redis.pubSub.subscribe(channel); redis.pubSub.publish(channel,"hello from chen"); */ "use strict"; let configFile = require('../../include/commons').CONFIG_FILE; let config = require('../../resources/config/' + configFile); let RedisModel = require('./../redis.model'); let RedisSubClient = require('./redisSubClient'); let RedisPubClient = require('./redisPubClient'); let RedisClient = require('../../repository/redis/redis.client.js'); let redisPubConn = RedisPubClient.redisClient().connection; let redisSubConn = RedisSubClient.redisClient().connection; let redis = RedisClient.redisClient().connection; let Sessions = require('../../models/sessions/sessions'); let WechatClient = require("../client/wechat.client.js"); let WlyySDK = require("../../util/wlyy.sdk"); let AppClient = require("../client/app.client.js"); let Participants = require('../sessions/participants'); let Messages = require('../messages/messages'); const REDIS_KEYS = require('../../include/commons').REDIS_KEYS; let ObjectUtil = require("../../util/object.util.js"); let logger = require('../../util/log.js'); let SessionRepo = require('../../repository/mysql/session.repo'); let ParticipantRepo = require('../../repository/mysql/participant.repo'); const SESSION_TYPES = require('../../include/commons').SESSION_TYPES; const SESSION_BUSINESS_TYPE = require('../../include/commons').SESSION_BUSINESS_TYPE; class PubSub{ constructor(){ this.sub=redisSubConn; this.handlers=new Map(); this.subAction=(channle,message)=>{ let actions= this.handlers.get(channle)||new Set(); for(let action of actions) { console.log("接收消息:"+message); message = JSON.parse(message); if(config.pubSubSwitch){//接收订阅消息处理开关,本地运行和测试库单独运行时防止用户接收消息2次 //Sessions.getRedisPushNotification(message);这里不知为什么无法调用这个方法,提示getRedisPushNotification不是方法 if (message.targetType=='patient') { if(config.environment!='local'){//pc版接收要发给居民的消息不做处理 WechatClient.sendMessage(message.targetUserId, message.targetUserName, message); } } else { if(message.sessionType=="1"){ WechatClient.sendReadDoctorByDoctorId(message.targetUserId, message); } //告知医生新消息 WechatClient.sendSocketMessageToDoctor(message.targetUserId,message); if(config.environment!='local'){//pc版不推送个推 WlyySDK.request(message.targetUserId, '', '', '', '/im/common/message/messages', 'POST', function (err, res) { let count = 0; res = JSON.parse(res) if (res.status == 200) { let data = res.data; count = parseInt(JSON.parse(data.imMsgCount).count) + parseInt(data.system.amount) + parseInt(data.healthIndex.amount) + parseInt(data.sign.amount); } AppClient.sendNotification(message.targetUserId, message,message.sessionType,count); }); } //外网pcim通过socket推送 WechatClient.sendPcImSocket(message.targetUserId,message,message.sessionType); } } //action(message); } } this.alredyPublishs=[]; this.subConnected=false; } publish(channel,message) { let action=()=>{ let pub=redisPubConn; pub.publish(channel,message); console.log("发布消息:channel:"+channel+",message:"+message); }; if(this.subConnected===false) { this.alredyPublishs.push(action); } else{ action(); } } registerHandlers(channel,action) { var actions=this.handlers.get(channel)||new Set(); actions.add(action); this.handlers.set(channel,actions); } subscribe(channel) { let self=this; this.sub.subscribe(channel,function (err,reply) { if(err){ log.error(err); } self.subConnected=true; for(let publish of self.alredyPublishs){ publish(); } console.log("订阅成功:"+reply); }); this.sub.on("message", function (channel, message) { self.subAction(channel,message); }); } tearDown() { this.sub.quit(); } } // Expose class module.exports = new PubSub();