1.基础组件
1.1注解类控制代码执行启动、停止、顺序
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface SyncMeta {String name() default "";boolean isEnable() default false;int order() default 0;}
1.2数据获取基类
1.2.1数据获取基类接口(interface)
public interface BaseDataAcquisition { void acquire();
}
1.2.2数据获取基类抽象类(abstract)实现接口
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
@Slf4j
public class BaseDataAcquisitionImpl<M extends BaseMapper<T>, T> implements BaseDataAcquisition {@Autowiredprotected M baseMapper;@Autowiredprotected SyncUrlProperties syncUrlProperties;@Resourceprotected ThirdDatabaseMapper thirdDatabaseMapper;@Overridepublic void acquire() {acquireData(DataSouceEnum.DATASOURCE);}protected void acquireData(DataSouceEnum dataSouceEnum) {List<T> allData = new ArrayList<>();switch (dataSouceEnum) {case DATASOURCE:allData = getThirdDataFromDB();break;case INTERFACE:allData = getDataFromUrl();break;default:break;}if (CollUtil.isEmpty(allData)) {log.error("采集数据为空,请确认数据采集方式");return;}addExtraData(allData);dealData(allData);List<List<T>> parts = Lists.partition(allData, 1000);parts.stream().forEach(list -> saveOrUpdate(list));extraDeal();}protected List<T> getThirdDataFromDB() {return null;}protected List<T> getDataFromUrl() {return covert(null);}protected void saveOrUpdate(List<T> part) {}protected List<T> covert(String body) {List<T> allData = new ArrayList<>();return allData;}protected void addExtraData(List<T> allData) {}protected void dealData(List<T> allData) {}protected void extraDeal() {}}
2.数据抽取实现
2.1人员数据抽取示例
@Service
@SyncMeta(name = Constants.PERSON_SERVICE, isEnable = false, order = 2)
@Slf4j
public class PersonDataAcquisitionImpl extends BaseDataAcquisitionImpl<ThirdPersonMapper, ThirdPerson>{@Overrideprotected List getThirdDataFromDB() {return thirdDatabaseMapper.getAllThirdPerson();}@Overrideprotected List getDataFromUrl() {String url = syncUrlProperties.getPersonUrl();String body = HttpRequest.get(url).timeout(3000).execute().body();return covert(body);}@Overrideprotected void saveOrUpdate(List part) {this.baseMapper.batchSaveOrUpdate(part);}@Overrideprotected List covert(String body) {List<ThirdPerson> thirdPeople = new ArrayList<>();return thirdPeople;}
}
2.2组织数据抽取示例
@Service
@SyncMeta(name = Constants.ORG_SERVICE, isEnable = false, order = 1)
@Slf4j
public class OrgDataAcquisitionImpl extends BaseDataAcquisitionImpl<ThirdOrgMapper, ThirdOrg> {@Autowiredprivate ThirdRedisOrgCache thirdRedisOrgCache;@Overrideprotected List<ThirdOrg> getThirdDataFromDB() {return thirdDatabaseMapper.getAllThirdOrg();}@Overrideprotected List<ThirdOrg> getDataFromUrl() {String url = syncUrlProperties.getOrgUrl();String body = HttpRequest.get(url).timeout(5000).execute().body();return covert(body);}@Overrideprotected void saveOrUpdate(List<ThirdOrg> part) {this.baseMapper.batchSaveOrUpdate(part);}@Overrideprotected List<ThirdOrg> covert(String body) {List<ThirdOrg> thirdOrgs = new ArrayList<>();return thirdOrgs;}@Overrideprotected void addExtraData(List<ThirdOrg> allData) {ThirdOrg thirdOrg = new ThirdOrg();thirdOrg.setName(Constants.NO_ORG_NAME_NAME);thirdOrg.setThirdCode(Constants.NO_ORG_NAME_CODE);thirdOrg.setType(0);allData.add(thirdOrg);}@Overrideprotected void extraDeal() {thirdRedisOrgCache.initData();}
}
2.3 mybatis 数据批量新增更新操作
<insert id="batchSaveOrUpdate">insert into third_person(NAME, CODE, SEX, PHONE, CARD_NUMBER, ID_CARD, EMAIL, ENTRY_DATE, CAR_NUMBER, THIRD_ORG_CODE, THIRD_DORM_CODE, TYPE)values<foreach collection="pojoList" item="item" index="index" separator=",">(#{item.name},#{item.code},#{item.sex},#{item.phone},#{item.cardNumber},#{item.idCard},#{item.email},#{item.entryDate},#{item.carNumber},#{item.thirdOrgCode},#{item.thirdDormCode},#{item.type})</foreach>ON DUPLICATE KEY UPDATENAME = values(NAME),SEX = values(SEX),PHONE = values(PHONE),CARD_NUMBER = values(CARD_NUMBER),EMAIL = values(EMAIL),ID_CARD = values(ID_CARD),ENTRY_DATE = values(ENTRY_DATE),THIRD_ORG_CODE = values(THIRD_ORG_CODE),THIRD_DORM_CODE = values(THIRD_DORM_CODE),TYPE = values(TYPE),UPDATE_TIME = now()</insert>
3.数据抽取触发
@Service
public class DataAcquisitionStrategy {public void process(List<String> serviceNames) {Map<String, BaseDataAcquisition> allServiceMap = AppContextHelper.getContext().getBeansOfType(BaseDataAcquisition.class);List<BaseDataAcquisition> allAcquisitionService = allServiceMap.values().stream().collect(Collectors.toList());List<BaseDataAcquisition> processServices = allAcquisitionService.stream().filter(service -> {SyncMeta syncMeta = service.getClass().getAnnotation(SyncMeta.class);if (CollUtil.isNotEmpty(serviceNames)) {return syncMeta != null && syncMeta.isEnable() && serviceNames.contains(syncMeta.name());} else {return syncMeta != null && syncMeta.isEnable();}}).collect(Collectors.toList());TreeMap<Integer, BaseDataAcquisition> sortedMap = new TreeMap<>();processServices.forEach(service->{SyncMeta syncMeta = service.getClass().getAnnotation(SyncMeta.class);sortedMap.put(syncMeta.order(), service);});sortedMap.values().forEach(service-> service.acquire());}}