Bladeren bron

修复获取未读消息数读取错误问题

Sand 8 jaren geleden
bovenliggende
commit
70274c833d

+ 2 - 0
src/server/models/messages/messages.js

@ -59,6 +59,8 @@ class Messages extends RedisModel {
    /**
     * 将消息保存至REDIS,并更新会话的最后状态,清理会话的过期消息。
     *
     * PS:更新此处的消息缓存代码,记得更新Users.login()
     *
     * @param messageId
     * @param sessionId
     * @param sessionType

+ 62 - 53
src/server/models/sessions/sessions.js

@ -294,56 +294,56 @@ class Sessions extends RedisModel {
     * 分页获取会话消息。
     *
     * @param sessionId     必选。会话ID
     * @param user          必选。用户ID
     * @param userId        必选。用户ID
     * @param startMsgId    必选。会话的的起始消息ID,作为检索的起始依据
     * @param endMsgId      必选。会话中的结束消息ID
     * @param offset        必选。页码
     * @param count         必选。页面大小
     * @param page          必选。页码
     * @param size          必选。页面大小
     * @param handler       必选。回调
     */
    getMessagesByPage(sessionId, user, startMsgId, endMsgId, page, size, handler) {
        let self = this;
        let message_timestamp_key = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
        let message_key = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId);
        let participants_key = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
        let participants = new Participants();
    getMessagesByPage(sessionId, userId, startMsgId, endMsgId, page, size, handler) {
        let messagesTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
        let messagesKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId);
        let offset = (page - 1 < 0 ? 0 : page - 1) * size+1;
        let count = 20;
        let participants = new Participants();
        let offset = (page - 1 < 0 ? 0 : page - 1) * size;
        let count = size + 1;
        participants.existsParticipant(sessionId, user, function (err, res) {
        participants.existsParticipant(sessionId, userId, function (err, res) {
            if (!res) {
                handler("用户不在此会话中!", false);
                handler(Error("User not found in session " + sessionId), null);
            } else {
                //将消息ID转换成分值
                redis.zscoreAsync(message_timestamp_key, startMsgId).then(function (res) {
                    let startMsgScore = res;
                    redis.zscoreAsync(message_timestamp_key, endMsgId).then(function (res) {
                        let endMsgScore = res;
                        //倒序取出最后N条消息
                        redis.zrevrangebyscoreAsync(message_timestamp_key, startMsgScore, endMsgScore, "limit", offset, count).then(function (res) {
                            //取出消息实体
                            if (res.length == 0) {
                                handler(null, res);
                                return;
                            }
                            redis.hmgetAsync(message_key, res).then(function (messages) {
                                console.log(messages)
                                //将取到的消息返回给前端
                                handler(null, messages);
                            }).then(function () {
                                //更新患者最后一次获取消息的日期
                                redis.zaddAsync(participants_key, (new Date().getTime()), user).then(function (res) {
                                    console.log(res);
                                }).catch(function (res) {
                                    throw res;
                redis.multi()
                    .zscore(messagesTimestampKey, startMsgId)
                    .zscore(messagesTimestampKey, endMsgId)
                    .execAsync()
                    .then(function (res) {
                        let startMsgScore = res[1];
                        let endMsgScore = res[0];
                        if(startMsgScore == null || endMsgScore == null){
                            handler(null, []);
                            return;
                        }
                        // 从消息时间表中过滤出要获取的消息ID列表,倒序取出消息
                        redis.zrevrangebyscoreAsync(messagesTimestampKey, startMsgScore, endMsgScore, "limit", offset, count)
                            .then(function (res) {
                                if (res.length == 0) {
                                    handler(null, []);
                                    return;
                                }
                                redis.hmgetAsync(messagesKey, res).then(function (messages) {
                                    handler(null, messages);
                                }).then(function () {
                                    Sessions.updateParticipantLastFetchTime(sessionId, userId);
                                })
                            })
                        }).catch(function (res) {
                            }).catch(function (res) {
                            handler(res, false);
                        })
                    })
                })
            }
        })
    }
@ -365,7 +365,7 @@ class Sessions extends RedisModel {
    getSessionUnreadMessageCount(sessionId, userId) {
        let self = this;
        let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
        let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, userId);
        let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
        async.waterfall([
            // 此成员最后获取消息的时间
@ -396,9 +396,8 @@ class Sessions extends RedisModel {
     */
    getSessionUnreadMessages(sessionId, userId) {
        let self = this;
        let messagesKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId);
        let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
        let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, userId);
        let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
        async.waterfall([
            // 此成员最后获取消息的时间
@ -419,15 +418,21 @@ class Sessions extends RedisModel {
            },
            // 获取消息
            function (messageIds, callback) {
                redis.hmgetAsync(messagesKey, messageIds)
                    .then(function (res) {
                        let messages = [];
                        res.forEach(function (message) {
                            messages.push(message);
                        });
                if(messageIds.length == 0){
                    ModelUtil.emitOK(self.eventEmitter, []);
                    return;
                }
                        ModelUtil.emitOK(self.eventEmitter, messages);
                    })
                let startMsgId = messageIds[0];
                let endMsgId = messageIds[messageIds.length - 1];
                self.getMessagesByPage(sessionId, userId, startMsgId, endMsgId, 0, messageIds.length, function (err, res) {
                    if(err){
                        ModelUtil.emitError(self.eventEmitter, err.message);
                        return;
                    }
                    ModelUtil.emitOK(self.eventEmitter, res);
                });
            }
        ], function (err, res) {
            if (err) {
@ -452,7 +457,9 @@ class Sessions extends RedisModel {
        let participants = new Participants();
        let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
        let messageId = mongoose.Types.ObjectId().toString();
        message.id = messageId;
        // 检查会话中是否存在此成员
        participants.existsParticipant(sessionId, message.sender_id, function (err, res) {
            if (err) {
@ -500,9 +507,10 @@ class Sessions extends RedisModel {
        let participants = new Participants();
        let session_key = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
        let messageId = mongoose.Types.ObjectId().toString();
        message.id = messageId;
        let sessionType = 0;
        let name = "";
        message.id = messageId;
        participants.existsParticipant(sessionId, message.sender_id, function (err, res) {
            //校验发送成员是都在讨论组
            if (res) {
@ -596,12 +604,13 @@ class Sessions extends RedisModel {
     */
    static updateParticipantLastFetchTime(sessionId, userId) {
        let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
        redis.zaddAsync(participantsKey, new Date().getTime(), userId)
        let score = new Date().getTime();
        redis.zaddAsync(participantsKey, score, userId)
            .then(function (res) {
                console.log(res);
                logger.info("update participant last fetch time success.");
            })
            .catch(function (res) {
                throw res;
            .catch(function (err) {
                logger.error("Update participant last fetch time error: ", err);
            });
    }
}

+ 7 - 7
src/server/models/user/users.js

@ -296,14 +296,14 @@ class Users extends RedisModel {
                                                topics.forEach(function (topic) {
                                                    let topicKey = RedisModel.makeRedisKey(REDIS_KEYS.Topic, topic.id);
                                                    let topicId = topic.id;
                                                    let name = topic.name;
                                                    let name = topic.name == null ? "" : topic.name;
                                                    let createTime = ObjectUtil.timestampToLong(topic.create_time);
                                                    let endBy = topic.end_by;
                                                    let endTime = ObjectUtil.timestampToLong(topic.end_time);
                                                    let startMessageId = topic.start_message_id;
                                                    let endMessageId = topic.end_message_id;
                                                    let description = topic.description;
                                                    let status = topic.status;
                                                    let endBy = topic.end_by == null ? "" : topic.end_by;
                                                    let endTime = topic.end_time == null ? 0 : ObjectUtil.timestampToLong(topic.end_time);
                                                    let startMessageId = topic.start_message_id == null ? "" : topic.start_message_id;
                                                    let endMessageId = topic.end_message_id == null ? "" : topic.end_message_id;
                                                    let description = topic.description == null ? "" : topic.description;
                                                    let status = topic.status == null ? 0 : topic.status;
                                                    redisConn.multi()
                                                        .zadd(topicsKey, topicId)
                                                        .hmset(topicKey,

+ 1 - 7
test/client/im.client.session.p2p.Test.js

@ -113,7 +113,7 @@ describe("Session P2P", function () {
    });
    // 获取会话列表及消息数
    describe("Get sessions and unread messages for B", function () {
    describe("B: Get sessions and unread messages", function () {
        it("every operation must be ok", function (done) {
            imClient.Sessions.getSessionsWithDoctor(TD.P2P.DoctorB.id, 0, 10,
                function (sessions) {
@ -160,12 +160,6 @@ describe("Session P2P", function () {
        });
    });
    // 发送消息: B -> A
    describe("Send message from B to A", function () {
        it("every message must be ok", function (done) {
        });
    });
    // 退出
    describe("User logout", function () {
        it("all user must be success", function (done) {