migration.js 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. /**
  2. * 群组,即原先的行政团队数据迁移。
  3. *
  4. * @author sand
  5. * @since 2017/1/9
  6. */
  7. 'use strict';
  8. let ImDb = require('../../repository/mysql/db/im.db');
  9. let ObjectUtil = require("../../util/object.util.js");
  10. let DbUtil = require("../../util/db.util.js");
  11. let async = require("async");
  12. let log = require("../../util/log.js");
  13. let fs = require('fs');
  14. let mongoose = require("mongoose");
  15. let vprintf = require('sprintf-js').vsprintf;
  16. const MIGRATION_SCRIPT_File_NAME = "./ichat_1.2.8_data_migration.sql";
  17. Date.prototype.format = function (fmt) { //author: meizz
  18. var o = {
  19. "M+": this.getMonth() + 1, //月份
  20. "d+": this.getDate(), //日
  21. "h+": this.getHours(), //小时
  22. "m+": this.getMinutes(), //分
  23. "s+": this.getSeconds(), //秒
  24. "q+": Math.floor((this.getMonth() + 3) / 3), //季度
  25. "S": this.getMilliseconds() //毫秒
  26. };
  27. if (/(y+)/.test(fmt)) fmt = fmt.replace(RegExp.$1, (this.getFullYear() + "").substr(4 - RegExp.$1.length));
  28. for (var k in o)
  29. if (new RegExp("(" + k + ")").test(fmt)) fmt = fmt.replace(RegExp.$1, (RegExp.$1.length == 1) ? (o[k]) : (("00" + o[k]).substr(("" + o[k]).length)));
  30. return fmt;
  31. };
  32. class Migration {
  33. constructor() {
  34. }
  35. /**
  36. * 迁移系统消息
  37. */
  38. static migrateSystem() {
  39. let data = "-- System sessions: \n";
  40. async.waterfall([
  41. function (callback) {
  42. // 所有的系统会话
  43. let sql = "select distinct to_uid participant_id from im_new.msg_system order by to_uid";
  44. ImDb.execQuery({
  45. sql: sql,
  46. args: [],
  47. handler: function (err, res) {
  48. if (err) {
  49. return callback(err, res);
  50. }
  51. let buffer = "insert into sessions(id, name, type, create_date) values ";
  52. res.forEach(function (session) {
  53. buffer += vprintf("\n('%s', '%s', 0, now()),", [
  54. DbUtil.stringArrayHash(['system', session.participant_id]),
  55. "系统消息"
  56. ]);
  57. });
  58. data += buffer.substr(0, buffer.length - 1) + ";\n";
  59. callback(null);
  60. }
  61. });
  62. },
  63. function (callback) {
  64. // 所有的系统消息
  65. let sql = "SELECT msg_id 'id', 'session_id', 'system' `sender_id`, '系统通知' `sender_name`, 1 `content_type`, DATA 'content', timestamp, `type` 'business_type', to_uid FROM im_new.msg_system ORDER BY to_uid";
  66. ImDb.execQuery({
  67. sql: sql,
  68. args: [],
  69. handler: function (err, res) {
  70. if (err) {
  71. return callback(err, res);
  72. }
  73. let buffer = "insert into system_messages(id, session_id, sender_id, sender_name, content_type, content, timestamp, business_type) values";
  74. res.forEach(function (message) {
  75. buffer += vprintf("\n('%s', '%s', '%s', '%s', 1, '%s', %s, '%s'),", [
  76. mongoose.Types.ObjectId().toString(),
  77. DbUtil.stringArrayHash(['system', message.to_uid]),
  78. 'system',
  79. '系统',
  80. message.content.replace(/'/g, "''"),
  81. ObjectUtil.timestampToLong(message.timestamp),
  82. message.business_type]);
  83. });
  84. data += buffer.substr(0, buffer.length - 1) + ";\n\n";
  85. callback(null, null);
  86. }
  87. });
  88. }],
  89. function (err, res) {
  90. if (err) {
  91. log.error("Error occurs while migrate system sessions: ", err);
  92. } else {
  93. fs.writeFileSync(MIGRATION_SCRIPT_File_NAME, data + "\n");
  94. log.info("Migrate System sessions succeed.");
  95. }
  96. });
  97. }
  98. /**
  99. * 迁移MUC及其消息。将原先的P2P中与患者有关的会话全部过来出来,再将其转为会话。议题从家庭医生库中提取出来。
  100. */
  101. static migrateMUC() {
  102. let data = "-- MUC legacy messages: \n";
  103. async.waterfall([
  104. function (callback) {
  105. //
  106. let sql = "SELECT concat(p.id, '_consult_2') session_id, p.id sender_id, p.name sender_name, m.msg_id message_id, m.`type` content_type, m.content, m.timestamp " +
  107. "FROM wlyy.msg_p2p m, patients p " +
  108. "WHERE m.`type` IN (6,7) AND ((m.from_uid IN ( " +
  109. " SELECT code " +
  110. "FROM wlyy.wlyy_patient) AND m.to_uid IN ( " +
  111. " SELECT code " +
  112. "FROM wlyy.wlyy_doctor)) OR (m.to_uid IN ( " +
  113. " SELECT code " +
  114. "FROM wlyy.wlyy_patient) AND m.from_uid IN ( " +
  115. " SELECT code " +
  116. "FROM wlyy.wlyy_doctor))) " +
  117. "AND m.from_uid = p.id " +
  118. "ORDER BY m.timestamp";
  119. ImDb.execQuery({
  120. sql: sql,
  121. args: [],
  122. handler: function (err, res) {
  123. if(err){
  124. return callback(err, null);
  125. }
  126. let messageBuffer = "insert into muc_messages(id, session_id, sender_id, sender_name, content_type, content, timestamp) values ";
  127. res.forEach(function (message) {
  128. messageBuffer += vprintf("\n('%s', '%s', '%s', '%s', '%s', '%s', '%s'),", [
  129. mongoose.Types.ObjectId().toString(),
  130. message.session_id,
  131. message.sender_id,
  132. message.sender_name,
  133. message.content_type,
  134. message.content.replace(/'/g, "''"),
  135. message.timestamp.format("yyyy-MM-dd hh:mm:ss")
  136. ]);
  137. });
  138. data += messageBuffer.substr(0, messageBuffer.length -1);
  139. callback(null, null);
  140. }
  141. });
  142. }
  143. ],
  144. function (err, res) {
  145. if (err) {
  146. log.error("Error occurs while migrate p2p sessions: ", err);
  147. } else {
  148. fs.writeFileSync(MIGRATION_SCRIPT_File_NAME, data + "\n");
  149. log.info("Migrate P2P sessions succeed.");
  150. }
  151. })
  152. }
  153. /**
  154. * 迁移P2P及其消息,新的IM中P2P仅给医生使用,因此过滤出原先发送与接收人均是医生的会话。
  155. */
  156. static migrateP2P() {
  157. let data = "-- P2P sessions: \n";
  158. async.waterfall([
  159. function (callback) {
  160. // 所有的P2P 会话,成员与消息
  161. let sql = "SELECT a.from_uid sender_id, d.name sender_name, a.to_uid to_id, e.name to_name, a.type content_type, a.content, a.timestamp " +
  162. " FROM im_new.msg_p2p a " +
  163. " LEFT JOIN wlyy.wlyy_doctor d ON a.from_uid = d.code " +
  164. " LEFT JOIN wlyy.wlyy_doctor e on a.to_uid = e.code " +
  165. " WHERE a.from_uid IN (SELECT code FROM wlyy.wlyy_doctor d) AND a.to_uid IN (SELECT code FROM wlyy.wlyy_doctor d)";
  166. ImDb.execQuery({
  167. sql: sql,
  168. args: [],
  169. handler: function (err, res) {
  170. if (err) return callback(err, null);
  171. let sessionsBuffer = "insert into sessions(id, name, type, create_date) values ";
  172. let participantBuffer = "insert into participants(session_id, participant_id, participant_role, last_fetch_time) values ";
  173. let messageBuffer = "insert into p2p_messages(id, session_id, sender_id, sender_name, content_type, content, timestamp) values ";
  174. res.forEach(function (message) {
  175. let sessionId = DbUtil.stringArrayHash([message.sender_id, message.to_id]);
  176. // 会话
  177. sessionsBuffer += vprintf("\n('%s', '%s', %s, %s),", [
  178. sessionId,
  179. 'P2P',
  180. 1,
  181. new Date('2016-11-22 12:00:00').getTime()
  182. ]);
  183. // 成员
  184. participantBuffer += vprintf("\n('%s', '%s', %s, %s),", [
  185. sessionId,
  186. message.sender_id,
  187. 0,
  188. new Date().getTime()
  189. ]);
  190. participantBuffer += vprintf("\n('%s', '%s', %s, %s),", [
  191. sessionId,
  192. message.to_id,
  193. 0,
  194. new Date().getTime()
  195. ]);
  196. // 消息
  197. messageBuffer += vprintf("\n('%s', '%s', '%s', '%s', '%s', '%s', %s),", [
  198. mongoose.Types.ObjectId().toString(),
  199. sessionId,
  200. message.sender_id,
  201. message.sender_name,
  202. message.content_type,
  203. message.content.replace(/'/g, "''"),
  204. ObjectUtil.timestampToLong(message.timestamp)
  205. ]);
  206. });
  207. data += sessionsBuffer.substr(0, sessionsBuffer.length - 1) + " ON DUPLICATE KEY UPDATE id = id;\n\n";
  208. data += participantBuffer.substr(0, participantBuffer.length - 1) + " ON DUPLICATE KEY UPDATE session_id = session_id;\n\n";
  209. data += messageBuffer.substr(0, messageBuffer.length - 1) + " ON DUPLICATE KEY UPDATE id = id;\n\n";
  210. callback(null);
  211. }
  212. });
  213. }
  214. ],
  215. function (err, res) {
  216. if (err) {
  217. log.error("Error occurs while migrate p2p sessions: ", err);
  218. } else {
  219. fs.writeFileSync(MIGRATION_SCRIPT_File_NAME, data + "\n");
  220. log.info("Migrate P2P sessions succeed.");
  221. }
  222. });
  223. }
  224. /**
  225. * 迁移行政团队及其消息。群聊会话规则:
  226. * 1 使用行政团队的ID作为会话的ID
  227. * 2 会话成员是医生
  228. * 3 原消息取出来后,需要重新生成UUID
  229. */
  230. static migrateGroups() {
  231. let data = "-- MUC messages: \n";
  232. async.waterfall([
  233. function (callback) {
  234. // 选择出所有的行政组,作为会话
  235. let sql = "select t.id, t.name, 3 `type`, t.create_time 'create_date' " +
  236. "from im_new.msg_group g, wlyy.wlyy_admin_team t " +
  237. "where g.to_gid = t.id group by g.to_gid order by g.to_gid";
  238. ImDb.execQuery({
  239. sql: sql,
  240. args: [],
  241. handler: function (err, sessions) {
  242. if (err) {
  243. return callback(err, res);
  244. }
  245. let buffer = "insert into sessions(id, name, type, create_date) values ";
  246. sessions.forEach(function (session) {
  247. buffer += vprintf("\n('%s', '%s', %s, %s),", [
  248. session.id,
  249. session.name,
  250. session.type,
  251. ObjectUtil.timestampToLong(session.create_date)
  252. ]);
  253. });
  254. data += buffer.substr(0, buffer.length - 1) + ";\n\n";
  255. callback(null);
  256. }
  257. });
  258. },
  259. function (callback) {
  260. // 选择出所有的行政组医生,作为会话成员
  261. let sql = "select t.id session_id, m.id participant_id, 0 'participant_role', 0 'last_fetch_time' " +
  262. "from im_new.msg_group g, wlyy.wlyy_admin_team t, wlyy.wlyy_admin_team_member m " +
  263. "where g.to_gid = t.id and t.id = m.team_id group by g.to_gid order by g.to_gid";
  264. ImDb.execQuery({
  265. sql: sql,
  266. args: [],
  267. handler: function (err, participants) {
  268. if (err) {
  269. return callback(err, res);
  270. }
  271. let buffer = "insert into participants(session_id, participant_id, participant_role, last_fetch_time) values ";
  272. participants.forEach(function (participant) {
  273. buffer += vprintf("\n('%s', '%s', %s, %s),", [
  274. participant.session_id,
  275. participant.participant_id,
  276. participant.participant_role,
  277. ObjectUtil.timestampToLong(participant.last_fetch_time)
  278. ]);
  279. });
  280. data += buffer.substr(0, buffer.length - 1) + ";\n\n";
  281. callback(null);
  282. }
  283. });
  284. },
  285. function (callback) {
  286. // 选择出所有的消息
  287. let sql = "SELECT g.msg_id 'id', g.to_gid 'session_id', g.from_uid 'sender_id', u.name 'sender_name', g.`type` 'content_type', g.content 'content', g.timestamp 'timestamp', g.at_uid 'at' " +
  288. "FROM im_new.msg_group g, wlyy.wlyy_doctor u " +
  289. "WHERE LENGTH(g.to_gid) < 20 and g.from_uid = u.code " +
  290. "ORDER BY g.to_gid";
  291. ImDb.execQuery({
  292. sql: sql,
  293. args: [],
  294. handler: function (err, messages) {
  295. if (err) {
  296. return callback(err, res);
  297. }
  298. let buffer = "insert into group_messages(id, session_id, sender_id, sender_name, content_type, content, timestamp, at) values ";
  299. messages.forEach(function (message) {
  300. buffer += vprintf("\n('%s', '%s', '%s', '%s', '%s', '%s', %s, '%s'),", [
  301. mongoose.Types.ObjectId().toString(),
  302. message.session_id,
  303. message.sender_id,
  304. message.sender_name,
  305. message.content_type,
  306. message.content.replace(/'/g, "''"),
  307. ObjectUtil.timestampToLong(message.timestamp),
  308. message.at
  309. ]);
  310. });
  311. data += buffer.substr(0, buffer.length - 1) + ";\n\n";
  312. callback(null, null);
  313. }
  314. });
  315. }
  316. ],
  317. function (err, res) {
  318. if (err) {
  319. log.error("Error occurs while migrate group sessions: ", err);
  320. } else {
  321. fs.appendFileSync(MIGRATION_SCRIPT_File_NAME, data, null);
  322. log.info("Migrate group sessions succeed.");
  323. process.exit(err != null ? 1 : 0);
  324. }
  325. });
  326. }
  327. }
  328. async.waterfall([
  329. function (callback) {
  330. //Migration.migrateSystem();
  331. callback(null);
  332. },
  333. function (callback) {
  334. Migration.migrateMUC();
  335. callback(null);
  336. },
  337. function (callback) {
  338. //Migration.migrateP2P();
  339. callback(null);
  340. },
  341. function (callback) {
  342. //Migration.migrateGroups();
  343. callback(null);
  344. }
  345. ],
  346. function (err, res) {
  347. });