Explorar el Código

修改消息表的主键为varchar类型,增加系统消息表,结构与其他消息表一致;common.js增加会话类型常量;增加会话清理MessageCleaner及配套会话参数

Sand hace 8 años
padre
commit
26f42826ec

+ 47 - 16
src/server/include/commons.js

@ -21,6 +21,19 @@ if (process.env.IM_PROFILE === "prod") {
exports.CONFIG_FILE = configFile;
/**
 * 会话类型
 */
const SESSION_TYPES = {
    SYSTEM: 0,          // 系统会话
    MUC: 1,             // MUC会话
    P2P: 2,             // P2P
    GROUP: 3,           // 固定组
    DISCUSSION: 4       // 临时讨论组
};
exports.SESSION_TYPES = SESSION_TYPES;
/**
 *  消息内容类型。
 */
@ -30,8 +43,8 @@ exports.CONTENT_TYPE = {
    Audio: 3,       // 语音信息
    Article: 4,     // 文章信息
    GoTo: 5,        // 跳转信息
    SessionBegin: 6,// 咨询开始
    SessionEnd: 7   // 咨询结束
    TopicBegin: 6,  // 议题开始
    TopicEnd: 7     // 议题结束
};
/**
@ -63,21 +76,17 @@ exports.MODEL_EVENTS = {
};
/**
 * 默认整型最大值。
 * @type {number}
 * 整型最大值。
 */
exports.MAX_INT = 9007199254740992;
/**
 * 置顶会话基础分值,以此为下限向上递增。
 * @type {number}
 */
exports.STICKY_SESSION_BASE_SCORE = 9000000000000;
/**
 * 默认分页大小。
 *
 * @type {number}
 */
exports.DEFAULT_PAGE_SIZE = 100;
@ -90,15 +99,17 @@ exports.REDIS_KEY_REPLACER = REDIS_KEY_REPLACER;
exports.REDIS_KEYS = {
    Users: "users:",
    User: "users:" + REDIS_KEY_REPLACER,
    UserAppStatus: "users:" + REDIS_KEY_REPLACER + ":app_status",
    UserWechatStatus: "users:" + REDIS_KEY_REPLACER + ":wechat_status",
    UserSessions: "users:" + REDIS_KEY_REPLACER + ":sessions",
    Sessions: "sessions:",
    Session: "sessions:" + REDIS_KEY_REPLACER,
    Participants: "participants:" + REDIS_KEY_REPLACER,
    ParticipantsRole:"participants:" + REDIS_KEY_REPLACER+":role",
    ParticipantsRole: "participants:" + REDIS_KEY_REPLACER + ":role",
    Topics: "sessions:" + REDIS_KEY_REPLACER + ":topics",
    Topic: "topics:" + REDIS_KEY_REPLACER,
@ -108,11 +119,31 @@ exports.REDIS_KEYS = {
};
exports.DB_TABLES = {
    "P2pMessages": "p2p_messages",
    "MucMessages": "muc_messages",
    "GroupMessages": "group_messages",
    "Participants": "participants",
    "Sessions": "sessions",
    "Topics": "topics",
    "StickySessions":"sticky_sessions"
    P2pMessages: "p2p_messages",
    MucMessages: "muc_messages",
    GroupMessages: "group_messages",
    DiscussionMessages: "discussion_messages",
    SystemMessages: "system_messages",
    Participants: "participants",
    Sessions: "sessions",
    Topics: "topics",
    StickySessions: "sticky_sessions",
    sesstionTypeToTableName: function (sessionType) {
        switch (sessionType) {
            case SESSION_TYPES.SYSTEM:
                return DB_TABLES.SystemMessages;
            case SESSION_TYPES.MUC:
                return DB_TABLES.MucMessages;
            case SESSION_TYPES.P2P:
                return DB_TABLES.P2pMessages;
            case SESSION_TYPES.GROUP:
                return DB_TABLES.GroupMessages;
            case SESSION_TYPES.DISCUSSION:
                return DB_TABLES.DiscussionMessages;
            default:
                throw {message: "Unknown session type"};
        }
    }
};

+ 42 - 13
src/server/models/messages/messages.js

@ -12,7 +12,7 @@ let log = require('../../util/log.js');
let configFile = require('../../include/commons').CONFIG_FILE;
let config = require('../../resources/config/' + configFile);
const RedisKey = require('../../include/commons').REDIS_KEYS;
const REDIS_KEYS = require('../../include/commons').REDIS_KEYS;
class Messages extends RedisModel {
    constructor() {
@ -37,30 +37,37 @@ class Messages extends RedisModel {
    }
    /**
     * 分页获取消息MySQL
     * 获取消息MySQL
     * @param sessionId
     * @param page
     * @param pagesize
     * @param handler
     */
    getMessageByPage(sessionId, page, pagesize, handler) {
    getMessageFromMySQL(sessionId, page, pagesize, handler) {
        MessageRepo.findBySessionId(sessionId, page, pagesize, handler);
    }
    /**
     * 根据消息ID获取单条消息
     * 获取单条消息
     *
     * @param messageId
     */
    getMessagesByid(messageId) {
    getMessageById(messageId) {
    }
    saveMessageForRedis(message_id, sessionId, message) {
        let message_key = super.makeRedisKey(RedisKey.Messages, sessionId);
        let message_timestamp_key = super.makeRedisKey(RedisKey.MessagesByTimestamp, sessionId);
        redis.hsetAsync(message_key, message_id, JSON.stringify(message)).then(function (res) {
            log.info("success save redis message by session :" + sessionId);
            //保存message_timestamp_key redis
            return redis.zaddAsync(message_timestamp_key, message.timestamp.getTime(), message_id);
    saveMessageToRedis(message_id, sessionId, message) {
        let self = this;
        let message_key = super.makeRedisKey(REDIS_KEYS.Messages, sessionId);
        let message_timestamp_key = super.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
        redis.multi()
            .hset(message_key, message_id, JSON.stringify(message))
            .zadd(message_timestamp_key, message.timestamp.getTime(), message_id)
            .execAsync().then(function (res) {
            log.info("Save redis message to session " + sessionId);
            // clean out range messages
            self.cleanOutRangeMessage(sessionId);
        });
    }
@ -73,7 +80,29 @@ class Messages extends RedisModel {
     * @type type 会话类型,1表示MUC会话,2表示P2P,3表示群会话,4表示临时讨论组
     */
    saveMessageToMysql(messages, type, messageId, sessionId) {
        MessageRepo.saveMessageForMysql(messages, type, messageId, sessionId);
        MessageRepo.save(messages, type, messageId, sessionId);
    }
    /**
     * 清理会话中超出范围的消息。
     *
     * 目前只实现超出1000条数据清理的逻辑,7天前的消息清除逻辑暂不实现。
     *
     * @param sessionId
     */
    cleanOutRangeMessage(sessionId) {
        let maxMessageCacheCount = config.sessionConfig.maxMessageCount;
        let messageById = this.makeRedisKey(REDIS_KEYS.Messages, sessionId);
        let messagesByTimestampKey = this.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
        redis.zcardAsync(messagesByTimestampKey).then(function (count) {
           if(count > maxMessageCacheCount){
               redis.zrevrangeAsync(messagesByTimestampKey, 0, count - maxMessageCacheCount).then(function (idList) {
                   redis.zremAsync(messagesByTimestampKey, idList).then(function (res) {
                       redis.hdel(messageById, idList);
                   })
               });
           }
        });
    }
}

+ 82 - 0
src/server/models/sessions/session.cleaner.js

@ -0,0 +1,82 @@
/**
 * Redis会话清除器。会话清理暂时不使用。
 *
 * Redis中的会话有效期为40分钟,即自会话中最后一次时间算起,如果40分钟内没有成员发送、读取消息则将此会话从内存中清除。
 * 为减少冲突,每次只清理时间最久的前10个会话。
 *
 * 清理逻辑:首先检查哪些会话是过期的,取得前10个过期会话,然后取得会话中的消息、MUC议题、成员列表、成员信息及客户端状态,将这些内容全部删除。
 *
 * PS: 目前此策略有一个问题:在并发量大的时候,有可能在清理时,刚好用户在登录,此时会发生冲突。
 *
 * author: Sand
 * since: 12/23/2016
 */
"use strict";
let RedisModel = require('../redis.model');
let RedisClient = require('../../repository/redis/redis.client.js');
let async = require('async');
let configFile = require('../../include/commons').CONFIG_FILE;
let config = require('../../resources/config/' + configFile);
let redis = RedisClient.redisClient().connection;
let log = require('../../util/log');
const REDIS_KEYS = require('../../include/commons').REDIS_KEYS;
class SessionCleaner {
    constructor() {
    }
    /**
     * 清理过期会话,但目前不能清理用户信息,因为用户可能在其他活动的会话中,
     * 除非后续有对每个用户做会话引用,便可以在清除会话时根据引用数量清理用户信息,
     * 因此当前有以下内容不清理:
     * users:
     * users:user_id
     * users:user_id:sessions
     * users:user_id:app_status
     * users:user_id:wechat_status
     *
     * @param count 清理多少个
     */
    static cleanExpiredSessions(count) {
        let baseScore = new Date().getMilliseconds() - config.sessionConfig.expireTime;
        let sessionsKey = RedisModel.makeRedisKey(REDIS_KEYS.Sessions);
        redis.zrevrangebyscoreAsync(sessionsKey, baseScore, config.MAX_INT).then(function (sessionIds) {
            if (sessionIds.length > count) {
                sessionIds = sessionIds.splice(count, sessionIds.length - count);
            }
            // 构建对每个会话的清理操作
            let sessionCleanCalls = [];
            sessionIds.forEach(function (sessionId) {
                sessionCleanCalls.push(function (callback) {
                    let topicsKey = RedisModel.makeRedisKey(REDIS_KEYS.Topics, sessionId);
                    // 清理议题
                    redis.zrangeAsync(topicsKey, 0, -1).then(function (topicIds) {
                        redis.del(topicIds.forEach(function (topicId) {
                            return RedisModel.makeRedisKey(REDIS_KEYS.Topic, topicId);
                        }));
                    });
                    // 清理会话、议题、参与者、消息
                    let toDeleteKeys = [REDIS_KEYS.Sessions, REDIS_KEYS.Session, REDIS_KEYS.Topics,
                        REDIS_KEYS.ParticipantsRole, REDIS_KEYS.Participants, REDIS_KEYS.Messages, REDIS_KEYS.MessagesByTimestamp];
                    redis.del(toDeleteKeys.forEach(function (key) {
                        return RedisModel.makeRedisKey(key, sessionId);
                    }));
                });
            });
            async.parallel(sessionCleanCalls, function (err, result) {
                if (err) log.error("Clean redis session failed: ", err);
                log.info(result);
            });
        })
    }
}
module.exports = SessionCleaner;

+ 30 - 30
src/server/models/sessions/sessions.js

@ -4,21 +4,22 @@
"use strict";
let RedisClient = require('../../repository/redis/redis.client.js');
let redisClient = RedisClient.redisClient();
let redis = redisClient.connection;
let RedisModel = require('./../redis.model.js');
let modelUtil = require('../../util/model.util');
let ModelUtil = require('../../util/model.util');
let Messages = require('../messages/messages');
let Participants = require('./Participants');
let log = require('../../util/log.js');
const RedisKeys = require('../../include/commons').REDIS_KEYS;
const Commons = require('../../include/commons');
let configFile = require('../../include/commons').CONFIG_FILE;
let config = require('../../resources/config/' + configFile);
let SessionRepo = require('../../repository/mysql/session.repo');
let ParticipantRepo = require('../../repository/mysql/participant.repo');
let configFile = require('../../include/commons').CONFIG_FILE;
let config = require('../../resources/config/' + configFile);
let redis = RedisClient.redisClient().connection;
let log = require('../../util/log.js');
let mongoose = require('mongoose');
const RedisKeys = require('../../include/commons').REDIS_KEYS;
const Commons = require('../../include/commons');
class Sessions extends RedisModel {
    constructor() {
        super();
@ -45,13 +46,13 @@ class Sessions extends RedisModel {
                userArray.push(key);
            }
            if(userArray.length>2){
                modelUtil.emitData(self.eventEmitter, {"status": -1, "msg": "会话人数超过2个无法创建P2P会话!"});
                ModelUtil.emitData(self.eventEmitter, {"status": -1, "msg": "会话人数超过2个无法创建P2P会话!"});
                return false;
            }
            ParticipantRepo.findSessionIdByParticipantIds(userArray[0],userArray[0],function(err,res){
                sessionId = res;
                callcreate(sessionId);
            })
            });
        }else{
            callcreate();
        }
@ -63,7 +64,7 @@ class Sessions extends RedisModel {
            // 将session加入redis
            participants.saveParticipantsToRedis(sessionId, users, createDate, function (res) {
                if (!res) {
                    modelUtil.emitData(self.eventEmitter, {"status": -1, "msg": res});
                    ModelUtil.emitData(self.eventEmitter, {"status": -1, "msg": res});
                } else {
                    let messages = {};
                    messages.senderId = "system";
@ -72,7 +73,7 @@ class Sessions extends RedisModel {
                    messages.content = "";
                    messages.contentType = "1";
                    self.updateLastContent(session_key, type, name, messages);
                    modelUtil.emitData(self.eventEmitter, {"status": 200, "msg": "session create success!"});
                    ModelUtil.emitData(self.eventEmitter, {"status": 200, "msg": "session create success!"});
                    self.saveSessionToMysql(sessionId, name, type, createDate);
                    participants.saveParticipantsToMysql(sessionId, users); //创建session成员到数据库
                }
@ -128,7 +129,7 @@ class Sessions extends RedisModel {
        redis.zrevrangeAsync(user_session_key, page, pagesize).then(function (res) {
            let sessionlist = [];
            if (res.length == 0) {
                modelUtil.emitData(self.eventEmitter, {"status": 200, "data": res});
                ModelUtil.emitData(self.eventEmitter, {"status": 200, "data": res});
            } else {
                for (var j in res) {
                    calllist(res[j], j, res.length);
@ -189,11 +190,11 @@ class Sessions extends RedisModel {
            function callback(res, j, _len) {
                sessionlist.push(res);
                if (j == (_len - 1)) {
                    modelUtil.emitData(self.eventEmitter, {"status": 200, "data": sessionlist});
                    ModelUtil.emitData(self.eventEmitter, {"status": 200, "data": sessionlist});
                }
            }
        }).catch(function (res) {
            modelUtil.emitData(self.eventEmitter, "get list error " + res + ",user:" + userId);
            ModelUtil.emitData(self.eventEmitter, "get list error " + res + ",user:" + userId);
        })
    }
@ -212,12 +213,11 @@ class Sessions extends RedisModel {
        let participants_key = super.makeRedisKey(RedisKeys.Participants, sessionId);
        //超过最大限制后从mysql获取数据
        if (page * pagesize >= config.sessionConfig.maxMessageCount) {
            let message = new Messages();
            message.getMessageByPage(sessionId, page, pagesize, function (err, res) {
            self.getMessageFromMySQL(sessionId, page, pagesize, function (err, res) {
                if (!err) {
                    modelUtil.emitData(self.eventEmitter, {"status": 200, "data": res});
                    ModelUtil.emitData(self.eventEmitter, {"status": 200, "data": res});
                } else {
                    modelUtil.emitData(self.eventEmitter, {"status": -1, "data": err});
                    ModelUtil.emitData(self.eventEmitter, {"status": -1, "data": err});
                }
            })
        } else {
@ -228,19 +228,19 @@ class Sessions extends RedisModel {
            let participants = new Participants();
            participants.existsParticipant(sessionId, user, function (res) {
                if (!res) {
                    modelUtil.emitData(self.eventEmitter, {"status": -1, "msg": "用户不在此会话中!"});
                    ModelUtil.emitData(self.eventEmitter, {"status": -1, "msg": "用户不在此会话中!"});
                } else {
                    //倒序取出最后N条消息
                    redis.zrevrangeAsync(message_timestamp_key, page, pagesize).then(function (res) {
                        //取出消息实体
                        if (res.length == 0) {
                            modelUtil.emitData(self.eventEmitter, {"status": 200, "data": []});
                            ModelUtil.emitData(self.eventEmitter, {"status": 200, "data": []});
                            return;
                        }
                        redis.hmgetAsync(message_key, res).then(function (messages) {
                            console.log(messages)
                            //将取到的消息返回给前端
                            modelUtil.emitData(self.eventEmitter, {"status": 200, "data": messages});
                            ModelUtil.emitData(self.eventEmitter, {"status": 200, "data": messages});
                        }).then(function () {
                            //更新患者最后一次获取消息的日期
                            redis.zaddAsync(participants_key, (new Date().getTime()), user).then(function (res) {
@ -250,7 +250,7 @@ class Sessions extends RedisModel {
                            })
                        })
                    }).catch(function (res) {
                        modelUtil.emitData(self.eventEmitter, {"status": -1, "msg": res});
                        ModelUtil.emitData(self.eventEmitter, {"status": -1, "msg": res});
                    })
                }
            })
@ -304,7 +304,7 @@ class Sessions extends RedisModel {
                    }
                }).then(function (res) {
                    //更新消息相关
                    return messages.saveMessageForRedis(message_id, sessionId, message);
                    return messages.saveMessageToRedis(message_id, sessionId, message);
                }).then(function (res) {
                    //更新session的最后一条聊天记录
                    return self.updateLastContent(session_key, session_type, name, message);
@ -312,13 +312,13 @@ class Sessions extends RedisModel {
                    //操作mysql数据库
                    messages.saveMessageToMysql(message, session_type, message_id, sessionId);
                    //返回数据给前端。
                    modelUtil.emitData(self.eventEmitter, {"status": 200, "msg": "发送成功!"});
                    ModelUtil.emitData(self.eventEmitter, {"status": 200, "msg": "发送成功!"});
                    //消息推送
                }).catch(function (res) {
                    modelUtil.emitData(self.eventEmitter, {"status": -1, "msg": res});
                    ModelUtil.emitData(self.eventEmitter, {"status": -1, "msg": res});
                })
            } else {
                modelUtil.emitData(self.eventEmitter, {"status": -1, "msg": "用户不在此会话当中!"});
                ModelUtil.emitData(self.eventEmitter, {"status": -1, "msg": "用户不在此会话当中!"});
            }
        })
    }
@ -339,7 +339,7 @@ class Sessions extends RedisModel {
                    //初始化置顶
                    redis.zaddAsync(user_session_key, Commons.STICKY_SESSION_BASE_SCORE, sessionId).then(function (res) {
                        log.info("stickSession:" + sessionId + ",res:" + res);
                        modelUtil.emitData(self.eventEmitter, {"status": 200, "msg": "置顶成功!"});
                        ModelUtil.emitData(self.eventEmitter, {"status": 200, "msg": "置顶成功!"});
                    }).then(function () {
                        SessionRepo.saveStickySession(sessionId, user, Commons.STICKY_SESSION_BASE_SCORE);
                    })
@ -348,7 +348,7 @@ class Sessions extends RedisModel {
                    scoreres = Number(scoreres) + 1;
                    redis.zaddAsync(user_session_key, scoreres, sessionId).then(function () {
                        log.info("stickSession:" + sessionId + ",res:" + res);
                        modelUtil.emitData(self.eventEmitter, {"status": 200, "msg": "置顶成功!"});
                        ModelUtil.emitData(self.eventEmitter, {"status": 200, "msg": "置顶成功!"});
                    }).then(function () {
                        SessionRepo.saveStickySession(sessionId, user, scoreres);
                    })
@ -370,7 +370,7 @@ class Sessions extends RedisModel {
            }
            redis.zaddAsync(user_session_key, res, sessionId).then(function (res) {
                log.info("cancelStickSession:" + sessionId);
                modelUtil.emitData(self.eventEmitter, {"status": 200, "msg": "取消置顶成功!"});
                ModelUtil.emitData(self.eventEmitter, {"status": 200, "msg": "取消置顶成功!"});
            }).then(function () {
                SessionRepo.unstickSession(sessionId, user);
            });

+ 2 - 2
src/server/models/user/patient.js

@ -137,8 +137,8 @@ class Patient extends RedisModel {
                case 0:
                case CONTENT_TYPES.Article:
                case CONTENT_TYPES.GoTo:
                case CONTENT_TYPES.SessionBegin:
                case CONTENT_TYPES.SessionEnd:
                case CONTENT_TYPES.TopicBegin:
                case CONTENT_TYPES.TopicEnd:
                    return;
                default:
                    break;

+ 10 - 8
src/server/models/user/users.js

@ -6,9 +6,6 @@
 */
"use strict";
const REDIS_KEYS = require('../../include/commons').REDIS_KEYS;
const PLATFORMS = require('../../include/commons').PLATFORM;
let RedisModel = require('../redis.model');
let Doctor = require('./doctor');
let Patient = require('./patient');
@ -30,6 +27,9 @@ let log = require('../../util/log');
let configFile = require('../../include/commons').CONFIG_FILE;
let config = require('../../resources/config/' + configFile);
const REDIS_KEYS = require('../../include/commons').REDIS_KEYS;
const PLATFORMS = require('../../include/commons').PLATFORM;
class Users extends RedisModel {
    constructor() {
        super();
@ -210,7 +210,8 @@ class Users extends RedisModel {
                        (function (sessionId, userId) {
                            // cache sessions
                            redisConn.multi()
                                .zadd(self.makeRedisKey(REDIS_KEYS.UserSessions, userId))
                                .zadd(self.makeRedisKey(REDIS_KEYS.Sessions), lastLoginTime)                // 会话的最后活动时间设置为此用户的登录时间
                                .zadd(self.makeRedisKey(REDIS_KEYS.UserSessions, userId), lastLoginTime)    // 会话的最后活动时间设置为此用户的登录时间
                                .hmset(self.makeRedisKey(REDIS_KEYS.Session, sessionId, 'name', name, 'type', type, 'create_date', createDate))
                                .execAsync().then(function (res) {
@ -255,7 +256,7 @@ class Users extends RedisModel {
                                    }
                                });
                                // cache topics for MUC session
                                // cache topics for MUC
                                let topicsKey = self.makeRedisKey(REDIS_KEYS.Topics, sessionId);
                                TopicRepo.findAll(sessionId, function (err, topics) {
                                    for (let topic in topics) {
@ -264,13 +265,14 @@ class Users extends RedisModel {
                                        let name = topic.name;
                                        let createTime = topic.create_time;
                                        let endBy = topic.end_by;
                                        let startMesssageId = topic.start_message_id;
                                        let endTime = topic.end_time;
                                        let startMessageId = topic.start_message_id;
                                        let endMessageId = topic.end_message_id;
                                        redisConn.multi()
                                            .zadd(topicsKey, topicId)
                                            .hmset(topicKey, 'name', name, 'session_id', sessionId, 'create_time',
                                                createTime, 'end_by', endBy, 'start_message_id',
                                                startMesssageId, 'end_message_id', endMessageId)
                                                createTime, 'end_by', endBy, 'end_time', endTime, 'start_message_id',
                                                startMessageId, 'end_message_id', endMessageId)
                                            .execAsync().then(function (res) {
                                        });
                                    }

+ 8 - 5
src/server/repository/mysql/message.repo.js

@ -69,16 +69,19 @@ class MessageRepo {
     * 保存消息
     *
     * @param messages 消息对象
     * @type type 会话类型,1表示MUC会话,2表示P2P,3表示群会话,4表示临时讨论组
     * @param sessionType 会话类型,1表示MUC会话,2表示P2P,3表示群会话,4表示临时讨论组
     * @param messageId
     * @param sessionId
     */
    static saveMessageForMysql(messages, type, messageid, sessionId) {
        var sql = "INSERT INTO " + (type == 1 ? DB_TABLES.MucMessages : type == 2 ? DB_TABLES.P2pMessages : DB_TABLES.GroupMessages) + " (id, session_id, sender_id, sender_name,content_type, content, timestamp) VALUES (?,?,?,?,?,?,?) ";
    static save(messages, sessionType, messageId, sessionId) {
        var sql = "INSERT INTO " + DB_TABLES.sesstionTypeToTableName(sessionType) +
            " (id, session_id, sender_id, sender_name,content_type, content, timestamp) VALUES (?,?,?,?,?,?,?)";
        ImDb.execQuery({
            "sql": sql,
            "args": [messageid, sessionId, messages.senderId, messages.senderName, messages.contentType, messages.content, messages.timestamp],
            "args": [messageId, sessionId, messages.senderId, messages.senderName, messages.contentType, messages.content, messages.timestamp],
            "handler": function (err, res) {
                if (err) {
                    log.error("sql:" + sql + ",error:" + err + ",data:" + JSON.stringify(messages) + ",messageid:" + messageid + ",sessionId:" + sessionId);
                    log.error("sql:" + sql + ",error:" + err + ",data:" + JSON.stringify(messages) + ",messageid:" + messageId + ",sessionId:" + sessionId);
                } else {
                    log.info("save message to mysql is success by session :" + sessionId);
                }

+ 1 - 1
src/server/repository/mysql/topic.repo.js

@ -18,7 +18,7 @@ class TopicsRepo {
     * @param handler
     */
    static findAll(sessionId, handler){
        let sql = "select id, session_id, name, create_time, end_by, start_message_id, end_message_id from topics where session_id = ?";
        let sql = "select id, session_id, name, create_time, end_by, end_time, start_message_id, end_message_id from topics where session_id = ?";
        ImDb.execQuery({
            sql: sql,
            args: [sessionId],

+ 7 - 3
src/server/resources/config/config.dev.js

@ -66,13 +66,17 @@ let wechatConfig = {
    }
};
// 会话配置
let sessionConfig = {
    maxMessageCount: 1000,
    maxMessageTimespan: 7 * 24 * 3600
    maxMessageCount: 1000,                  // 会话缓存的消息数量
    maxMessageTimespan: 7 * 24 * 3600,      // 会话缓存的最大时间跨度
    expireTime: 3 * 60 * 60 * 1000,         // 会话过期时间,以毫秒计
    expireSessionCleanCount: 10             // 每次清理多少个过期会话
};
exports.app = 'IM.Server';
exports.version = '1.2.7';
exports.version = '1.2.8';
exports.debug = true;
exports.serverPort = 3008;
exports.sessionExpire = 1800;

+ 4 - 2
src/server/resources/config/config.prod.js

@ -66,8 +66,10 @@ let wechatConfig = {
let sessionConfig = {
    maxMessageCount: 1000,
    maxMessageTimespan: 7 * 24 * 3600
    maxMessageCount: 1000,                  // 会话缓存的消息数量
    maxMessageTimespan: 7 * 24 * 3600,      // 会话缓存的最大时间跨度,此条件暂时不使用
    expireSessionCleanCount: 10             // 每次清理多少个过期会话
};
exports.app = 'im.server';

+ 4 - 2
src/server/resources/config/config.test.js

@ -64,8 +64,10 @@ let wechatConfig = {
    }
};
let sessionConfig = {
    maxMessageCount: 1000,
    maxMessageTimespan: 7 * 24 * 3600
    maxMessageCount: 1000,                  // 会话缓存的消息数量
    maxMessageTimespan: 7 * 24 * 3600,      // 会话缓存的最大时间跨度
    expireSessionCleanCount: 10             // 每次清理多少个过期会话
};
exports.app = 'im.server';
exports.version = '1.0.2.20160805';

+ 57 - 31
src/server/resources/schema/ichat_schema.1.2.8.sql

@ -1,13 +1,22 @@
/* ---------------------------------------------------- */
/*  Generated by Enterprise Architect Version 12.0 		*/
/*  Created On : 09-Dec-2016 10:34:58 AM 				*/
/*  Created On : 23-Dec-2016 3:16:13 PM 				*/
/*  DBMS       : MySql 						*/
/* ---------------------------------------------------- */
SET FOREIGN_KEY_CHECKS=0;
SET FOREIGN_KEY_CHECKS=0
/* Drop Tables */
DROP TABLE IF EXISTS `system_messages` CASCADE
;
DROP TABLE IF EXISTS `sticky_sessions` CASCADE
;
DROP TABLE IF EXISTS `wechat_access_tokens` CASCADE
;
DROP TABLE IF EXISTS `app_status` CASCADE
;
@ -29,13 +38,38 @@ DROP TABLE IF EXISTS `participants` CASCADE
DROP TABLE IF EXISTS `sessions` CASCADE
;
DROP TABLE IF EXISTS `sticky_sessions` CASCADE
/* Create Tables */
CREATE TABLE `system_messages`
(
	`id` VARCHAR(32) NOT NULL COMMENT '消息ID',
	`session_id` VARCHAR(50) NOT NULL COMMENT '所属会话',
	`sender_id` VARCHAR(50) NOT NULL COMMENT '消息发送者',
	`sender_name` VARCHAR(50),
	`content_type` INTEGER NOT NULL COMMENT '消息类型,1文本,2图片,3语音,4文章,5跳转,6咨询开始,7咨询结束',
	`content` VARCHAR(1024) COMMENT '消息内容',
	`timestamp` TIMESTAMP(0) COMMENT '发送时间',
	CONSTRAINT `PK_messages` PRIMARY KEY (`id`)
) COMMENT='P2P会话消息'
;
DROP TABLE IF EXISTS `wechat_access_tokens` CASCADE
CREATE TABLE `sticky_sessions`
(
	`user_id` VARCHAR(50) NOT NULL COMMENT '用户ID',
	`session_id` VARCHAR(50) NOT NULL COMMENT '会话ID',
	`score` NUMERIC(15,0) COMMENT '置顶分值',
	CONSTRAINT `PK_sticky_sessions` PRIMARY KEY (`user_id`,`session_id`)
) COMMENT='置顶会话'
;
/* Create Tables */
CREATE TABLE `wechat_access_tokens`
(
	`access_token` VARCHAR(50) NOT NULL COMMENT '访问token',
	`expiry_date` TIMESTAMP(0) COMMENT '过期时间',
	`create_time` TIMESTAMP(0) COMMENT '数据创建时间',
	CONSTRAINT `PK_wechat_access_tokens` PRIMARY KEY (`access_token`)
) COMMENT='微信接口调用所需要token'
;
CREATE TABLE `app_status`
(
@ -44,17 +78,18 @@ CREATE TABLE `app_status`
	`token` VARCHAR(100) COMMENT '个推Token',
	`client_id` VARCHAR(100) COMMENT '客户端ID',
	`app_in_bg` TINYINT COMMENT 'App是否处于后台状态',
	`last_login_time` TIMESTAMP(0) COMMENT '最后登录 时间',
	CONSTRAINT `PK_user_status` PRIMARY KEY (`user_id`)
) COMMENT='app端状态'
;
CREATE TABLE `topics`
(
	`id` INTEGER NOT NULL COMMENT 'ID',
	`id` VARCHAR(32) NOT NULL COMMENT 'ID',
	`session_id` VARCHAR(50) NOT NULL COMMENT 'MUC会话ID',
	`name` VARCHAR(50) COMMENT '议题名称',
	`create_time` TIMESTAMP(0) COMMENT '创建时间',
	`end_by` VARCHAR(50) COMMENT '结束人ID',
	`end_time` TIMESTAMP(0) COMMENT '结束时间',
	`start_message_id` INTEGER COMMENT '消息起始ID',
	`end_message_id` INTEGER COMMENT '消息结束ID',
	CONSTRAINT `PK_topics` PRIMARY KEY (`id`)
@ -65,8 +100,8 @@ CREATE TABLE `p2p_messages`
(
	`id` VARCHAR(32) NOT NULL COMMENT '消息ID',
	`session_id` VARCHAR(50) NOT NULL COMMENT '所属会话',
	`sender_id` VARCHAR(50) NOT NULL COMMENT '消息发送者ID',
    `sender_name` VARCHAR(50) NOT NULL COMMENT '消息发送者姓名',
	`sender_id` VARCHAR(50) NOT NULL COMMENT '消息发送者',
	`sender_name` VARCHAR(50),
	`content_type` INTEGER NOT NULL COMMENT '消息类型,1文本,2图片,3语音,4文章,5跳转,6咨询开始,7咨询结束',
	`content` VARCHAR(1024) COMMENT '消息内容',
	`timestamp` TIMESTAMP(0) COMMENT '发送时间',
@ -78,8 +113,8 @@ CREATE TABLE `group_messages`
(
	`id` VARCHAR(32) NOT NULL COMMENT '消息ID',
	`session_id` VARCHAR(50) NOT NULL COMMENT '所属会话',
	`sender_id` VARCHAR(50) NOT NULL COMMENT '消息发送者ID',
    `sender_name` VARCHAR(50) NOT NULL COMMENT '消息发送者姓名',
	`sender_id` VARCHAR(50) NOT NULL COMMENT '消息发送者',
	`sender_name` VARCHAR(50),
	`content_type` INTEGER NOT NULL COMMENT '消息类型,1文本,2图片,3语音,4文章,5跳转,6咨询开始,7咨询结束',
	`content` VARCHAR(1024) COMMENT '消息内容',
	`timestamp` TIMESTAMP(0) COMMENT '发送时间',
@ -91,8 +126,8 @@ CREATE TABLE `muc_messages`
(
	`id` VARCHAR(32) NOT NULL COMMENT '消息ID',
	`session_id` VARCHAR(50) NOT NULL COMMENT '所属会话',
	`sender_id` VARCHAR(50) NOT NULL COMMENT '消息发送者ID',
    `sender_name` VARCHAR(50) NOT NULL COMMENT '消息发送者姓名',
	`sender_id` VARCHAR(50) NOT NULL COMMENT '消息发送者',
	`sender_name` VARCHAR(50),
	`content_type` INTEGER NOT NULL COMMENT '消息类型,1文本,2图片,3语音,4文章,5跳转,6咨询开始,7咨询结束',
	`content` VARCHAR(1024) COMMENT '消息内容',
	`timestamp` TIMESTAMP(0) COMMENT '发送时间',
@ -105,7 +140,7 @@ CREATE TABLE `participants`
	`session_id` VARCHAR(50) NOT NULL COMMENT '会话ID。ID结构:以患者ID+最大次数',
	`participant_id` VARCHAR(50) NOT NULL COMMENT '参与者ID',
	`participant_role` INTEGER COMMENT '参与者角色,MUC模式中的主持人/普通参与者',
	`receiving` TINYINT COMMENT '当前是否正在接收',
	`receiving` TINYINT COMMENT '暂未使用',
	CONSTRAINT `PK_participants` PRIMARY KEY (`session_id`,`participant_id`)
) COMMENT='会话参与者'
;
@ -120,25 +155,15 @@ CREATE TABLE `sessions`
) COMMENT='会话'
;
CREATE TABLE `sticky_sessions`
(
	`user_id` VARCHAR(50) NOT NULL COMMENT '用户ID',
	`session_id` VARCHAR(50) NOT NULL COMMENT '会话ID',
	`score` NUMERIC(15,0) COMMENT '置顶分值',
	CONSTRAINT `PK_sticky_sessions` PRIMARY KEY (`user_id`,`session_id`)
) COMMENT='置顶会话'
;
/* Create Primary Keys, Indexes, Uniques, Checks */
CREATE TABLE `wechat_access_tokens`
(
	`access_token` VARCHAR(50) NOT NULL COMMENT '访问token',
	`expiry_date` TIMESTAMP(0) COMMENT '过期时间',
	`create_time` TIMESTAMP(0) COMMENT '数据创建时间',
	CONSTRAINT `PK_wechat_access_tokens` PRIMARY KEY (`access_token`)
) COMMENT='微信接口调用所需要token'
ALTER TABLE `system_messages` 
 ADD INDEX `IXFK_messages_participants` (`session_id` ASC,`sender_id` ASC)
;
/* Create Primary Keys, Indexes, Uniques, Checks */
ALTER TABLE `system_messages` 
 ADD INDEX `IXFK_messages_sessions` (`session_id` ASC)
;
ALTER TABLE `topics` 
 ADD INDEX `IXFK_topics_sessions` (`session_id` ASC)
@ -168,7 +193,8 @@ ALTER TABLE `muc_messages`
 ADD INDEX `IXFK_messages_sessions` (`session_id` ASC)
;
SET FOREIGN_KEY_CHECKS=1;
SET FOREIGN_KEY_CHECKS=1
;
/* 用户视图:医生、患者、用户微信状态*/
create or replace view doctors as 

+ 172 - 1
src/server/resources/schema/temp.sql

@ -1,6 +1,6 @@
/* ---------------------------------------------------- */
/*  Generated by Enterprise Architect Version 12.0 		*/
/*  Created On : 19-12月-2016 11:01:00 				*/
/*  Created On : 23-Dec-2016 3:13:54 PM 				*/
/*  DBMS       : MySql 						*/
/* ---------------------------------------------------- */
@ -8,11 +8,51 @@ SET FOREIGN_KEY_CHECKS=0
/* Drop Tables */
DROP TABLE IF EXISTS `system_messages` CASCADE
;
DROP TABLE IF EXISTS `sticky_sessions` CASCADE
;
DROP TABLE IF EXISTS `wechat_access_tokens` CASCADE
;
DROP TABLE IF EXISTS `app_status` CASCADE
;
DROP TABLE IF EXISTS `topics` CASCADE
;
DROP TABLE IF EXISTS `p2p_messages` CASCADE
;
DROP TABLE IF EXISTS `group_messages` CASCADE
;
DROP TABLE IF EXISTS `muc_messages` CASCADE
;
DROP TABLE IF EXISTS `participants` CASCADE
;
DROP TABLE IF EXISTS `sessions` CASCADE
;
/* Create Tables */
CREATE TABLE `system_messages`
(
	`id` VARCHAR(32) NOT NULL COMMENT '消息ID',
	`session_id` VARCHAR(50) NOT NULL COMMENT '所属会话',
	`sender_id` VARCHAR(50) NOT NULL COMMENT '消息发送者',
	`sender_name` VARCHAR(50),
	`content_type` INTEGER NOT NULL COMMENT '消息类型,1文本,2图片,3语音,4文章,5跳转,6咨询开始,7咨询结束',
	`content` VARCHAR(1024) COMMENT '消息内容',
	`timestamp` TIMESTAMP(0) COMMENT '发送时间',
	CONSTRAINT `PK_messages` PRIMARY KEY (`id`)
) COMMENT='P2P会话消息'
;
CREATE TABLE `sticky_sessions`
(
	`user_id` VARCHAR(50) NOT NULL COMMENT '用户ID',
@ -22,4 +62,135 @@ CREATE TABLE `sticky_sessions`
) COMMENT='置顶会话'
;
CREATE TABLE `wechat_access_tokens`
(
	`access_token` VARCHAR(50) NOT NULL COMMENT '访问token',
	`expiry_date` TIMESTAMP(0) COMMENT '过期时间',
	`create_time` TIMESTAMP(0) COMMENT '数据创建时间',
	CONSTRAINT `PK_wechat_access_tokens` PRIMARY KEY (`access_token`)
) COMMENT='微信接口调用所需要token'
;
CREATE TABLE `app_status`
(
	`user_id` VARCHAR(50) NOT NULL COMMENT '用户ID',
	`platform` TINYINT COMMENT '平台,0为iOS,1为安卓',
	`token` VARCHAR(100) COMMENT '个推Token',
	`client_id` VARCHAR(100) COMMENT '客户端ID',
	`app_in_bg` TINYINT COMMENT 'App是否处于后台状态',
	CONSTRAINT `PK_user_status` PRIMARY KEY (`user_id`)
) COMMENT='app端状态'
;
CREATE TABLE `topics`
(
	`id` VARCHAR(32) NOT NULL COMMENT 'ID',
	`session_id` VARCHAR(50) NOT NULL COMMENT 'MUC会话ID',
	`name` VARCHAR(50) COMMENT '议题名称',
	`create_time` TIMESTAMP(0) COMMENT '创建时间',
	`end_by` VARCHAR(50) COMMENT '结束人ID',
	`end_time` TIMESTAMP(0) COMMENT '结束时间',
	`start_message_id` INTEGER COMMENT '消息起始ID',
	`end_message_id` INTEGER COMMENT '消息结束ID',
	CONSTRAINT `PK_topics` PRIMARY KEY (`id`)
) COMMENT='议题,仅MUC模式使用。'
;
CREATE TABLE `p2p_messages`
(
	`id` VARCHAR(32) NOT NULL COMMENT '消息ID',
	`session_id` VARCHAR(50) NOT NULL COMMENT '所属会话',
	`sender_id` VARCHAR(50) NOT NULL COMMENT '消息发送者',
	`sender_name` VARCHAR(50),
	`content_type` INTEGER NOT NULL COMMENT '消息类型,1文本,2图片,3语音,4文章,5跳转,6咨询开始,7咨询结束',
	`content` VARCHAR(1024) COMMENT '消息内容',
	`timestamp` TIMESTAMP(0) COMMENT '发送时间',
	CONSTRAINT `PK_messages` PRIMARY KEY (`id`)
) COMMENT='P2P会话消息'
;
CREATE TABLE `group_messages`
(
	`id` VARCHAR(32) NOT NULL COMMENT '消息ID',
	`session_id` VARCHAR(50) NOT NULL COMMENT '所属会话',
	`sender_id` VARCHAR(50) NOT NULL COMMENT '消息发送者',
	`sender_name` VARCHAR(50),
	`content_type` INTEGER NOT NULL COMMENT '消息类型,1文本,2图片,3语音,4文章,5跳转,6咨询开始,7咨询结束',
	`content` VARCHAR(1024) COMMENT '消息内容',
	`timestamp` TIMESTAMP(0) COMMENT '发送时间',
	CONSTRAINT `PK_messages` PRIMARY KEY (`id`)
) COMMENT='群会话消息'
;
CREATE TABLE `muc_messages`
(
	`id` VARCHAR(32) NOT NULL COMMENT '消息ID',
	`session_id` VARCHAR(50) NOT NULL COMMENT '所属会话',
	`sender_id` VARCHAR(50) NOT NULL COMMENT '消息发送者',
	`sender_name` VARCHAR(50),
	`content_type` INTEGER NOT NULL COMMENT '消息类型,1文本,2图片,3语音,4文章,5跳转,6咨询开始,7咨询结束',
	`content` VARCHAR(1024) COMMENT '消息内容',
	`timestamp` TIMESTAMP(0) COMMENT '发送时间',
	CONSTRAINT `PK_messages` PRIMARY KEY (`id`)
) COMMENT='MUC会话消息'
;
CREATE TABLE `participants`
(
	`session_id` VARCHAR(50) NOT NULL COMMENT '会话ID。ID结构:以患者ID+最大次数',
	`participant_id` VARCHAR(50) NOT NULL COMMENT '参与者ID',
	`participant_role` INTEGER COMMENT '参与者角色,MUC模式中的主持人/普通参与者',
	`receiving` TINYINT COMMENT '暂未使用',
	CONSTRAINT `PK_participants` PRIMARY KEY (`session_id`,`participant_id`)
) COMMENT='会话参与者'
;
CREATE TABLE `sessions`
(
	`id` VARCHAR(50) NOT NULL COMMENT '会话标识。会话标识来源根据业务场景:1 医生间P2P会话使用随机生成的ID;2 医生间的群会话使用行政团队的ID;3 医生与患者间的咨询以患者的ID+当前咨询次数为ID',
	`name` VARCHAR(50) NOT NULL COMMENT '会话名称',
	`type` INTEGER NOT NULL COMMENT '会话类型,1表示MUC会话,2表示P2P,3表示群会话,4表示临时讨论组',
	`create_date` DATE NOT NULL COMMENT '创建时间',
	CONSTRAINT `PK_sessions` PRIMARY KEY (`id`)
) COMMENT='会话'
;
/* Create Primary Keys, Indexes, Uniques, Checks */
ALTER TABLE `system_messages` 
 ADD INDEX `IXFK_messages_participants` (`session_id` ASC,`sender_id` ASC)
;
ALTER TABLE `system_messages` 
 ADD INDEX `IXFK_messages_sessions` (`session_id` ASC)
;
ALTER TABLE `topics` 
 ADD INDEX `IXFK_topics_sessions` (`session_id` ASC)
;
ALTER TABLE `p2p_messages` 
 ADD INDEX `IXFK_messages_participants` (`session_id` ASC,`sender_id` ASC)
;
ALTER TABLE `p2p_messages` 
 ADD INDEX `IXFK_messages_sessions` (`session_id` ASC)
;
ALTER TABLE `group_messages` 
 ADD INDEX `IXFK_messages_participants` (`session_id` ASC,`sender_id` ASC)
;
ALTER TABLE `group_messages` 
 ADD INDEX `IXFK_messages_sessions` (`session_id` ASC)
;
ALTER TABLE `muc_messages` 
 ADD INDEX `IXFK_messages_participants` (`session_id` ASC,`sender_id` ASC)
;
ALTER TABLE `muc_messages` 
 ADD INDEX `IXFK_messages_sessions` (`session_id` ASC)
;
SET FOREIGN_KEY_CHECKS=1