|
@ -1,19 +1,18 @@
|
|
package com.yihu.jw.quota.etl.save.es;
|
|
package com.yihu.jw.quota.etl.save.es;
|
|
|
|
|
|
|
|
import com.alibaba.fastjson.JSON;
|
|
import com.yihu.jw.quota.etl.Contant;
|
|
import com.yihu.jw.quota.etl.Contant;
|
|
import com.yihu.jw.quota.etl.model.EsConfig;
|
|
import com.yihu.jw.quota.etl.model.EsConfig;
|
|
import com.yihu.jw.quota.etl.save.LargDataWithRunnable;
|
|
import com.yihu.jw.quota.etl.save.LargDataWithRunnable;
|
|
import com.yihu.jw.quota.vo.SaveModel;
|
|
import com.yihu.jw.quota.vo.SaveModel;
|
|
import io.searchbox.client.JestClient;
|
|
|
|
import io.searchbox.core.Bulk;
|
|
|
|
import io.searchbox.core.BulkResult;
|
|
|
|
import io.searchbox.core.Index;
|
|
|
|
import net.sf.json.JSONObject;
|
|
import net.sf.json.JSONObject;
|
|
import org.elasticsearch.action.bulk.BulkRequest;
|
|
import org.elasticsearch.action.bulk.BulkRequest;
|
|
import org.elasticsearch.action.bulk.BulkResponse;
|
|
import org.elasticsearch.action.bulk.BulkResponse;
|
|
import org.elasticsearch.action.index.IndexRequest;
|
|
import org.elasticsearch.action.index.IndexRequest;
|
|
import org.elasticsearch.client.RequestOptions;
|
|
import org.elasticsearch.client.RequestOptions;
|
|
import org.elasticsearch.client.RestHighLevelClient;
|
|
import org.elasticsearch.client.RestHighLevelClient;
|
|
|
|
import org.elasticsearch.common.unit.TimeValue;
|
|
|
|
import org.elasticsearch.common.xcontent.XContentType;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.springframework.context.annotation.Scope;
|
|
import org.springframework.context.annotation.Scope;
|
|
@ -100,7 +99,7 @@ public class ElastricSearchSave {
|
|
for (SaveModel obj : newList) {
|
|
for (SaveModel obj : newList) {
|
|
obj.setCreateTime( new Date());
|
|
obj.setCreateTime( new Date());
|
|
IndexRequest indexRequest = new IndexRequest(esConfig.getIndex());
|
|
IndexRequest indexRequest = new IndexRequest(esConfig.getIndex());
|
|
indexRequest.source(obj);
|
|
|
|
|
|
indexRequest.source(JSON.toJSONString(obj), XContentType.JSON);
|
|
bulkRequest.add(indexRequest);
|
|
bulkRequest.add(indexRequest);
|
|
}
|
|
}
|
|
br = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT) ;
|
|
br = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT) ;
|
|
@ -111,10 +110,11 @@ public class ElastricSearchSave {
|
|
}else{
|
|
}else{
|
|
//得到链接
|
|
//得到链接
|
|
BulkRequest request = new BulkRequest();
|
|
BulkRequest request = new BulkRequest();
|
|
|
|
request.timeout(TimeValue.timeValueSeconds(10));
|
|
for (SaveModel obj : smss) {
|
|
for (SaveModel obj : smss) {
|
|
obj.setCreateTime( new Date());
|
|
obj.setCreateTime( new Date());
|
|
IndexRequest indexRequest = new IndexRequest(esConfig.getIndex());
|
|
IndexRequest indexRequest = new IndexRequest(esConfig.getIndex());
|
|
indexRequest.source(obj);
|
|
|
|
|
|
indexRequest.source(JSON.toJSONString(obj), XContentType.JSON);
|
|
request.add(indexRequest);
|
|
request.add(indexRequest);
|
|
}
|
|
}
|
|
br = restHighLevelClient.bulk(request,RequestOptions.DEFAULT);
|
|
br = restHighLevelClient.bulk(request,RequestOptions.DEFAULT);
|
|
@ -124,6 +124,7 @@ public class ElastricSearchSave {
|
|
}
|
|
}
|
|
return isSuccessed;
|
|
return isSuccessed;
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
|
e.printStackTrace();
|
|
throw new RuntimeException("ES 保存数据异常"+ e.getMessage());
|
|
throw new RuntimeException("ES 保存数据异常"+ e.getMessage());
|
|
}
|
|
}
|
|
}
|
|
}
|