123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 |
- /*
- * redis发布订阅
- *example:
- * let channel="ryan";
- redis.pubSub.registerHandlers("ryan",msg=> console.log(msg));
- redis.pubSub.subscribe(channel);
- redis.pubSub.publish(channel,"hello from chen");
- */
- "use strict";
- let configFile = require('../../include/commons').CONFIG_FILE;
- let config = require('../../resources/config/' + configFile);
- let RedisModel = require('./../redis.model');
- let RedisSubClient = require('./redisSubClient');
- let RedisPubClient = require('./redisPubClient');
- let RedisClient = require('../../repository/redis/redis.client.js');
- let redisPubConn = RedisPubClient.redisClient().connection;
- let redisSubConn = RedisSubClient.redisClient().connection;
- let redis = RedisClient.redisClient().connection;
- let Sessions = require('../../models/sessions/sessions');
- let WechatClient = require("../client/wechat.client.js");
- let WlyySDK = require("../../util/wlyy.sdk");
- let AppClient = require("../client/app.client.js");
- let Participants = require('../sessions/participants');
- let Messages = require('../messages/messages');
- const REDIS_KEYS = require('../../include/commons').REDIS_KEYS;
- let ObjectUtil = require("../../util/object.util.js");
- let logger = require('../../util/log.js');
- class PubSub{
- constructor(){
- this.sub=redisSubConn;
- this.handlers=new Map();
- this.subAction=(channle,message)=>{
- let actions= this.handlers.get(channle)||new Set();
- for(let action of actions)
- {
- console.log("接收消息:"+message);
- message = JSON.parse(message);
- message.timestamp = new Date(message.timestamp);
- let sessionId = message.session_id;
- let participants = new Participants();
- let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
- let sessionType =0;
- // 检查会话中是否存在此成员
- participants.existsParticipant(sessionId, message.sender_id, function (err, res) {
- if (res) {
- redis.hmgetAsync(sessionKey, ["type", "name"]).then(function (res) {
- sessionType = res[0];
- let sessionName = res[1];
- if (sessionType) {
- // 消息保存到Redis,并更新会话最后状态、用户最后消息获取时间
- let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
- let messageKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId);
- let messageTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
- let msgJson = {
- id: message.id,
- sender_id: message.sender_id,
- sender_name: message.sender_name,
- timestamp: message.timestamp.getTime(),
- content_type: message.content_type,
- content: message.content,
- business_type:message.business_type||1
- };
- redis.multi()
- .hset(messageKey, message.id, JSON.stringify(msgJson)) // 保存消息
- .zadd(messageTimestampKey, message.timestamp.getTime(), message.id) // 保存消息时间
- .execAsync()
- .then(function (res) {
- Messages.updateLastContent(sessionKey, sessionType, null, message);
- Messages.cleanOutRangeMessage(sessionId); // clean out range messages
- })
- .catch(function (ex) {
- logger.error("Save message to redis failed: ", ex);
- });
- Messages.updateLastContent(sessionKey, sessionType, sessionName, message);
- var score = message.timestamp.getTime() + 1;
- let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
- redis.zaddAsync(participantsKey, score, message.sender_id)
- .then(function (res) {
- //这个代码需要消息存入缓存后才能执行,否则用户拉取消息列表时可能缺少消息
- if(config.pubSubSwitch){//接收订阅消息处理开关,本地运行和测试库单独运行时防止用户接收消息2次
- //Sessions.getRedisPushNotification(message);这里不知为什么无法调用这个方法,提示getRedisPushNotification不是方法
- if (message.targetType=='patient') {
- if(config.environment!='local'){//pc版接收要发给居民的消息不做处理
- WechatClient.sendMessage(message.targetUserId, message.targetUserName, message);
- }
- } else {
- if(message.sessionType=="1"){
- WechatClient.sendReadDoctorByDoctorId(message.targetUserId, message);
- }
- //告知医生新消息
- WechatClient.sendSocketMessageToDoctor(message.targetUserId,message);
- if(config.environment!='local'){//pc版不推送个推
- WlyySDK.request(message.targetUserId, '', '', '', '/im/common/message/messages', 'POST', function (err, res) {
- let count = 0;
- res = JSON.parse(res)
- if (res.status == 200) {
- let data = res.data;
- count = parseInt(JSON.parse(data.imMsgCount).count) + parseInt(data.system.amount) + parseInt(data.healthIndex.amount) + parseInt(data.sign.amount);
- }
- AppClient.sendNotification(message.targetUserId, message,message.sessionType,count);
- });
- }
- //外网pcim通过socket推送
- WechatClient.sendPcImSocket(message.targetUserId,message,message.sessionType);
- }
- }
- })
- .catch(function (err) {
- logger.error("Update participant last fetch time failed: ", err);
- });
- }
- }).catch(function (err) {
- logger.error({message: "Error occurred while save message to session: " + err});
- })
- } else {
- logger.error({message: "当前会话找不到此发送者"});
- }
- });
- //action(message);
- }
- }
- this.alredyPublishs=[];
- this.subConnected=false;
- }
- publish(channel,message)
- {
- let action=()=>{
- let pub=redisPubConn;
- pub.publish(channel,message);
- console.log("发布消息:channel:"+channel+",message:"+message);
- };
- if(this.subConnected===false)
- {
- this.alredyPublishs.push(action);
- }
- else{
- action();
- }
- }
- registerHandlers(channel,action)
- {
- var actions=this.handlers.get(channel)||new Set();
- actions.add(action);
- this.handlers.set(channel,actions);
- }
- subscribe(channel)
- {
- let self=this;
- this.sub.subscribe(channel,function (err,reply) {
- if(err){
- log.error(err);
- }
- self.subConnected=true;
- for(let publish of self.alredyPublishs){
- publish();
- }
- console.log("订阅成功:"+reply);
- });
- this.sub.on("message", function (channel, message) {
- self.subAction(channel,message);
- });
- }
- tearDown()
- {
- this.sub.quit();
- }
- }
- // Expose class
- module.exports = new PubSub();
|