Browse Source

增加yuidoc生成脚本doc.bat;增加议题迁移

Sand 8 years ago
parent
commit
b23c49817f

+ 2 - 1
.gitignore

@ -1,4 +1,5 @@
.idea/*
npm-debug.log
src/server/node_modules/*
test/server/node_modules/*
test/server/node_modules/*
doc/yui/*

+ 1 - 5
readme.md

@ -25,11 +25,7 @@
## 运行
对Windows环境,可以预先安装Node.js。对Linux环境,代码包中包含Node.js程序,第一次部署需要使用chmod为此增加执行权限:
    chmod +x node
    
之后通过以下命令启动消息服务器:
对Windows环境,可以预先安装Node.js。对Linux环境,代码包中包含Node.js程序,第一次部署需要使用chmod为此增加执行权限`chmod +x node`,之后通过以下命令启动消息服务器:
    node app.armour.js

+ 1 - 0
src/server/doc.bat

@ -0,0 +1 @@
yuidoc . -o ../../doc/yui

+ 18 - 11
src/server/models/base.model.js

@ -1,31 +1,38 @@
/**
 * 模型基类。
 *
 * 模型基于异步模型,提供事件抽象。
 *
 * author: Sand
 * since: 2016.11.20
 */
"use strict";
let EventEmitter = require('events').EventEmitter;
/**
 * @module model
 */
/**
 * 模型基类。模型基于异步模型,提供事件抽象。
 *
 * @class BaseModel
 * @constructor
 *
 * @author: Sand
 * @since: 2016.11.20
 */
