migration.topics.js 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. /**
  2. * 群组,即原先的行政团队数据迁移。
  3. *
  4. * @author sand
  5. * @since 2017/1/9
  6. */
  7. 'use strict';
  8. let ImDb = require('../../repository/oracle/db/im.db');
  9. let ObjectUtil = require("../../util/object.util.js");
  10. let DbUtil = require("../../util/db.util.js");
  11. let async = require("async");
  12. let log = require("../../util/log.js");
  13. let fs = require('fs');
  14. let mongoose = require("mongoose");
  15. let vprintf = require('sprintf-js').vsprintf;
  16. const MIGRATION_SCRIPT_File_NAME = "./ichat_1.2.8_topics_migration.sql";
  17. class Migration {
  18. constructor() {
  19. }
  20. /**
  21. * 迁移议题及会话。
  22. */
  23. static migrateTopicAndSession() {
  24. let sessionIds = [];
  25. let data = "-- Topics update: \n";
  26. async.waterfall([
  27. /*function (callback) {
  28. // 未导入的历史咨询
  29. let sql = "select t.consult id, concat(t.patient, '_consult_2') session_id, t.name name, t.czrq create_time, t.end_operator end_by, t.end_time end_time, 0 'status', t.symptoms description " +
  30. "from wlyy.wlyy_consult_team t where t.consult not in (select id from im.topics)";
  31. ImDb.execQuery({
  32. sql: sql,
  33. args: [],
  34. handler: function (err, res) {
  35. if (err) {
  36. return callback(err, res);
  37. }
  38. let buffer = "insert into topics(id, session_id, name, create_time, end_by, end_time, status, description) values ";
  39. res.forEach(function (topic) {
  40. buffer += vprintf("\n('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s'),", [
  41. topic.id,
  42. topic.session_id,
  43. topic.name,
  44. topic.create_time,
  45. topic.end_by,
  46. topic.end_time,
  47. topic.status,
  48. topic.description
  49. ]);
  50. topics.push(topic.id);
  51. });
  52. data += buffer.substr(0, buffer.length - 1) + " ON DUPLICATE KEY UPDATE id = id;\n";
  53. callback(null);
  54. }
  55. });
  56. },
  57. function (callback) {
  58. // 未导入的历史咨询所属会话
  59. let sql = "select concat(c.patient, '_consult_2') id, c.name, 1 'type', 2 'business_type', c.czrq 'create_date' " +
  60. "from wlyy.wlyy_consult_team c where concat(c.patient, '_consult_2') not in (select id from im.sessions s) group by c.patient order by c.patient";
  61. ImDb.execQuery({
  62. sql: sql,
  63. args: [],
  64. handler: function (err, res) {
  65. if (err) {
  66. return callback(err, res);
  67. }
  68. let buffer = "insert into sessions(id, name, type, business_type, create_date) values";
  69. res.forEach(function (session) {
  70. buffer += vprintf("\n('%s', '%s', '%s', '%s', '%s'),", [
  71. session.id,
  72. session.name,
  73. session.type,
  74. session.business_type,
  75. session.create_date]);
  76. });
  77. data += buffer.substr(0, buffer.length - 1) + " ON DUPLICATE KEY UPDATE id = id;\n\n";
  78. callback(null, null);
  79. }
  80. });
  81. },*/
  82. function (callback) {
  83. // 更新与未导入咨询相关的始末消息ID
  84. let sql = "select t.id, t.session_id, t.name, t.create_time, t.end_by " +
  85. "from topics t " +
  86. "where t.start_message_id is null and t.end_message_id is null group by t.session_id " +
  87. "order by session_id";
  88. ImDb.execQuery({
  89. sql: sql,
  90. args: [],
  91. handler: function (err, res) {
  92. res.forEach(function (topic) {
  93. let sessionId = topic.session_id;
  94. sessionIds.push(sessionId);
  95. });
  96. callback(null);
  97. }
  98. });
  99. },
  100. function (callback) {
  101. ImDb.execQuery({
  102. sql: "select session_id, id message_id from muc_messages m where m.session_id in (?) order by session_id, m.id",
  103. args: [sessionIds],
  104. handler: function (err, res) {
  105. let lastSessionId = null;
  106. let lastMsgId = null;
  107. let minMsgId = null;
  108. let maxMsgId = null;
  109. for(let i = 0; i < res.length; ++i){
  110. console.log("Processing item " + i);
  111. let item = res[i];
  112. if(lastSessionId !== item.session_id){
  113. lastSessionId = item.session_id;
  114. maxMsgId = lastMsgId;
  115. if(maxMsgId !== null){
  116. let updateSQL = "update topics t set t.start_message_id = '%s', t.end_message_id = '%s' where t.session_id = '%s'; \n";
  117. updateSQL = vprintf(updateSQL, [minMsgId, maxMsgId, lastSessionId]);
  118. data += updateSQL;
  119. }
  120. minMsgId = item.message_id;
  121. }
  122. lastMsgId = item.message_id;
  123. if(i === res.length - 1){
  124. let updateSQL = "update topics t set t.start_message_id = '%s', t.end_message_id = '%s' where t.session_id = '%s'; \n";
  125. updateSQL = vprintf(updateSQL, [minMsgId, lastMsgId, lastSessionId]);
  126. data += updateSQL;
  127. callback(null, null);
  128. }
  129. }
  130. }
  131. });
  132. }],
  133. function (err, res) {
  134. if (err) {
  135. log.error("Error occurs while migrating topics and sessions: ", err);
  136. } else {
  137. fs.writeFileSync(MIGRATION_SCRIPT_File_NAME, data + "\n");
  138. log.info("Migrate topics and sessions succeed");
  139. }
  140. });
  141. }
  142. }
  143. async.waterfall([
  144. function (callback) {
  145. Migration.migrateTopicAndSession();
  146. callback(null);
  147. }
  148. ],
  149. function (err, res) {
  150. });