|
@ -153,6 +153,11 @@ public class CurrentMysqlToEsQuotaJob implements Job {
|
|
|
} catch (Exception e) {
|
|
|
e.printStackTrace();
|
|
|
}
|
|
|
try {
|
|
|
Thread.sleep(20000L);
|
|
|
} catch (InterruptedException e) {
|
|
|
e.printStackTrace();
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
|
|
@ -177,32 +182,22 @@ public class CurrentMysqlToEsQuotaJob implements Job {
|
|
|
Search search = new Search.Builder(searchSourceBuilder.toString()).addIndex(esIndex).addType(esType)
|
|
|
.build();
|
|
|
SearchResult result = jestClient.execute(search);
|
|
|
List<SaveModel> saveModels = result.getSourceAsObjectList(SaveModel.class);
|
|
|
//如果之前有值批量更新
|
|
|
if (saveModels != null && saveModels.size() > 0) {
|
|
|
List<SaveModel> quarySaveModels = result.getSourceAsObjectList(SaveModel.class);
|
|
|
//如果之前有值就用查询出来的然后修改数目即可
|
|
|
if (quarySaveModels != null && quarySaveModels.size() > 0) {
|
|
|
//list转map
|
|
|
Map<String, SaveModel> smsMap = new HashMap<>();
|
|
|
sms.stream().forEach(one -> {
|
|
|
smsMap.put(one.getId(), one);
|
|
|
smsMap.put(one.getTeam(), one);
|
|
|
});
|
|
|
//根据id批量更新
|
|
|
Bulk.Builder bulk = new Bulk.Builder().defaultIndex(esIndex).defaultType(esType);
|
|
|
for (SaveModel obj : saveModels) {
|
|
|
SaveModel newResult = smsMap.get(obj.getId());
|
|
|
for (SaveModel obj : quarySaveModels) {
|
|
|
SaveModel newResult = smsMap.get(obj.getTeam());
|
|
|
obj.setResult1(newResult.getResult1());
|
|
|
obj.setResult2(newResult.getResult2());
|
|
|
Update index = new Update.Builder(obj.getId()).build();
|
|
|
bulk.addAction(index);
|
|
|
}
|
|
|
BulkResult br = jestClient.execute(bulk.build());
|
|
|
|
|
|
logger.info("update data count:" + saveModels.size());
|
|
|
logger.info("update flag:" + br.isSucceeded());
|
|
|
return br.isSucceeded();
|
|
|
} else {
|
|
|
//批量插入
|
|
|
return saveDate(sms);
|
|
|
sms=quarySaveModels;
|
|
|
}
|
|
|
return saveDate(sms);
|
|
|
} catch (Exception e) {
|
|
|
e.printStackTrace();
|
|
|
}
|