Bläddra i källkod

增加未读消息接口

Sand 8 år sedan
förälder
incheckning
c819445ab2

+ 1 - 1
src/client/im.client.js

@ -341,7 +341,7 @@ var imClient = {
        },
        // 获取指定会话的未读消息
        getSessionUnreadMessages: function (sessionId, userId, count, success, failure) {
        getSessionUnreadMessages: function (sessionId, userId, success, failure) {
            httpClient.get(ENDPOINTS.Sessions.SessionUnreadMessages.replace(SessionPath, sessionId),
                {user_id: userId},
                success,

+ 12 - 1
src/server/endpoints/v2/session.endpoint.js

@ -213,11 +213,22 @@ router.get(APIv2.Sessions.Messages, function (req, res) {
    if (!sessionId) {
        throw {httpStatus: 406, message: 'Missing sessionId.'};
    }
    let sessions = new Sessions();
    ControllerUtil.regModelEventHandler(sessions, res);
    sessions.getMessages(sessionId, user, page, size);
});
router.get(APIv2.Sessions.SessionUnreadMessages, function (req, res) {
    let sessionId = req.params.session_id;
    let userId = req.query.User_id;
    let sessions = new Sessions();
    ControllerUtil.regModelEventHandler(sessions, res);
    sessions.getSessionUnreadMessages(sessionId, userId);
});
/**
 * 获取所有会话未读消息数。
 *
@ -229,7 +240,7 @@ router.get(APIv2.Sessions.SessionsUnreadMessageCount, function (req, res) {
   let sessions = new Sessions();
   ControllerUtil.regModelEventHandler(sessions, res);
   sessions.getAllSessionsUnreadMessageCount();
   sessions.getAllSessionsUnreadMessageCount(userId);
});
/**

+ 1 - 1
src/server/models/redis.model.js

@ -26,7 +26,7 @@ class RedisModel extends BaseModel {
        if (redisKey.indexOf(RedisKeyReplacer) >= 0) {
            return redisKey.replace(RedisKeyReplacer, keyValue);
        } else {
            log.warn("redisModelKey is not found");
            log.warn("Redis model key is not found");
            return redisKey;
        }
    }

+ 93 - 16
src/server/models/sessions/sessions.js

@ -17,6 +17,7 @@ let redis = RedisClient.redisClient().connection;
let logger = require('../../util/log.js');
let mongoose = require('mongoose');
var async = require("async");
var ObjectUtil = require("../../util/object.util.js");
const REDIS_KEYS = require('../../include/commons').REDIS_KEYS;
const SESSION_TYPES = require('../../include/commons').SESSION_TYPES;
@ -79,6 +80,11 @@ class Sessions extends RedisModel {
                // 保存会话及成员至MySQL中
                self.saveSessionToMysql(sessionId, name, type, createDate, function (err, res) {
                    Participants.saveParticipantsToMysql(sessionId, participantArray, function (err, res) {
                        if(err){
                            ModelUtil.emitError(self.eventEmitter, err.message);
                            return;
                        }
                        // 保存会话及成员至Redis中,并更新会话的最后状态
                        let isMucSession = SESSION_TYPES.MUC == type;
                        let message = {
@ -230,18 +236,19 @@ class Sessions extends RedisModel {
    }
    /**
     * 根据会话中的消息
     * 获取会话消息。全部,不管已读/未读状态。
     *
     * @param sessionId 会话ID
     * @param user 拉取消息的人
     * @param userId 拉取消息的人
     * @param page 第几页
     * @param pagesize 分页数量
     */
    getMessages(sessionId, user, page, pagesize) {
    getMessages(sessionId, userId, page, pagesize) {
        let self = this;
        let message_timestamp_key = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
        let message_key = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId);
        let participants_key = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
        //超过最大限制后从mysql获取数据
        if (page * pagesize >= config.sessionConfig.maxMessageCount) {
            self.getMessageFromMySQL(sessionId, page, pagesize, function (err, res) {
@ -257,7 +264,7 @@ class Sessions extends RedisModel {
                pagesize = pagesize + page;
            }
            let participants = new Participants();
            participants.existsParticipant(sessionId, user, function (res) {
            participants.existsParticipant(sessionId, userId, function (res) {
                if (!res) {
                    ModelUtil.emitOK(self.eventEmitter, {"status": -1, "msg": "用户不在此会话中!"});
                } else {
@ -274,46 +281,112 @@ class Sessions extends RedisModel {
                            ModelUtil.emitOK(self.eventEmitter, {"status": 200, "data": messages});
                        }).then(function () {
                            //更新患者最后一次获取消息的日期
                            redis.zaddAsync(participants_key, (new Date().getTime()), user).then(function (res) {
                            redis.zaddAsync(participants_key, (new Date().getTime()), userId).then(function (res) {
                                console.log(res);
                            }).catch(function (res) {
                                throw res;
                            })
                        })
                    }).catch(function (res) {
                        ModelUtil.emitOK(self.eventEmitter, {"status": -1, "msg": res});
                        ModelUtil.emitOK(self.eventEmitter, {message: res});
                    })
                }
            })
        }
    }
    getAllSessionsUnreadMessageCount() {
    /**
     * 获取所有会话的未读消息数。
     */
    getAllSessionsUnreadMessageCount(userId) {
        let self = this;
        ModelUtil.emitError(self.eventEmitter, {message: "not implemented."}, null);
    }
    /**
     * 获取会话的未读消息数。
     * 获取会话的未读消息数。根据成员最后一次获取消息的时候与当前时间。
     *
     * @param sessionId
     * @param userId
     */
    getSessionUnreadMessageCount(sessionId, userId) {
        let self = this;
        let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
        let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
        let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.Participants, userId);
        let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, userId);
        async.waterfall([
            // 取得会话最后消息时间与此成员最后的消息获取时间
            // 此成员最后获取消息的时间
            function (callback) {
                redis.zscoreAsync(participantsKey)
                    .then(function (lastFatechTime) {
                        callback();
                redis.zscoreAsync(participantsKey, userId)
                    .then(function (lastFetchTime) {
                        callback(null, lastFetchTime);
                    })
            },
            // 计算最后获取消息的时间之后到现在有多少条消息
            function (lastFetchTime, callback) {
                if (!lastFetchTime) lastFetchTime = 0;
                let now = new Date().getTime();
                redis.zcountAsync(messagesByTimestampKey, lastFetchTime, now)
                    .then(function (count) {
                        ModelUtil.emitOK(self.eventEmitter, {count: count});
                    })
            }
        ], function (err, res) {
            if (err) {
                ModelUtil.emitError(self.eventEmitter, "Get session unread message count failed.")
            }
        ]);
        });
    }
    /**
     * 获取会话未读消息数。根据成员最后一次获取消息的时候与当前时间。
     */
    getSessionUnreadMessages(sessionId, userId) {
        let self = this;
        let messagesKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId);
        let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
        let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, userId);
        ModelUtil.emitOK(self.eventEmitter, {count: 9});
        async.waterfall([
            // 此成员最后获取消息的时间
            function (callback) {
                redis.zscoreAsync(participantsKey, userId)
                    .then(function (lastFetchTime) {
                        callback(null, lastFetchTime);
                    })
            },
            // 最后获取消息的时间之后到现在的消息ID列表
            function (lastFetchTime, callback) {
                if (!lastFetchTime) lastFetchTime = 0;
                let now = new Date().getTime();
                redis.zrangebyscoreAsync(messagesByTimestampKey, lastFetchTime, now)
                    .then(function (messageIds) {
                        callback(null, messageIds);
                    })
            },
            // 获取消息
            function (messageIds, callback) {
                redis.hmgetAsync(messagesKey, messageIds)
                    .then(function (res) {
                        let messages = [];
                        res.forEach(function (message) {
                            messages.push({
                                sender_id: message.sender_id,
                                sender_name: message.sender_name,
                                content_type: message.content_type,
                                content: message.content,
                                timestamp: ObjectUtil.timestampToLong(message.timestamp)
                            });
                        });
                        ModelUtil.emitOK(self.eventEmitter, messages);
                    })
            }
        ], function (err, res) {
            if (err) {
                ModelUtil.emitError(self.eventEmitter, "Get session unread message count failed.")
            }
        });
    }
    /**
@ -343,6 +416,10 @@ class Sessions extends RedisModel {
            if (res) {
                redis.hmgetAsync(sessionKey, ["type", "name"]).then(function (res) {
                    let sessionType = res[0];
                    if(sessionType == null){
                        ModelUtil.emitError(self.eventEmitter, "Session with id " + sessionId + " not found.");
                        return;
                    }
                    messages.saveMessageToRedis(sessionId, sessionType, messageId, message);
                    messages.saveMessageToMysql(sessionId, sessionType, messageId, message, function (err, res) {

+ 68 - 68
src/server/models/user/users.js

@ -185,8 +185,8 @@ class Users extends RedisModel {
                function (userInfo, callback) {
                    let multi = redisConn.multi()
                        .zadd(usersKey, lastLoginTime.getTime(), userId);
                        //.hmset(userKey, 'avatar', userInfo.avatar ? userInfo.avatar : '', 'birthdate', userInfo.birthdate ? userInfo.birthdate : '',
                        //    'name', userInfo.name, 'role', loginFromApp ? 'doctor' : 'patient');
                    //.hmset(userKey, 'avatar', userInfo.avatar ? userInfo.avatar : '', 'birthdate', userInfo.birthdate ? userInfo.birthdate : '',
                    //    'name', userInfo.name, 'role', loginFromApp ? 'doctor' : 'patient');
                    if (loginFromApp) {
                        // cache app status
@ -204,7 +204,7 @@ class Users extends RedisModel {
                // cache sessions, participants, topics, messages
                function (callback) {
                    SessionRepo.findAll(userId, function (err, sessions) {
                        for (let session in sessions) {
                        sessions.forEach(function (session) {
                            let sessionId = session.id;
                            let name = session.name;
                            let type = session.type;
@ -213,76 +213,76 @@ class Users extends RedisModel {
                            (function (sessionId, userId) {
                                // cache sessions
                                redisConn.multi()
                                    .zadd(RedisModel.makeRedisKey(REDIS_KEYS.Sessions), lastLoginTime)                // 会话的最后活动时间设置为此用户的登录时间
                                    .zadd(RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, userId), lastLoginTime)    // 会话的最后活动时间设置为此用户的登录时间
                                    .hmset(RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId, 'name', name, 'type', type, 'create_date', createDate))
                                    .execAsync().then(function (res) {
                                    // cache participants
                                    let sessionParticipantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
                                    let sessionParticipantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId);
                                    ParticipantRepo.findParticipants(sessionId, function (err, participants) {
                                        for (let participant in participants) {
                                            let participantId = participant.participant_id;
                                            let participantRole = participant.participant_role;
                                            let score = new Date().getTime();
                                            redisConn.multi()
                                                .zadd(sessionParticipantsKey, participantId, score)
                                                .hset(sessionParticipantsRoleKey, participantId, participantRole)
                                                .execAsync().then(function (res) {
                                    .zadd(REDIS_KEYS.Sessions, lastLoginTime.getTime(), sessionId)                                           // 会话的最后活动时间设置为此用户的登录时间
                                    .zadd(RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, userId), lastLoginTime.getTime(), sessionId)      // 会话的最后活动时间设置为此用户的登录时间
                                    .hmset(RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId), 'name', name, 'type', type, 'create_date', createDate.getTime())
                                    .execAsync()
                                    .then(function (res) {
                                        // cache participants
                                        let sessionParticipantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
                                        let sessionParticipantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId);
                                        ParticipantRepo.findParticipants(sessionId, function (err, participants) {
                                            participants.forEach(function (participant) {
                                                let participantId = participant.participant_id;
                                                let participantRole = participant.participant_role;
                                                let score = new Date().getTime();
                                                redisConn.multi()
                                                    .zadd(sessionParticipantsKey, participantId, score)
                                                    .hset(sessionParticipantsRoleKey, participantId, participantRole)
                                                    .execAsync().then(function (res) {
                                                });
                                            });
                                        }
                                    });
                                    // cache messages
                                    let messagesKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId);
                                    let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
                                    MessageRepo.findBySessionId(sessionId, 0, config.sessionConfig.maxMessageCount, function (err, messages) {
                                        for (let message in messages) {
                                            let id = message.id;
                                            let msgJson = {
                                                sessionId: message.session_id,
                                                senderId: message.sender_id,
                                                senderName: message.sender_name,
                                                contentType: message.content_type,
                                                content: message.content,
                                                timestamp: message.timestamp
                                            };
                                            redisConn.multi()
                                                .hset(messagesKey, id, msgJson)
                                                .zadd(messagesByTimestampKey, id)
                                                .execAsync()
                                                .then(function (res) {
                                        });
                                        // cache messages
                                        let messagesKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId);
                                        let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
                                        MessageRepo.findBySessionId(sessionId, 0, config.sessionConfig.maxMessageCount, function (err, messages) {
                                            messages.forEach(function (message) {
                                                let id = message.id;
                                                let msgJson = {
                                                    sessionId: message.session_id,
                                                    senderId: message.sender_id,
                                                    senderName: message.sender_name,
                                                    contentType: message.content_type,
                                                    content: message.content,
                                                    timestamp: message.timestamp
                                                };
                                                redisConn.multi()
                                                    .hset(messagesKey, id, msgJson)
                                                    .zadd(messagesByTimestampKey, id)
                                                    .execAsync()
                                                    .then(function (res) {
                                                    });
                                            });
                                        });
                                        // cache topics for MUC
                                        let topicsKey = RedisModel.makeRedisKey(REDIS_KEYS.Topics, sessionId);
                                        TopicRepo.findAll(sessionId, function (err, topics) {
                                            topics.forEach(function (topic) {
                                                let topicKey = RedisModel.makeRedisKey(REDIS_KEYS.Topic, topic.id);
                                                let topicId = topic.id;
                                                let name = topic.name;
                                                let createTime = ObjectUtil.timestampToLong(topic.create_time);
                                                let endBy = topic.end_by;
                                                let endTime = ObjectUtil.timestampToLong(topic.end_time);
                                                let startMessageId = topic.start_message_id;
                                                let endMessageId = topic.end_message_id;
                                                redisConn.multi()
                                                    .zadd(topicsKey, topicId)
                                                    .hmset(topicKey, 'name', name, 'session_id', sessionId, 'create_time',
                                                        createTime, 'end_by', endBy, 'end_time', endTime, 'start_message_id',
                                                        startMessageId, 'end_message_id', endMessageId)
                                                    .execAsync().then(function (res) {
                                                });
                                        }
                                    });
                                    // cache topics for MUC
                                    let topicsKey = RedisModel.makeRedisKey(REDIS_KEYS.Topics, sessionId);
                                    TopicRepo.findAll(sessionId, function (err, topics) {
                                        for (let topic in topics) {
                                            let topicKey = RedisModel.makeRedisKey(REDIS_KEYS.Topic, topic.id);
                                            let topicId = topic.id;
                                            let name = topic.name;
                                            let createTime = ObjectUtil.timestampToLong(topic.create_time);
                                            let endBy = topic.end_by;
                                            let endTime = ObjectUtil.timestampToLong(topic.end_time);
                                            let startMessageId = topic.start_message_id;
                                            let endMessageId = topic.end_message_id;
                                            redisConn.multi()
                                                .zadd(topicsKey, topicId)
                                                .hmset(topicKey, 'name', name, 'session_id', sessionId, 'create_time',
                                                    createTime, 'end_by', endBy, 'end_time', endTime, 'start_message_id',
                                                    startMessageId, 'end_message_id', endMessageId)
                                                .execAsync().then(function (res) {
                                            });
                                        }
                                        });
                                    });
                                });
                            })(sessionId, userId);
                        }
                        });
                    });
                    callback(null, null);

+ 3 - 2
src/server/repository/mysql/message.repo.js

@ -5,9 +5,10 @@
let configFile = require('../../include/commons').CONFIG_FILE;
let config = require('../../resources/config/' + configFile);
let log = require('../../util/log.js');
let Sessions = require('../../models/sessions/sessions');
let ImDb = require('../mysql/db/im.db');
let log = require('../../util/log.js');
const DB_TABLES = require('../../include/commons').DB_TABLES;
@ -50,7 +51,7 @@ class MessageRepo {
                let sql = "select id, session_id, sender_id, sender_name, content_type, content, timestamp from " + MessageTable + " w where w.session_id = ? limit ?, ?";
                ImDb.execQuery({
                    "sql": sessionsql,
                    "sql": session,
                    "args": [sessionId, page, size],
                    "handler": function (err, res) {
                        if (err) {

+ 5 - 23
src/server/repository/mysql/participant.repo.js

@ -24,13 +24,7 @@ class ParticipantRepo {
        ImDb.execQuery({
            "sql": sql,
            "args": [sessionId],
            "handler": function (err, res) {
                if (err) {
                    log.error("getParticipantsBySessionId is fail error: " + err);
                }
                handler(res);
            }
            "handler": handler
        });
    }
@ -45,13 +39,7 @@ class ParticipantRepo {
        ImDb.execQuery({
            "sql": sql,
            "args": [role, sessionId, participant_id],
            "handler": function (err, res) {
                if (err) {
                    log.error("updateParticipant is fail error: " + err);
                } else {
                    log.info("updateParticipant is success");
                }
            }
            "handler": handler
        });
    }
@ -91,7 +79,7 @@ class ParticipantRepo {
     * @param handler
     */
    static saveParticipantsToMysql(sessionId, users, handler) {
        let sql = "insert into " + DB_TABLES.SessionParticipants + " (session_id,participant_id,participant_role) VALUES "
        let sql = "insert into " + DB_TABLES.Participants + " (session_id,participant_id,participant_role) VALUES "
        let args = [];
        for (var j in users) {
            sql += "(?,?,?)";
@ -114,17 +102,11 @@ class ParticipantRepo {
    }
    static deleteUserFromMysql(sessionId, userId) {
        let sql = "delete from " + DB_TABLES.SessionParticipants + " where user_id=? and session_id=? ";
        let sql = "delete from " + DB_TABLES.Participants + " where user_id=? and session_id=? ";
        ImDb.execQuery({
            "sql": sql,
            "args": [userId, sessionId],
            "handler": function (err, res) {
                if (err) {
                    log.error("sql:" + sql + "data:sessionId:" + sessionId + ",user:" + userId);
                } else {
                    log.info("delete deleteUser to mysql is success by session :" + sessionId);
                }
            }
            "handler": handler
        });
    }

+ 6 - 22
src/server/repository/mysql/session.repo.js

@ -23,13 +23,7 @@ class SessionRepo {
        ImDb.execQuery({
            "sql": sessionSQL,
            "args": [sessionId],
            "handler": function (err, res) {
                if (err) {
                    log.error("sql:" + sessionSQL + "data:sessionId:" + sessionId);
                }
                handler(err, res);
            }
            "handler": handler
        });
    }
@ -39,22 +33,13 @@ class SessionRepo {
     * @param userId
     * @param handler
     */
    static
    findAll(userId, handler) {
        let sql = "select session_id from " + DB_TABLES.SessionParticipants + " w where w.participant_id = ? group by w.session_id";
    static findAll(userId, handler) {
        let sql = "select session_id from " + DB_TABLES.Participants + " w where w.participant_id = ? group by w.session_id";
        let sessionSQL = "select id,name,type,create_date from " + DB_TABLES.Sessions + " s where s.id in(" + sql + ")";
        ImDb.execQuery({
            "sql": sessionSQL,
            "args": [userId],
            "handler": function (err, res) {
                if (err) {
                    log.error("sql:" + sessionSQL + "data:userId:" + userId);
                } else {
                    log.info("getUserSessionsFromMysql success by userId :" + userId);
                }
                handler(err, res);
            }
            "handler": handler
        });
    }
@ -64,9 +49,8 @@ class SessionRepo {
     * @param userId
     * @param handler
     */
    static
    findStickySessions(userId, handler) {
        let sql = "select session_id from " + DB_TABLES.SessionParticipants + " w where w.participant_id = ? group by w.session_id";
    static findStickySessions(userId, handler) {
        let sql = "select session_id from " + DB_TABLES.Participants + " w where w.participant_id = ? group by w.session_id";
        let sessionSQL = "select s.id,s.name,s.type,s.create_date from " + DB_TABLES.Sessions + " s," + DB_TABLES.StickySessions + " ss  where s.id = ss.session_id s.id in(" + sql + ")";
        ImDb.execQuery({
            "sql": sessionSQL,

+ 37 - 0
src/server/util/object.util.js

@ -21,6 +21,43 @@ class ObjectUtil {
        return {}.constructor === value.constructor;
    };
    /**
     * 克隆一个对象。
     */
    static cloneObject(obj){
        var copy;
        // Handle the 3 simple types, and null or undefined
        if (null == obj || "object" != typeof obj) return obj;
        // Handle Date
        if (obj instanceof Date) {
            copy = new Date();
            copy.setTime(obj.getTime());
            return copy;
        }
        // Handle Array
        if (obj instanceof Array) {
            copy = [];
            for (var i = 0, len = obj.length; i < len; i++) {
                copy[i] = ObjectUtil.cloneObject(obj[i]);
            }
            return copy;
        }
        // Handle Object
        if (obj instanceof Object) {
            copy = {};
            for (var attr in obj) {
                if (obj.hasOwnProperty(attr)) copy[attr] = ObjectUtil.cloneObject(obj[attr]);
            }
            return copy;
        }
        throw new Error("Unable to copy obj! Its type isn't supported.");
    }
    /**
     * 检查对象是否具有指定的属性列表。
     *

+ 9 - 0
test/client/im.client.session.p2p.Test.js

@ -132,6 +132,15 @@ describe("Session P2P", function () {
                    imClient.Sessions.getSessionUnreadMessageCount(TD.SessionId, TD.P2P.DoctorB.id,
                        function (data) {
                            assert.strictEqual(data.count, TD.UnreadMessageCount, "Unread message count dismatch.");
                        },
                        function (xhr, status, error) {
                            assert(false, xhr.responseJSON.message);
                        });
                    // 获取未读消息
                    imClient.Sessions.getSessionUnreadMessages(TD.SessionId, TD.P2P.DoctorB.id,
                        function (data) {
                            assert.strictEqual(data.length, TD.UnreadMessageCount, "Get session unread messages failed.");
                            done();
                        },
                        function (xhr, status, error) {