|
@ -6,6 +6,7 @@ import com.yihu.wlyy.figure.label.controller.JobController;
|
|
import com.yihu.wlyy.figure.label.convert.ConvertHelper;
|
|
import com.yihu.wlyy.figure.label.convert.ConvertHelper;
|
|
import com.yihu.wlyy.figure.label.dao.FlJobConfigDao;
|
|
import com.yihu.wlyy.figure.label.dao.FlJobConfigDao;
|
|
import com.yihu.wlyy.figure.label.dao.FlLabelDictJobDao;
|
|
import com.yihu.wlyy.figure.label.dao.FlLabelDictJobDao;
|
|
|
|
import com.yihu.wlyy.figure.label.enums.JobSqlFieldTypeEnum;
|
|
import com.yihu.wlyy.figure.label.extract.MysqlExtracter;
|
|
import com.yihu.wlyy.figure.label.extract.MysqlExtracter;
|
|
import com.yihu.wlyy.figure.label.model.DataModel;
|
|
import com.yihu.wlyy.figure.label.model.DataModel;
|
|
import com.yihu.wlyy.figure.label.model.SaveModel;
|
|
import com.yihu.wlyy.figure.label.model.SaveModel;
|
|
@ -21,6 +22,7 @@ import org.springframework.context.annotation.Scope;
|
|
import org.springframework.stereotype.Component;
|
|
import org.springframework.stereotype.Component;
|
|
import org.springframework.stereotype.Service;
|
|
import org.springframework.stereotype.Service;
|
|
import org.springframework.transaction.annotation.Transactional;
|
|
import org.springframework.transaction.annotation.Transactional;
|
|
|
|
import org.springframework.util.CollectionUtils;
|
|
import org.springframework.util.StringUtils;
|
|
import org.springframework.util.StringUtils;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
@ -72,6 +74,11 @@ public class Mysql2ESJob implements Job {
|
|
|
|
|
|
private String source;
|
|
private String source;
|
|
|
|
|
|
|
|
/**
|
|
|
|
* 数据表的id,有些数据是按时间增量查询,有些数据是按表的主键id增量查询
|
|
|
|
*/
|
|
|
|
private long lastDataId;
|
|
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void execute(JobExecutionContext context) throws JobExecutionException {
|
|
public void execute(JobExecutionContext context) throws JobExecutionException {
|
|
@ -81,13 +88,21 @@ public class Mysql2ESJob implements Job {
|
|
//根据obconfig里配置的sql提取数据
|
|
//根据obconfig里配置的sql提取数据
|
|
extract();
|
|
extract();
|
|
//数据转换
|
|
//数据转换
|
|
List<SaveModel> list = new ArrayList<>();
|
|
|
|
list = convert();
|
|
|
|
//数据保存
|
|
|
|
boolean bool = save(list);
|
|
|
|
|
|
|
|
//增量存储成功后,修改增量的czrq时间为当前时间
|
|
|
|
jobService.updateFieldValuetoCurrentTime(this.flJobConfigId,bool);
|
|
|
|
|
|
List<SaveModel> list = convert();
|
|
|
|
//转换后的结果
|
|
|
|
if (!CollectionUtils.isEmpty(list)) {
|
|
|
|
//数据保存
|
|
|
|
boolean bool = save(list);
|
|
|
|
|
|
|
|
//增量存储成功后,修改增量的czrq时间为当前时间或主键id为上次获取到的数据的最后的一条的id
|
|
|
|
if(StringUtils.endsWithIgnoreCase(this.flJobConfig.getSqlFieldType().toString(), JobSqlFieldTypeEnum.TIME.toString())){
|
|
|
|
jobService.updateFieldValuetoCurrentTimeOrId(this.flJobConfigId,null,bool);
|
|
|
|
}else if(StringUtils.endsWithIgnoreCase(this.flJobConfig.getSqlFieldType().toString(), JobSqlFieldTypeEnum.NUM.toString())){
|
|
|
|
int index = dataModelList.size();
|
|
|
|
lastDataId = dataModelList.get(index -1).getId();
|
|
|
|
jobService.updateFieldValuetoCurrentTimeOrId(this.flJobConfigId,this.lastDataId,bool);
|
|
|
|
}
|
|
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
public void initParams(JobDataMap paramsMap){
|
|
public void initParams(JobDataMap paramsMap){
|
|
@ -96,9 +111,10 @@ public class Mysql2ESJob implements Job {
|
|
this.source = String.valueOf(paramsMap.get("source"));
|
|
this.source = String.valueOf(paramsMap.get("source"));
|
|
|
|
|
|
this.flJobConfig = flJobConfigDao.findById(this.flJobConfigId);
|
|
this.flJobConfig = flJobConfigDao.findById(this.flJobConfigId);
|
|
//this.sqlFiledValue = (String)paramsMap.get(this.flJobConfig.getSqlField().toString());
|
|
|
|
this.sqlFiledValue = String.valueOf(paramsMap.get("sqlFiledValue"));
|
|
|
|
|
|
// this.sqlFiledValue = String.valueOf(paramsMap.get("sqlFiledValue"));
|
|
this.sqlFiledCondition = paramsMap.getString("sqlFiledCondition");
|
|
this.sqlFiledCondition = paramsMap.getString("sqlFiledCondition");
|
|
|
|
this.sqlFiledValue = flJobConfig.getSqlFieldValue();
|
|
|
|
this.sqlFiledCondition = ">";
|
|
//没有传增量值,以数据库配置的默认值为查询条件
|
|
//没有传增量值,以数据库配置的默认值为查询条件
|
|
if(StringUtils.isEmpty(this.sqlFiledValue)){
|
|
if(StringUtils.isEmpty(this.sqlFiledValue)){
|
|
this.sqlFiledValue = this.flJobConfig.getSqlFieldValue();
|
|
this.sqlFiledValue = this.flJobConfig.getSqlFieldValue();
|
|
@ -193,10 +209,10 @@ public class Mysql2ESJob implements Job {
|
|
public String getFinalSql(String sql, String sqlFiled, String sqlFiledCondition, String sqlFiledValue) {
|
|
public String getFinalSql(String sql, String sqlFiled, String sqlFiledCondition, String sqlFiledValue) {
|
|
StringBuilder result = new StringBuilder();
|
|
StringBuilder result = new StringBuilder();
|
|
result.append(sql).append(" ");
|
|
result.append(sql).append(" ");
|
|
if (sqlFiledValue.contains(",") && sql.contains("where")) {
|
|
|
|
|
|
if (sqlFiledValue.contains(",") && (sql.contains("where") || sql.contains("WHERE"))) {
|
|
String[] sqlFiledValues = sqlFiledValue.split(",");
|
|
String[] sqlFiledValues = sqlFiledValue.split(",");
|
|
result.append("and ").append(sqlFiled).append(sqlFiledCondition).append("\'"+sqlFiledValues[1]+"\'").append(" and ").append(sqlFiled).append(sqlFiledCondition).append("\'"+sqlFiledValues[0]+"\'");
|
|
result.append("and ").append(sqlFiled).append(sqlFiledCondition).append("\'"+sqlFiledValues[1]+"\'").append(" and ").append(sqlFiled).append(sqlFiledCondition).append("\'"+sqlFiledValues[0]+"\'");
|
|
}else if(sql.contains("where")){
|
|
|
|
|
|
}else if((sql.contains("where") || sql.contains("WHERE"))){
|
|
result.append("and ").append(sqlFiled).append(sqlFiledCondition).append("\'"+sqlFiledValue+"\'");
|
|
result.append("and ").append(sqlFiled).append(sqlFiledCondition).append("\'"+sqlFiledValue+"\'");
|
|
}else if (sql.contains("$")){
|
|
}else if (sql.contains("$")){
|
|
StringBuffer otherCondition = new StringBuffer();
|
|
StringBuffer otherCondition = new StringBuffer();
|