class BaseModel{
    constructor() {
        this._eventEmitter = new EventEmitter();
    }
    /**
     * 获取模型的事件触发器。
     * 模型的事件触发器
     *
     * @returns {EventEmitter|*}
     * @property {EventEmitter} eventEmitter
     */
    get eventEmitter(){
        return this._eventEmitter;
    }
    /**
     * 监听事件。
     * 绑定事件处理器。
     *
     * @method on
     *
     * @param event
     * @param handler

+ 73 - 13
src/server/models/migration/migration.js

@ -16,7 +16,23 @@ let fs = require('fs');
let mongoose = require("mongoose");
let vprintf = require('sprintf-js').vsprintf;
const MIGRATION_SCRIPT_File_NAME = "../../resources/schema/ichat_1.2.8_data_migration.sql";
const MIGRATION_SCRIPT_File_NAME = "./ichat_1.2.8_data_migration.sql";
Date.prototype.format = function (fmt) { //author: meizz
    var o = {
        "M+": this.getMonth() + 1, //月份
        "d+": this.getDate(), //日
        "h+": this.getHours(), //小时
        "m+": this.getMinutes(), //分
        "s+": this.getSeconds(), //秒
        "q+": Math.floor((this.getMonth() + 3) / 3), //季度
        "S": this.getMilliseconds() //毫秒
    };
    if (/(y+)/.test(fmt)) fmt = fmt.replace(RegExp.$1, (this.getFullYear() + "").substr(4 - RegExp.$1.length));
    for (var k in o)
        if (new RegExp("(" + k + ")").test(fmt)) fmt = fmt.replace(RegExp.$1, (RegExp.$1.length == 1) ? (o[k]) : (("00" + o[k]).substr(("" + o[k]).length)));
    return fmt;
};
class Migration {
    constructor() {
@ -97,17 +113,61 @@ class Migration {
     * 迁移MUC及其消息。将原先的P2P中与患者有关的会话全部过来出来,再将其转为会话。议题从家庭医生库中提取出来。
     */
    static migrateMUC() {
        // 选择出所有的咨询组
        let sql = "select * from wlyy.msg_p2p p " +
            "where p.type = 1 and  ((p.from_uid in (select code from wlyy.wlyy_patient) and p.to_uid in (select code from wlyy.wlyy_doctor)) p " +
            "or (p.from_uid in (select code from wlyy.wlyy_doctor) and p.to_uid in (select code from wlyy.wlyy_patient)))";
        let data = "-- MUC legacy messages: \n";
        async.waterfall([
            function (callback) {
                //
                let sql = "SELECT concat(p.id, '_consult_2') session_id, p.id sender_id, p.name sender_name, m.msg_id message_id, m.`type` content_type, m.content, m.timestamp " +
                    "FROM wlyy.msg_p2p m, patients p  " +
                    "WHERE m.`type` IN (6,7) AND ((m.from_uid IN (  " +
                    "    SELECT code  " +
                    "FROM wlyy.wlyy_patient) AND m.to_uid IN (  " +
                    "    SELECT code  " +
                    "FROM wlyy.wlyy_doctor)) OR (m.to_uid IN (  " +
                    "    SELECT code  " +
                    "FROM wlyy.wlyy_patient) AND m.from_uid IN (  " +
                    "    SELECT code  " +
                    "FROM wlyy.wlyy_doctor)))  " +
                    "AND m.from_uid = p.id  " +
                    "ORDER BY m.timestamp";
                ImDb.execQuery({
                    sql: sql,
                    args: [],
                    handler: function (err, res) {
                        if(err){
                            return callback(err, null);
                        }
        // 选择出所有的咨询组成员
        sql = "select g.to_gid, g.from_uid from im_new.msg_group g where length(g.to_gid) > 20 group by g.to_gid order by g.to_gid";
        // 选择出所有的咨询组消息
        sql = "select g.to_gid, g.from_uid, g.msg_id, g.`type`, g.content, g.timestamp, g.at_uid from im_new.msg_group g where length(g.to_gid) > 20 order by g.to_gid";
                        let messageBuffer = "insert into muc_messages(id, session_id, sender_id, sender_name, content_type, content, timestamp) values ";
                        res.forEach(function (message) {
                            messageBuffer += vprintf("\n('%s', '%s', '%s', '%s', '%s', '%s', '%s'),", [
                                mongoose.Types.ObjectId().toString(),
                                message.session_id,
                                message.sender_id,
                                message.sender_name,
                                message.content_type,
                                message.content.replace(/'/g, "''"),
                                message.timestamp.format("yyyy-MM-dd hh:mm:ss")
                            ]);
                        });
                        data += messageBuffer.substr(0, messageBuffer.length -1);
                        callback(null, null);
                    }
                });
            }
        ],
        function (err, res) {
            if (err) {
                log.error("Error occurs while migrate p2p sessions: ", err);
            } else {
                fs.writeFileSync(MIGRATION_SCRIPT_File_NAME, data + "\n");
                log.info("Migrate P2P sessions succeed.");
            }
        })
    }
    /**
@ -197,7 +257,7 @@ class Migration {
     * 3 原消息取出来后,需要重新生成UUID
     */
    static migrateGroups() {
        let data = "-- Group sessions: \n";
        let data = "-- MUC messages: \n";
        async.waterfall([
                function (callback) {
                    // 选择出所有的行政组,作为会话
@ -312,11 +372,11 @@ async.waterfall([
            callback(null);
        },
        function (callback) {
            //Migration.migrateMUC();
            Migration.migrateMUC();
            callback(null);
        },
        function (callback) {
            Migration.migrateP2P();
            //Migration.migrateP2P();
            callback(null);
        },
        function (callback) {

+ 292 - 0
src/server/models/migration/migration.redis.js

@ -0,0 +1,292 @@
'use strict';
let RedisClient = require('../../repository/redis/redis.client');
let RedisModel = require('../redis.model');
let ImDb = require('../../repository/mysql/db/im.db');
let ParticipantRepo = require('../../repository/mysql/participant.repo');
let DoctorRepo = require('../../repository/mysql/doctor.repo');
let PatientRepo = require('../../repository/mysql/patient.repo');
let SessionRepo = require('../../repository/mysql/session.repo');
let MessageRepo = require('../../repository/mysql/message.repo');
let TopicRepo = require('../../repository/mysql/topics.repo');
let AppStatusRepo = require('../../repository/mysql/app.status.repo');
let ModelUtil = require('../../util/model.util');
let ObjectUtil = require("../../util/object.util.js");
let Patient = require('../user/patient');
let Doctor = require('../user/doctor');
let redisConn = RedisClient.redisClient().connection;
let async = require('async');
let log = require('../../util/log');
let configFile = require('../../include/commons').CONFIG_FILE;
let config = require('../../resources/config/' + configFile);
let fs = require('fs');
let mongoose = require("mongoose");
let vprintf = require('sprintf-js').vsprintf;
const REDIS_KEYS = require('../../include/commons').REDIS_KEYS;
const PLATFORMS = require('../../include/commons').PLATFORM;
const SESSION_TYPE = require('../../include/commons').SESSION_TYPES;
class MigrateRedis extends RedisModel{
    constructor(){
        super()
    }
    updateRedisCache(){
        let self = this;
        ImDb.execQuery({
            sql: "select user_id, platform, token, client_id, app_in_bg from im.app_status",
            args: [],
            handler: function (err, res) {
                res.forEach(function (appStatus, index) {
                    log.info("Processing at " + index + " / " + res.length);
                    self.login(appStatus.user_id, appStatus.platform, appStatus.token, appStatus.client_id);
                })
            }
        })
    }
    login(userId, platform, deviceToken, clientId) {
        let self = this;
        let loginFromApp = platform !== PLATFORMS.Wechat;
        let usersKey = REDIS_KEYS.Users;
        let userKey = RedisModel.makeRedisKey(REDIS_KEYS.User, userId);
        let userStatusKey = RedisModel.makeRedisKey(loginFromApp ? REDIS_KEYS.UserAppStatus : REDIS_KEYS.UserWechatStatus, userId);
        let lastLoginTime = new Date();
        async.waterfall([
                // get user info from mysql
                function (callback) {
                    self.getUserFromMySQL(userId, function (err, userInfo) {
                        if (!userInfo) {
                            ModelUtil.emitDataNotFound(self, 'User not exists.');
                            return;
                        }
                        callback(null, userInfo);
                    })
                },
                // cache user and app/wechat status
                function (userInfo, callback) {
                    let multi = redisConn.multi()
                        .zadd(usersKey, lastLoginTime.getTime(), userId);
                    /*.hmset(userKey,
                     'avatar', userInfo.avatar ? userInfo.avatar : '',
                     'birthdate', userInfo.birthdate ? ObjectUtil.timestampToLong(userInfo.birthdate) : '',
                     'name', userInfo.name,
                     'role', loginFromApp ? 'doctor' : 'patient');*/
                    if (loginFromApp) {
                        AppStatusRepo.save(userId, deviceToken, clientId, platform, function (err, res) {
                            if (err) log.error();
                        });
                        // cache app status
                        multi = multi.hmset(userStatusKey,
                            'app_in_bg', 0,
                            'client_id', clientId,
                            'device_token', deviceToken,
                            'last_login_time', lastLoginTime.getTime(),
                            'platform', platform);
                    } else {
                        // cache wechat status
                        multi = multi.hmset(userStatusKey,
                            'last_login_time', lastLoginTime.getTime(),
                            'openid', userInfo.openid,
                            'platform', platform);
                    }
                    multi.execAsync()
                        .then(function (res) {
                            callback(null);
                        })
                        .catch(function (ex) {
                            log.error("Login failed while cache user status: ", ex);
                        });
                },
                // cache sessions, participants, topics, messages
                function (callback) {
                    SessionRepo.findAll(userId, function (err, sessions) {
                        if (err) {
                            ModelUtil.emitError(self.eventEmitter, err.message);
                            return;
                        }
                        sessions.forEach(function (session) {
                            redisConn.zscore(REDIS_KEYS.Sessions, session.id, function (err, res) {
                                (function (sessionId, userId) {
                                    let business_type = session.business_type;
                                    if (!session.business_type && session.type == SESSION_TYPE.MUC) {
                                        business_type = 2
                                    } else if (!session.business_type && session.type != SESSION_TYPE.MUC) {
                                        business_type = 1
                                    }
                                    let redisSession = [
                                        "id", session.id,
                                        "name", session.name,
                                        "type", session.type,
                                        "business_type", business_type,
                                        "last_sender_id", session.last_sender_id == null ? "" : session.last_sender_id,
                                        "last_sender_name", session.last_sender_name == null ? "" : session.last_sender_name,
                                        "last_content_type", session.last_content_type == null ? "" : session.last_content_type,
                                        "last_content", session.last_content == null ? "" : session.last_content,
                                        "last_message_time", session.last_message_time == null ? "" : session.last_message_time,
                                        "create_date", ObjectUtil.timestampToLong(session.create_date),
                                        "status",session.status==null?0:session.status
                                    ];
                                    // cache sessions
                                    redisConn.multi()
                                        .zadd(REDIS_KEYS.Sessions, lastLoginTime.getTime(), sessionId)                                           // 会话的最后活动时间设置为此用户的登录时间
                                        .zadd(RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, userId), lastLoginTime.getTime(), sessionId)      // 会话的最后活动时间设置为此用户的登录时间
                                        .hmset(RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId), redisSession)
                                        .execAsync()
                                        .then(function (res) {
                                            // cache participants
                                            let sessionParticipantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
                                            let sessionParticipantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId);
                                            ParticipantRepo.findAll(sessionId, function (err, participants) {
                                                if (err) {
                                                    ModelUtil.emitError(self.eventEmitter, err.message);
                                                    return;
                                                }
                                                let multi = redisConn.multi();
                                                participants.forEach(function (participant) {
                                                    let participantId = participant.id;
                                                    let participantRole = participant.role;
                                                    let score = ObjectUtil.timestampToLong(participant.last_fetch_time);
                                                    multi = multi.zadd(sessionParticipantsKey, score, participantId)
                                                        .hset(sessionParticipantsRoleKey, participantId, participantRole);
                                                });
                                                multi.execAsync()
                                                    .then(function (res) {
                                                    })
                                                    .catch(function (ex) {
                                                        log.error("Login failed while caching participants: ", ex);
                                                    });
                                            });
                                            // cache messages
                                            let messagesKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId);
                                            let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
                                            MessageRepo.findBySessionId(sessionId, 0, config.sessionConfig.maxMessageCount, null, function (err, messages) {
                                                if (err) {
                                                    ModelUtil.emitError(self.eventEmitter, err.message);
                                                    return;
                                                }
                                                let multi = redisConn.multi();
                                                messages.forEach(function (message) {
                                                    let msgJson = {
                                                        id: message.id,
                                                        sender_id: message.sender_id,
                                                        sender_name: message.sender_name,
                                                        timestamp: ObjectUtil.timestampToLong(message.timestamp),
                                                        content_type: message.content_type,
                                                        content: message.content
                                                    };
                                                    multi = multi.hset(messagesKey, message.id, JSON.stringify(msgJson))
                                                        .zadd(messagesByTimestampKey, ObjectUtil.timestampToLong(message.timestamp), message.id);
                                                });
                                                multi.execAsync()
                                                    .then(function (res) {
                                                    })
                                                    .catch(function (ex) {
                                                        log.error("Login failed while caching messages: ", ex);
                                                    });
                                            });
                                            // cache topics for MUC
                                            let topicsKey = RedisModel.makeRedisKey(REDIS_KEYS.Topics, sessionId);
                                            TopicRepo.findAllBySessionId(sessionId, function (err, topics) {
                                                if (err) {
                                                    ModelUtil.emitError(self.eventEmitter, err.message);
                                                    return;
                                                }
                                                topics.forEach(function (topic) {
                                                    let topicKey = RedisModel.makeRedisKey(REDIS_KEYS.Topic, topic.id);
                                                    let topicId = topic.id;
                                                    let name = topic.name == null ? "" : topic.name;
                                                    let createTime = ObjectUtil.timestampToLong(topic.create_time);
                                                    let endBy = topic.end_by == null ? "" : topic.end_by;
                                                    let endTime = topic.end_time == null ? 0 : ObjectUtil.timestampToLong(topic.end_time);
                                                    let startMessageId = topic.start_message_id == null ? "" : topic.start_message_id;
                                                    let endMessageId = topic.end_message_id == null ? "" : topic.end_message_id;
                                                    let description = topic.description == null ? "" : topic.description;
                                                    let status = topic.status == null ? 0 : topic.status;
                                                    redisConn.multi()
                                                        .zadd(topicsKey, createTime, topicId)
                                                        .hmset(topicKey,
                                                            'name', name,
                                                            'session_id', sessionId,
                                                            'create_time', createTime,
                                                            'end_by', endBy,
                                                            'end_time', endTime,
                                                            'start_message_id', startMessageId,
                                                            'end_message_id', endMessageId,
                                                            'description', description,
                                                            'status', status)
                                                        .execAsync()
                                                        .catch(function (ex) {
                                                            log.error("Login failed while caching topics: ", ex);
                                                        });
                                                });
                                            });
                                        })
                                        .catch(function (ex) {
                                            log.error("Login failed while caching sessions: ", ex);
                                        });
                                })(session.id, userId);
                            });
                        });
                    });
                    callback(null, null);
                }
            ],
            function (err, res) {
                log.info("Update user " + userId);
            });
    }
    /**
     * 获取用户,直接从MYSQL获取,缓存是否有在不能确定。
     *
     * @param userId
     * @param outCallback
     */
    getUserFromMySQL(userId, outCallback) {
        async.waterfall([
            // get from mysql
            function (isPatientId) {
                let repoProto = isPatientId ? PatientRepo : DoctorRepo;
                repoProto.findOne(userId, function (err, res) {
                    let user = isPatientId ? new Patient() : new Doctor();
                    if (res.length > 0) {
                        user.name = res[0].name;
                        user.sex = res[0].sex;
                        user.birthdate = res[0].birthdate;
                        user.avatar = res[0].avatar;
                        if (res[0].openid) user.openid = res[0].openid;
                    }
                    outCallback(null, user);
                });
            }
        ]);
    }
}
new MigrateRedis().updateRedisCache();

+ 13 - 8
src/server/models/redis.model.js

@ -1,9 +1,3 @@
/**
 * Redis模型基类,提供基础模型操作。
 *
 * author:linz
 * since: 2016.12.13
 */
"use strict";
let BaseModel = require('./base.model');
@ -11,6 +5,15 @@ let log = require("../util/log.js");
const RedisKeyReplacer = require('../include/commons').REDIS_KEY_REPLACER;
/**
 * Redis模型基类,提供基础模型操作。
 *
 * @class RedisModel
 * @constructor
 *
 * @author:linz
 * @since: 2016.12.13
 */
class RedisModel extends BaseModel {
    constructor() {
        super();
@ -19,8 +22,10 @@ class RedisModel extends BaseModel {
    /**
     * 替换Redis Key模板中的占位符,生成有效的Key。
     *
     * @param redisKey
     * @param keyValue
     * @method makeRedisKey
     * @static
     * @param {String} redisKey Redis中定义的键模板,参见:REDIS_KEYS
     * @param {String} keyValue 要修改的数据量
     */
    static makeRedisKey(redisKey, keyValue) {
        if (redisKey.indexOf(RedisKeyReplacer) >= 0) {

File diff suppressed because it is too large
+ 0 - 4215
src/server/resources/schema/ichat_1.2.8_data_migration.sql