migration.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. /**
  2. * 群组,即原先的行政团队数据迁移。
  3. *
  4. * @author sand
  5. * @since 2017/1/9
  6. */
  7. 'use strict';
  8. let ImDb = require('../../repository/mysql/db/im.db');
  9. let ObjectUtil = require("../../util/object.util.js");
  10. let DbUtil = require("../../util/db.util.js");
  11. let async = require("async");
  12. let log = require("../../util/log.js");
  13. let fs = require('fs');
  14. let mongoose = require("mongoose");
  15. let vprintf = require('sprintf-js').vsprintf;
  16. const MIGRATION_SCRIPT_File_NAME = "../../resources/schema/ichat_1.2.8_data_migration.sql";
  17. class Migration {
  18. constructor() {
  19. }
  20. /**
  21. * 迁移系统消息
  22. */
  23. static migrateSystem() {
  24. let data = "-- System sessions: \n";
  25. async.waterfall([
  26. function (callback) {
  27. // 所有的系统会话
  28. let sql = "select distinct to_uid participant_id from im_new.msg_system order by to_uid";
  29. ImDb.execQuery({
  30. sql: sql,
  31. args: [],
  32. handler: function (err, res) {
  33. if (err) {
  34. return callback(err, res);
  35. }
  36. let buffer = "insert into sessions(id, name, type, create_date) values ";
  37. res.forEach(function (session) {
  38. buffer += vprintf("\n('%s', '%s', 0, now()),", [
  39. DbUtil.stringArrayHash(['system', session.participant_id]),
  40. "系统消息"
  41. ]);
  42. });
  43. data += buffer.substr(0, buffer.length - 1) + ";\n";
  44. callback(null);
  45. }
  46. });
  47. },
  48. function (callback) {
  49. // 所有的系统消息
  50. let sql = "SELECT msg_id 'id', 'session_id', 'system' `sender_id`, '系统通知' `sender_name`, 1 `content_type`, DATA 'content', timestamp, `type` 'business_type', to_uid FROM im_new.msg_system ORDER BY to_uid";
  51. ImDb.execQuery({
  52. sql: sql,
  53. args: [],
  54. handler: function (err, res) {
  55. if (err) {
  56. return callback(err, res);
  57. }
  58. let buffer = "insert into system_messages(id, session_id, sender_id, sender_name, content_type, content, timestamp, business_type) values";
  59. res.forEach(function (message) {
  60. buffer += vprintf("\n('%s', '%s', '%s', '%s', 1, '%s', %s, '%s'),", [
  61. mongoose.Types.ObjectId().toString(),
  62. DbUtil.stringArrayHash(['system', message.to_uid]),
  63. 'system',
  64. '系统',
  65. message.content.replace(/'/g, "''"),
  66. ObjectUtil.timestampToLong(message.timestamp),
  67. message.business_type]);
  68. });
  69. data += buffer.substr(0, buffer.length - 1) + ";\n\n";
  70. callback(null, null);
  71. }
  72. });
  73. }],
  74. function (err, res) {
  75. if (err) {
  76. log.error("Error occurs while migrate system sessions: ", err);
  77. } else {
  78. fs.writeFileSync(MIGRATION_SCRIPT_File_NAME, data + "\n");
  79. log.info("Migrate System sessions succeed.");
  80. }
  81. });
  82. }
  83. /**
  84. * 迁移MUC及其消息。将原先的P2P中与患者有关的会话全部过来出来,再将其转为会话。议题从家庭医生库中提取出来。
  85. */
  86. static migrateMUC() {
  87. // 选择出所有的咨询组
  88. let sql = "select g.to_gid from im_new.msg_group g where length(g.to_gid) > 20 group by g.to_gid order by g.to_gid";
  89. // 选择出所有的咨询组成员
  90. sql = "select g.to_gid, g.from_uid from im_new.msg_group g where length(g.to_gid) > 20 group by g.to_gid order by g.to_gid";
  91. // 选择出所有的咨询组消息
  92. sql = "select g.to_gid, g.from_uid, g.msg_id, g.`type`, g.content, g.timestamp, g.at_uid from im_new.msg_group g where length(g.to_gid) > 20 order by g.to_gid";
  93. }
  94. /**
  95. * 迁移P2P及其消息,新的IM中P2P仅给医生使用,因此过滤出原先发送与接收人均是医生的会话。
  96. */
  97. static migrateP2P() {
  98. let data = "-- P2P sessions: \n";
  99. async.waterfall([
  100. function (callback) {
  101. // 所有的P2P 会话,成员与消息
  102. let sql = "SELECT a.from_uid sender_id, d.name sender_name, a.to_uid to_id, e.name to_name, a.type content_type, a.content, a.timestamp " +
  103. " FROM im_new.msg_p2p a " +
  104. " LEFT JOIN wlyy.wlyy_doctor d ON a.from_uid = d.code " +
  105. " LEFT JOIN wlyy.wlyy_doctor e on a.to_uid = e.code " +
  106. " WHERE a.from_uid IN (SELECT code FROM wlyy.wlyy_doctor d) AND a.to_uid IN (SELECT code FROM wlyy.wlyy_doctor d)";
  107. ImDb.execQuery({
  108. sql: sql,
  109. args: [],
  110. handler: function (err, res) {
  111. if(err) return callback(err, null);
  112. let sessionsBuffer = "insert into sessions(id, name, type, create_date) values ";
  113. let participantBuffer = "insert into participants(session_id, participant_id, participant_role, last_fetch_time) values ";
  114. let messageBuffer = "insert into p2p_messages(id, session_id, sender_id, sender_name, content_type, content, timestamp) values ";
  115. res.forEach(function (message) {
  116. let sessionId = DbUtil.stringArrayHash([message.sender_id, message.to_id]);
  117. // 会话
  118. sessionsBuffer += vprintf("\n('%s', '%s', %s, %s),", [
  119. sessionId,
  120. 'P2P',
  121. 1,
  122. new Date('2016-11-22 12:00:00').getTime()
  123. ]);
  124. // 成员
  125. participantBuffer += vprintf("\n('%s', '%s', %s, %s),", [
  126. sessionId,
  127. message.sender_id,
  128. 0,
  129. new Date().getTime()
  130. ]);
  131. participantBuffer += vprintf("\n('%s', '%s', %s, %s),", [
  132. sessionId,
  133. message.to_id,
  134. 0,
  135. new Date().getTime()
  136. ]);
  137. // 消息
  138. messageBuffer += vprintf("\n('%s', '%s', '%s', '%s', '%s', '%s', %s),", [
  139. mongoose.Types.ObjectId().toString(),
  140. sessionId,
  141. message.sender_id,
  142. message.sender_name,
  143. message.content_type,
  144. message.content.replace(/'/g, "''"),
  145. ObjectUtil.timestampToLong(message.timestamp)
  146. ]);
  147. });
  148. data += sessionsBuffer.substr(0, sessionsBuffer.length - 1) + " ON DUPLICATE KEY UPDATE id = id;\n\n";
  149. data += participantBuffer.substr(0, participantBuffer.length - 1) + " ON DUPLICATE KEY UPDATE session_id = session_id;\n\n";
  150. data += messageBuffer.substr(0, messageBuffer.length - 1) + " ON DUPLICATE KEY UPDATE id = id;\n\n";
  151. callback(null);
  152. }
  153. });
  154. }
  155. ],
  156. function (err, res) {
  157. if (err) {
  158. log.error("Error occurs while migrate p2p sessions: ", err);
  159. } else {
  160. fs.writeFileSync(MIGRATION_SCRIPT_File_NAME, data + "\n");
  161. log.info("Migrate P2P sessions succeed.");
  162. }
  163. });
  164. }
  165. /**
  166. * 迁移行政团队及其消息。群聊会话规则:
  167. * 1 使用行政团队的ID作为会话的ID
  168. * 2 会话成员是医生
  169. * 3 原消息取出来后,需要重新生成UUID
  170. */
  171. static migrateGroups() {
  172. let data = "-- Group sessions: \n";
  173. async.waterfall([
  174. function (callback) {
  175. // 选择出所有的行政组,作为会话
  176. let sql = "select t.id, t.name, 3 `type`, t.create_time 'create_date' " +
  177. "from im_new.msg_group g, wlyy.wlyy_admin_team t " +
  178. "where g.to_gid = t.id group by g.to_gid order by g.to_gid";
  179. ImDb.execQuery({
  180. sql: sql,
  181. args: [],
  182. handler: function (err, sessions) {
  183. if (err) {
  184. return callback(err, res);
  185. }
  186. let buffer = "insert into sessions(id, name, type, create_date) values ";
  187. sessions.forEach(function (session) {
  188. buffer += vprintf("\n('%s', '%s', %s, %s),", [
  189. session.id,
  190. session.name,
  191. session.type,
  192. ObjectUtil.timestampToLong(session.create_date)
  193. ]);
  194. });
  195. data += buffer.substr(0, buffer.length - 1) + ";\n\n";
  196. callback(null);
  197. }
  198. });
  199. },
  200. function (callback) {
  201. // 选择出所有的行政组医生,作为会话成员
  202. let sql = "select t.id session_id, m.id participant_id, 0 'participant_role', 0 'last_fetch_time' " +
  203. "from im_new.msg_group g, wlyy.wlyy_admin_team t, wlyy.wlyy_admin_team_member m " +
  204. "where g.to_gid = t.id and t.id = m.team_id group by g.to_gid order by g.to_gid";
  205. ImDb.execQuery({
  206. sql: sql,
  207. args: [],
  208. handler: function (err, participants) {
  209. if (err) {
  210. return callback(err, res);
  211. }
  212. let buffer = "insert into participants(session_id, participant_id, participant_role, last_fetch_time) values ";
  213. participants.forEach(function (participant) {
  214. buffer += vprintf("\n('%s', '%s', %s, %s),", [
  215. participant.session_id,
  216. participant.participant_id,
  217. participant.participant_role,
  218. ObjectUtil.timestampToLong(participant.last_fetch_time)
  219. ]);
  220. });
  221. data += buffer.substr(0, buffer.length - 1) + ";\n\n";
  222. callback(null);
  223. }
  224. });
  225. },
  226. function (callback) {
  227. // 选择出所有的消息
  228. let sql = "SELECT g.msg_id 'id', g.to_gid 'session_id', g.from_uid 'sender_id', u.name 'sender_name', g.`type` 'content_type', g.content 'content', g.timestamp 'timestamp', g.at_uid 'at' " +
  229. "FROM im_new.msg_group g, wlyy.wlyy_doctor u " +
  230. "WHERE LENGTH(g.to_gid) < 20 and g.from_uid = u.code " +
  231. "ORDER BY g.to_gid";
  232. ImDb.execQuery({
  233. sql: sql,
  234. args: [],
  235. handler: function (err, messages) {
  236. if (err) {
  237. return callback(err, res);
  238. }
  239. let buffer = "insert into group_messages(id, session_id, sender_id, sender_name, content_type, content, timestamp, at) values ";
  240. messages.forEach(function (message) {
  241. buffer += vprintf("\n('%s', '%s', '%s', '%s', '%s', '%s', %s, '%s'),", [
  242. mongoose.Types.ObjectId().toString(),
  243. message.session_id,
  244. message.sender_id,
  245. message.sender_name,
  246. message.content_type,
  247. message.content.replace(/'/g, "''"),
  248. ObjectUtil.timestampToLong(message.timestamp),
  249. message.at
  250. ]);
  251. });
  252. data += buffer.substr(0, buffer.length - 1) + ";\n\n";
  253. callback(null, null);
  254. }
  255. });
  256. }
  257. ],
  258. function (err, res) {
  259. if (err) {
  260. log.error("Error occurs while migrate group sessions: ", err);
  261. } else {
  262. fs.appendFileSync(MIGRATION_SCRIPT_File_NAME, data, null);
  263. log.info("Migrate group sessions succeed.");
  264. process.exit(err != null ? 1 : 0);
  265. }
  266. });
  267. }
  268. }
  269. async.waterfall([
  270. function (callback) {
  271. //Migration.migrateSystem();
  272. callback(null);
  273. },
  274. function (callback) {
  275. //Migration.migrateMUC();
  276. callback(null);
  277. },
  278. function (callback) {
  279. Migration.migrateP2P();
  280. callback(null);
  281. },
  282. function (callback) {
  283. //Migration.migrateGroups();
  284. callback(null);
  285. }
  286. ],
  287. function (err, res) {
  288. });