这篇文章主要为大家详细介绍了Java实现多线程大批量同步数据(分页),文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
背景
最近遇到个功能,两个月有300w+的数据,之后还在累加,因一开始该数据就全部存储在mysql表,现需要展示在页面,还需要关联另一张表的数据,而且产品要求页面的查询条件多达20个条件,最终,这个功能卡的要死,基本查不出来数据。
最后是打算把这两张表的数据同时存储到MongoDB中去,以提高查询效率。
一开始同步的时候,采用单线程,循环以分页的模式去同步这两张表数据,结果是…一晚上,只同步了30w数据,特慢!!!
最后,改造了一番,2小时,就成功同步了300w+数据。
以下是主要逻辑。
线程的个数请根据你自己的服务器性能酌情设置。
思路
先通过count查出结果集的总条数,设置每个线程分页查询的条数,通过总条数和单次条数得到线程数量,通过改变limit的下标实现分批查询。
代码实现
package com.github.admin.controller.loans;
import com.baomidou.mybatisplus.mapper.EntityWrapper;
import com.github.admin.model.entity.CaseCheckCallRecord;
import com.github.admin.model.entity.duyan.DuyanCallRecordDetail;
import com.github.admin.model.entity.loans.CaseCallRemarkRecord;
import com.github.admin.service.duyan.DuyanCallRecordDetailService;
import com.github.admin.service.loans.CaseCallRemarkRecordService;
import com.github.common.constant.MongodbConstant;
import com.github.common.util.DingDingMsgSendUtils;
import com.github.common.util.ListUtils;
import com.github.common.util.Response;
import com.github.common.util.concurrent.Executors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
/**
* 多线程同步历史数据
* @author songfayuan
* @date 2019-09-26 15:38
*/
@Slf4j
@RestController
@RequestMapping("/demo")
public class SynchronizeHistoricalDataController implements DisposableBean {
private ExecutorService executor = Executors.newFixedThreadPool(10, "SynchronizeHistoricalDataController"); //newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
@Value("${spring.profiles.active}")
private String profile;
@Autowired
private DuyanCallRecordDetailService duyanCallRecordDetailService;
@Autowired
private MongoTemplate mongoTemplate;
@Autowired
private CaseCallRemarkRecordService caseCallRemarkRecordService;
/**
* 多线程同步通话记录历史数据
* @param params
* @return
* @throws Exception
*/
@GetMapping("/syncHistoryData")
public Response syncHistoryData(Map<String, Object> params) throws Exception {
executor.execute(new Runnable() {
@Override
public void run() {
try {
logicHandler(params);
} catch (Exception e) {
log.warn("多线程同步稽查通话记录历史数据才处理异常,errMsg = {}", e);
DingDingMsgSendUtils.sendDingDingGroupMsg("【系统消息】" + profile + "环境,多线程同步稽查通话记录历史数据才处理异常,errMsg = "+e);
}
}
});
return Response.success("请求成功");
}
/**
* 处理数据逻辑
* @param params
* @throws Exception
*/
private void logicHandler(Map<String, Object> params) throws Exception {
/******返回结果:多线程处理完的最终数据******/
List<DuyanCallRecordDetail> result = new ArrayList<>();
/******查询数据库总的数据条数******/
int count = this.duyanCallRecordDetailService.selectCount(new EntityWrapper<DuyanCallRecordDetail>()
.eq("is_delete", 0)
.eq("platform_type", 1));
DingDingMsgSendUtils.sendDingDingGroupMsg("【系统消息】" + profile + "环境,本次需要同步" + count + "条历史稽查通话记录数据。");
// int count = 2620266;
/******限制每次查询的条数******/
int num = 1000;
/******计算需要查询的次数******/
int times = count / num;
if (count % num != 0) {
times = times + 1;
}
/******每个线程开始查询的行数******/
int offset = 0;
/******添加任务******/
List<Callable<List<DuyanCallRecordDetail>>> tasks = new ArrayList<>();
for (int i = 0; i < times; i++) {
Callable<List<DuyanCallRecordDetail>> qfe = new ThredQuery(duyanCallRecordDetailService, params, offset, num);
tasks.add(qfe);
offset = offset + num;
}
/******为避免太多任务的最终数据全部存在list导致内存溢出,故将任务再次拆分单独处理******/
List<List<Callable<List<DuyanCallRecordDetail>>>> smallList = ListUtils.partition(tasks, 10);
for (List<Callable<List<DuyanCallRecordDetail>>> callableList : smallList) {
if (CollectionUtils.isNotEmpty(callableList)) {
// executor.execute(new Runnable() {
// @Override
// public void run() {
// log.info("任务拆分执行开始:线程{}拆分处理开始...", Thread.currentThread().getName());
//
// log.info("任务拆分执行结束:线程{}拆分处理开始...", Thread.currentThread().getName());
// }
// });
try {
List<Future<List<DuyanCallRecordDetail>>> futures = executor.invokeAll(callableList);
/******处理线程返回结果******/
if (futures != null && futures.size() > 0) {
for (Future<List<DuyanCallRecordDetail>> future : futures) {
List<DuyanCallRecordDetail> duyanCallRecordDetailList = future.get();
if (CollectionUtils.isNotEmpty(duyanCallRecordDetailList)){
executor.execute(new Runnable() {
@Override
public void run() {
/******异步存储******/
log.info("异步存储MongoDB开始:线程{}拆分处理开始...", Thread.currentThread().getName());
saveMongoDB(duyanCallRecordDetailList);
log.info("异步存储MongoDB结束:线程{}拆分处理开始...", Thread.currentThread().getName());
}
});
}
//result.addAll(future.get());
}
}
} catch (Exception e) {
log.warn("任务拆分执行异常,errMsg = {}", e);
DingDingMsgSendUtils.sendDingDingGroupMsg("【系统消息】" + profile + "环境,任务拆分执行异常,errMsg = "+e);
}
}
}
}
/**
* 数据存储MongoDB
* @param duyanCallRecordDetailList
*/
private void saveMongoDB(List<DuyanCallRecordDetail> duyanCallRecordDetailList) {
for (DuyanCallRecordDetail duyanCallRecordDetail : duyanCallRecordDetailList) {
/******重复数据不同步MongoDB******/
org.springframework.data.mongodb.core.query.Query query = new org.springframework.data.mongodb.core.query.Query();
query.addCriteria(Criteria.where("callUuid").is(duyanCallRecordDetail.getCallUuid()));
List<CaseCheckCallRecord> caseCheckCallRecordList = mongoTemplate.find(query, CaseCheckCallRecord.class, MongodbConstant.CASE_CHECK_CALL_RECORD);
if (CollectionUtils.isNotEmpty(caseCheckCallRecordList)) {
log.warn("call_uuid = {}在MongoDB已经存在数据,后面数据将被舍弃...", duyanCallRecordDetail.getCallUuid());
continue;
}
/******关联填写的记录******/
CaseCallRemarkRecord caseCallRemarkRecord = this.caseCallRemarkRecordService.selectOne(new EntityWrapper<CaseCallRemarkRecord>()
.eq("is_delete", 0)
.eq("call_uuid", duyanCallRecordDetail.getCallUuid()));
CaseCheckCallRecord caseCheckCallRecord = new CaseCheckCallRecord();
BeanUtils.copyProperties(duyanCallRecordDetail, caseCheckCallRecord);
//补充
caseCheckCallRecord.setCollectorUserId(duyanCallRecordDetail.getUserId());
if (caseCallRemarkRecord != null) {
//补充
caseCheckCallRecord.setCalleeName(caseCallRemarkRecord.getContactName());
}
log.info("正在存储数据到MongoDB:{}", caseCheckCallRecord.toString());
this.mongoTemplate.save(caseCheckCallRecord, MongodbConstant.CASE_CHECK_CALL_RECORD);
}
}
@Override
public void destroy() throws Exception {
executor.shutdown();
}
}
class ThredQuery implements Callable<List<DuyanCallRecordDetail>> {
/******需要通过构造方法把对应的业务service传进来 实际用的时候把类型变为对应的类型******/
private DuyanCallRecordDetailService myService;
/******查询条件 根据条件来定义该类的属性******/
private Map<String, Object> params;
/******分页index******/
private int offset;
/******数量******/
private int num;
public ThredQuery(DuyanCallRecordDetailService myService, Map<String, Object> params, int offset, int num) {
this.myService = myService;
this.params = params;
this.offset = offset;
this.num = num;
}
@Override
public List<DuyanCallRecordDetail> call() throws Exception {
/******通过service查询得到对应结果******/
List<DuyanCallRecordDetail> duyanCallRecordDetailList = myService.selectList(new EntityWrapper<DuyanCallRecordDetail>()
.eq("is_delete", 0)
.eq("platform_type", 1)
.last("limit "+offset+", "+num));
return duyanCallRecordDetailList;
}
}
ListUtils工具
package com.github.common.util;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
/**
* 描述:List工具类
* @author songfayuan
* 2018年7月22日下午2:23:22
*/
@Slf4j
public class ListUtils {
/**
* 描述:list集合深拷贝
* @param src
* @return
* @throws IOException
* @throws ClassNotFoundException
* @author songfayuan
* 2018年7月22日下午2:35:23
*/
public static <T> List<T> deepCopy(List<T> src) {
try {
ByteArrayOutputStream byteout = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(byteout);
out.writeObject(src);
ByteArrayInputStream bytein = new ByteArrayInputStream(byteout.toByteArray());
ObjectInputStream in = new ObjectInputStream(bytein);
@SuppressWarnings("unchecked")
List<T> dest = (List<T>) in.readObject();
return dest;
} catch (ClassNotFoundException e) {
e.printStackTrace();
return null;
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
/**
* 描述:对象深拷贝
* @param src
* @return
* @throws IOException
* @throws ClassNotFoundException
* @author songfayuan
* 2018年12月14日
*/
public static <T> T objDeepCopy(T src) {
try {
ByteArrayOutputStream byteout = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(byteout);
out.writeObject(src);
ByteArrayInputStream bytein = new ByteArrayInputStream(byteout.toByteArray());
ObjectInputStream in = new ObjectInputStream(bytein);
@SuppressWarnings("unchecked")
T dest = (T) in.readObject();
return dest;
} catch (ClassNotFoundException e) {
log.error("errMsg = {}", e);
return null;
} catch (IOException e) {
log.error("errMsg = {}", e);
return null;
}
}
/**
* 将一个list均分成n个list,主要通过偏移量来实现的
* @author songfayuan
* 2018年12月14日
*/
public static <T> List<List<T>> averageAssign(List<T> source, int n) {
List<List<T>> result = new ArrayList<List<T>>();
int remaider = source.size() % n; //(先计算出余数)
int number = source.size() / n; //然后是商
int offset = 0;//偏移量
for (int i = 0; i < n; i++) {
List<T> value = null;
if (remaider > 0) {
value = source.subList(i * number + offset, (i + 1) * number + offset + 1);
remaider--;
offset++;
} else {
value = source.subList(i * number + offset, (i + 1) * number + offset);
}
result.add(value);
}
return result;
}
/**
* List按指定长度分割
* @param list the list to return consecutive sublists of (需要分隔的list)
* @param size the desired size of each sublist (the last may be smaller) (分隔的长度)
* @author songfayuan
* @date 2019-07-07 21:37
*/
public static <T> List<List<T>> partition(List<T> list, int size){
return Lists.partition(list, size); // 使用guava
}
/**
* 测试
* @param args
*/
public static void main(String[] args) {
List<Integer> bigList = new ArrayList<>();
for (int i = 0; i < 101; i++){
bigList.add(i);
}
log.info("bigList长度为:{}", bigList.size());
log.info("bigList为:{}", bigList);
List<List<Integer>> smallists = partition(bigList, 20);
log.info("smallists长度为:{}", smallists.size());
for (List<Integer> smallist : smallists) {
log.info("拆分结果:{},长度为:{}", smallist, smallist.size());
}
}
}
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持编程学习网。
沃梦达教程
本文标题为:Java实现多线程大批量同步数据(分页)
基础教程推荐
猜你喜欢
- Java数据结构之对象比较详解 2023-03-07
- Java实现线程插队的示例代码 2022-09-03
- java实现多人聊天系统 2023-05-19
- ConditionalOnProperty配置swagger不生效问题及解决 2023-01-02
- java基础知识之FileInputStream流的使用 2023-08-11
- Java文件管理操作的知识点整理 2023-05-19
- Java并发编程进阶之线程控制篇 2023-03-07
- springboot自定义starter方法及注解实例 2023-03-31
- JDK数组阻塞队列源码深入分析总结 2023-04-18
- Java实现查找文件和替换文件内容 2023-04-06