/** * 群组,即原先的行政团队数据迁移。 * * @author sand * @since 2017/1/9 */ 'use strict'; let ImDb = require('../../repository/mysql/db/im.db'); let ObjectUtil = require("../../util/object.util.js"); let DbUtil = require("../../util/db.util.js"); let async = require("async"); let log = require("../../util/log.js"); let fs = require('fs'); let mongoose = require("mongoose"); let vprintf = require('sprintf-js').vsprintf; const MIGRATION_SCRIPT_File_NAME = "../../resources/schema/ichat_1.2.8_data_migration.sql"; class Migration { constructor() { } /** * 迁移系统消息 */ static migrateSystem() { let data = "-- System sessions: \n"; async.waterfall([ function (callback) { // 所有的系统会话 let sql = "select distinct to_uid participant_id from im_new.msg_system order by to_uid"; ImDb.execQuery({ sql: sql, args: [], handler: function (err, res) { if (err) { return callback(err, res); } let buffer = "insert into sessions(id, name, type, create_date) values "; res.forEach(function (session) { buffer += vprintf("\n('%s', '%s', 0, now()),", [ DbUtil.stringArrayHash(['system', session.participant_id]), "系统消息" ]); }); data += buffer.substr(0, buffer.length - 1) + ";\n"; callback(null); } }); }, function (callback) { // 所有的系统消息 let sql = "SELECT msg_id 'id', 'session_id', 'system' `sender_id`, '系统通知' `sender_name`, 1 `content_type`, DATA 'content', timestamp, `type` 'business_type', to_uid FROM im_new.msg_system ORDER BY to_uid"; ImDb.execQuery({ sql: sql, args: [], handler: function (err, res) { if (err) { return callback(err, res); } let buffer = "insert into system_messages(id, session_id, sender_id, sender_name, content_type, content, timestamp, business_type) values"; res.forEach(function (message) { buffer += vprintf("\n('%s', '%s', '%s', '%s', 1, '%s', %s, '%s'),", [ mongoose.Types.ObjectId().toString(), DbUtil.stringArrayHash(['system', message.to_uid]), 'system', '系统', message.content.replace(/'/g, "''"), ObjectUtil.timestampToLong(message.timestamp), message.business_type]); }); data += buffer.substr(0, buffer.length - 1) + ";\n\n"; callback(null, null); } }); }], function (err, res) { if (err) { log.error("Error occurs while migrate system sessions: ", err); } else { fs.writeFileSync(MIGRATION_SCRIPT_File_NAME, data + "\n"); log.info("Migrate System sessions succeed."); } }); } /** * 迁移MUC及其消息。将原先的P2P中与患者有关的会话全部过来出来,再将其转为会话。议题从家庭医生库中提取出来。 */ static migrateMUC() { // 选择出所有的咨询组 let sql = "select g.to_gid from im_new.msg_group g where length(g.to_gid) > 20 group by g.to_gid order by g.to_gid"; // 选择出所有的咨询组成员 sql = "select g.to_gid, g.from_uid from im_new.msg_group g where length(g.to_gid) > 20 group by g.to_gid order by g.to_gid"; // 选择出所有的咨询组消息 sql = "select g.to_gid, g.from_uid, g.msg_id, g.`type`, g.content, g.timestamp, g.at_uid from im_new.msg_group g where length(g.to_gid) > 20 order by g.to_gid"; } /** * 迁移P2P及其消息,新的IM中P2P仅给医生使用,因此过滤出原先发送与接收人均是医生的会话。 */ static migrateP2P() { let data = "-- P2P sessions: \n"; async.waterfall([ function (callback) { // 所有的P2P 会话,成员与消息 let sql = "SELECT a.from_uid sender_id, d.name sender_name, a.to_uid to_id, e.name to_name, a.type content_type, a.content, a.timestamp " + " FROM im_new.msg_p2p a " + " LEFT JOIN wlyy.wlyy_doctor d ON a.from_uid = d.code " + " LEFT JOIN wlyy.wlyy_doctor e on a.to_uid = e.code " + " WHERE a.from_uid IN (SELECT code FROM wlyy.wlyy_doctor d) AND a.to_uid IN (SELECT code FROM wlyy.wlyy_doctor d)"; ImDb.execQuery({ sql: sql, args: [], handler: function (err, res) { if(err) return callback(err, null); let sessionsBuffer = "insert into sessions(id, name, type, create_date) values "; let participantBuffer = "insert into participants(session_id, participant_id, participant_role, last_fetch_time) values "; let messageBuffer = "insert into p2p_messages(id, session_id, sender_id, sender_name, content_type, content, timestamp) values "; res.forEach(function (message) { let sessionId = DbUtil.stringArrayHash([message.sender_id, message.to_id]); // 会话 sessionsBuffer += vprintf("\n('%s', '%s', %s, %s),", [ sessionId, 'P2P', 1, new Date('2016-11-22 12:00:00').getTime() ]); // 成员 participantBuffer += vprintf("\n('%s', '%s', %s, %s),", [ sessionId, message.sender_id, 0, new Date().getTime() ]); participantBuffer += vprintf("\n('%s', '%s', %s, %s),", [ sessionId, message.to_id, 0, new Date().getTime() ]); // 消息 messageBuffer += vprintf("\n('%s', '%s', '%s', '%s', '%s', '%s', %s),", [ mongoose.Types.ObjectId().toString(), sessionId, message.sender_id, message.sender_name, message.content_type, message.content.replace(/'/g, "''"), ObjectUtil.timestampToLong(message.timestamp) ]); }); data += sessionsBuffer.substr(0, sessionsBuffer.length - 1) + " ON DUPLICATE KEY UPDATE id = id;\n\n"; data += participantBuffer.substr(0, participantBuffer.length - 1) + " ON DUPLICATE KEY UPDATE session_id = session_id;\n\n"; data += messageBuffer.substr(0, messageBuffer.length - 1) + " ON DUPLICATE KEY UPDATE id = id;\n\n"; callback(null); } }); } ], function (err, res) { if (err) { log.error("Error occurs while migrate p2p sessions: ", err); } else { fs.writeFileSync(MIGRATION_SCRIPT_File_NAME, data + "\n"); log.info("Migrate P2P sessions succeed."); } }); } /** * 迁移行政团队及其消息。群聊会话规则: * 1 使用行政团队的ID作为会话的ID * 2 会话成员是医生 * 3 原消息取出来后,需要重新生成UUID */ static migrateGroups() { let data = "-- Group sessions: \n"; async.waterfall([ function (callback) { // 选择出所有的行政组,作为会话 let sql = "select t.id, t.name, 3 `type`, t.create_time 'create_date' " + "from im_new.msg_group g, wlyy.wlyy_admin_team t " + "where g.to_gid = t.id group by g.to_gid order by g.to_gid"; ImDb.execQuery({ sql: sql, args: [], handler: function (err, sessions) { if (err) { return callback(err, res); } let buffer = "insert into sessions(id, name, type, create_date) values "; sessions.forEach(function (session) { buffer += vprintf("\n('%s', '%s', %s, %s),", [ session.id, session.name, session.type, ObjectUtil.timestampToLong(session.create_date) ]); }); data += buffer.substr(0, buffer.length - 1) + ";\n\n"; callback(null); } }); }, function (callback) { // 选择出所有的行政组医生,作为会话成员 let sql = "select t.id session_id, m.id participant_id, 0 'participant_role', 0 'last_fetch_time' " + "from im_new.msg_group g, wlyy.wlyy_admin_team t, wlyy.wlyy_admin_team_member m " + "where g.to_gid = t.id and t.id = m.team_id group by g.to_gid order by g.to_gid"; ImDb.execQuery({ sql: sql, args: [], handler: function (err, participants) { if (err) { return callback(err, res); } let buffer = "insert into participants(session_id, participant_id, participant_role, last_fetch_time) values "; participants.forEach(function (participant) { buffer += vprintf("\n('%s', '%s', %s, %s),", [ participant.session_id, participant.participant_id, participant.participant_role, ObjectUtil.timestampToLong(participant.last_fetch_time) ]); }); data += buffer.substr(0, buffer.length - 1) + ";\n\n"; callback(null); } }); }, function (callback) { // 选择出所有的消息 let sql = "SELECT g.msg_id 'id', g.to_gid 'session_id', g.from_uid 'sender_id', u.name 'sender_name', g.`type` 'content_type', g.content 'content', g.timestamp 'timestamp', g.at_uid 'at' " + "FROM im_new.msg_group g, wlyy.wlyy_doctor u " + "WHERE LENGTH(g.to_gid) < 20 and g.from_uid = u.code " + "ORDER BY g.to_gid"; ImDb.execQuery({ sql: sql, args: [], handler: function (err, messages) { if (err) { return callback(err, res); } let buffer = "insert into group_messages(id, session_id, sender_id, sender_name, content_type, content, timestamp, at) values "; messages.forEach(function (message) { buffer += vprintf("\n('%s', '%s', '%s', '%s', '%s', '%s', %s, '%s'),", [ mongoose.Types.ObjectId().toString(), message.session_id, message.sender_id, message.sender_name, message.content_type, message.content.replace(/'/g, "''"), ObjectUtil.timestampToLong(message.timestamp), message.at ]); }); data += buffer.substr(0, buffer.length - 1) + ";\n\n"; callback(null, null); } }); } ], function (err, res) { if (err) { log.error("Error occurs while migrate group sessions: ", err); } else { fs.appendFileSync(MIGRATION_SCRIPT_File_NAME, data, null); log.info("Migrate group sessions succeed."); process.exit(err != null ? 1 : 0); } }); } } async.waterfall([ function (callback) { //Migration.migrateSystem(); callback(null); }, function (callback) { //Migration.migrateMUC(); callback(null); }, function (callback) { Migration.migrateP2P(); callback(null); }, function (callback) { //Migration.migrateGroups(); callback(null); } ], function (err, res) { });