Browse Source

增加最近会话列表接口,按7天计算;增加通用请求参数判断

Sand 8 years ago
parent
commit
bc7e080fdb

+ 9 - 0
src/client/im.client.js

@ -389,6 +389,15 @@ var imClient = {
                {},
                success,
                failure);
        },
        // 获取最近7天内的会话列表
        getRecentSessions: function (userId, success, failure) {
            "use strict";
            httpClient.get(ENDPOINTS.Sessions.RecentSessions,
                {user_id: userId, date_span: 7},
                success,
                failure);
        }
    },
    Search: {}

+ 18 - 0
src/server/endpoints/v2/session.endpoint.js

@ -83,6 +83,24 @@ router.get("/", function (req, res) {
    sessions.getUserSessions(userId, page, size,businessType);
});
/**
 * 最近会话列表。
 *
 * URL:
 *  /sessions/recent?user_id=abc&date_span=7
 */
router.get(APIv2.Sessions.RecentSessions, function (req, res) {
    ControllerUtil.checkRequestQueryParams(req, ['user_id', 'date_span']);
    let userId = req.query.user_id;
    let dateSpan = req.query.date_span;
    let sessions = new Sessions();
    ControllerUtil.regModelEventHandler(sessions, res);
    sessions.getRecentSessions(userId, dateSpan);
});
/**
 * 某个聊天记录置顶操作
 *

+ 189 - 39
src/server/models/migration/migration.js

@ -7,24 +7,184 @@
'use strict';
let ImDb = require('../../repository/mysql/db/im.db');
var ObjectUtil = require("../../util/object.util.js");
let ObjectUtil = require("../../util/object.util.js");
let DbUtil = require("../../util/db.util.js");
let async = require("async");
let log = require("../../util/log.js");
let fs = require('fs');
var mongoose = require("mongoose");
let mongoose = require("mongoose");
let vprintf = require('sprintf-js').vsprintf;
const migrationFile = "../../resources/schema/ichat_1.2.8_data_migration.sql";
const MIGRATION_SCRIPT_File_NAME = "../../resources/schema/ichat_1.2.8_data_migration.sql";
class Migration {
    constructor() {
    }
    /**
     * 迁移P2P及其消息
     * 迁移系统消息
     */
    static migrateSystem() {
        let data = "-- System sessions: \n";
        async.waterfall([
                function (callback) {
                    // 所有的系统会话
                    let sql = "select distinct to_uid participant_id from im_new.msg_system order by to_uid";
                    ImDb.execQuery({
                        sql: sql,
                        args: [],
                        handler: function (err, res) {
                            if (err) {
                                return callback(err, res);
                            }
                            let buffer = "insert into sessions(id, name, type, create_date) values ";
                            res.forEach(function (session) {
                                buffer += vprintf("\n('%s', '%s', 0, now()),", [
                                    DbUtil.stringArrayHash(['system', session.participant_id]),
                                    "系统消息"
                                ]);
                            });
                            data += buffer.substr(0, buffer.length - 1) + ";\n";
                            callback(null);
                        }
                    });
                },
                function (callback) {
                    // 所有的系统消息
                    let sql = "SELECT msg_id 'id', 'session_id', 'system' `sender_id`, '系统通知' `sender_name`, 1 `content_type`, DATA 'content', timestamp, `type` 'business_type', to_uid FROM im_new.msg_system ORDER BY to_uid";
                    ImDb.execQuery({
                        sql: sql,
                        args: [],
                        handler: function (err, res) {
                            if (err) {
                                return callback(err, res);
                            }
                            let buffer = "insert into system_messages(id, session_id, sender_id, sender_name, content_type, content, timestamp, business_type) values";
                            res.forEach(function (message) {
                                buffer += vprintf("\n('%s', '%s', '%s', '%s', 1, '%s', %s, '%s'),", [
                                    mongoose.Types.ObjectId().toString(),
                                    DbUtil.stringArrayHash(['system', message.to_uid]),
                                    'system',
                                    '系统',
                                    message.content.replace(/'/g, "''"),
                                    ObjectUtil.timestampToLong(message.timestamp),
                                    message.business_type]);
                            });
                            data += buffer.substr(0, buffer.length - 1) + ";\n\n";
                            callback(null, null);
                        }
                    });
                }],
            function (err, res) {
                if (err) {
                    log.error("Error occurs while migrate system sessions: ", err);
                } else {
                    fs.writeFileSync(MIGRATION_SCRIPT_File_NAME, data + "\n");
                    log.info("Migrate System sessions succeed.");
                }
            });
    }
    /**
     * 迁移MUC及其消息。将原先的P2P中与患者有关的会话全部过来出来,再将其转为会话。议题从家庭医生库中提取出来。
     */
    static migrateMUC() {
        // 选择出所有的咨询组
        let sql = "select g.to_gid from im_new.msg_group g where length(g.to_gid) > 20 group by g.to_gid order by g.to_gid";
        // 选择出所有的咨询组成员
        sql = "select g.to_gid, g.from_uid from im_new.msg_group g where length(g.to_gid) > 20 group by g.to_gid order by g.to_gid";
        // 选择出所有的咨询组消息
        sql = "select g.to_gid, g.from_uid, g.msg_id, g.`type`, g.content, g.timestamp, g.at_uid from im_new.msg_group g where length(g.to_gid) > 20 order by g.to_gid";
    }
    /**
     * 迁移P2P及其消息,新的IM中P2P仅给医生使用,因此过滤出原先发送与接收人均是医生的会话。
     */
    static migrateP2P() {
        let data = "-- P2P sessions: \n";
        async.waterfall([
            function (callback) {
                // 所有的P2P 会话,成员与消息
                let sql = "SELECT a.from_uid sender_id, d.name sender_name, a.to_uid to_id, e.name to_name, a.type content_type, a.content, a.timestamp " +
                " FROM im_new.msg_p2p a " +
                " LEFT JOIN wlyy.wlyy_doctor d ON a.from_uid = d.code " +
                " LEFT JOIN wlyy.wlyy_doctor e on a.to_uid = e.code " +
                " WHERE a.from_uid IN (SELECT code FROM wlyy.wlyy_doctor d) AND a.to_uid IN (SELECT code FROM wlyy.wlyy_doctor d)";
                ImDb.execQuery({
                    sql: sql,
                    args: [],
                    handler: function (err, res) {
                        if(err) return callback(err, null);
                        let sessionsBuffer = "insert into sessions(id, name, type, create_date) values ";
                        let participantBuffer = "insert into participants(session_id, participant_id, participant_role, last_fetch_time) values ";
                        let messageBuffer = "insert into p2p_messages(id, session_id, sender_id, sender_name, content_type, content, timestamp) values ";
                        res.forEach(function (message) {
                            let sessionId = DbUtil.stringArrayHash([message.sender_id, message.to_id]);
                            // 会话
                            sessionsBuffer += vprintf("\n('%s', '%s', %s, %s),",  [
                                sessionId,
                                'P2P',
                                1,
                                new Date('2016-11-22 12:00:00').getTime()
                            ]);
                            // 成员
                            participantBuffer += vprintf("\n('%s', '%s', %s, %s),", [
                                sessionId,
                                message.sender_id,
                                0,
                                new Date().getTime()
                            ]);
                            participantBuffer += vprintf("\n('%s', '%s', %s, %s),", [
                                sessionId,
                                message.to_id,
                                0,
                                new Date().getTime()
                            ]);
                            // 消息
                            messageBuffer += vprintf("\n('%s', '%s', '%s', '%s', '%s', '%s', %s),", [
                                mongoose.Types.ObjectId().toString(),
                                sessionId,
                                message.sender_id,
                                message.sender_name,
                                message.content_type,
                                message.content.replace(/'/g, "''"),
                                ObjectUtil.timestampToLong(message.timestamp)
                            ]);
                        });
                        data += sessionsBuffer.substr(0, sessionsBuffer.length - 1) + " ON DUPLICATE KEY UPDATE id = id;\n\n";
                        data += participantBuffer.substr(0, participantBuffer.length - 1) + " ON DUPLICATE KEY UPDATE session_id = session_id;\n\n";
                        data += messageBuffer.substr(0, messageBuffer.length - 1) + " ON DUPLICATE KEY UPDATE id = id;\n\n";
                        callback(null);
                    }
                });
            }
        ],
        function (err, res) {
            if (err) {
                log.error("Error occurs while migrate p2p sessions: ", err);
            } else {
                fs.writeFileSync(MIGRATION_SCRIPT_File_NAME, data + "\n");
                log.info("Migrate P2P sessions succeed.");
            }
        });
    }
    /**
@ -34,6 +194,7 @@ class Migration {
     * 3 原消息取出来后,需要重新生成UUID
     */
    static migrateGroups() {
        let data = "-- Group sessions: \n";
        async.waterfall([
                function (callback) {
                    // 选择出所有的行政组,作为会话
@ -49,9 +210,9 @@ class Migration {
                                return callback(err, res);
                            }
                            let sqls = "insert into sessions(id, name, type, create_date) values ";
                            let buffer = "insert into sessions(id, name, type, create_date) values ";
                            sessions.forEach(function (session) {
                                sqls += vprintf("\n('%s', '%s', %s, %s)", [
                                buffer += vprintf("\n('%s', '%s', %s, %s),", [
                                    session.id,
                                    session.name,
                                    session.type,
@ -59,7 +220,7 @@ class Migration {
                                ]);
                            });
                            fs.writeFileSync(migrationFile, sqls + ";\n\n");
                            data += buffer.substr(0, buffer.length - 1) + ";\n\n";
                            callback(null);
                        }
                    });
@ -78,9 +239,9 @@ class Migration {
                                return callback(err, res);
                            }
                            let sqls = "insert into participants(session_id, participant_id, participant_role, last_fetch_time) values ";
                            let buffer = "insert into participants(session_id, participant_id, participant_role, last_fetch_time) values ";
                            participants.forEach(function (participant) {
                                sqls += vprintf("\n('%s', '%s', %s, %s)", [
                                buffer += vprintf("\n('%s', '%s', %s, %s),", [
                                    participant.session_id,
                                    participant.participant_id,
                                    participant.participant_role,
@ -88,7 +249,7 @@ class Migration {
                                ]);
                            });
                            fs.appendFileSync(migrationFile, sqls + ";\n\n");
                            data += buffer.substr(0, buffer.length - 1) + ";\n\n";
                            callback(null);
                        }
                    });
@ -108,71 +269,60 @@ class Migration {
                                return callback(err, res);
                            }
                            let sqls = "insert into group_messages(id, session_id, sender_id, sender_name, content_type, content, timestamp, at) values ";
                            let buffer = "insert into group_messages(id, session_id, sender_id, sender_name, content_type, content, timestamp, at) values ";
                            messages.forEach(function (message) {
                                sqls += vprintf("\n('%s', '%s', '%s', '%s', '%s', '%s', %s, '%s')", [
                                buffer += vprintf("\n('%s', '%s', '%s', '%s', '%s', '%s', %s, '%s'),", [
                                    mongoose.Types.ObjectId().toString(),
                                    message.session_id,
                                    message.sender_id,
                                    message.sender_name,
                                    message.content_type,
                                    message.content,
                                    message.content.replace(/'/g, "''"),
                                    ObjectUtil.timestampToLong(message.timestamp),
                                    message.at
                                ]);
                            });
                            fs.appendFileSync(migrationFile, sqls + ";\n\n", null, function (err) {
                                if (err) {
                                    return callback(err, null);
                                }
                            });
                            data += buffer.substr(0, buffer.length - 1) + ";\n\n";
                            callback(null, null);
                        }
                    });
                    callback(null, null, "OK");
                }
            ],
            function (err, res) {
                if (err) {
                    log.error("Error occures while migrate group sessions: ", err);
                    log.error("Error occurs while migrate group sessions: ", err);
                } else {
                    fs.appendFileSync(MIGRATION_SCRIPT_File_NAME, data, null);
                    log.info("Migrate group sessions succeed.");
                    process.exit(err != null ? 1 : 0);
                }
            });
    }
    /**
     * 迁移MUC及其消息
     */
    static migrateMUC() {
        // 选择出所有的咨询组
        let sql = "select g.to_gid from im_new.msg_group g where length(g.to_gid) > 20 group by g.to_gid order by g.to_gid";
        // 选择出所有的咨询组成员
        sql = "select g.to_gid, g.from_uid from im_new.msg_group g where length(g.to_gid) > 20 group by g.to_gid order by g.to_gid";
        // 选择出所有的咨询组消息
        sql = "select g.to_gid, g.from_uid, g.msg_id, g.`type`, g.content, g.timestamp, g.at_uid from im_new.msg_group g where length(g.to_gid) > 20 order by g.to_gid";
    }
}
async.waterfall([
        function (callback) {
            Migration.P2P();
            //Migration.migrateSystem();
            callback(null);
        },
        function (callback) {
            Migration.migrateMUC();
            //Migration.migrateMUC();
            callback(null);
        },
        function (callback) {
            Migration.migrateGroups();
            Migration.migrateP2P();
            callback(null);
        },
        function (callback) {
            //Migration.migrateGroups();
            callback(null);
        }
    ],
    function (err, res) {
        process.exit(err != null ? 1 : 0);
    });

+ 35 - 8
src/server/models/sessions/sessions.js

@ -159,7 +159,7 @@ class Sessions extends RedisModel {
    createSessionToMysql(sessionId, name, type, participantArray,messageId, handler){
        let self = this;
        //如果sessionId不存在则执行创建sessionId过程
        var participantIdArray = [];
        let participantIdArray = [];
        for (let i in participantArray) {
            participantIdArray.push(participantArray[i].split(":")[0]);
        }
@ -171,8 +171,7 @@ class Sessions extends RedisModel {
                    callBusinessType();
                });
            }else{
                handler("MUC模式和团队模式,不允许sessionId为空!",null);
                return;
                return handler("MUC模式和团队模式,不允许sessionId为空!",null);
            }
        }else{
            callBusinessType();
@ -200,7 +199,8 @@ class Sessions extends RedisModel {
                        type:type,
                        create_date:createDate.getTime(),
                        business_type:businessType
                    }
                    };
                    //将session写入数据库
                    self.saveSessionToMysql(sessionId, name, type, createDate, businessType, function (err, res) {
                        if(err){handler(err,null);return;};
@ -230,15 +230,13 @@ class Sessions extends RedisModel {
                timestamp: mesDate,
                id:messageId
            };
            session.last_sender_id = message.sender_id;
            session.last_sender_id = message.sender_id;
            session.last_sender_name = message.sender_name;
            session.last_message_time = mesDate.getTime();
            session.last_content  = message.content;
            session.last_content_type = message.content_type;
            SessionRepo.updateSessionLastStatus(message.sender_id,
                message.sender_name,
                message.timestamp,
@ -251,12 +249,41 @@ class Sessions extends RedisModel {
        }
    }
    /**
     * 最近会话列表,7天内。
     *
     * @param userId
     * @param dateSpan
     */
    getRecentSessions(userId, dateSpan){
        let self = this;
        SessionRepo.findAllByTimestampAndType(userId, dateSpan, function (err, res) {
            if(err){
                ModelUtil.emitError(self.eventEmitter, "Get recent sessions failed", err);
                return;
            }
            let sessions = [];
            res.forEach(function (session) {
                sessions.push({
                    id: session.id,
                    name: session.name,
                    type: session.type,
                    create_date: session.create_date
                })
            });
            ModelUtil.emitOK(self.eventEmitter, sessions);
        });
    }
    /**
     * 保存session到MySQL
     * @param sessionId
     * @param name
     * @param type
     * @param createDate
     * @param businessType
     * @param handler
     */
    saveSessionToMysql(sessionId, name, type, createDate, businessType, handler) {

+ 29 - 4
src/server/repository/mysql/session.repo.js

@ -48,19 +48,44 @@ class SessionRepo {
     * 获取用户全部会话
     *
     * @param userId
     * @param type
     * @param handler
     */
    static findAllByType(userId,type, handler) {
    static findAllByType(userId, type, handler) {
        let sql = "select session_id from " + DB_TABLES.Participants + " w where w.participant_id = ? and type=? group by w.session_id";
        let sessionSQL = "select id, name, type, create_date, last_sender_id, last_sender_name, last_content_type, last_content, last_message_time from "
            + DB_TABLES.Sessions + " s where s.id in(" + sql + ") ";
        ImDb.execQuery({
            "sql": sessionSQL,
            "args": [userId,type],
            "args": [userId, type],
            "handler": handler
        });
    }
    /**
     * 按时间跨度查询会话。
     *
     * @param userId
     * @param dateSpan
     * @param handler
     */
    static findAllByTimestampAndType(userId, dateSpan, handler) {
        let sql = "SELECT DISTINCT s.id, CASE WHEN TYPE = 2 THEN d.name ELSE s.name END 'name', s.type, s.create_date " +
        "FROM sessions s, participants p " +
        "LEFT JOIN doctors d ON p.participant_id = d.id " +
        "WHERE s.id = p.session_id AND s.last_sender_id <> 'system' " +
        "AND UNIX_TIMESTAMP(s.last_message_time) > UNIX_TIMESTAMP(NOW()) - ? " +
        "AND p.participant_id <> ? " +
        "AND s.id in (select s.id from sessions s, participants p where s.id = p.session_id and p.participant_id = ?) " +
        "ORDER BY s.last_message_time DESC";
        ImDb.execQuery({
            sql: sql,
            args: [dateSpan * 3600 * 24, userId, userId],
            handler: handler
        });
    }
    /**
     * 获取用户置顶会话
     *
@ -98,7 +123,7 @@ class SessionRepo {
            "ON DUPLICATE KEY UPDATE name = ?";
        ImDb.execQuery({
            "sql": sql,
            "args": [sessionId, name, type, createDate, businessType,name],
            "args": [sessionId, name, type, createDate, businessType, name],
            "handler": handler
        });
    }
@ -119,7 +144,7 @@ class SessionRepo {
        ImDb.execQuery({
            "sql": sql,
            "args": [lastSenderId, lastSenderName, lastMessageTime, lastContent, lastContentType, sessionId],
            "handler": handler||function(err,res){
            "handler": handler || function (err, res) {
                log.info("updateSessionLastStatus");
            }
        });

File diff suppressed because it is too large
+ 4211 - 586
src/server/resources/schema/ichat_1.2.8_data_migration.sql


+ 4 - 4
src/server/resources/schema/ichat_1.2.8_table_schema.sql

@ -49,7 +49,7 @@ CREATE TABLE `system_messages`
	`content_type` INTEGER NOT NULL COMMENT '消息类型,1文本,2图片,3语音,4文章,5跳转,6咨询开始,7咨询结束',
	`content` VARCHAR(1024) COMMENT '消息内容',
	`timestamp` TIMESTAMP(0) COMMENT '发送时间',
	`business_type` INT COMMENT '业务类型,IM不处理,只做存储与转发',
	`business_type` VARCHAR(50) COMMENT '业务类型,IM不处理,只做存储与转发',
	CONSTRAINT `PK_messages` PRIMARY KEY (`id`)
) COMMENT='P2P会话消息'
;
@ -108,7 +108,7 @@ CREATE TABLE `p2p_messages`
	`content_type` INTEGER NOT NULL COMMENT '消息类型,1文本,2图片,3语音,4文章,5跳转,6咨询开始,7咨询结束',
	`content` VARCHAR(1024) COMMENT '消息内容',
	`timestamp` TIMESTAMP(0) COMMENT '发送时间',
	`business_type` INT COMMENT '业务类型,IM不处理,只做存储与转发',
	`business_type` VARCHAR(50) COMMENT '业务类型,IM不处理,只做存储与转发',
	CONSTRAINT `PK_messages` PRIMARY KEY (`id`)
) COMMENT='P2P会话消息'
;
@ -122,7 +122,7 @@ CREATE TABLE `group_messages`
	`content_type` INTEGER NOT NULL COMMENT '消息类型,1文本,2图片,3语音,4文章,5跳转,6咨询开始,7咨询结束',
	`content` VARCHAR(1024) COMMENT '消息内容',
	`timestamp` TIMESTAMP(0) COMMENT '发送时间',
	`business_type` INT COMMENT '业务类型,IM不处理,只做存储与转发',
	`business_type` VARCHAR(50) COMMENT '业务类型,IM不处理,只做存储与转发',
	`at` VARCHAR(1024) COMMENT '发送时,at某人',
	CONSTRAINT `PK_messages` PRIMARY KEY (`id`)
) COMMENT='群会话消息'
@ -136,7 +136,7 @@ CREATE TABLE `muc_messages`
	`sender_name` VARCHAR(50),
	`content_type` INTEGER NOT NULL COMMENT '消息类型,1文本,2图片,3语音,4文章,5跳转,6咨询开始,7咨询结束',
	`content` VARCHAR(1024) COMMENT '消息内容',
	`business_type` INT COMMENT '业务类型,IM不处理,只做存储与转发',
	`business_type` VARCHAR(50) COMMENT '业务类型,IM不处理,只做存储与转发',
	`timestamp` TIMESTAMP(0) COMMENT '发送时间',
	CONSTRAINT `PK_messages` PRIMARY KEY (`id`)
) COMMENT='MUC会话消息'

+ 14 - 0
src/server/util/controller.util.js

@ -27,6 +27,20 @@ class ControllerUtil {
            response.status(500).json(data);
        });
    }
    static checkRequestQueryParams(req, params){
        let message = "Missing field(s): ";
        let missingField = false;
        params.forEach(function (param) {
            if(!req.query.hasOwnProperty(param)){
                message += param + ",";
                missingField = true;
            }
        });
        if(missingField) throw {message: message};
    }
}
module.exports = ControllerUtil;

+ 6 - 6
src/server/util/db.util.js

@ -3,10 +3,10 @@
 */
"use strict";
var configFile = require('../include/commons').CONFIG_FILE;
var config = require('../resources/config/' + configFile);
let configFile = require('../include/commons').CONFIG_FILE;
let config = require('../resources/config/' + configFile);
var log = require('./log');
let log = require('./log');
let crypto = require('crypto');
class DbUtil {
@ -18,9 +18,9 @@ class DbUtil {
        pool.getConnection(function (err, connection) {
            // 查询参数
            var sql = options['sql'];
            var args = options['args'];
            var handler = options['handler'];
            let sql = options['sql'];
            let args = options['args'];
            let handler = options['handler'];
            if (err) {
                //log.error('Database - get connection failed, ' + err);

+ 26 - 7
test/client/im.client.session.p2p.Test.js

@ -7,7 +7,7 @@
 * @since 2016/12/24
 */
"use strict";
var $ = require('jquery');
let $ = require('jquery');
let assert = require('assert');
let imClient = require('../../src/client/im.client');
@ -160,16 +160,35 @@ describe("Session P2P", function () {
        });
    });
    // 获取最近会话列表,默认按7天
    describe("Get recent sessions", function () {
        it("should return recent session in 7 days", function (done) {
            imClient.Sessions.getRecentSessions('shiliuD20160926008',
                function (data) {
                    assert.ok(data.length > 0, "Test data must return at least one session");
                    console.log(data);
                    done();
                },
                function (xhr, status, error) {
                    assert.ok(false, xhr.responseJSON.message);
                    done();
                });
        });
    });
    // 获取会话的消息
    describe("Get session messages", function () {
        it("should return session messages by page", function (done) {
            imClient.Sessions.getMessagesBySession(TD.SessionId, TD.DoctorB.id, null, null, null, null,
            function (messages) {
                assert.ok(messages.length > 0);
            },
            function (xhr, status, error) {
                assert.ok(false, message.responseJSON.message);
            });
                function (messages) {
                    assert.ok(messages.length > 0);
                },
                function (xhr, status, error) {
                    assert.ok(false, message.responseJSON.message);
                });
        });
    });