123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- package com.yihu.wlyy.statistics.task;
- import com.yihu.wlyy.statistics.util.http.HttpResponse;
- import net.sf.json.JSONArray;
- import net.sf.json.JSONObject;
- import org.apache.http.NameValuePair;
- import org.apache.http.client.config.RequestConfig;
- import org.apache.http.client.entity.UrlEncodedFormEntity;
- import org.apache.http.client.methods.CloseableHttpResponse;
- import org.apache.http.client.methods.HttpPost;
- import org.apache.http.impl.client.CloseableHttpClient;
- import org.apache.http.impl.client.HttpClients;
- import org.apache.http.message.BasicNameValuePair;
- import org.apache.http.util.EntityUtils;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.concurrent.LinkedBlockingQueue;
- public class PushMsgTask {
- public static String url="";
- private static Logger logger = LoggerFactory.getLogger(PushMsgTask.class);
- // 最大容量为50的数组堵塞队列
- private static LinkedBlockingQueue<JSONObject> queue = new LinkedBlockingQueue<JSONObject>();
- private static PushMsgTask instance;
- private static Object lock = new Object();
- public static PushMsgTask getInstance() {
- synchronized (lock) {
- if (instance == null) {
- instance = new PushMsgTask();
- instance.run();
- }
- }
- return instance;
- }
- /**
- * 添加一条推送消息
- * @param receiver 接收人
- * @param type 消息类型
- * @param title 消息标题
- * @param msg 消息内容
- * @param data 消息数据
- */
- public void put(String receiver, String type, String title, String msg, String data) {
- try {
- JSONObject json = new JSONObject();
- json.put("receiver", receiver);
- json.put("type", type);
- json.put("title", title);
- json.put("msg", msg);
- json.put("data", data);
- queue.put(json);
- } catch (Exception e) {
- logger.error("添加到消息队列失败!", e);
- e.printStackTrace();
- }
- }
- public void put(JSONArray array) {
- if (array == null || array.size() == 0) {
- return;
- }
- for (int i = 0; i < array.size(); i++) {
- JSONObject json = array.getJSONObject(i);
- if (json == null) {
- continue;
- }
- try {
- queue.put(json);
- } catch (Exception e) {
- logger.error("批量添加到消息队列失败!", e);
- }
- }
- }
- private void run() {
- new Thread(new ConsumerTask()).start();
- }
- // 消费者
- class ConsumerTask implements Runnable {
- @Override
- public void run() {
- try {
- while (true) {
- // 如果queue为空,则当前线程会堵塞,直到有新数据加入
- JSONObject json = queue.take();
- // 推送平台消息
- String receiver = json.has("receiver") ? json.getString("receiver") : "";
- String type = json.has("type") ? json.getString("type") : "";
- String title = json.has("title") ? json.getString("title") : "";
- String msg = json.has("msg") ? json.getString("msg") : "";
- String data = json.has("data") ? json.getString("data") : "";
- boolean res = pushMessage(receiver, type, title, msg, data);
- if (res) {
- logger.info("消息推送成功!");
- } else {
- logger.error("消息推送失败!");
- }
- }
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- }
- /**
- * 消息推送
- *
- * @param receiver 消息接收人
- * @param msgType 消息类型
- * @param title 消息标题
- * @param msg 消息内容
- * @param data 消息数据
- */
- public static boolean pushMessage(String receiver, String msgType, String title, String msg, String data) {
- try {
- List<NameValuePair> params = new ArrayList<NameValuePair>();
- params.add(new BasicNameValuePair("to", receiver));
- params.add(new BasicNameValuePair("content", msg));
- params.add(new BasicNameValuePair("contentType", msgType));
- params.add(new BasicNameValuePair("title", title));
- params.add(new BasicNameValuePair("summary", data));
- HttpResponse response = post(url, params);
- if (response != null && response.getStatusCode() == 200) {
- JSONObject json = JSONObject.fromObject(response.getBody());
- if (!"200".equals(json.optString("status"))) {
- throw new Exception(json.optString("msg"));
- }
- } else {
- throw new Exception("接口调用错误!"+response.getBody());
- }
- return true;
- } catch (Exception e) {
- logger.error("push message error:", e);
- }
- return false;
- }
- private static HttpResponse post(String url, List<NameValuePair> params) {
- HttpResponse re = new HttpResponse();
- CloseableHttpResponse response = null;
- CloseableHttpClient httpclient = HttpClients.createDefault();
- //设置请求信息
- try {
- RequestConfig requestConfig = RequestConfig.custom().
- setAuthenticationEnabled(true).build();
- HttpPost request = new HttpPost(url );
- request.setEntity(new UrlEncodedFormEntity(params,"UTF-8"));
- request.setConfig(requestConfig);
- response = httpclient.execute(request);
- re.setStatusCode(response.getStatusLine().getStatusCode());
- re.setBody(EntityUtils.toString(response.getEntity(), "UTF-8"));
- } catch (Exception e) {
- re.setStatusCode(201);
- re.setBody(e.getMessage());
- } finally {
- close(httpclient, response);
- }
- return re;
- }
- private static void close(CloseableHttpClient httpClient, CloseableHttpResponse response) {
- try {
- if (httpClient != null) {
- httpClient.close();
- }
- } catch (Exception e) {
- System.out.print("关闭httpClient失败");
- }
- try {
- if (response != null) {
- response.close();
- }
- } catch (Exception e) {
- System.out.print("关闭response失败");
- }
- }
- }
|