PushMsgTask.java 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. package com.yihu.wlyy.statistics.task;
  2. import com.yihu.wlyy.statistics.util.HttpClientUtil;
  3. import com.yihu.wlyy.statistics.util.ImUtill;
  4. import net.sf.json.JSONArray;
  5. import net.sf.json.JSONObject;
  6. import org.apache.http.NameValuePair;
  7. import org.apache.http.client.config.RequestConfig;
  8. import org.apache.http.client.entity.UrlEncodedFormEntity;
  9. import org.apache.http.client.methods.CloseableHttpResponse;
  10. import org.apache.http.client.methods.HttpPost;
  11. import org.apache.http.impl.client.CloseableHttpClient;
  12. import org.apache.http.impl.client.HttpClients;
  13. import org.apache.http.message.BasicNameValuePair;
  14. import org.apache.http.util.EntityUtils;
  15. import org.slf4j.Logger;
  16. import org.slf4j.LoggerFactory;
  17. import java.util.ArrayList;
  18. import java.util.List;
  19. import java.util.concurrent.LinkedBlockingQueue;
  20. public class PushMsgTask {
  21. public static String url="";
  22. private static Logger logger = LoggerFactory.getLogger(PushMsgTask.class);
  23. // 最大容量为50的数组堵塞队列
  24. private static LinkedBlockingQueue<JSONObject> queue = new LinkedBlockingQueue<JSONObject>();
  25. private static PushMsgTask instance;
  26. private static Object lock = new Object();
  27. public static PushMsgTask getInstance() {
  28. synchronized (lock) {
  29. if (instance == null) {
  30. instance = new PushMsgTask();
  31. instance.run();
  32. }
  33. }
  34. return instance;
  35. }
  36. /**
  37. * 添加一条推送消息
  38. * @param receiver 接收人
  39. * @param type 消息类型
  40. * @param title 消息标题
  41. * @param msg 消息内容
  42. * @param data 消息数据
  43. */
  44. public void put(String receiver, String type, String title, String msg, String data) {
  45. try {
  46. JSONObject json = new JSONObject();
  47. json.put("receiver", receiver);
  48. json.put("type", type);
  49. json.put("title", title);
  50. json.put("msg", msg);
  51. json.put("data", data);
  52. queue.put(json);
  53. } catch (Exception e) {
  54. logger.error("添加到消息队列失败!", e);
  55. e.printStackTrace();
  56. }
  57. }
  58. public void put(JSONArray array) {
  59. if (array == null || array.size() == 0) {
  60. return;
  61. }
  62. for (int i = 0; i < array.size(); i++) {
  63. JSONObject json = array.getJSONObject(i);
  64. if (json == null) {
  65. continue;
  66. }
  67. try {
  68. queue.put(json);
  69. } catch (Exception e) {
  70. logger.error("批量添加到消息队列失败!", e);
  71. }
  72. }
  73. }
  74. private void run() {
  75. new Thread(new ConsumerTask()).start();
  76. }
  77. // 消费者
  78. class ConsumerTask implements Runnable {
  79. @Override
  80. public void run() {
  81. try {
  82. while (true) {
  83. // 如果queue为空,则当前线程会堵塞,直到有新数据加入
  84. JSONObject json = queue.take();
  85. logger.info("发送前:"+json);
  86. // 推送平台消息
  87. String receiver = json.containsKey("receiver") ? json.getString("receiver") : "";
  88. String type = json.containsKey("type") ? json.getString("type") : "";
  89. String title = json.containsKey("title") ? json.getString("title") : "";
  90. String msg = json.containsKey("msg") ? json.getString("msg") : "";
  91. String data = json.containsKey("data") ? json.getString("data") : "";
  92. boolean res = pushMessage(receiver, type, title, msg, data);
  93. if (res) {
  94. logger.info("消息推送成功!");
  95. } else {
  96. logger.error("消息推送失败!");
  97. }
  98. }
  99. } catch (Exception ex) {
  100. ex.printStackTrace();
  101. }
  102. }
  103. }
  104. /**
  105. * 消息推送
  106. *
  107. * @param receiver 消息接收人
  108. * @param msgType 消息类型
  109. * @param title 消息标题
  110. * @param msg 消息内容
  111. * @param data 消息数据
  112. */
  113. public static boolean pushMessage(String receiver, String msgType, String title, String msg, String data) {
  114. try {
  115. //List<NameValuePair> params = new ArrayList<NameValuePair>();
  116. //params.add(new BasicNameValuePair("to", receiver));
  117. //params.add(new BasicNameValuePair("content", msg));
  118. //params.add(new BasicNameValuePair("contentType", msgType));
  119. //params.add(new BasicNameValuePair("title", title));
  120. //params.add(new BasicNameValuePair("summary", data));
  121. //String response = HttpClientUtil.post(url, params, "UTF-8");
  122. org.json.JSONObject participants = new org.json.JSONObject();
  123. participants.put("system",0);
  124. participants.put(receiver,0);
  125. org.json.JSONObject sessionObj = ImUtill.createSession(url,participants,"0","系统消息","");
  126. if(sessionObj.getInt("status")==-1){
  127. throw new RuntimeException(sessionObj.getString("message"));
  128. }
  129. org.json.JSONObject session = sessionObj.getJSONObject("data");
  130. ImUtill.sendImMsg(url,"system","系统",session.getString("id"),"1", msg,msgType);
  131. return true;
  132. } catch (Exception e) {
  133. logger.error("push message error:", e);
  134. }
  135. return false;
  136. }
  137. }