Переглянути джерело

Merge branch 'feature-refactor' of http://192.168.1.220:10080/Amoy/im.doctor into feature-refactor

sand 8 роки тому
батько
коміт
0d88f7b8ce

+ 26 - 3
src/server/endpoints/chats.endpoint.js

@ -69,7 +69,7 @@ router.post(APIv1.Chats.SM, function (req, res) {
router.get(APIv1.Chats.TEST,function(req,res){
    let test = req.query.test;
    //http://192.168.131.107:3008/api/v1/chats/test?test=1&page=0&pagesize=10&user=3121&sessionId=testsessionmsg1
    if(test==1){
    if(test==1){//获取会话消息列表
        let page = req.query.page;
        let pagesize = req.query.pagesize;
        let user = req.query.user;
@ -79,7 +79,7 @@ router.get(APIv1.Chats.TEST,function(req,res){
        sessions.getSessionMessages(sessionId,user,page,pagesize);
    }
    //http://192.168.131.107:3008/api/v1/chats/test?test=2&page=0&pagesize=10&user=3121
    if(test==2){
    if(test==2){//获取用户会话
        let sessions = new Sessions();
        let page = req.query.page;
        let pagesize = req.query.pagesize;
@ -87,15 +87,38 @@ router.get(APIv1.Chats.TEST,function(req,res){
        controllerUtil.regModelEventHandler(sessions, res);
        sessions.getUserSessions(user,page,pagesize);
    }
    //
    //http://192.168.131.107:3008/api/v1/chats/test?test=3&sessionId=132312312&users=10,2,3&name=3121&sessionType=2
    if(test==3){
        let sessions = new Sessions();
        controllerUtil.regModelEventHandler(sessions, res);
        let sessionId = req.query.sessionId;
        let users = req.query.users;
        let name = req.query.name;
        let sessionType = req.query.sessionType;
        sessions.createSessions(sessionId,name,sessionType,users);
    }
    if(test==4){
        let sessions = new Sessions();
        controllerUtil.regModelEventHandler(sessions, res);
        let sessionId = req.query.sessionId;
        let message ={};
        message.contentType =1;
        message.timestamp=new Date();
        message.content ="test send message";
        message.senderId="10";
        message.senderName="test1";
        sessions.saveMessageBySession(message,sessionId);
    }
    //http://192.168.131.107:3008/api/v1/chats/test?test=5&page=0&pagesize=10&user=3121&sessionId=testsessionmsg1
    if(test==5){
        let sessions = new Sessions();
        controllerUtil.regModelEventHandler(sessions, res);
        let sessionId = req.query.sessionId;
        let user = req.query.user;
        let page = req.query.page;
        let pagesize = req.query.pagesize;
        sessions.getSessionMessages(sessionId,user,page,pagesize);
    }
})

+ 3 - 0
src/server/include/commons.js

@ -82,6 +82,7 @@ exports.DEFAULT_PAGE_SIZE = 100;
 * Redis Key列表与占位符。
 */
const REDIS_KEY_REPLACER = "{id}";
exports.REDIS_KEY_REPLACER = REDIS_KEY_REPLACER;
exports.REDIS_KEYS = {
@ -102,6 +103,8 @@ exports.REDIS_KEYS = {
    Topic: "topics:" + REDIS_KEY_REPLACER
};
exports.STICK_NUM = 90000000000000;
exports.IM_DB = {
    "P2PMSG": "p2p_messages",
    "MUCMSG": "muc_messages",

+ 79 - 7
src/server/models/messages/messages.js

@ -10,6 +10,9 @@ let RedisModel = require('./../redis.model.js');
let modelUtil = require('../../util/modelUtil');
var imDb = require('../../repository/mysql/db/im.db');
let log = require('../../util/log.js');
let Sessions = require('../sessions/sessions');
let configFile = require('../../include/commons').CONFIG_FILE;
let config = require('../../resources/config/' + configFile);
const RedisKey = require('../../include/commons').REDIS_KEYS;
const IMTABLE = require('../../include/commons').IM_DB;
@ -39,23 +42,92 @@ class Messages extends RedisModel {
    /**
     * 根据sessionId获取对应的会话的信息列表
     * 根据sessionId获取对应的会话的信息列表mysql
     * @param sessionId
     */
    getMessagesBySession(sessionId){
    getMessagesBySession(sessionId,handler){
       let session = new  Sessions();
       session.getSessions(sessionId,function(err,res){
            if(err){
                return;
            }else{
                if(res.length==0){
                    log.warn("session is not found!");
                    return;
                }
                let type = res[0].type;
                let db ="";
                if(type==1){
                    db = IMTABLE.MUCMSG;
                }else if(type==2){
                    db = IMTABLE.P2PMSG;
                }else{
                    db = IMTABLE.GROUPMSG;
                }
                let sql = "select * from "+db+" w where w.session_id = ? limit 0,"+config.sessionConfig.maxMessageCount;
                imDb.execQuery({
                    "sql": sessionsql,
                    "args": [sessionId],
                    "handler": function (err, res) {
                        if(err) {
                            log.error("sql:"+sql+"data:sessionId:"+sessionId);
                        }else{
                            log.info("getMessagesBySession success by sessionId :"+sessionId);
                        }
                        handler(err,res);
                    }
                })
            }
       })
    }
    /**
     * 分页
     * 根据sessionId获取对应的会话的信息列表
     * 分页获取消息MySQL
     * @param sessionId
     * @param page
     * @param pagesize
     */
    getMessagesBySessionForPage(sessionId,page,pagesize){
    getMessageByPage(sessionId,page,pagesize){
        if(page>0){
            page =page*pagesize;
        }
        let session = new  Sessions();
        session.getSessions(sessionId,function(err,res){
            if(err){
                return;
            }else{
                if(res.length==0){
                    log.warn("session is not found!");
                    return;
                }
                let type = res[0].type;
                let db ="";
                if(type==1){
                    db = IMTABLE.MUCMSG;
                }else if(type==2){
                    db = IMTABLE.P2PMSG;
                }else{
                    db = IMTABLE.GROUPMSG;
                }
                let sql = "select * from "+db+" w where w.session_id = ? limit ?,?";
                imDb.execQuery({
                    "sql": sessionsql,
                    "args": [sessionId,page,pagesize],
                    "handler": function (err, res) {
                        if(err) {
                            log.error("sql:"+sql+"data:sessionId:"+sessionId);
                        }else{
                            log.info("getMessagesBySession success by sessionId :"+sessionId);
                        }
                        handler(err,res);
                    }
                })
            }
        })
    }
    }
    /**
     * 根据消息ID获取单条消息

+ 65 - 1
src/server/models/sessions/participants.js

@ -32,10 +32,66 @@ class Participants extends RedisModel {
     * 根据sessionId获取对应的议题的成员信息
     * @param sessionId
     */
    getParticipantsBySessionId(sessionId){
    getParticipantsBySessionId(sessionId,handler){
        let  participant_key =  super.makeRedisKey(RedisKey.Participants,sessionId);
        redis.existsAsync(participant_key).then(function(res){
            if(res){
                redis.zrangeAsync(participant_key,0,-1).then(function(res){
                    handler(res);
                })
            }else{
                let sql ="select w.* from participants w where w.session_id =? ";
                imDb.execQuery({
                    "sql": sql,
                    "args": [sessionId],
                    "handler": function (err, res) {
                        if(err) {
                            log.error("getParticipantsBySessionId is fail error: "+err);
                        }
                        handler(res);
                    }
                });
            }
        })
    }
    /**
     * 判断成员是否存在这个讨论组中
     * @param sessionId
     * @param userId
     */
    existsUser(sessionId,userId,handler){
      let  participant_key =  super.makeRedisKey(RedisKey.Participants,sessionId);
      redis.existsAsync(participant_key).then(function(res){
          if(res){//存在redis中直接从redis校验
              redis.zrangeAsync(participant_key,0,-1).then(function(res){
                  let exists = false
                  for(var j in res){
                      var value = res[j];
                      if(value==userId){
                          exists  = true;
                          break;
                      }
                  }
                  handler(exists);
              })
          }else{//不存在从数据库中获取
              let sql ="select count(1) as count from participants w where w.session_id =? and w.participaint_id = ? ";
              imDb.execQuery({
                  "sql": sql,
                  "args": [sessionId,userId],
                  "handler": function (err, res) {
                      if(err) {
                          log.error("existsUser is fail error: "+err);
                      }
                      handler(res[0].count);
                  }
              });
          }
      })
    }
    /**
     * 根据医生和患者
     * @param patient
@ -57,6 +113,14 @@ class Participants extends RedisModel {
        });
    }
    /**
     * 将成员写入redis
     * @param session_id 会话ID
     * @param users 用户集合
     * @param createDate 创建日期
     * @param handler 回调
     */
    createParticipantsToRedis(session_id,users,createDate,handler){
        let participants_key =  super.makeRedisKey(RedisKey.Participants,session_id);
        for(var j in users){

+ 150 - 63
src/server/models/sessions/sessions.js

@ -14,6 +14,7 @@ let imDb = require('../../repository/mysql/db/im.db');
let log = require('../../util/log.js');
const RedisKeys = require('../../include/commons').REDIS_KEYS;
const IMTABLE = require('../../include/commons').IM_DB;
const Commons = require('../../include/commons')
let mongoose = require('mongoose');
class Sessions extends RedisModel {
@ -22,13 +23,48 @@ class Sessions extends RedisModel {
    }
    /**
     * 根据sessionId获取对应的Session
     * @param sessionId
     * 获取某个用户的全部session列表
     * @param userId
     * @param handler
     */
    getSessionsById(sessionId){
    getMysqlUserSessions(userId,handler){
        let sql ="select select session_id from participants w where w.participaint_id = ? group by w.session_id";
        let sessionsql = "select id,name,type,create_date from session s where s.id in("+sql+")";
        imDb.execQuery({
            "sql": sessionsql,
            "args": [userId],
            "handler": function (err, res) {
                if(err) {
                    log.error("sql:"+sessionsql+"data:userId:"+userId);
                }else{
                    log.info("getMysqlUserSessions success by userId :"+userId);
                }
                handler(err,res);
            }
        });
    }
    /**
     * 获取session单个对象
     * @param sessionId
     * @param handler
     */
    getSessions(sessionId,handler){
        let sessionsql = "select id,name,type,create_date from session s where s.id=?";
        imDb.execQuery({
            "sql": sessionsql,
            "args": [sessionId],
            "handler": function (err, res) {
                if(err) {
                    log.error("sql:"+sessionsql+"data:sessionId:"+sessionId);
                }else{
                    log.info("getSessions success by sessionId :"+sessionId);
                }
                handler(err,res);
            }
        });
    }
    /**
     * 根据用户ID获取用户的session列表
     * @param userId
@ -51,7 +87,6 @@ class Sessions extends RedisModel {
                    calllist(res[j],j,res.length);
                }
            }
            function calllist(session,j,_len){
                let session_key =_super(RedisKeys.Session,session);
                redis.hgetallAsync(session_key).then(function(res){
@ -113,34 +148,42 @@ class Sessions extends RedisModel {
        let self = this;
        let message_timestamp_key = super.makeRedisKey(RedisKeys.MessagesTimestamp,sessionId);
        let message_key = super.makeRedisKey(RedisKeys.Messages,sessionId);
        let participants = super.makeRedisKey(RedisKeys.Participants,sessionId);
        let participants_key = super.makeRedisKey(RedisKeys.Participants,sessionId);
        if(page>0){
            page = page*pagesize;
            pagesize = pagesize+page;
        }
        //倒序取出最后N条消息
        redis.zrevrangeAsync(message_timestamp_key,page,pagesize).then(function(res){
            //取出消息实体
            if(res.length==0){
                modelUtil.emitData(self.eventEmitter,{"status":200,"data":[]});
                return;
            }
            redis.hmgetAsync(message_key,res).then(function(messages) {
                console.log(messages)
                //将取到的消息返回给前端
                modelUtil.emitData(self.eventEmitter,{"status":200,"data":messages});
            }).then(function(){
                //更新患者最后一次获取消息的日期
                redis.zaddAsync(participants, (new Date().getTime()),user).then(function(res){
                    console.log(res);
        let participants = new Participants();
        participants.existsUser(sessionId,user,function(res){
            if(!res){
                modelUtil.emitData(self.eventEmitter,{"status":-1,"msg":"用户不在此会话中!"});
            }else{
                //倒序取出最后N条消息
                redis.zrevrangeAsync(message_timestamp_key,page,pagesize).then(function(res){
                    //取出消息实体
                    if(res.length==0){
                        modelUtil.emitData(self.eventEmitter,{"status":200,"data":[]});
                        return;
                    }
                    redis.hmgetAsync(message_key,res).then(function(messages) {
                        console.log(messages)
                        //将取到的消息返回给前端
                        modelUtil.emitData(self.eventEmitter,{"status":200,"data":messages});
                    }).then(function(){
                        //更新患者最后一次获取消息的日期
                        redis.zaddAsync(participants_key, (new Date().getTime()),user).then(function(res){
                            console.log(res);
                        }).catch(function(res){
                            throw res;
                        })
                    })
                }).catch(function(res){
                    throw res;
                    modelUtil.emitData(self.eventEmitter,{"status":-1,"msg":res});
                })
            })
        }).catch(function(res){
            modelUtil.emitData(self.eventEmitter,{"status":-1,"msg":res});
        })
            }
         })
    }
    /**
     * 更新最后一条消息
     * @param session_key rediskey
@ -149,7 +192,7 @@ class Sessions extends RedisModel {
     * @param message
     * @returns {*}
     */
     updateLastContent(session_key,session_type,name,message){
    updateLastContent(session_key,session_type,name,message){
        return redis.hmsetAsync(session_key,
            "create_date", message.timestamp,
            "last_content", message.content,
@ -161,52 +204,50 @@ class Sessions extends RedisModel {
        );
    }
    /**
     * 保存消息,用于消息发送
     * @param message
     * @param sessionId
     */
    saveMessageBySession(message,sessionId) {
        let self = this;
        let messages = new Messages();
        let participants = new Participants();
        let session_key = super.makeRedisKey(RedisKeys.Session,sessionId);
        let message_id = mongoose.Types.ObjectId().toString();
        let session_type = 0;
        let name = "";
        let messages = new Messages();
        redis.hmgetAsync(session_key, ["type","name"]).then(function(res){
            session_type = res[0];
            name  = res[1];
            if(!session_type||!name){
                log.error("session is error for key "+session_key);
                throw "session is not found";
        participants.existsUser(sessionId,messages.senderId,function(res){
            //校验发送成员是都在讨论组
            if(res){
                redis.hmgetAsync(session_key, ["type","name"]).then(function(res){
                    session_type = res[0];
                    name  = res[1];
                    if(!session_type||!name){
                        log.error("session is error for key "+session_key);
                        throw "session is not found";
                    }
                }).then(function(res){
                    //更新消息相关
                    return  messages.saveMessageForRedis(message_id,sessionId,message);
                }).then(function (res) {
                    //更新session的最后一条聊天记录
                    return self.updateLastContent(session_key,session_type,name,message);
                }).then(function (res) {
                    //操作mysql数据库
                    messages.saveMessageForMysql(message,session_type,message_id,sessionId);
                    //返回数据给前端。
                    modelUtil.emitData(self.eventEmitter, {"status":200,"msg":"发送成功!"});
                    //消息推送
                }).catch(function (res) {
                    modelUtil.emitData(self.eventEmitter,{"status":-1,"msg":res});
                })
            }else{
                modelUtil.emitData(self.eventEmitter,{"status":-1,"msg":"用户不在此会话当中!"});
            }
        }).then(function(res){
            //更新消息相关
            return  messages.saveMessageForRedis(message_id,sessionId,message);
        }).then(function (res) {
            //更新session的最后一条聊天记录
            return self.updateLastContent(session_key,session_type,name,message);
        }).then(function (res) {
            //操作mysql数据库
            messages.saveMessageForMysql(message,session_type,message_id,sessionId);
            //返回数据给前端。
            modelUtil.emitData(self.eventEmitter, {"status":200,"msg":"发送成功!"});
            //消息推送
        }).catch(function (res) {
            modelUtil.emitData(self.eventEmitter,{"status":-1,"msg":res});
        })
    }
    /**
     * 根据sessionId获取对应的消息
     * @param sessionId
     */
    getSessionMessagesByPage(sessionId,page,pageSize){
    }
    /**
     * 非MUC模式创建会话
     * @param sessionId 会话ID
@ -261,7 +302,53 @@ class Sessions extends RedisModel {
        });
    }
    /**
     *置顶操作
     */
    stickSession(sessionId,user){
        let user_session_key = super.makeRedisKey(RedisKeys.UsersSessions,user);
        let self = this;
        //取出最大的session
        redis.zrevrangeAsync(user_session_key,0,0).then(function(res){
            //获取该session的时间搓
            redis.zscoreAsync(user_session_key,res).then(function(scoreres){
                let nowtime = new Date().getTime();
                //当前时间搓比redis的时间搓更早证明没有置顶过
                if(scoreres<=nowtime){
                    //初始化置顶
                   redis.zaddAsync(user_session_key, Commons.STICK_NUM,sessionId).then(function(res){
                       log.info("stickSession:"+sessionId+",res:"+res);
                       modelUtil.emitData(self.eventEmitter,{"status":200,"msg":"置顶成功!"});
                   })
                }else{
                    //已有置顶的数据,取出来加1保存回去
                    scoreres =  scoreres+1;
                    redis.zaddAsync(user_session_key, scoreres,sessionId).then(function(){
                        log.info("stickSession:"+sessionId+",res:"+res);
                        modelUtil.emitData(self.eventEmitter,{"status":200,"msg":"置顶成功!"});
                    })
                }
            })
        })
    }
    /**
     *取消置顶操作
     */
    cancelStickSession(sessionId,user){
        let user_session_key = super.makeRedisKey(RedisKeys.UsersSessions,user);
        let participants_key = super.makeRedisKey(RedisKeys.Participants,sessionId);
        let self = this;
        redis.zscoreAsync(participants_key,user).then(function(res){
            if(!res){
                res = new Date().getTime();
            }
            redis.zaddAsync(user_session_key, res,sessionId).then(function(res){
                log.info("cancelStickSession:"+sessionId);
                modelUtil.emitData(self.eventEmitter,{"status":200,"msg":"取消置顶成功!"});
            });
        })
    }
}

+ 6 - 0
src/server/resources/config/config.dev.js

@ -66,6 +66,12 @@ let wechatConfig = {
    }
};
let sessionConfig = {
    maxMessageCount: 1000,
    maxMessageTimespan: 7 * 24 * 3600
};
exports.app = 'IM.Server';
exports.version = '1.2.7';
exports.debug = true;