LargeDataWithRunnable.java 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. package com.yihu.quota.etl.save;
  2. import com.yihu.quota.etl.model.EsConfig;
  3. import com.yihu.quota.util.ElasticsearchUtil;
  4. import com.yihu.quota.vo.SaveModel;
  5. import io.searchbox.client.JestClient;
  6. import io.searchbox.core.Bulk;
  7. import io.searchbox.core.BulkResult;
  8. import io.searchbox.core.Index;
  9. import net.sf.json.JSONObject;
  10. import org.slf4j.Logger;
  11. import org.slf4j.LoggerFactory;
  12. import java.util.Date;
  13. import java.util.List;
  14. /**
  15. * Created by janseny on 2018/5/23.
  16. */
  17. public class LargeDataWithRunnable implements Runnable {
  18. private Logger logger = LoggerFactory.getLogger(LargeDataWithRunnable.class);
  19. private ElasticsearchUtil esClientUtil;
  20. private String jsonConfig;
  21. private List<SaveModel> list;//待处理数据
  22. private int threadCount = 0;//初始化线程数
  23. private int flag = 1; //这是第几个线程
  24. private int perCount = 10000;//每个线程处理的数据量
  25. private int totalCount = 0;//待处理数据总数量
  26. private int havedCount = 0;//已经处理的数据量
  27. public LargeDataWithRunnable(List<SaveModel> saveModels, String jsonConfig, ElasticsearchUtil esClientUtil) {
  28. this.list = saveModels;
  29. int count = saveModels.size() / perCount;
  30. int remainder = saveModels.size() % perCount;
  31. if (remainder != 0) {
  32. count++;
  33. }
  34. this.threadCount = count;
  35. this.totalCount = list.size();
  36. this.jsonConfig = jsonConfig;
  37. this.esClientUtil = esClientUtil;
  38. }
  39. @Override
  40. public void run() {
  41. List<SaveModel> sublist = null;
  42. while (totalCount - havedCount > 0) {//线程会循环执行,直到所有数据都处理完
  43. synchronized (this) {//在分包时需要线程同步,避免线程间处理重复的数据
  44. if (totalCount - havedCount != 0) {
  45. sublist = list.subList(perCount * (flag - 1), totalCount - havedCount > perCount ? perCount * flag : perCount * (flag - 1) + (totalCount - havedCount));
  46. flag = flag + 1;
  47. havedCount = sublist.size() + havedCount;
  48. logger.debug("这是第" + (flag - 1) + "个线程;数据 = " + sublist.size());
  49. System.out.println(Thread.currentThread().getName() + "这是第" + (flag - 1) + "个线程;数据 = " + sublist.size());
  50. }
  51. if (sublist != null) {
  52. //此处为数据处理(简单打印 )
  53. BulkResult br = null;
  54. boolean isSuccessed = false;
  55. try {
  56. //得到链接
  57. EsConfig esConfig = (EsConfig) JSONObject.toBean(JSONObject.fromObject(jsonConfig), EsConfig.class);
  58. JestClient jestClient = esClientUtil.getJestClient(esConfig.getHost(), esConfig.getPort());
  59. Bulk.Builder bulk = new Bulk.Builder().defaultIndex(esConfig.getIndex()).defaultType(esConfig.getType());
  60. for (SaveModel obj : sublist) {
  61. obj.setCreateTime(new Date());
  62. Index index = new Index.Builder(obj).build();
  63. bulk.addAction(index);
  64. }
  65. br = jestClient.execute(bulk.build());
  66. //关闭链接
  67. jestClient.shutdownClient();
  68. isSuccessed = br.isSucceeded();
  69. } catch (Exception e) {
  70. throw new RuntimeException("ES 保存数据异常");
  71. }
  72. }
  73. }
  74. }
  75. }
  76. public List<SaveModel> getList() {
  77. return list;
  78. }
  79. public void setList(List<SaveModel> list) {
  80. this.list = list;
  81. }
  82. public int getThreadCount() {
  83. return threadCount;
  84. }
  85. public void setThreadCount(int threadCount) {
  86. this.threadCount = threadCount;
  87. }
  88. public int getFlag() {
  89. return flag;
  90. }
  91. public void setFlag(int flag) {
  92. this.flag = flag;
  93. }
  94. public int getPerCount() {
  95. return perCount;
  96. }
  97. public void setPerCount(int perCount) {
  98. this.perCount = perCount;
  99. }
  100. public int getTotalCount() {
  101. return totalCount;
  102. }
  103. public void setTotalCount(int totalCount) {
  104. this.totalCount = totalCount;
  105. }
  106. public int getHavedCount() {
  107. return havedCount;
  108. }
  109. public void setHavedCount(int havedCount) {
  110. this.havedCount = havedCount;
  111. }
  112. public String getJsonConfig() {
  113. return jsonConfig;
  114. }
  115. public void setJsonConfig(String jsonConfig) {
  116. this.jsonConfig = jsonConfig;
  117. }
  118. public ElasticsearchUtil getEsClientUtil() {
  119. return esClientUtil;
  120. }
  121. public void setEsClientUtil(ElasticsearchUtil esClientUtil) {
  122. this.esClientUtil = esClientUtil;
  123. }
  124. }