|
@ -9,7 +9,9 @@ import org.springframework.jdbc.core.BeanPropertyRowMapper;
|
|
|
import org.springframework.jdbc.core.JdbcTemplate;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
import java.util.concurrent.*;
|
|
|
|
|
|
/**
|
|
|
* Created by chenweida on 2018/3/7.
|
|
@ -19,19 +21,58 @@ public class MysqlExtracter implements Extracter {
|
|
|
|
|
|
private Logger logger = LoggerFactory.getLogger(MysqlExtracter.class);
|
|
|
|
|
|
private int numPerPage = 100000; //一次性最多查询10万条
|
|
|
|
|
|
@Autowired
|
|
|
private JdbcTemplate jdbcTemplate;
|
|
|
|
|
|
/* @Override
|
|
|
public List<ExtractModel> extract() {
|
|
|
return null;
|
|
|
}*/
|
|
|
|
|
|
@Override
|
|
|
public List<DataModel> extractDataByJobConfigsql(String sql) {
|
|
|
List<DataModel> datas = jdbcTemplate.query(sql,new BeanPropertyRowMapper(DataModel.class));
|
|
|
logger.info("job get data counts:" + datas.size());
|
|
|
List<DataModel> datas = new ArrayList<>();
|
|
|
getDataByThread(sql);
|
|
|
return datas;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 根据数据量采用多线程分页查询数据
|
|
|
* @param sql
|
|
|
* @return
|
|
|
*/
|
|
|
public List<DataModel> getDataByThread(String sql){
|
|
|
List<DataModel> datas = new ArrayList<>();
|
|
|
int size = this.getCount(sql);
|
|
|
int number = size / numPerPage + 1;
|
|
|
CountDownLatch countDownLatch = new CountDownLatch(number);
|
|
|
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(number);
|
|
|
List<Callable<List<DataModel>>> threadList = new ArrayList<>();
|
|
|
try {
|
|
|
for (int i = 0; i < number; i++) {
|
|
|
sql = sql + " limit " + i * numPerPage + "," + (i + 1) * numPerPage;
|
|
|
MutilThreadMysqlExtracter mutilThreadMysqlExtracter = new MutilThreadMysqlExtracter(jdbcTemplate, sql);
|
|
|
threadList.add(mutilThreadMysqlExtracter);
|
|
|
}
|
|
|
List<Future<List<DataModel>>> futureList = fixedThreadPool.invokeAll(threadList);
|
|
|
countDownLatch.await();
|
|
|
for (Future future : futureList) {
|
|
|
datas.addAll((List<DataModel>) future.get());
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
logger.error("MutilThreadMysqlExtracter call failed!");
|
|
|
}finally {
|
|
|
countDownLatch.countDown();
|
|
|
fixedThreadPool.shutdown();
|
|
|
}
|
|
|
return datas;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 统计有多少条数据
|
|
|
* @param sql
|
|
|
* @return
|
|
|
*/
|
|
|
public Integer getCount(String sql){
|
|
|
String countSql = "select count(*) from " + sql.split("from")[1];
|
|
|
Integer count = jdbcTemplate.queryForObject(countSql,Integer.class);
|
|
|
return count;
|
|
|
}
|
|
|
}
|