ソースを参照

为会话及议题增加消息推送

Sand 8 年 前
コミット
3bd78eab6c

+ 4 - 2
src/server/endpoints/v2/session.endpoint.js

@ -71,6 +71,7 @@ router.get("/", function (req, res) {
    let page = req.query.page;
    let size = req.query.size;
    let userId = req.query.user_id;
    let businessType = req.query.business_type;
    if (!page) {
        throw {httpStatus: 406, message: 'Missing page.'};
    }
@ -84,7 +85,7 @@ router.get("/", function (req, res) {
    let sessions = new Sessions();
    ControllerUtil.regModelEventHandler(sessions, res);
    sessions.getUserSessions(userId, page, size);
    sessions.getUserSessions(userId, page, size,businessType);
});
/**
@ -235,6 +236,7 @@ router.get(APIv2.Sessions.Messages, function (req, res) {
    let user = req.query.user;
    let sessionId = req.query.session_id;
    let content_type = req.query.content_type;
    let isoffset = req.query.isoffset;
    if (!user) {
        throw {httpStatus: 406, message: 'Missing user.'};
    }
@ -255,7 +257,7 @@ router.get(APIv2.Sessions.Messages, function (req, res) {
        let sessions = new Sessions();
        ControllerUtil.regModelEventHandler(sessions, res);
        sessions.getMessages(sessionId, user, startMsgId,endMsgId,page,pagesize);
        sessions.getMessages(sessionId, user, startMsgId,endMsgId,page,pagesize,isoffset);
    }
});

+ 7 - 3
src/server/models/messages/messages.js

@ -4,9 +4,10 @@
"use strict";
let MessageRepo = require('../../repository/mysql/message.repo');
let SessionRepo = require('../../repository/mysql/session.repo');
let RedisModel = require('./../redis.model.js');
let SessionRepo = require('../../repository/mysql/session.repo');
let RedisClient = require('../../repository/redis/redis.client.js');
let ModelUtil = require('../../util/model.util');
var ObjectUtil = require("../../util/object.util.js");
let redis = RedisClient.redisClient().connection;
@ -58,6 +59,9 @@ class Messages extends RedisModel {
     * @param handler
     */
    getMessageByType(sessionId, page, pagesize, messageType) {
        let self = this;
        page = parseInt(page);
        pagesize = parseInt(pagesize);
        MessageRepo.findBySessionId(sessionId, page, pagesize, messageType, function (err, res) {
            if (err) {
                ModelUtil.emitError(self.eventEmitter, {message: "Error get message by session and type : " + err});
@ -94,7 +98,7 @@ class Messages extends RedisModel {
            id: message.id,
            sender_id: message.sender_id,
            sender_name: message.sender_name,
            timestamp: ObjectUtil.timestampToLong(message.timestamp),
            timestamp: message.timestamp.getTime(),
            content_type: message.content_type,
            content: message.content
        };
@ -148,7 +152,7 @@ class Messages extends RedisModel {
            message.content,
            message.content_type,
            sessionid, function (err, res) {
                if(err) logger.error("Update session last status failed: ", err);
                if (err) logger.error("Update session last status failed: ", err);
            });
        if (name != null) redis.hsetAsync(sessionKey, "name", name);

+ 5 - 5
src/server/models/sessions/participants.js

@ -27,7 +27,7 @@ class Participants extends RedisModel {
        let self = this;
        ParticipantRepo.findAll(sessionId, function (err, participants) {
            if(err){
            if (err) {
                ModelUtil.emitError(self.eventEmitter, "Get session participants error", err);
                return;
            }
@ -41,11 +41,11 @@ class Participants extends RedisModel {
     *
     * @param sessionId
     */
    getParticipantsAvatar(sessionId){
    getParticipantsAvatar(sessionId) {
        let self = this;
        ParticipantRepo.findAllAvatars(sessionId, function (err, participantsAvatars) {
            if(err){
            if (err) {
                ModelUtil.emitError(self.eventEmitter, "Get session participant's avatars error", err);
                return;
            }
@ -117,7 +117,7 @@ class Participants extends RedisModel {
            .hmset(sessionParticipantsRoleKey, sessionParticipantsRoles);
        // 更新用户参与的会话列表
        for(let key in userSessions){
        for (let key in userSessions) {
            multi = multi.zadd(key, userSessions[key]);
        }
@ -177,7 +177,7 @@ class Participants extends RedisModel {
    updateUser(sessionId, user, role) {
        let participants_role_key = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId);
        redis.hsetAsync(sessionId, user, role).then(function (res) {
            ParticipantRepo.updateParticipant(sessionId, user, role,function(err,res){
            ParticipantRepo.updateParticipant(sessionId, user, role, function (err, res) {
            });
        })

+ 56 - 37
src/server/models/sessions/sessions.js

@ -63,14 +63,14 @@ class Sessions extends RedisModel {
        } else {
            callBusinessType(sessionId);
        }
        var businessType = SESSION_BUSINESS_TYPE.DOCTOR;
        function callBusinessType(sessionId) {
            var businessType = SESSION_BUSINESS_TYPE.DOCTOR;
            for (var j = 0; j < participantArray.length; j++)
                callIsPatient(j, businessType, participantArray.length);
                callIsPatient(j, participantArray.length);
        }
        function callIsPatient(j, businessType, length) {
        function callIsPatient(j, length) {
            Users.isPatientId(participantArray[j], function (isPatient) {
                if (isPatient) {
                    businessType = SESSION_BUSINESS_TYPE.PATIENT
@ -167,7 +167,7 @@ class Sessions extends RedisModel {
     * @param page
     * @param size
     */
    getUserSessions(userId, page, size) {
    getUserSessions(userId, page, size, businessType) {
        let userSessionKey = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, userId);
        let self = this;
        if (page > 0) {
@ -205,27 +205,32 @@ class Sessions extends RedisModel {
                            let session = res[0];
                            let role = res[1];
                            let lastFetchTime = res[2];
                            // 计算未读消息数
                            let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
                            redis.zcountAsync(messagesByTimestampKey, lastFetchTime, new Date().getTime())
                                .then(function (count) {
                                    sessionList.push({
                                        id: sessionId,
                                        name: session.name,
                                        create_date: session.create_date,
                                        last_content_type: session.last_content_type,
                                        last_content: session.last_content,
                                        sender_id: session.sender_id,
                                        sender_name: session.sender_name,
                                        unread_count: count,
                                        my_role: role
                                    });
                                    if (sessionId === sessionIds[sessionIds.length - 1]) {
                                        ModelUtil.emitOK(self.eventEmitter, sessionList);
                                    }
                                });
                            if (businessType && businessType != session.business_type) {
                                logger.info("businessType:" + businessType + "<>" + session.business_type);
                            } else {
                                // 计算未读消息数
                                let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
                                redis.zcountAsync(messagesByTimestampKey, lastFetchTime, new Date().getTime())
                                    .then(function (count) {
                                        sessionList.push({
                                            id: sessionId,
                                            name: session.name,
                                            create_date: session.create_date,
                                            last_content_type: session.last_content_type,
                                            last_content: session.last_content,
                                            sender_id: session.sender_id,
                                            sender_name: session.sender_name,
                                            unread_count: count,
                                            business_type: session.business_type,
                                            my_role: role
                                        });
                                        if (sessionId === sessionIds[sessionIds.length - 1]) {
                                            ModelUtil.emitOK(self.eventEmitter, sessionList);
                                        }
                                    })
                            }
                            ;
                        })
                        .catch(function (err) {
                            ModelUtil.emitError(self.eventEmitter, "Get sessions failed: " + err);
@ -245,7 +250,7 @@ class Sessions extends RedisModel {
     * @param start_msg_id 消息会话最新的一条消息的ID
     * @param end_msg_id 消息会话刚开始的消息ID
     */
    getMessages(sessionId, user, start_msg_id, end_msg_id, page, pagesize) {
    getMessages(sessionId, user, start_msg_id, end_msg_id, page, pagesize, isoffset) {
        let self = this;
        let message_timestamp_key = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
        if (!start_msg_id && !end_msg_id) {
@ -261,7 +266,7 @@ class Sessions extends RedisModel {
                        return;
                    }
                    end_msg_id = res[0];
                    self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, function (err, res) {
                    self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, isoffset, function (err, res) {
                        if (err) {
                            logger.error("getMessagesByPage error" + err);
                            ModelUtil.emitError(self.eventEmitter, err, err);
@ -278,7 +283,7 @@ class Sessions extends RedisModel {
                    return;
                }
                start_msg_id = res[0];
                self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, function (err, res) {
                self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, isoffset, function (err, res) {
                    if (err) {
                        logger.error("getMessagesByPage error" + err);
                        ModelUtil.emitError(self.eventEmitter, err, err);
@ -294,7 +299,7 @@ class Sessions extends RedisModel {
                    return;
                }
                end_msg_id = res[0];
                self.getMessagesByPage(sessionId, user, start_msg_id, end_msg_id, page, pagesize, function (err, res) {
                self.getMessagesByPage(sessionId, user, start_msg_id, end_msg_id, page, pagesize, isoffset, function (err, res) {
                    if (err) {
                        logger.error("getMessagesByPage error" + err);
                        ModelUtil.emitError(self.eventEmitter, err, err);
@ -304,7 +309,7 @@ class Sessions extends RedisModel {
                })
            })
        } else {
            self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, function (err, res) {
            self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, isoffset, function (err, res) {
                if (err) {
                    logger.error("getMessagesByPage error" + err);
                    ModelUtil.emitError(self.eventEmitter, err, err);
@ -326,17 +331,16 @@ class Sessions extends RedisModel {
     * @param size          必选。页面大小
     * @param handler       必选。回调
     */
    getMessagesByPage(sessionId, userId, startMsgId, endMsgId, page, size, handler) {
    getMessagesByPage(sessionId, userId, startMsgId, endMsgId, page, size, isoffset, handler) {
        let messagesTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
        let messagesKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId);
        let participants = new Participants();
        let offset = (page - 1 < 0 ? 0 : page - 1) * size;
        let count = size;
        if (page > 1 || startMsgId) {//翻页由于闭区间,需跳过本身数据
        if (page > 1 || isoffset == 1) {//翻页由于闭区间,需跳过本身数据
            offset += 1;
        }
        participants.existsParticipant(sessionId, userId, function (err, res) {
            if (!res) {
                handler(Error("User not found in session " + sessionId), null);
@ -350,7 +354,7 @@ class Sessions extends RedisModel {
                        let startMsgScore = res[1];
                        let endMsgScore = res[0];
                        if (startMsgScore == null || endMsgScore == null) {
                        if (startMsgScore == null || endMsgScore == null || startMsgScore == endMsgScore) {
                            handler(null, []);
                            return;
                        }
@ -453,7 +457,7 @@ class Sessions extends RedisModel {
                let startMsgId = messageIds[0];
                let endMsgId = messageIds[messageIds.length - 1];
                self.getMessagesByPage(sessionId, userId, startMsgId, endMsgId, 0, messageIds.length, function (err, res) {
                self.getMessagesByPage(sessionId, userId, startMsgId, endMsgId, 0, messageIds.length, 0, function (err, res) {
                    if (err) {
                        ModelUtil.emitError(self.eventEmitter, err.message);
                        return;
@ -517,7 +521,7 @@ class Sessions extends RedisModel {
                    // 推送消息
                    ParticipantRepo.findIds(sessionId, function (err, res) {
                        if (err) {
                            ModelUtil.logError("Push message: get participant's id list failed: ", err);
                            ModelUtil.logError("Push message from session: get participant's id list failed: ", err);
                        } else {
                            message.session_id = sessionId;
@ -576,8 +580,23 @@ class Sessions extends RedisModel {
                    handler(null, messageId);
                }).then(function (res) {
                    // TODO: 消息推送
                    // 推送消息
                    ParticipantRepo.findIds(sessionId, function (err, res) {
                        if (err) {
                            ModelUtil.logError("Push message from topic: get participant's id list failed: ", err);
                        } else {
                            message.session_id = sessionId;
                            res.forEach(function (participant) {
                                if (participant.id !== message.sender_id) {
                                    Sessions.pushNotification(participant.id, message);
                                }
                            });
                        }
                    })
                }).catch(function (err) {
                    ModelUtil.emitError(self.eventEmitter, {message: "Error occurred while save message to topic: " + err});
                    handler(err, messageId)
                })
            } else {