Browse Source

代码修改

yeshijie 7 years ago
parent
commit
97aae5210a

+ 134 - 75
src/server/models/redis/pubSub.js

@ -28,6 +28,11 @@ let Messages = require('../messages/messages');
const REDIS_KEYS = require('../../include/commons').REDIS_KEYS;
let ObjectUtil = require("../../util/object.util.js");
let logger = require('../../util/log.js');
let SessionRepo = require('../../repository/mysql/session.repo');
let ParticipantRepo = require('../../repository/mysql/participant.repo');
const SESSION_TYPES = require('../../include/commons').SESSION_TYPES;
const SESSION_BUSINESS_TYPE = require('../../include/commons').SESSION_BUSINESS_TYPE;
class PubSub{
    constructor(){
@ -42,94 +47,148 @@ class PubSub{
                message = JSON.parse(message);
                message.timestamp = new Date(message.timestamp);
                let sessionId = message.session_id;
                let participants = new Participants();
                let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
                let sessionType =0;
                // 检查会话中是否存在此成员
                participants.existsParticipant(sessionId, message.sender_id, function (err, res) {
                    if (res) {
                        redis.hmgetAsync(sessionKey, ["type", "name"]).then(function (res) {
                            sessionType = res[0];
                            let sessionName = res[1];
                            if (sessionType) {
                                // 消息保存到Redis,并更新会话最后状态、用户最后消息获取时间
                                let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
                                let messageKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId);
                                let messageTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
                                let msgJson = {
                                    id: message.id,
                                    sender_id: message.sender_id,
                                    sender_name: message.sender_name,
                                    timestamp: message.timestamp.getTime(),
                                    content_type: message.content_type,
                                    content: message.content,
                                    business_type:message.business_type||1
                redis.hexistsAsync(sessionKey, sessionId).then(function(res){
                    if(res==0){
                        ////新增session时 要先把session缓存到redis
                        SessionRepo.findOne(sessionId, function (err, res) {
                            if (res.length > 0) {//已经存在
                                let type =  res[0].type;
                                let createDate = res[0].create_date;
                                let session = {
                                    id: sessionId,
                                    name: res[0].name,
                                    type: type,
                                    create_date: createDate.getTime(),
                                    business_type: res[0].business_type
                                };
                                redis.hmsetAsync(sessionKey, session).then(function () {
                                    ParticipantRepo.findParricipantBySessionId(sessionId,function (err,res) {
                                        // 构造会话,成员及成员角色zset, hash所需要的数据
                                        let userSessions = {};
                                        let sessionParticipants = [];
                                        let sessionParticipantsRoles = [];
                                        res.forEach(function (item) {
                                            let participant_id = item.participant_id;
                                            let participant_role = item.participant_role;
                                            userSessions[RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, participant_id)] = [createDate.getTime(), sessionId];
                                            sessionParticipants.push(createDate.getTime());
                                            sessionParticipants.push(participant_id);
                                            sessionParticipantsRoles.push(participant_id, participant_role);
                                        });
                                        // 向会话成员、会话成员角色集合中添加数据
                                        let sessionParticipantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
                                        let sessionParticipantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId);
                                        let multi = redis.multi()
                                            .zadd(sessionParticipantsKey, sessionParticipants)
                                            .hmset(sessionParticipantsRoleKey, sessionParticipantsRoles);
                                redis.multi()
                                    .hset(messageKey, message.id, JSON.stringify(msgJson))               // 保存消息
                                    .zadd(messageTimestampKey, message.timestamp.getTime(), message.id)  // 保存消息时间
                                    .execAsync()
                                    .then(function (res) {
                                        Messages.updateLastContent(sessionKey, sessionType, null, message);
                                        Messages.cleanOutRangeMessage(sessionId); // clean out range messages
                                    })
                                    .catch(function (ex) {
                                        logger.error("Save message to redis failed: ", ex);
                                    });
                                Messages.updateLastContent(sessionKey, sessionType, sessionName, message);
                                var score = message.timestamp.getTime() + 1;
                                let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
                                redis.zaddAsync(participantsKey, score, message.sender_id)
                                    .then(function (res) {
                                        //这个代码需要消息存入缓存后才能执行,否则用户拉取消息列表时可能缺少消息
                                        if(config.pubSubSwitch){//接收订阅消息处理开关,本地运行和测试库单独运行时防止用户接收消息2次
                                            //Sessions.getRedisPushNotification(message);这里不知为什么无法调用这个方法,提示getRedisPushNotification不是方法
                                            if (message.targetType=='patient') {
                                                if(config.environment!='local'){//pc版接收要发给居民的消息不做处理
                                                    WechatClient.sendMessage(message.targetUserId, message.targetUserName, message);
                                                }
                                            } else {
                                                if(message.sessionType=="1"){
                                                    WechatClient.sendReadDoctorByDoctorId(message.targetUserId, message);
                                                }
                                                //告知医生新消息
                                                WechatClient.sendSocketMessageToDoctor(message.targetUserId,message);
                                                if(config.environment!='local'){//pc版不推送个推
                                                    WlyySDK.request(message.targetUserId, '', '', '', '/im/common/message/messages', 'POST', function (err, res) {
                                                        let count = 0;
                                                        res =  JSON.parse(res)
                                                        if (res.status == 200) {
                                                            let data = res.data;
                                                            count = parseInt(JSON.parse(data.imMsgCount).count) + parseInt(data.system.amount) + parseInt(data.healthIndex.amount) + parseInt(data.sign.amount);
                                                        }
                                                        AppClient.sendNotification(message.targetUserId, message,message.sessionType,count);
                                                    });
                                                }
                                                //外网pcim通过socket推送
                                                WechatClient.sendPcImSocket(message.targetUserId,message,message.sessionType);
                                            }
                                        // 更新用户参与的会话列表
                                        for (let key in userSessions) {
                                            multi = multi.zadd(key, userSessions[key]);
                                        }
                                    })
                                    .catch(function (err) {
                                        logger.error("Update participant last fetch time failed: ", err);
                                        multi.execAsync().then(function (res) {
                                            sendMessage(sessionId,message,sessionKey);
                                        }).catch(function (ex) {
                                            logger.error("Save participants to redis failed: ", ex);
                                        });
                                    });
                                })
                            }
                        }).catch(function (err) {
                            logger.error({message: "Error occurred while save message to session: " + err});
                        })
                    } else {
                        logger.error({message: "当前会话找不到此发送者"});
                        });
                    }else {
                        sendMessage(sessionId,message,sessionKey);
                    }
                });
                })
                //action(message);
            }
        }
        this.alredyPublishs=[];
        this.subConnected=false;
        
        function sendMessage(sessionId,message,sessionKey) {
            let participants = new Participants();
            // 检查会话中是否存在此成员
            participants.existsParticipant(sessionId, message.sender_id, function (err, res) {
                if (res) {
                    redis.hmgetAsync(sessionKey, ["type", "name"]).then(function (res) {
                        let sessionType = res[0];
                        let sessionName = res[1];
                        if (sessionType) {
                            // 消息保存到Redis,并更新会话最后状态、用户最后消息获取时间
                            let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
                            let messageKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId);
                            let messageTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
                            let msgJson = {
                                id: message.id,
                                sender_id: message.sender_id,
                                sender_name: message.sender_name,
                                timestamp: message.timestamp.getTime(),
                                content_type: message.content_type,
                                content: message.content,
                                business_type:message.business_type||1
                            };
                            redis.multi()
                                .hset(messageKey, message.id, JSON.stringify(msgJson))               // 保存消息
                                .zadd(messageTimestampKey, message.timestamp.getTime(), message.id)  // 保存消息时间
                                .execAsync()
                                .then(function (res) {
                                    Messages.updateLastContent(sessionKey, sessionType, null, message);
                                    Messages.cleanOutRangeMessage(sessionId); // clean out range messages
                                })
                                .catch(function (ex) {
                                    logger.error("Save message to redis failed: ", ex);
                                });
                            Messages.updateLastContent(sessionKey, sessionType, sessionName, message);
                            var score = message.timestamp.getTime() + 1;
                            let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
                            redis.zaddAsync(participantsKey, score, message.sender_id)
                                .then(function (res) {
                                    //这个代码需要消息存入缓存后才能执行,否则用户拉取消息列表时可能缺少消息
                                    if(config.pubSubSwitch){//接收订阅消息处理开关,本地运行和测试库单独运行时防止用户接收消息2次
                                        //Sessions.getRedisPushNotification(message);这里不知为什么无法调用这个方法,提示getRedisPushNotification不是方法
                                        if (message.targetType=='patient') {
                                            if(config.environment!='local'){//pc版接收要发给居民的消息不做处理
                                                WechatClient.sendMessage(message.targetUserId, message.targetUserName, message);
                                            }
                                        } else {
                                            if(message.sessionType=="1"){
                                                WechatClient.sendReadDoctorByDoctorId(message.targetUserId, message);
                                            }
                                            //告知医生新消息
                                            WechatClient.sendSocketMessageToDoctor(message.targetUserId,message);
                                            if(config.environment!='local'){//pc版不推送个推
                                                WlyySDK.request(message.targetUserId, '', '', '', '/im/common/message/messages', 'POST', function (err, res) {
                                                    let count = 0;
                                                    res =  JSON.parse(res)
                                                    if (res.status == 200) {
                                                        let data = res.data;
                                                        count = parseInt(JSON.parse(data.imMsgCount).count) + parseInt(data.system.amount) + parseInt(data.healthIndex.amount) + parseInt(data.sign.amount);
                                                    }
                                                    AppClient.sendNotification(message.targetUserId, message,message.sessionType,count);
                                                });
                                            }
                                            //外网pcim通过socket推送
                                            WechatClient.sendPcImSocket(message.targetUserId,message,message.sessionType);
                                        }
                                    }
                                })
                                .catch(function (err) {
                                    logger.error("Update participant last fetch time failed: ", err);
                                });
                        }
                    }).catch(function (err) {
                        logger.error({message: "Error occurred while save message to session: " + err});
                    })
                } else {
                    logger.error({message: "当前会话找不到此发送者"});
                }
            });
        }
    }
    publish(channel,message)

+ 1 - 1
src/server/models/sessions/sessions.js

@ -786,7 +786,7 @@ class Sessions extends RedisModel {
                                    })
                            })
                            .catch(function (err) {
                                logger.error("Get sessions failed: ", err);
                                logger.error("Get sessions:"+sessionId+" failed: ", err);
                            });
                    };

+ 14 - 0
src/server/repository/mysql/participant.repo.js

@ -65,6 +65,20 @@ class ParticipantRepo {
        });
    }
    /**
     * 根据会话id查找会话成员
     * @param sessionId
     * @param handler
     */
    static findParricipantBySessionId(sessionId,handler){
        let sql = "select participant_id,participant_role from participants p where p.session_id = ?";
        ImDb.execQuery({
            "sql": sql,
            "args": [sessionId],
            "handler": handler
        });
    }
    /**
     * 获取会话医生的id
     * @param sessionId

+ 1 - 1
src/server/repository/mysql/session.repo.js

@ -21,7 +21,7 @@ class SessionRepo {
     * @param handler
     */
    static findOne(sessionId, handler) {
        let sessionSQL = "select id,name,type,create_date from " + DB_TABLES.Sessions + " s where s.id = ?";
        let sessionSQL = "select id,name,type,create_date,business_type from " + DB_TABLES.Sessions + " s where s.id = ?";
        ImDb.execQuery({
            "sql": sessionSQL,
            "args": [sessionId],