Explorar el Código

完成登录逻辑;退出逻辑

Sand hace 8 años
padre
commit
0359b5b3bc

+ 2 - 2
src/server/endpoints/chats.endpoint.js

@ -76,7 +76,7 @@ router.get(APIv1.Chats.TEST,function(req,res){
        let sessionId =req.query.sessionId;
        let sessions = new Sessions();
        ControllerUtil.regModelEventHandler(sessions, res);
        sessions.getSessionMessages(sessionId,user,page,pagesize);
        sessions.getMessages(sessionId,user,page,pagesize);
    }
    //http://192.168.131.107:3008/api/v1/chats/test?test=2&page=0&pagesize=10&user=3121
    if(test==2){//获取用户会话
@ -95,7 +95,7 @@ router.get(APIv1.Chats.TEST,function(req,res){
        let users = req.query.users;
        let name = req.query.name;
        let sessionType = req.query.sessionType;
        sessions.createSessions(sessionId,name,sessionType,users);
        sessions.createSession(sessionId,name,sessionType,users);
    }
    if(test==4){
        let sessions = new Sessions();

+ 0 - 1
src/server/endpoints/groups.endpoint.js

@ -9,7 +9,6 @@ let router = express.Router();
const APIv1 = require('../include/endpoints').APIv1;
const MODEL_EVENTS = require('../include/commons').MODEL_EVENTS;
//let GroupRepo = require('../repository/group.repo.js');
let Group = require('../models/group');
/**

+ 13 - 20
src/server/endpoints/session.endpoint.js

@ -8,19 +8,15 @@
let express = require('express');
let router = express.Router();
let http = require('http');
let log = require('../util/log.js');
let ObjectUtil = require("../util/object.util.js");
let ControllerUtil = require('../util/controller.util');
let Sessions = require('../models/sessions/sessions');
let Participants = require('../models/sessions/participants');
const APIv1 = require('../include/endpoints').APIv1;
const CONTENT_TYPES = require('../include/commons').CONTENT_TYPE;
const MAX_INT = require('../include/commons').MAX_INT;
const DEFAULT_PAGE_SIZE = require('../include/commons').DEFAULT_PAGE_SIZE;
const APIv1 = require('../include/endpoints').APIv1;
/**
 * 获取用户的聊天列表
@ -42,8 +38,7 @@ router.get(APIv1.Sessions.SessionList,function(req,res){
    let sessions = new Sessions();
    ControllerUtil.regModelEventHandler(sessions, res);
    sessions.getUserSessions(user,page,pagesize);
})
});
/**
 * 获取用户的聊天信息列表
@ -68,8 +63,8 @@ router.get(APIv1.Sessions.MessageList,function(req,res){
    }
    let sessions = new Sessions();
    ControllerUtil.regModelEventHandler(sessions, res);
    sessions.getSessionMessages(sessionId,user,page,pagesize);
})
    sessions.getMessages(sessionId,user,page,pagesize);
});
/**
 * 某个聊天记录置顶操作
@ -87,9 +82,7 @@ router.post(APIv1.Sessions.StickSession,function(req,res){
    let sessions = new Sessions();
    ControllerUtil.regModelEventHandler(sessions, res);
    sessions.stickSession(sessionId,user);
})
});
/**
 * 取消置顶
@ -107,7 +100,7 @@ router.post(APIv1.Sessions.UnStickSession,function(req,res){
    let sessions = new Sessions();
    ControllerUtil.regModelEventHandler(sessions, res);
    sessions.cancelStickSession(sessionId,user);
})
});
/**
@ -136,8 +129,8 @@ router.post(APIv1.Sessions.CreateSession,function(req,res){
    }
    let sessions = new Sessions();
    ControllerUtil.regModelEventHandler(sessions, res);
    sessions.createSessions(sessionId,name,sessionType,users);
})
    sessions.createSession(sessionId,name,sessionType,users);
});
/**
 * 发送消息
@ -162,7 +155,7 @@ router.post(APIv1.Sessions.SendMsg,function(req,res){
    ControllerUtil.regModelEventHandler(sessions, res);
    message.timestamp=new Date();
    sessions.saveMessageBySession(message,sessionId);
})
});
/**
@ -188,7 +181,7 @@ router.post(APIv1.Sessions.SendMsg,function(req,res){
    ControllerUtil.regModelEventHandler(sessions, res);
    message.timestamp=new Date();
    sessions.saveMessageBySession(message,sessionId);
})
});
/**
@ -208,7 +201,7 @@ router.post(APIv1.Sessions.RemoveSessionUser,function(req,res){
    let participants = new Participants();
    ControllerUtil.regModelEventHandler(sessions, res);
    participants.deleteUser(sessionId,user);
})
});
/**
 * 移除人员
@ -227,6 +220,6 @@ router.post(APIv1.Sessions.PushSessionUser,function(req,res){
    let participants = new Participants();
    ControllerUtil.regModelEventHandler(sessions, res);
    participants.pushUser(sessionId,user);
})
});
module.exports = router;

+ 14 - 10
src/server/include/commons.js

@ -68,6 +68,12 @@ exports.MODEL_EVENTS = {
 */
exports.MAX_INT = 9007199254740992;
/**
 * 置顶会话基础分值,以此为下限向上递增。
 * @type {number}
 */
exports.STICKY_SESSION_BASE_SCORE = 9000000000000;
/**
 * 默认分页大小。
 *
@ -100,14 +106,12 @@ exports.REDIS_KEYS = {
    MessagesByTimestamp: "sessions:" + REDIS_KEY_REPLACER + ":messages_by_timestamp"
};
exports.STICK_NUM = 9000000000000;
exports.IM_DB = {
    "P2PMSG": "p2p_messages",
    "MUCMSG": "muc_messages",
    "GROUPMSG": "group_messages",
    "PARTICIPANTS": "participants",
    "SESSIONS": "sessions",
    "TOPICS": "topics",
    "STICKY_SESSION":"sticky_sessions"
exports.DB_TABLES = {
    "P2pMessages": "p2p_messages",
    "MucMessages": "muc_messages",
    "GroupMessages": "group_messages",
    "Participants": "participants",
    "Sessions": "sessions",
    "Topics": "topics",
    "StickySessions":"sticky_sessions"
};

+ 22 - 37
src/server/models/messages/messages.js

@ -3,19 +3,16 @@
 */
"use strict";
let RedisClient = require('../../repository/redis/redis.client.js');
let redisClient = RedisClient.redisClient();
let redis = redisClient.connection;
let MessageRepo = require('../../repository/mysql/message.repo');
let RedisModel = require('./../redis.model.js');
let modelUtil = require('../../util/model.util');
var imDb = require('../../repository/mysql/db/im.db');
let RedisClient = require('../../repository/redis/redis.client.js');
let redis = RedisClient.redisClient().connection;
let log = require('../../util/log.js');
let Sessions = require('../sessions/sessions');
let MessageRepo = require('../../repository/mysql/message.repo');
let configFile = require('../../include/commons').CONFIG_FILE;
let config = require('../../resources/config/' + configFile);
const RedisKey = require('../../include/commons').REDIS_KEYS;
const IMTABLE = require('../../include/commons').IM_DB;
class Messages extends RedisModel {
    constructor() {
@ -26,28 +23,17 @@ class Messages extends RedisModel {
     * 根据topicId获取对应的议题的信息列表
     * @param topicId
     */
    getMessagesByTopicId(topicId){
        
    getMessagesByTopicId(topicId) {
    }
    /**
     * 分页
     * 根据topicId获取对应的议题的成员信息
     * @param topicId
     * @param page
     * @param pagesize
     */
    getMessagesByTopicIdForPage(topicId,page,pagesize){
    }
    /**
     * 根据sessionId获取对应的会话的信息列表mysql
     * @param sessionId
     */
    getMessagesBySession(sessionId,handler){
        MessageRepo.getMessagesBySession(sessionId,handler);
    getMessagesByTopicIdForPage(topicId, page, pagesize) {
    }
    /**
@ -55,41 +41,40 @@ class Messages extends RedisModel {
     * @param sessionId
     * @param page
     * @param pagesize
     * @param handler
     */
    getMessageByPage(sessionId,page,pagesize,handler){
        MessageRepo.getMessageByPage(sessionId,page,pagesize,handler);
    getMessageByPage(sessionId, page, pagesize, handler) {
        MessageRepo.findBySessionId(sessionId, page, pagesize, handler);
    }
    /**
     * 根据消息ID获取单条消息
     * @param messageId
     */
    getMessagesByid(messageId){
    getMessagesByid(messageId) {
    }
    saveMessageForRedis(message_id,sessionId,message){
        let message_key = super.makeRedisKey(RedisKey.Messages,sessionId);
        let message_timestamp_key = super.makeRedisKey(RedisKey.MessagesByTimestamp,sessionId);
    saveMessageForRedis(message_id, sessionId, message) {
        let message_key = super.makeRedisKey(RedisKey.Messages, sessionId);
        let message_timestamp_key = super.makeRedisKey(RedisKey.MessagesByTimestamp, sessionId);
        redis.hsetAsync(message_key, message_id, JSON.stringify(message)).then(function (res) {
            log.info("success save redis message by session :"+sessionId);
            log.info("success save redis message by session :" + sessionId);
            //保存message_timestamp_key redis
            return redis.zaddAsync(message_timestamp_key, message.timestamp.getTime(), message_id);
        });
    }
    /**
     * 保存Message 到mysql
     * @param messages 消息对象
     * @param type
     * @param messageId
     * @param sessionId
     * @type type 会话类型,1表示MUC会话,2表示P2P,3表示群会话,4表示临时讨论组
     */
    saveMessageForMysql(messages,type,messageid,sessionId){
        MessageRepo.saveMessageForMysql(messages,type,messageid,sessionId);
    saveMessageToMysql(messages, type, messageId, sessionId) {
        MessageRepo.saveMessageForMysql(messages, type, messageId, sessionId);
    }
}
// Expose class

+ 89 - 85
src/server/models/sessions/participants.js

@ -1,16 +1,17 @@
/**
 * 成员模型。
 * 会话成员模型。
 */
"use strict";
let RedisClient = require('../../repository/redis/redis.client.js');
let redisClient = RedisClient.redisClient();
let redis = redisClient.connection;
let RedisModel = require('./../redis.model.js');
let modelUtil = require('../../util/model.util');
let ModelUtil = require('../../util/model.util');
let RedisClient = require('../../repository/redis/redis.client.js');
let ParticipantRepo = require('../../repository/mysql/participant.repo');
let log = require('../../util/log.js');
const RedisKey = require('../../include/commons').REDIS_KEYS;
let redis = RedisClient.redisClient().connection;
const REDIS_KEYS = require('../../include/commons').REDIS_KEYS;
class Participants extends RedisModel {
    constructor() {
@ -18,147 +19,150 @@ class Participants extends RedisModel {
    }
    /**
     * 根据topicId获取对应的议题的成员信息
     * @param topicId
     */
    getParticipantsByTopicId(topicId){
        
    }
    /**
     * 根据sessionId获取对应的议题的成员信息
     * 获取会话的成员列表
     *
     * @param sessionId
     */
    getParticipantsBySessionId(sessionId,handler){
        let  participant_key =  super.makeRedisKey(RedisKey.Participants,sessionId);
        redis.existsAsync(participant_key).then(function(res){
            if(res){
                redis.zrangeAsync(participant_key,0,-1).then(function(res){
    getParticipants(sessionId, handler) {
        let participant_key = super.makeRedisKey(REDIS_KEYS.Participants, sessionId);
        redis.existsAsync(participant_key).then(function (res) {
            if (res) {
                redis.zrangeAsync(participant_key, 0, -1).then(function (res) {
                    handler(res);
                })
            }else{
                ParticipantRepo.getParticipantsBySessionId(sessionId,handler);
            } else {
                ParticipantRepo.findParticipants(sessionId, handler);
            }
        })
    }
    /**
     * 判断成员是否存在这个讨论组中
     * 会话中是否存在指定成员
     *
     * @param sessionId
     * @param userId
     * @param handler
     */
    existsUser(sessionId,userId,handler){
      let  participant_key =  super.makeRedisKey(RedisKey.Participants,sessionId);
      redis.existsAsync(participant_key).then(function(res){
          if(res){//存在redis中直接从redis校验
              redis.zrangeAsync(participant_key,0,-1).then(function(res){
                  let exists = false
                  for(var j in res){
                      var value = res[j];
                      if(value==userId){
                          exists  = true;
                          break;
                      }
                  }
                  handler(exists);
              })
          }else{//不存在从数据库中获取
              ParticipantRepo.existsUser(sessionId,userId,handler);
          }
      })
    existsParticipant(sessionId, userId, handler) {
        let participant_key = super.makeRedisKey(REDIS_KEYS.Participants, sessionId);
        redis.existsAsync(participant_key).then(function (res) {
            if (res) {
                // get from redis
                redis.zrangeAsync(participant_key, 0, -1).then(function (res) {
                    let exists = false;
                    for (var j in res) {
                        var value = res[j];
                        if (value == userId) {
                            exists = true;
                            break;
                        }
                    }
                    handler(null, exists);
                })
            } else {
                // get from mysql
                ParticipantRepo.existsParticipant(sessionId, userId, handler);
            }
        })
    }
    /**
     * 根据医生和患者
     * 获取P2P成员所在会话
     *
     * @param patient
     * @param doctor
     */
    getSessionIdByParticipants(patient,doctor,handler){
        ParticipantRepo.getSessionIdByParticipants(patient,doctor,handler);
    getSessionIdByParticipants(patient, doctor, handler) {
        ParticipantRepo.findSessionIdByParticipantIds(patient, doctor, handler);
    }
    /**
     * 将成员写入redis
     *
     * @param session_id 会话ID
     * @param users 用户集合
     * @param createDate 创建日期
     * @param handler 回调
     */
    createParticipantsToRedis(session_id,users,createDate,handler){
        let participants_key =  super.makeRedisKey(RedisKey.Participants,session_id);
        for(var j in users){
            let user_session_key =  super.makeRedisKey(RedisKey.UserSessions,users[j]);
            redis.zaddAsync(participants_key, createDate.getTime(),users[j]).then(function(res){
                    return  redis.zaddAsync(user_session_key,createDate.getTime(),session_id);
              }
            ).catch(function(err){
                log.error("createParticipantsToRedis is fail error:"+err+",session_id:"+session_id+",users:"+users.join(","));
                    handler(false);
    saveParticipantsToRedis(session_id, users, createDate, handler) {
        let participants_key = super.makeRedisKey(REDIS_KEYS.Participants, session_id);
        for (var j in users) {
            let user_session_key = super.makeRedisKey(REDIS_KEYS.UserSessions, users[j]);
            redis.zaddAsync(participants_key, createDate.getTime(), users[j]).then(function (res) {
                    return redis.zaddAsync(user_session_key, createDate.getTime(), session_id);
                }
            ).catch(function (err) {
                log.error("createParticipantsToRedis is fail error:" + err + ",session_id:" + session_id + ",users:" + users.join(","));
                handler(false);
            });
        }
        handler(true);
    }
    /**
     * mysql成员创建
     *
     * @param session_id
     * @param users
     */
    createParticipantsToMysql(session_id,users){
        return ParticipantRepo.createParticipantsToMysql(session_id,users);
    saveParticipantsToMysql(session_id, users) {
        return ParticipantRepo.createParticipantsToMysql(session_id, users);
    }
    /**
     * MUC成员创建
     * @param users
     */
    createMUCParticipants(users){
    createMUCParticipants(users) {
        return true;
    }
    /**
     * 移除讨论组成员
     * 移除成员
     * @param sessionId
     * @param user
     * @param userId
     */
    deleteUser(sessionId,user){
    deleteUser(sessionId, userId) {
        let self = this;
        let participants_key = super.makeRedisKey(RedisKeys.Participants,sessionId);
        let user_session_key = super.makeRedisKey(RedisKeys.UsersSessions,user);
        let participants_key = super.makeRedisKey(REDIS_KEYS.Participants, sessionId);
        let user_session_key = super.makeRedisKey(REDIS_KEYS.UsersSessions, userId);
        //1.移除SESSION成员表中的成员信息
        redis.zremAsync(participants_key,user).then(function(res){
            log.info("remove participants:"+res);
        redis.zremAsync(participants_key, userId).then(function (res) {
            log.info("remove participants:" + res);
            //2.删除对应人员的Session列表
            redis.zremAsync(user_session_key,sessionId).then(function(res){
                log.info("remove user_session:"+res);
            redis.zremAsync(user_session_key, sessionId).then(function (res) {
                log.info("remove user_session:" + res);
                //3.移除数据库中的人员记录
                self.deleteUserFromMysql(sessionId,user);
                modelUtil.emitData(self.eventEmitter,{"status":200,"msg":"人员删除成功!"});
            }).catch(function(err){
                modelUtil.emitData(self.eventEmitter,{"status":-1,"msg":"人员删除失败!"+err});
                self.deleteUserFromMysql(sessionId, userId);
                ModelUtil.emitData(self.eventEmitter, {"status": 200, "msg": "人员删除成功!"});
            }).catch(function (err) {
                ModelUtil.emitData(self.eventEmitter, {"status": -1, "msg": "人员删除失败!" + err});
            })
        }).catch(function(err){
            modelUtil.emitData(self.eventEmitter,{"status":-1,"msg":"人员删除失败!"+err});
        }).catch(function (err) {
            ModelUtil.emitData(self.eventEmitter, {"status": -1, "msg": "人员删除失败!" + err});
        })
    }
    /**
     * 添加讨论组成员
     * @param sessionId
     * @param user
     */
    pushUser(sessionId,user){
    pushUser(sessionId, user) {
        let self = this;
        let users = [];
        users.push(user);
        self.createParticipantsToRedis(sessionId,users,new Date(),function(res){
            if(res){
                self.createParticipantsToMysql(sessionId,users);
            }else{
                modelUtil.emitData(self.eventEmitter,{"status":-1,"msg":"人员添加失败!"});
        self.saveParticipantsToRedis(sessionId, users, new Date(), function (res) {
            if (res) {
                self.saveParticipantsToMysql(sessionId, users);
            } else {
                ModelUtil.emitData(self.eventEmitter, {"status": -1, "msg": "人员添加失败!"});
            }
        })
    }
@ -168,8 +172,8 @@ class Participants extends RedisModel {
     * @param sessionId 会话
     * @param user 用户
     */
    deleteUserFromMysql(sessionId,user){
        ParticipantRepo.deleteUserFromMysql(sessionId,user);
    deleteUserFromMysql(sessionId, user) {
        ParticipantRepo.deleteUserFromMysql(sessionId, user);
    }
}

+ 68 - 60
src/server/models/sessions/sessions.js

@ -23,13 +23,62 @@ class Sessions extends RedisModel {
        super();
    }
    /**
     * 创建会话
     *
     * @param sessionId 会话ID
     * @param name 会话名称
     * @param type 会话类型
     * @param users 会话成员
     */
    createSession(sessionId, name, type, users) {
        if (type == 2) {
        }
        let self = this;
        let createDate = new Date();
        let session_key = super.makeRedisKey(RedisKeys.Session, sessionId);
        let participants = new Participants();
        // 将session加入redis
        participants.saveParticipantsToRedis(sessionId, users.split(","), createDate, function (res) {
            if (!res) {
                modelUtil.emitData(self.eventEmitter, {"status": -1, "msg": res});
            } else {
                let messages = {};
                messages.senderId = "system";
                messages.senderName = "系统消息";
                messages.timestamp = createDate;
                messages.content = "";
                messages.contentType = "1";
                self.updateLastContent(session_key, type, name, messages);
                modelUtil.emitData(self.eventEmitter, {"status": 200, "msg": "session create success!"});
                self.saveSessionToMysql(sessionId, name, type, createDate);
                participants.saveParticipantsToMysql(sessionId, users.split(",")); //创建session成员到数据库
            }
        })
    }
    /**
     * 保存session到MySQL
     * @param sessionId
     * @param name
     * @param type
     * @param createDate
     */
    saveSessionToMysql(sessionId, name, type, createDate) {
        SessionRepo.saveSession(sessionId, name, type, createDate);
    }
    /**
     * 获取某个用户的全部session列表
     * @param userId
     * @param handler
     */
    getUserSessionsFromMysql(userId, handler) {
        SessionRepo.getUserSessionsFromMysql(userId, handler);
        SessionRepo.findAll(userId, handler);
    }
    /**
@ -38,12 +87,14 @@ class Sessions extends RedisModel {
     * @param handler
     */
    getSessions(sessionId, handler) {
        SessionRepo.getSessions(sessionId, handler);
        SessionRepo.findOne(sessionId, handler);
    }
    /**
     * 根据用户ID获取用户的session列表
     * @param userId
     * @param page
     * @param pagesize
     */
    getUserSessions(userId, page, pagesize) {
        let user_session_key = super.makeRedisKey(RedisKeys.UserSessions, userId);
@ -115,13 +166,14 @@ class Sessions extends RedisModel {
    }
    /**
     * 根据sessionId获取对应的消息
     * 根据会话中的消息
     *
     * @param sessionId 会话ID
     * @param user 拉取消息的人
     * @param page 第几页
     * @param pagesize 分页数量
     */
    getSessionMessages(sessionId, user, page, pagesize) {
    getMessages(sessionId, user, page, pagesize) {
        let self = this;
        let message_timestamp_key = super.makeRedisKey(RedisKeys.MessagesByTimestamp, sessionId);
        let message_key = super.makeRedisKey(RedisKeys.Messages, sessionId);
@ -142,7 +194,7 @@ class Sessions extends RedisModel {
                pagesize = pagesize + page;
            }
            let participants = new Participants();
            participants.existsUser(sessionId, user, function (res) {
            participants.existsParticipant(sessionId, user, function (res) {
                if (!res) {
                    modelUtil.emitData(self.eventEmitter, {"status": -1, "msg": "用户不在此会话中!"});
                } else {
@ -174,7 +226,8 @@ class Sessions extends RedisModel {
    }
    /**
     * 更新最后一条消息
     * 更新会话最后一条消息
     *
     * @param session_key rediskey
     * @param session_type
     * @param name 议题名称
@ -194,7 +247,8 @@ class Sessions extends RedisModel {
    }
    /**
     * 保存消息,用于消息发送
     * 保存消息
     *
     * @param message
     * @param sessionId
     */
@ -206,7 +260,7 @@ class Sessions extends RedisModel {
        let message_id = mongoose.Types.ObjectId().toString();
        let session_type = 0;
        let name = "";
        participants.existsUser(sessionId, messages.senderId, function (res) {
        participants.existsParticipant(sessionId, messages.senderId, function (res) {
            //校验发送成员是都在讨论组
            if (res) {
                redis.hmgetAsync(session_key, ["type", "name"]).then(function (res) {
@ -224,7 +278,7 @@ class Sessions extends RedisModel {
                    return self.updateLastContent(session_key, session_type, name, message);
                }).then(function (res) {
                    //操作mysql数据库
                    messages.saveMessageForMysql(message, session_type, message_id, sessionId);
                    messages.saveMessageToMysql(message, session_type, message_id, sessionId);
                    //返回数据给前端。
                    modelUtil.emitData(self.eventEmitter, {"status": 200, "msg": "发送成功!"});
                    //消息推送
@ -238,53 +292,7 @@ class Sessions extends RedisModel {
    }
    /**
     * 非MUC模式创建会话
     * @param sessionId 会话ID
     * @param name 会话名称
     * @param type 会话类型
     * @param users 会话成员
     */
    createSessions(sessionId, name, type, users) {
        if (type == 2) {
        }
        let self = this;
        let createDate = new Date();
        let session_key = super.makeRedisKey(RedisKeys.Session, sessionId);
        let participants = new Participants();
        // 将session加入redis
        participants.createParticipantsToRedis(sessionId, users.split(","), createDate, function (res) {
            if (!res) {
                modelUtil.emitData(self.eventEmitter, {"status": -1, "msg": res});
            } else {
                let messages = {};
                messages.senderId = "system";
                messages.senderName = "系统消息";
                messages.timestamp = createDate;
                messages.content = "";
                messages.contentType = "1";
                self.updateLastContent(session_key, type, name, messages);
                modelUtil.emitData(self.eventEmitter, {"status": 200, "msg": "session create success!"});
                self.saveSessionToMysql(sessionId, name, type, createDate);
                participants.createParticipantsToMysql(sessionId, users.split(",")); //创建session成员到数据库
            }
        })
    }
    /**
     * 保存session到sql数据库
     * @param sessionId
     * @param name
     * @param type
     * @param createDate
     */
    saveSessionToMysql(sessionId, name, type, createDate) {
        SessionRepo.saveSessionToMysql(sessionId, name, type, createDate);
    }
    /**
     *置顶操作
     * 置顶操作
     */
    stickSession(sessionId, user) {
        let user_session_key = super.makeRedisKey(RedisKeys.UserSessions, user);
@ -297,11 +305,11 @@ class Sessions extends RedisModel {
                //当前时间搓比redis的时间搓更早证明没有置顶过
                if (scoreres <= nowtime) {
                    //初始化置顶
                    redis.zaddAsync(user_session_key, Commons.STICK_NUM, sessionId).then(function (res) {
                    redis.zaddAsync(user_session_key, Commons.STICKY_SESSION_BASE_SCORE, sessionId).then(function (res) {
                        log.info("stickSession:" + sessionId + ",res:" + res);
                        modelUtil.emitData(self.eventEmitter, {"status": 200, "msg": "置顶成功!"});
                    }).then(function () {
                        SessionRepo.stickSession(sessionId, user, Commons.STICK_NUM);
                        SessionRepo.saveStickySession(sessionId, user, Commons.STICKY_SESSION_BASE_SCORE);
                    })
                } else {
                    //已有置顶的数据,取出来加1保存回去
@ -310,7 +318,7 @@ class Sessions extends RedisModel {
                        log.info("stickSession:" + sessionId + ",res:" + res);
                        modelUtil.emitData(self.eventEmitter, {"status": 200, "msg": "置顶成功!"});
                    }).then(function () {
                        SessionRepo.stickSession(sessionId, user, scoreres);
                        SessionRepo.saveStickySession(sessionId, user, scoreres);
                    })
                }
            })
@ -318,7 +326,7 @@ class Sessions extends RedisModel {
    }
    /**
     *取消置顶操作
     * 取消会话置顶
     */
    cancelStickSession(sessionId, user) {
        let user_session_key = super.makeRedisKey(RedisKeys.UserSessions, user);

+ 146 - 48
src/server/models/user/users.js

@ -12,18 +12,23 @@ const PLATFORMS = require('../../include/commons').PLATFORM;
let RedisModel = require('../redis.model');
let Doctor = require('./doctor');
let Patient = require('./patient');
let Sessions = require('../sessions/sessions');
let ImDb = require('../../repository/mysql/db/im.db');
let ParticipantRepo = require('../../repository/mysql/participant.repo');
let DoctorRepo = require('../../repository/mysql/doctor.repo');
let PatientRepo = require('../../repository/mysql/patient.repo');
let AppStatusRepo = require('../../repository/mysql/app.status.repo');
let SessionRepo = require('../../repository/mysql/session.repo');
let TopicRepo = require('../../repository/mysql/topic.repo');
let MessageRepo = require('../../repository/mysql/message.repo');
let ModelUtil = require('../../util/model.util');
let RedisClient = require('../../repository/redis/redis.client');
let redisConn = RedisClient.redisClient().connection;
let async = require('async');
let log = require('../../util/log');
let configFile = require('../../include/commons').CONFIG_FILE;
let config = require('../../resources/config/' + configFile);
class Users extends RedisModel {
    constructor() {
@ -52,7 +57,7 @@ class Users extends RedisModel {
                let repoProto = isPatientId ? PatientRepo : DoctorRepo;
                repoProto.findOne(userId, function (err, res) {
                    let user = isPatientId ? new Doctor() : new Patient();
                    if(res.length > 0){
                    if (res.length > 0) {
                        user.name = res[0].name;
                        user.sex = res[0].sex;
                        user.birthdate = res[0].birthdate;
@ -65,31 +70,49 @@ class Users extends RedisModel {
        ]);
    }
    /**
     * 取得用户微信端状态。
     *
     * @param userId
     * @param outCallback
     */
    getWechatStatus(userId) {
        let self = this;
        redisConn.hgetallAsync(self.makeRedisKey(REDIS_KEYS.UserWechatStatus, userId))
            .then(function (res) {
                if (res) {
                    ModelUtil.emitData(self, res);
                } else {
                    ModelUtil.emitDataNotFound(self, {"message": "User is offline, unable to get wechat status."});
                }
            });
    }
    /**
     * 获取客户端App状态。
     *
     * @param userId
     * @param outCallback
     */
    getAppStatus(userId, outCallback){
    getAppStatus(userId, outCallback) {
        let self = this;
        async.waterfall([
            // get from redis
            function (callback) {
                let userStatusKey = self.makeRedisKey(REDIS_KEYS.UserStatus, userId);
                redisConn.hgetallAsync(userStatusKey).then(function (res) {
                   if(res === null){
                       callback(null);  // get from mysql
                   } else {
                       outCallback(null, res);
                   }
                    if (res === null) {
                        callback(null);  // get from mysql
                    } else {
                        outCallback(null, res);
                    }
                });
            },
            // get from MySQL
            function () {
                AppStatusRepo.findOne(userId, function (err, res) {
                    let userStatus = null;
                    if(res.length > 0){
                    if (res.length > 0) {
                        userStatus = {};
                        userStatus.platform = res[0].platform;
                        userStatus.token = res[0].token;
@ -111,29 +134,18 @@ class Users extends RedisModel {
     * @param appInBg
     * @param outCallback
     */
    updateAppStatus(userId, appInBg, outCallback){
    updateAppStatus(userId, appInBg, outCallback) {
        let self = this;
        DoctorRepo.updateStatus(userId, status,
            function (err, result) {
                if (err) {
                    ModelUtil.emitDbError(self.eventEmitter, 'Update user status failed', err);
                    return;
        redisConn.hsetAsync(self.makeRedisKey(REDIS_KEYS.UserAppStatus, userId), 'app_in_bg', appInBg)
            .then(function (res) {
                if (res) {
                    ModelUtil.emitData(self.eventEmitter, {});
                } else {
                    ModelUtil.emitDataNotFound(self.eventEmitter, {"message": "User is offline, unable to update app status."});
                }
                ModelUtil.emitData(self.eventEmitter, {});
            });
    }
    /**
     * 取得用户微信端状态。
     *
     * @param userId
     * @param outCallback
     */
    getWechatStatus(userId, outCallback){
        let self = this;
    }
    /**
     * 用户登录。
     *
@ -146,7 +158,7 @@ class Users extends RedisModel {
     *
     * @return 用户token
     */
    login(userId, platform, token, clientId){
    login(userId, platform, token, clientId) {
        let self = this;
        let loginFromApp = platform === PLATFORMS.Wechat;
@ -159,7 +171,7 @@ class Users extends RedisModel {
            // get user info from mysql
            function (callback) {
                self.getUser(userId, function (err, userInfo) {
                    if(userInfo === null){
                    if (userInfo === null) {
                        ModelUtil.emitDataNotFound(self, 'User not exists.');
                        return;
                    }
@ -169,12 +181,12 @@ class Users extends RedisModel {
            },
            // cache user info and app/wechat status
            function (userInfo, callback) {
                let multi = redisConn.multi()
                let multi = redisConn.multiAsync()
                    .zadd(usersKey, lastLoginTime.getMilliseconds(), userId)
                    .hmset(userKey, 'avatar', userInfo.avatar, 'birthdate', userInfo.birthdate,
                        'name', userInfo.name, 'role', loginFromApp ? 'doctor' : 'patient');
                if(loginFromApp){
                if (loginFromApp) {
                    // cache app status
                    multi = multi.hmset(userStatusKey, 'platform', platform, 'app_in_bg', false, 'client_id', clientId,
                        'token', token, 'last_login_time', lastLoginTime);
@ -183,31 +195,117 @@ class Users extends RedisModel {
                    multi = multi.hmset(userKey, 'open_id', userInfo.open_id);
                }
                multi.execAsnyc().then(function (res) {
                        callback(null);
                    });
                multi.execAsync().then(function (res) {
                    callback(null);
                });
            },
            // cache sessions
            // cache sessions, participants, topics, messages
            function (callback) {
                let sessions = new Sessions();
                sessions.getUserSessionsFromMysql();
                SessionRepo.findAll(userId, function (err, sessions) {
                    for (let i = 0; i < sessions.length; ++i) {
                        let sessionId = sessions[i].id;
                        let name = sessions[i].name;
                        let type = sessions[i].type;
                        let createDate = sessions[i].create_date;
                        (function (sessionId, userId) {
                            // cache sessions
                            redisConn.multiAsync()
                                .zadd(self.makeRedisKey(REDIS_KEYS.UserSessions, userId))
                                .hmset(self.makeRedisKey(REDIS_KEYS.Session, sessionId, 'name', name, 'type', type, 'create_date', createDate))
                                .execAsync().then(function (res) {
                                // cache participants
                                let sessionParticipantsKey = self.makeRedisKey(REDIS_KEYS.Participants, sessionId);
                                ParticipantRepo.findParticipants(sessionId, function (err, participants) {
                                    for (let participant in participants) {
                                        let participantId = participant.participant_id;
                                        let score = new Date().getMilliseconds();
                                        redisConn.multiAsync()
                                            .zaddAsync(sessionParticipantsKey, participantId, score)
                                            .execAsync().then(function (res) {
                                        });
                                    }
                                });
                                // cache messages
                                let messagesKey = self.makeRedisKey(REDIS_KEYS.Messages, sessionId);
                                let messagesByTimestampKey = self.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
                                MessageRepo.findBySessionId(sessionId, 0, config.sessionConfig.maxMessageCount, function (err, messages) {
                                    for (let message in messages) {
                                        let id = message.id;
                                        let msgJson = {
                                            sessionId: message.session_id,
                                            senderId: message.sender_id,
                                            senderName: message.sender_name,
                                            contentType: message.content_type,
                                            content: message.content,
                                            timestamp: message.timestamp
                                        };
                                        redisConn.multiAsync()
                                            .hset(messagesKey, id, msgJson)
                                            .zadd(messagesByTimestampKey, id)
                                            .execAsync()
                                            .then(function (res) {
                                            });
                                    }
                                });
                                // cache topics for MUC session
                                let topicsKey = self.makeRedisKey(REDIS_KEYS.Topics, sessionId);
                                TopicRepo.findAll(sessionId, function (err, topics) {
                                    for (let topic in topics) {
                                        let topicKey = self.makeRedisKey(REDIS_KEYS.Topic, topic.id);
                                        let topicId = topic.id;
                                        let name = topic.name;
                                        let createTime = topic.create_time;
                                        let endBy = topic.end_by;
                                        let startMesssageId = topic.start_message_id;
                                        let endMessageId = topic.end_message_id;
                                        redisConn.multiAsync()
                                            .zadd(topicsKey, topicId)
                                            .hmset(topicKey, 'name', name, 'session_id', sessionId, 'create_time',
                                                createTime, 'end_by', endBy, 'start_message_id',
                                                startMesssageId, 'end_message_id', endMessageId)
                                            .execAsync().then(function (res) {
                                        });
                                    }
                                });
                            });
                        })(sessionId, userId);
                    }
                });
                ModelUtil.emitData(self.eventEmitter, {token: token.value});
                ModelUtil.emitData(self.eventEmitter, {});
            }
        ]);
    }
    logout(userId, outCallback){
    logout(userId) {
        let self = this;
        DoctorRepo.logout(userId,
            function (err, result) {
                if (err) {
                    ModelUtil.emitDbError(self.eventEmitter, 'Logout failed', err);
                    return;
                }
        async.waterfall([
                function (callback) {
                    self.isPatientId(userId, function (err, isPatient) {
                        callback(null, isPatient)
                    });
                },
                function (callback, isPatient) {
                    let usersKey = REDIS_KEYS.Users;
                    let userKey = self.makeRedisKey(REDIS_KEYS.User, userId);
                    let userStatusKey = self.makeRedisKey(isPatient ? REDIS_KEYS.UserWechatStatus : REDIS_KEYS.UserAppStatus, userId);
                    redisConn.multiAsync()
                        .del(usersKey)
                        .del(userKey)
                        .del(userStatusKey)
                        .execAsync().then(function (res) {
                    })
                }],
            function (err, res) {
                ModelUtil.emitData(self.eventEmitter, {});
            });
            }
        );
    }
    /**

+ 49 - 78
src/server/repository/mysql/message.repo.js

@ -1,94 +1,64 @@
/**
 * 搜索功能。
 * 消息库。
 */
"use strict";
let configFile = require('../../include/commons').CONFIG_FILE;
let config = require('../../resources/config/' + configFile);
let ImDb = require('../mysql/db/im.db');
let log = require('../../util/log.js');
const IMTABLE = require('../../include/commons').IM_DB;
const DB_TABLES = require('../../include/commons').DB_TABLES;
class MessageRepo {
    constructor() {
    }
    /**
     * 根据sessionId获取对应的会话的信息列表mysql
     * @param sessionId
     */
    static getMessagesBySession(sessionId,handler){
        let session = new  Sessions();
        session.getSessions(sessionId,function(err,res){
            if(err){
                return;
            }else{
                if(res.length==0){
                    log.warn("session is not found!");
                    return;
                }
                let type = res[0].type;
                let db ="";
                if(type==1){
                    db = IMTABLE.MUCMSG;
                }else if(type==2){
                    db = IMTABLE.P2PMSG;
                }else{
                    db = IMTABLE.GROUPMSG;
                }
                let sql = "select * from "+db+" w where w.session_id = ? limit 0,"+config.sessionConfig.maxMessageCount;
                imDb.execQuery({
                    "sql": sessionsql,
                    "args": [sessionId],
                    "handler": function (err, res) {
                        if(err) {
                            log.error("sql:"+sql+"data:sessionId:"+sessionId);
                        }else{
                            log.info("getMessagesBySession success by sessionId :"+sessionId);
                        }
                        handler(err,res);
                    }
                })
            }
        })
    }
    /**
     * 分页获取消息MySQL
     * 分页获取消息
     *
     * @param sessionId
     * @param page
     * @param pagesize
     * @param size
     * @param handler
     */
    static getMessageByPage(sessionId,page,pagesize,handler){
        if(page>0){
            page =page*pagesize;
    static findBySessionId(sessionId, page, size, handler) {
        if (page <= 0) {
            page = 1;
        }
        let session = new  Sessions();
        session.getSessions(sessionId,function(err,res){
            if(err){
                return;
            }else{
                if(res.length==0){
                    log.warn("session is not found!");
        let session = new Sessions();
        session.getSessions(sessionId, function (err, res) {
            if (!err) {
                if (res.length == 0) {
                    log.warn("Session is not found!");
                    handler(null, null);
                    return;
                }
                let type = res[0].type;
                let db ="";
                if(type==1){
                    db = IMTABLE.MUCMSG;
                }else if(type==2){
                    db = IMTABLE.P2PMSG;
                }else{
                    db = IMTABLE.GROUPMSG;
                let MessageTable = "";
                if (type == 1) {
                    MessageTable = DB_TABLES.MucMessages;
                } else if (type == 2) {
                    MessageTable = DB_TABLES.P2pMessages;
                } else {
                    MessageTable = DB_TABLES.GroupMessages;
                }
                let sql = "select * from "+db+" w where w.session_id = ? limit ?,?";
                imDb.execQuery({
                let sql = "select id, session_id, sender_id, sender_name, content_type, content, timestamp from " + MessageTable + " w where w.session_id = ? limit ?, ?";
                ImDb.execQuery({
                    "sql": sessionsql,
                    "args": [sessionId,page,pagesize],
                    "args": [sessionId, page, size],
                    "handler": function (err, res) {
                        if(err) {
                            log.error("sql:"+sql+"data:sessionId:"+sessionId);
                        }else{
                            log.info("getMessagesBySession success by sessionId :"+sessionId);
                        if (err) {
                            log.error("sql:" + sql + "data:sessionId:" + sessionId);
                        } else {
                            log.info("getMessagesBySession success by sessionId :" + sessionId);
                        }
                        handler(err,res);
                        handler(err, res);
                    }
                })
            }
@ -96,20 +66,21 @@ class MessageRepo {
    }
    /**
     * 保存Message 到mysql
     * 保存消息
     *
     * @param messages 消息对象
     * @type type 会话类型,1表示MUC会话,2表示P2P,3表示群会话,4表示临时讨论组
     */
    static saveMessageForMysql(messages,type,messageid,sessionId){
        var sql = "INSERT INTO "+(type==1?IMTABLE.MUCMSG:type==2?IMTABLE.P2PMSG:IMTABLE.GROUPMSG)+" (id, session_id, sender_id, sender_name,content_type, content, timestamp) VALUES (?,?,?,?,?,?,?) ";
        imDb.execQuery({
    static saveMessageForMysql(messages, type, messageid, sessionId) {
        var sql = "INSERT INTO " + (type == 1 ? DB_TABLES.MucMessages : type == 2 ? DB_TABLES.P2pMessages : DB_TABLES.GroupMessages) + " (id, session_id, sender_id, sender_name,content_type, content, timestamp) VALUES (?,?,?,?,?,?,?) ";
        ImDb.execQuery({
            "sql": sql,
            "args": [messageid,sessionId,messages.senderId,messages.senderName,messages.contentType,messages.content,messages.timestamp],
            "args": [messageid, sessionId, messages.senderId, messages.senderName, messages.contentType, messages.content, messages.timestamp],
            "handler": function (err, res) {
                if(err) {
                    log.error("sql:"+sql+",error:"+err+",data:"+JSON.stringify(messages)+",messageid:"+messageid+",sessionId:"+sessionId);
                }else{
                    log.info("save message to mysql is success by session :"+sessionId);
                if (err) {
                    log.error("sql:" + sql + ",error:" + err + ",data:" + JSON.stringify(messages) + ",messageid:" + messageid + ",sessionId:" + sessionId);
                } else {
                    log.info("save message to mysql is success by session :" + sessionId);
                }
            }
        });

+ 61 - 47
src/server/repository/mysql/participant.repo.js

@ -4,95 +4,109 @@
"use strict";
let ImDb = require('../mysql/db/im.db');
let DbUtil = require('../../util/db.util');
let log = require('../../util/log.js');
const IMTABLE = require('../../include/commons').IM_DB;
const DB_TABLES = require('../../include/commons').DB_TABLES;
class ParticipantRepo {
    constructor() {
    }
    /**
     * 获取某个会话中的成员信息
     * 获取会话的成员列表
     *
     * @param sessionId
     * @param handler
     */
    static getParticipantsBySessionId(sessionId,handler){
        let sql ="select p.* from participants w,patients p where and p.id = w.participaint_id and  w.session_id =? ";
    static findParticipants(sessionId, handler) {
        let sql = "select participant_id, participant_role from participants where session_id = ? ";
        ImDb.execQuery({
            "sql": sql,
            "args": [sessionId],
            "handler": function (err, res) {
                if(err) {
                    log.error("getParticipantsBySessionId is fail error: "+err);
                if (err) {
                    log.error("getParticipantsBySessionId is fail error: " + err);
                }
                handler(res);
            }
        });
    }
    /**
     * 判断用户是否存在session中
     * @param sessionId
     * 获取P2P成员所在会话。将成员的ID排序后取哈希值即可。
     *
     * @param userId
     * @param anotherUserId
     * @param handler
     */
    static existsUser(sessionId,userId,handler){
        let sql ="select count(1) as count from participants w where w.session_id =? and w.participaint_id = ? ";
    static findSessionIdByParticipantIds(userId, anotherUserId, handler) {
        let sessionId = DbUtil.stringArrayHash([userId, anotherUserId]);
        handler(null, sessionId);
        /*let sql = "select session_id from " + DB_TABLES.Participants + " p1 ," + DB_TABLES.Participants + " p2 " +
            "where p1.session_id = p2.session_id and " +
            "((p1.participant_id = ? and p2.participant_id = ?) or (p1.participant_id = ? and p2.participant_id = ?))";
        ImDb.execQuery({
            "sql": sql,
            "args": [sessionId,userId],
            "args": [userId, anotherUserId, anotherUserId, userId],
            "handler": function (err, res) {
                if(err) {
                    log.error("existsUser is fail error: "+err);
                if (err) {
                    log.error("getSessionIdByParticipants is fail error: " + err);
                }
                handler(res[0].count);
                handler(err, res);
            }
        });
        });*/
    }
     /**
     * 根据医生和患者
     * @param patient
     * @param doctor
    /**
     * 用户是否在指定Session中
     *
     * @param sessionId
     * @param userId
     * @param handler
     */
     static getSessionIdByParticipants(patient,doctor,handler){
        let sql ="select session_id from "+IMTABLE.PARTICIPANTS+" p1 ,"+IMTABLE.PARTICIPANTS+" p2 " +
            "where p1.session_id = p2.session_id and " +
            "((p1.participaint_id = ? and p2.participaint_id = ?) or (p1.participaint_id = ? and p2.participaint_id = ?))"
         ImDb.execQuery({
    static existsParticipant(sessionId, userId, handler) {
        let sql = "select case when count(*) > 0 then True else False end as count from participants w where w.session_id =? and w.participant_id = ? ";
        ImDb.execQuery({
            "sql": sql,
            "args": [patient,doctor,doctor,patient],
            "args": [sessionId, userId],
            "handler": function (err, res) {
                if(err) {
                    log.error("getSessionIdByParticipants is fail error: "+err);
                if (err) {
                    log.error("existsUser is fail error: " + err);
                }
                handler(err,res);
                handler(res[0].count);
            }
        });
    }
    /**
     * mysql成员创建
     * @param users
     *
     * @param userIds
     */
    createParticipantsToMysql(session_id,users){
        let sql="insert into "+IMTABLE.PARTICIPANTS +" (session_id,participaint_id,participaint_role,receiving) VALUES "
        let args=[];
        for(var j in users){
            sql+="(?,?,?,?),";
    saveParticipantsToMysql(session_id, userIds) {
        let sql = "insert into " + DB_TABLES.Participants + " (session_id,participant_id,participant_role,receiving) VALUES "
        let args = [];
        for (var j in userIds) {
            sql += "(?,?,?,?),";
            args.push(session_id);
            args.push(users[j]);
            args.push(userIds[j]);
            args.push(0);
            args.push(0);
        }
        sql = sql.substring(0,sql.lastIndexOf(","));
        sql = sql.substring(0, sql.lastIndexOf(","));
        ImDb.execQuery({
            "sql": sql,
            "args": args,
            "handler": function (err, res) {
                if(err) {
                    log.error("createParticipantsForMysql is fail error: "+err+",session_id:"+session_id+",users:"+users.join(","));
                }else{
                if (err) {
                    log.error("createParticipantsForMysql is fail error: " + err + ",session_id:" + session_id + ",users:" + userIds.join(","));
                } else {
                    return res;
                }
            }
@ -100,16 +114,16 @@ class ParticipantRepo {
        return true;
    }
    static deleteUserFromMysql(sessionId,User){
        let sql ="delete from "+IMTABLE.PARTICIPANTS+" where user_id=? and session_id=? ";
    static deleteUserFromMysql(sessionId, userId) {
        let sql = "delete from " + DB_TABLES.Participants + " where user_id=? and session_id=? ";
        ImDb.execQuery({
            "sql": sql,
            "args": [user,sessionId],
            "args": [userId, sessionId],
            "handler": function (err, res) {
                if(err) {
                    log.error("sql:"+sql+"data:sessionId:"+sessionId+",user:"+user);
                }else{
                    log.info("delete deleteUser to mysql is success by session :"+sessionId);
                if (err) {
                    log.error("sql:" + sql + "data:sessionId:" + sessionId + ",user:" + userId);
                } else {
                    log.info("delete deleteUser to mysql is success by session :" + sessionId);
                }
            }
        });

+ 70 - 59
src/server/repository/mysql/session.repo.js

@ -1,132 +1,143 @@
/**
 * 搜索功能。
 * 会话库。
 */
"use strict";
let ImDb = require('../mysql/db/im.db');
let log = require('../../util/log.js');
const IMTABLE = require('../../include/commons').IM_DB;
const DB_TABLES = require('../../include/commons').DB_TABLES;
class SessionRepo {
    constructor() {
    }
    /**
     * 获取某个用户的全部session列表
     * @param userId
     * 获取单个session对象
     *
     * @param sessionId
     * @param handler
     */
    static getUserSessionsFromMysql(userId,handler){
        let sql ="select session_id from "+IMTABLE.PARTICIPANTS+" w where w.participaint_id = ? group by w.session_id";
        let sessionsql = "select id,name,type,create_date from "+IMTABLE.SESSIONS+" s where s.id in("+sql+")";
    static findOne(sessionId, handler) {
        let sessionSQL = "select id,name,type,create_date from " + DB_TABLES.Sessions + " s where s.id=?";
        ImDb.execQuery({
            "sql": sessionsql,
            "args": [userId],
            "sql": sessionSQL,
            "args": [sessionId],
            "handler": function (err, res) {
                if(err) {
                    log.error("sql:"+sessionsql+"data:userId:"+userId);
                }else{
                    log.info("getMysqlUserSessions success by userId :"+userId);
                if (err) {
                    log.error("sql:" + sessionSQL + "data:sessionId:" + sessionId);
                }
                handler(err,res);
                handler(err, res);
            }
        });
    }
    /**
     * 获取置顶的消息
     * 获取用户全部会话
     *
     * @param userId
     * @param handler
     */
    static getUserStickSessionsFromMysql(userId,handler){
        let sql ="select session_id from "+IMTABLE.PARTICIPANTS+" w where w.participaint_id = ? group by w.session_id";
        let sessionsql = "select s.id,s.name,s.type,s.create_date from "+IMTABLE.SESSIONS+" s,"+IMTABLE.STICKY_SESSION+" ss  where s.id = ss.session_id s.id in("+sql+")";
    static findAll(userId, handler) {
        let sql = "select session_id from " + DB_TABLES.Participants + " w where w.participant_id = ? group by w.session_id";
        let sessionSQL = "select id,name,type,create_date from " + DB_TABLES.Sessions + " s where s.id in(" + sql + ")";
        ImDb.execQuery({
            "sql": sessionsql,
            "sql": sessionSQL,
            "args": [userId],
            "handler": function (err, res) {
                if(err) {
                    log.error("sql:"+sessionsql+"data:userId:"+userId);
                }else{
                    log.info("getMysqlUserSessions success by userId :"+userId);
                if (err) {
                    log.error("sql:" + sessionSQL + "data:userId:" + userId);
                } else {
                    log.info("getUserSessionsFromMysql success by userId :" + userId);
                }
                handler(err,res);
                handler(err, res);
            }
        });
    }
    /**
     * 获取session单个对象
     * @param sessionId
     * 获取用户置顶会话
     *
     * @param userId
     * @param handler
     */
    static getSessions(sessionId,handler){
        let sessionsql = "select id,name,type,create_date from "+IMTABLE.SESSIONS+" s where s.id=?";
    static findStickySessions(userId, handler) {
        let sql = "select session_id from " + DB_TABLES.Participants + " w where w.participant_id = ? group by w.session_id";
        let sessionSQL = "select s.id,s.name,s.type,s.create_date from " + DB_TABLES.Sessions + " s," + DB_TABLES.StickySessions + " ss  where s.id = ss.session_id s.id in(" + sql + ")";
        ImDb.execQuery({
            "sql": sessionsql,
            "args": [sessionId],
            "sql": sessionSQL,
            "args": [userId],
            "handler": function (err, res) {
                if(err) {
                    log.error("sql:"+sessionsql+"data:sessionId:"+sessionId);
                }else{
                    log.info("getSessions success by sessionId :"+sessionId);
                if (err) {
                    log.error("sql:" + sessionSQL + "data:userId:" + userId);
                }
                handler(err,res);
                
                handler(err, res);
            }
        });
    }
    /**
     * 保存session到sql数据库
     * 保存session。
     *
     * @param sessionId
     * @param name
     * @param type
     * @param createDate
     */
    static saveSessionToMysql(sessionId,name,type,createDate){
        let sql ="insert into "+IMTABLE.SESSIONS+" (id,name,type,create_date) VALUES (?,?,?,?) ";
    static saveSession(sessionId, name, type, createDate) {
        let sql = "insert into " + DB_TABLES.Sessions + " (id,name,type,create_date) VALUES (?,?,?,?) ";
        ImDb.execQuery({
            "sql": sql,
            "args": [sessionId,name,type,createDate],
            "args": [sessionId, name, type, createDate],
            "handler": function (err, res) {
                if(err) {
                    log.error("sql:"+sql+"data:sessionId:"+sessionId+",name:"+name+",type:"+type+",createDate:"+createDate);
                }else{
                    log.info("save session to mysql is success by session :"+sessionId);
                if (err) {
                    log.error("sql:" + sql + "data:sessionId:" + sessionId + ",name:" + name + ",type:" + type + ",createDate:" + createDate);
                }
            }
        });
    }
    static stickSession(sessionId,user,score){
        let sql ="insert into "+IMTABLE.STICKY_SESSION+" (user_id,session_id,score) VALUES (?,?,?) ";
    /**
     * 保存置顶会话。
     *
     * @param sessionId
     * @param user
     * @param score
     */
    static saveStickySession(sessionId, user, score) {
        let sql = "insert into " + DB_TABLES.StickySessions + " (user_id,session_id,score) VALUES (?,?,?) ";
        ImDb.execQuery({
            "sql": sql,
            "args": [user,sessionId,score],
            "args": [user, sessionId, score],
            "handler": function (err, res) {
                if(err) {
                    log.error("sql:"+sql+"data:sessionId:"+sessionId+",user:"+user+",score:"+score);
                }else{
                    log.info("save stickSession to mysql is success by session :"+sessionId);
                if (err) {
                    log.error("sql:" + sql + "data:sessionId:" + sessionId + ",user:" + user + ",score:" + score);
                }
            }
        });
    }
    static unstickSession(sessionId,user){
        let sql ="delete from "+IMTABLE.STICKY_SESSION+" where user_id=? and session_id=? ";
    /**
     * 取消会话置顶。
     *
     * @param sessionId
     * @param userId
     */
    static unstickSession(sessionId, userId) {
        let sql = "delete from " + DB_TABLES.StickySessions + " where user_id=? and session_id=? ";
        ImDb.execQuery({
            "sql": sql,
            "args": [user,sessionId],
            "args": [userId, sessionId],
            "handler": function (err, res) {
                if(err) {
                    log.error("sql:"+sql+"data:sessionId:"+sessionId+",user:"+user);
                }else{
                    log.info("delete unstickSession to mysql is success by session :"+sessionId);
                if (err) {
                    log.error("sql:" + sql + "data:sessionId:" + sessionId + ",user:" + userId);
                }
            }
        });
    }
}
module.exports = SessionRepo;

+ 31 - 0
src/server/repository/mysql/topic.repo.js

@ -0,0 +1,31 @@
/**
 * 会话议题库。
 *
 * author: Sand
 * since: 12/21/2016
 */
"use strict";
let ImDb = require('./db/im.db');
class TopicsRepo {
    constructor(){}
    /**
     * 获取会话中的议题。
     *
     * @param sessionId
     * @param handler
     */
    static findAll(sessionId, handler){
        let sql = "select id, session_id, name, create_time, end_by, start_message_id, end_message_id from topics where session_id = ?";
        ImDb.execQuery({
            sql: sql,
            args: [sessionId],
            handler: handler
        });
    }
}
module.exports = TopicsRepo;

+ 0 - 1
src/server/resources/config/config.dev.js

@ -71,7 +71,6 @@ let sessionConfig = {
    maxMessageTimespan: 7 * 24 * 3600
};
exports.app = 'IM.Server';
exports.version = '1.2.7';
exports.debug = true;

+ 3 - 3
src/server/resources/schema/ichat_schema.1.2.8.sql

@ -103,10 +103,10 @@ CREATE TABLE `muc_messages`
CREATE TABLE `participants`
(
	`session_id` VARCHAR(50) NOT NULL COMMENT '会话ID。ID结构:以患者ID+最大次数',
	`participaint_id` VARCHAR(50) NOT NULL COMMENT '参与者ID',
	`participaint_role` INTEGER COMMENT '参与者角色,MUC模式中的主持人/普通参与者',
	`participant_id` VARCHAR(50) NOT NULL COMMENT '参与者ID',
	`participant_role` INTEGER COMMENT '参与者角色,MUC模式中的主持人/普通参与者',
	`receiving` TINYINT COMMENT '当前是否正在接收',
	CONSTRAINT `PK_participants` PRIMARY KEY (`session_id`,`participaint_id`)
	CONSTRAINT `PK_participants` PRIMARY KEY (`session_id`,`participant_id`)
) COMMENT='会话参与者'
;

+ 61 - 43
src/server/util/db.util.js

@ -1,62 +1,80 @@
"use strict";
/**
 * 数据库工具,使用数据库连接池获取连接,执行查询,之后将连接返回连接池。
 */
"use strict";
var configFile = require('../include/commons').CONFIG_FILE;
var config = require('../resources/config/' + configFile);
var log = require('./log');
let crypto = require('crypto');
/**
 * 数据库查询工具,使用数据库连接池获取连接,执行查询,之后将连接返回连接池。
 */
exports.execQuery = function (pool, options) {
    if(config.showSQL) log.info(options.sql);
class DbUtil {
    constructor() {
    }
    pool.getConnection(function (err, connection) {
        // 查询参数
        var sql = options['sql'];
        var args = options['args'];
        var handler = options['handler'];
    static execQuery(pool, options) {
        if (config.showSQL) log.info(options.sql);
        if (err) {
            //log.error('Database - get connection failed, ' + err);
            handler(err, 'db-getConnection');
        pool.getConnection(function (err, connection) {
            // 查询参数
            var sql = options['sql'];
            var args = options['args'];
            var handler = options['handler'];
            return;
        }
            if (err) {
                //log.error('Database - get connection failed, ' + err);
                handler(err, 'db-getConnection');
        // 执行查询
        if (args) {
            var query = connection.query(sql, args, function (err, results) {
                if (err) {
                    //log.error('Database - execute query failed, ' + err);
                    handler(err, results);
                return;
            }
                    return;
                }
            // 执行查询
            if (args) {
                connection.query(sql, args, function (err, results) {
                    if (err) {
                        //log.error('Database - execute query failed, ' + err);
                        handler(err, results);
                // 处理结果
                handler(err, results);
            });
                        return;
                    }
        } else {
            var query = connection.query(sql, function (err, results) {
                if (err) {
                    //log.error('Database - execute query failed, ' + err);
                    // 处理结果
                    handler(err, results);
                });
                    return;
                }
            } else {
                connection.query(sql, function (err, results) {
                    if (err) {
                        //log.error('Database - execute query failed, ' + err);
                        handler(err, results);
                // 处理结果
                handler(err, results);
            });
        }
                        return;
                    }
        // 返回连接池
        connection.release(function (err) {
            if (err) {
                log.error('Database - release connection failed, ' + err);
                    // 处理结果
                    handler(err, results);
                });
            }
            // 返回连接池
            connection.release(function (err) {
                if (err) {
                    log.error('Database - release connection failed, ' + err);
                }
            });
        });
    });
};
    }
    /**
     * 为字符串数组生成一个Hash值,为保证唯一性,生成前先对数组进行排序。
     *
     * @param stringArray
     */
    static stringArrayHash(stringArray){
        let sortedArr = stringArray.sort();
        return crypto.createHash("sha256").update(sortedArr.join(","));
    }
}
module.exports = DbUtil;