소스 검색

代码修改

yeshijie 7 년 전
부모
커밋
0d456115f2

+ 1 - 1
src/server/include/commons.js

@ -122,7 +122,7 @@ exports.SOCKET_TYPES={
exports.PLATFORM = {
    iOS: 0,
    Android: 1,
    PC: 3,
    PC: 4,
    Wechat: 10
};

+ 7 - 7
src/server/models/client/wechat.client.js

@ -247,13 +247,13 @@ class WechatClient extends RedisModel {
            log.warn("target doctor is not online!");
            return;
        }
        let sendClient = clientCache.findByIdAndType(message.sender_id,SOCKET_TYPES.DOCTOR);//app医生发送的消息
        if(!sendClient){//pc医生发送的消息
            sendClient = clientCache.findByIdAndType("pc_"+message.sender_id,SOCKET_TYPES.PC_DOCTOR);
        }
        if(!sendClient){//居民发送的消息
            sendClient = clientCache.findByIdAndType(message.sender_id,SOCKET_TYPES.PATIENT);
        }
        // let sendClient = clientCache.findByIdAndType(message.sender_id,SOCKET_TYPES.DOCTOR);//app医生发送的消息
        // if(!sendClient){//pc医生发送的消息
        //     sendClient = clientCache.findByIdAndType("pc_"+message.sender_id,SOCKET_TYPES.PC_DOCTOR);
        // }
        // if(!sendClient){//居民发送的消息
        //     sendClient = clientCache.findByIdAndType(message.sender_id,SOCKET_TYPES.PATIENT);
        // }
        var count = 0;
        if(doctorClient&&message.session_id==doctorClient.sessionId){

+ 93 - 29
src/server/models/redis/pubSub.js

@ -15,52 +15,116 @@ let config = require('../../resources/config/' + configFile);
let RedisModel = require('./../redis.model');
let RedisSubClient = require('./redisSubClient');
let RedisPubClient = require('./redisPubClient');
let RedisClient = require('../../repository/redis/redis.client.js');
let redisPubConn = RedisPubClient.redisClient().connection;
let redisSubConn = RedisSubClient.redisClient().connection;
let Sessions = require('../sessions/sessions.js');
let redis = RedisClient.redisClient().connection;
let Sessions = require('../../models/sessions/sessions');
let WechatClient = require("../client/wechat.client.js");
let WlyySDK = require("../../util/wlyy.sdk");
let AppClient = require("../client/app.client.js");
let Participants = require('../sessions/participants');
let Messages = require('../messages/messages');
const REDIS_KEYS = require('../../include/commons').REDIS_KEYS;
let ObjectUtil = require("../../util/object.util.js");
let logger = require('../../util/log.js');
class PubSub{
    constructor(){
        this.sub=redisSubConn;
        this.handlers=new Map();
        this.subAction=(channle,message)=>{
            let actions= this.handlers.get(channle)||new Set();
            for(let action of actions)
            {
                console.log("接收消息:"+message);
                if(config.pubSubSwitch){//接收订阅消息处理开关,本地运行和测试库单独运行时防止用户接收消息2次
                    message = JSON.parse(message);
                    //Sessions.getRedisPushNotification(message);这里不知为什么无法调用这个方法,提示getRedisPushNotification不是方法
                    if (message.targetType=='patient') {
                        if(config.environment!='local'){//pc版接收要发给居民的消息不做处理
                            WechatClient.sendMessage(message.targetUserId, message.targetUserName, message);
                        }
                message = JSON.parse(message);
                message.timestamp = new Date(message.timestamp);
                let sessionId = message.session_id;
                let participants = new Participants();
                let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
                let sessionType =0;
                // 检查会话中是否存在此成员
                participants.existsParticipant(sessionId, message.sender_id, function (err, res) {
                    if (res) {
                        redis.hmgetAsync(sessionKey, ["type", "name"]).then(function (res) {
                            sessionType = res[0];
                            let sessionName = res[1];
                            if (sessionType) {
                                // 消息保存到Redis,并更新会话最后状态、用户最后消息获取时间
                                let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
                                let messageKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId);
                                let messageTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
                                let msgJson = {
                                    id: message.id,
                                    sender_id: message.sender_id,
                                    sender_name: message.sender_name,
                                    timestamp: message.timestamp.getTime(),
                                    content_type: message.content_type,
                                    content: message.content,
                                    business_type:message.business_type||1
                                };
                                redis.multi()
                                    .hset(messageKey, message.id, JSON.stringify(msgJson))               // 保存消息
                                    .zadd(messageTimestampKey, message.timestamp.getTime(), message.id)  // 保存消息时间
                                    .execAsync()
                                    .then(function (res) {
                                        Messages.updateLastContent(sessionKey, sessionType, null, message);
                                        Messages.cleanOutRangeMessage(sessionId); // clean out range messages
                                    })
                                    .catch(function (ex) {
                                        logger.error("Save message to redis failed: ", ex);
                                    });
                                Messages.updateLastContent(sessionKey, sessionType, sessionName, message);
                                var score = message.timestamp.getTime() + 1;
                                let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
                                redis.zaddAsync(participantsKey, score, message.sender_id)
                                    .then(function (res) {
                                        //这个代码需要消息存入缓存后才能执行,否则用户拉取消息列表时可能缺少消息
                                        if(config.pubSubSwitch){//接收订阅消息处理开关,本地运行和测试库单独运行时防止用户接收消息2次
                                            //Sessions.getRedisPushNotification(message);这里不知为什么无法调用这个方法,提示getRedisPushNotification不是方法
                                            if (message.targetType=='patient') {
                                                if(config.environment!='local'){//pc版接收要发给居民的消息不做处理
                                                    WechatClient.sendMessage(message.targetUserId, message.targetUserName, message);
                                                }
                                            } else {
                                                if(message.sessionType=="1"){
                                                    WechatClient.sendReadDoctorByDoctorId(message.targetUserId, message);
                                                }
                                                //告知医生新消息
                                                WechatClient.sendSocketMessageToDoctor(message.targetUserId,message);
                                                if(config.environment!='local'){//pc版不推送个推
                                                    WlyySDK.request(message.targetUserId, '', '', '', '/im/common/message/messages', 'POST', function (err, res) {
                                                        let count = 0;
                                                        res =  JSON.parse(res)
                                                        if (res.status == 200) {
                                                            let data = res.data;
                                                            count = parseInt(JSON.parse(data.imMsgCount).count) + parseInt(data.system.amount) + parseInt(data.healthIndex.amount) + parseInt(data.sign.amount);
                                                        }
                                                        AppClient.sendNotification(message.targetUserId, message,message.sessionType,count);
                                                    });
                                                }
                                                //外网pcim通过socket推送
                                                WechatClient.sendPcImSocket(message.targetUserId,message,message.sessionType);
                                            }
                                        }
                                    })
                                    .catch(function (err) {
                                        logger.error("Update participant last fetch time failed: ", err);
                                    });
                            }
                        }).catch(function (err) {
                            logger.error({message: "Error occurred while save message to session: " + err});
                        })
                    } else {
                        if(message.sessionType=="1"){
                            WechatClient.sendReadDoctorByDoctorId(message.targetUserId, message);
                        }
                        //告知医生新消息
                        WechatClient.sendSocketMessageToDoctor(message.targetUserId,message);
                        if(config.environment!='local'){//pc版不推送个推
                            WlyySDK.request(message.targetUserId, '', '', '', '/im/common/message/messages', 'POST', function (err, res) {
                                let count = 0;
                                res =  JSON.parse(res)
                                if (res.status == 200) {
                                    let data = res.data;
                                    count = parseInt(JSON.parse(data.imMsgCount).count) + parseInt(data.system.amount) + parseInt(data.healthIndex.amount) + parseInt(data.sign.amount);
                                }
                                if(config.environment!='local'){//pc版不推送个推,通过redis的publish
                                    AppClient.sendNotification(message.targetUserId, message,message.sessionType,count);
                                }
                                //外网pcim通过socket推送
                                WechatClient.sendPcImSocket(message.targetUserId,message,message.sessionType);
                            });
                        }
                        logger.error({message: "当前会话找不到此发送者"});
                    }
                }
                });
                //action(message);
            }
        }

+ 172 - 6
src/server/models/sessions/sessions.js

@ -15,6 +15,7 @@ let MessageRepo = require('../../repository/mysql/message.repo');
let ParticipantRepo = require('../../repository/mysql/participant.repo');
let ImDb = require('../../repository/mysql/db/im.db');
let WlyySDK = require("../../util/wlyy.sdk");
let ObjectUtil = require("../../util/object.util.js");
let WechatClient = require("../client/wechat.client.js");
let AppClient = require("../client/app.client.js");
@ -440,6 +441,137 @@ class Sessions extends RedisModel {
                                let userRoles = res[5];
                                let participantsTime = [];
                                let isInvite = true;
                                //处理session未加入redis的bug
                                if(session==null){
                                    let lastLoginTime = new Date();
                                    SessionRepo.findOne(sessionId, function (err, res) {
                                        if(res){
                                            session = res;
                                            let redisSession = [
                                                "id", session.id,
                                                "name", session.name,
                                                "type", session.type,
                                                "business_type", session.business_type,
                                                "last_sender_id", session.last_sender_id||"",
                                                "last_sender_name", session.last_sender_name||"",
                                                "last_content_type", session.last_content_type||"",
                                                "last_content", session.last_content||"",
                                                "last_message_time", session.last_message_time||"",
                                                "create_date", ObjectUtil.timestampToLong(session.create_date),
                                                "status",session.status==null?0:session.status
                                            ];
                                            // cache sessions
                                            redis.multi()
                                                .zadd(REDIS_KEYS.Sessions, lastLoginTime.getTime(), sessionId)                                           // 会话的最后活动时间设置为此用户的登录时间
                                                .zadd(RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, userId), lastLoginTime.getTime(), sessionId)      // 会话的最后活动时间设置为此用户的登录时间
                                                .hmset(RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId), redisSession)
                                                .execAsync()
                                                .then(function (res) {
                                                    // cache participants
                                                    let sessionParticipantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
                                                    let sessionParticipantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId);
                                                    ParticipantRepo.findAll(sessionId, function (err, participants) {
                                                        if (err) {
                                                            ModelUtil.emitError(self.eventEmitter, err.message);
                                                            return;
                                                        }
                                                        let multi = redis.multi();
                                                        participants.forEach(function (participant) {
                                                            let participantId = participant.id;
                                                            let participantRole = participant.role;
                                                            let score = ObjectUtil.timestampToLong(participant.last_fetch_time||(new Date()));
                                                            multi = multi.zadd(sessionParticipantsKey, score, participantId)
                                                                .hset(sessionParticipantsRoleKey, participantId, participantRole);
                                                        });
                                                        multi.execAsync()
                                                            .then(function (res) {
                                                            })
                                                            .catch(function (ex) {
                                                                log.error("Login failed while caching participants: ", ex);
                                                            });
                                                    });
                                                    // cache messages
                                                    let messagesKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId);
                                                    let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
                                                    MessageRepo.findBySessionId(sessionId, 0, config.sessionConfig.maxMessageCount, null, function (err, messages) {
                                                        if (err) {
                                                            ModelUtil.emitError(self.eventEmitter, err.message);
                                                            return;
                                                        }
                                                        let multi = redis.multi();
                                                        messages.forEach(function (message) {
                                                            let msgJson = {
                                                                id: message.id,
                                                                sender_id: message.sender_id,
                                                                sender_name: message.sender_name,
                                                                timestamp: ObjectUtil.timestampToLong(message.timestamp),
                                                                content_type: message.content_type,
                                                                content: message.content
                                                            };
                                                            multi = multi.hset(messagesKey, message.id, JSON.stringify(msgJson))
                                                                .zadd(messagesByTimestampKey, ObjectUtil.timestampToLong(message.timestamp), message.id);
                                                        });
                                                        multi.execAsync()
                                                            .then(function (res) {
                                                            })
                                                            .catch(function (ex) {
                                                                log.error("Login failed while caching messages: ", ex);
                                                            });
                                                    });
                                                    // cache topics for MUC
                                                    let topicsKey = RedisModel.makeRedisKey(REDIS_KEYS.Topics, sessionId);
                                                    TopicRepo.findAllBySessionId(sessionId, function (err, topics) {
                                                        if (err) {
                                                            ModelUtil.emitError(self.eventEmitter, err.message);
                                                            return;
                                                        }
                                                        topics.forEach(function (topic) {
                                                            let topicKey = RedisModel.makeRedisKey(REDIS_KEYS.Topic, topic.id);
                                                            let topicId = topic.id;
                                                            let name = topic.name == null ? "" : topic.name;
                                                            let createTime = ObjectUtil.timestampToLong(topic.create_time);
                                                            let endBy = topic.end_by == null ? "" : topic.end_by;
                                                            let endTime = topic.end_time == null ? 0 : ObjectUtil.timestampToLong(topic.end_time);
                                                            let startMessageId = topic.start_message_id == null ? "" : topic.start_message_id;
                                                            let endMessageId = topic.end_message_id == null ? "" : topic.end_message_id;
                                                            let description = topic.description == null ? "" : topic.description;
                                                            let status = topic.status == null ? 0 : topic.status;
                                                            redisConn.multi()
                                                                .zadd(topicsKey, createTime, topicId)
                                                                .hmset(topicKey,
                                                                    'name', name,
                                                                    'session_id', sessionId,
                                                                    'create_time', createTime,
                                                                    'end_by', endBy,
                                                                    'end_time', endTime,
                                                                    'start_message_id', startMessageId,
                                                                    'end_message_id', endMessageId,
                                                                    'description', description,
                                                                    'status', status)
                                                                .execAsync()
                                                                .catch(function (ex) {
                                                                    log.error("Login failed while caching topics: ", ex);
                                                                });
                                                        });
                                                    });
                                                })
                                                .catch(function (ex) {
                                                    log.error("Login failed while caching sessions: ", ex);
                                                });
                                        }
                                    });
                                }
                                for(var j in userRoles){
                                    if(userRoles[j]==1){
                                        isInvite = false;
@ -687,12 +819,46 @@ class Sessions extends RedisModel {
        if (!start_msg_id && !end_msg_id) {
            redis.zrevrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
                if (res.length == 0) {
                    if (handler) {
                        handler(null, res);
                        return;
                    }
                    ModelUtil.emitOK(self.eventEmitter, res);
                    return;
                    //修复应redis没有缓冲聊天记录导致会话列表加载不出来
                    // cache messages
                    let messagesKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId);
                    MessageRepo.findBySessionId(sessionId, 0, config.sessionConfig.maxMessageCount, null, function (err, messages) {
                        if (err) {
                            ModelUtil.emitError(self.eventEmitter, err.message);
                            return;
                        }
                        let multi = redis.multi();
                        messages.forEach(function (message) {
                            let msgJson = {
                                id: message.id,
                                sender_id: message.sender_id,
                                sender_name: message.sender_name,
                                timestamp: ObjectUtil.timestampToLong(message.timestamp),
                                content_type: message.content_type,
                                content: message.content
                            };
                            multi = multi.hset(messagesKey, message.id, JSON.stringify(msgJson))
                                .zadd(message_timestamp_key, ObjectUtil.timestampToLong(message.timestamp), message.id);
                        });
                        multi.execAsync()
                            .then(function (res) {
                            })
                            .catch(function (ex) {
                                log.error("Login failed while caching messages: ", ex);
                                ModelUtil.emitOK(self.eventEmitter, res);
                                return;
                            });
                    });
                    // if (handler) {
                    //     handler(null, res);
                    //     return;
                    // }
                    // ModelUtil.emitOK(self.eventEmitter, res);
                    // return;
                }
                start_msg_id = res[0];
                redis.zrangeAsync(message_timestamp_key, 0, 0).then(function (res) {

+ 10 - 2
src/server/models/user/users.js

@ -86,8 +86,11 @@ class Users extends RedisModel {
     */
    login(userId, platform, deviceToken, clientId) {
        let self = this;
        let loginFromApp = (platform !== PLATFORMS.Wechat)&&(platform !== PLATFORMS.PC);
        let loginFromPc = platform !== PLATFORMS.PC;
        if(platform){
            platform = parseInt(platform);
        }
        let loginFromApp = (platform === PLATFORMS.Android)||(platform === PLATFORMS.iOS);
        let loginFromPc = platform === PLATFORMS.PC;
        log.error(userId+"  "+ platform+"  "+deviceToken+"  "+clientId);
        let usersKey = REDIS_KEYS.Users;
        let userKey = RedisModel.makeRedisKey(REDIS_KEYS.User, userId);
@ -159,6 +162,11 @@ class Users extends RedisModel {
                        sessions.forEach(function (session) {
                            redisConn.zscore(REDIS_KEYS.Sessions, session.id, function (err, res) {
                                if (res != null) return;    // 已经缓存过的会话不再缓存
                                // if(session.id =="915ce5ab-5b1d-11e6-8344-fa163e8aee56_828c1a62000d4838a0c8bab1acdfadff_8"){
                                //     log.error("1111");
                                // }else if(res != null){
                                //     return;
                                // }
                                (function (sessionId, userId) {
                                    let redisSession = [
                                        "id", session.id,

+ 1 - 1
src/server/repository/mysql/message.repo.js

@ -37,7 +37,7 @@ class MessageRepo {
                let params = [];
                let type = res[0].type;
                let MessageTable = "";
                if (type == 1) {
                if (type == 1 || type == 8) {
                    MessageTable = DB_TABLES.MucMessages;
                } else if (type == 2) {
                    MessageTable = DB_TABLES.P2pMessages;

+ 1 - 1
src/server/repository/mysql/search.repo.js

@ -117,7 +117,7 @@ class SearchRepo {
        if (userTable === DB_TABLES.Doctors) {
            sql += " AND s.type in (2) and s.business_type = 1 ";
        }else{
            sql += " AND s.type in (1,2)  and s.business_type = 2 ";
            sql += " AND s.type in (1,2,8)  and s.business_type = 2 ";
        }
        sql += " limit ?, ? ";
        sql = vsprintf(sql, [userTable == DB_TABLES.Doctors ? ', hospital_name' : '']);

+ 1 - 1
src/server/repository/mysql/session.repo.js

@ -181,7 +181,7 @@ class SessionRepo {
        let sql ="";
        sql = "select session_id from " + DB_TABLES.Participants + " w where w.participant_id = ? and participant_role ="+PARTICIPANT_ROLES.HOST+" group by w.session_id";
        sessionSQL =  "select * from "
            + DB_TABLES.Sessions + " s where s.id in(" + sql + ") and s.business_type = ? limit "+page+","+pagesize;
            + DB_TABLES.Sessions + " s where s.id in(" + sql + ") and s.business_type = ? and s.type!=0 limit "+page+","+pagesize;
        ImDb.execQuery({
            "sql": sessionSQL,
            "args": [userId, businessType],