本例应用线程池、多线程、阻塞队列处理一个流程任务。本例处理一个订单流程,主要包括生成订单、订单处理、订单入库,下面我们一起看看
版本:Spring Boot 2.6.3
一、案例场景
1>web端接收restful请求生成任务A,并把任务放入队列Queue_A。
2>线程池A的任务线程从队列Queue_A取出任务,处理完成后放入Queue_B。
3>线程池B的任务线程从Queue_B取出任务,处理完成后入库。
本例就使用两个任务步骤,按需扩展延长任务链。
二、使用类
java.util.LinkedHashMap,双向链表。
java.util.concurrent.BlockingQueue,阻塞队列接口。
java.util.concurrent.LinkedBlockingQueue,阻塞队列实现类。
java.util.concurrent.CountDownLatch,线程计数器。
java.util.concurrent.locks.ReentrantLock,可重入锁。
三、本例说明
1.接收web请求
OrderController接收web请求,业务数据封装成任务对象,并写入队列QUEUE_A。Web请求结束,立即返回。
2.后台任务处理
FlowStarter流程启动器
管理FlowManager,创建流程管理器和启动流程管理器。创建线程池容器StepContainer,指定队列、线程池线程数量,以及业务处理Handler。
FlowManager流程管理器
管理线程池容器StepContainer。创建线程池容器,启动线程池容器,关闭线程池容器,线程池容器之间数据传递。使用LinkedHashMap维护一个流程中的多个线程池容器。
StepContainer线程池容器
创建线程池,启动线程执行器(Executor),初始化业务处理Handler,读写队列。使用LinkedHashMap维护一个流程中的多个StepExecutor。
StepExecutor线程执行器
执行抽象公用业务逻辑。实现线程Runnable接口。调用StepHandler的实现类的execute执行具体业务逻辑。
StepHandler业务处理器handler
具体业务在StepHandler的实现类的execute中实现。
任务模型对象StepModel和执行结果对象StepResult
每个具体业务数据必须包装成任务模型对象StepModel,执行结果包装成执行结果对象StepResult,才能在线程池和队列中流转。
3.关系说明
一个FlowStarter可以启动一个或者多个FlowManager。支持一对多和一对一,按需扩展。
一个FlowManager对应一个业务流程。一个业务流程可以拆分为多个步骤。一个步骤对应一个线程池容器StepContainer。一个线程池容器StepContainer,启动多个线程执行器StepExecutor。效果就是并发执行任务。
一个业务流程拆分成若干个步骤,每个步骤之间数据流转,使用任务模型StepModel中的状态标识isFinished,isPutInQueueAgain,isPutInQueueNext 字段来分析任务流向。使用StepModel的StepResult的 nextStepName字段来识别具体流向的线程池容器。
四、代码
1.OrderController
OrderController,接收请求、封装任务、写队列。
@Slf4j
@RestController
@RequestMapping("/order")
public class OrderController {
@PostMapping("/f1")
public Object f1(@RequestBody Object obj) {
log.info("OrderController->f1,接收参数,obj = " + obj.toString());
Map objMap = (Map) obj;
OrderInfo orderInfo = new OrderInfo();
orderInfo.setUserName((String) objMap.get("userName"));
orderInfo.setTradeName((String) objMap.get("tradeName"));
orderInfo.setOrderTime(System.currentTimeMillis());
LinkedBlockingQueue<StepModel> queueA = FlowQueue.getBlockingQueue("QUEUE_A");
QueueUtils.putStepPutInQueue(queueA,orderInfo);
log.info("OrderController->f1,返回." );
return ResultObj.builder().code("200").message("成功").build();
}
}
2.FlowStarter流程启动器
FlowStarter,后台任务线程池和线程启动。实现InitializingBean了接口。那么在spring初始化化bean完成后,就能触发启动线程池和线程。
@Slf4j
@Service
public class FlowStarter implements InitializingBean {
@Override
public void afterPropertiesSet() throws Exception {
log.info("FlowWorker创建流程.");
FlowManager flowManager = new FlowManager();
flowManager.buildContainer(ConstantUtils.STEP_01,5,
FlowQueue.getBlockingQueue("QUEUE_A"), Step01Handler.class
);
flowManager.buildContainer(ConstantUtils.STEP_02,5,
FlowQueue.getBlockingQueue("QUEUE_B"), Step02Handler.class
);
flowManager.startContainers();
log.info("FlowWorker启动流程完成.");
}
}
3.FlowManager流程管理器
一个FlowManager流程管理器,维护多个线程池容器StepContainer,共同完成一个流程的多个步骤。
public class FlowManager {
// 管理器名称
private String name;
// 管理线程池容器
private Map<String, StepContainer> stepContainerMap = new LinkedHashMap<>();
public FlowManager() {}
// 创建线程池容器
public void buildContainer(String name, int poolSize, BlockingQueue<StepModel> queue,
Class<? extends StepHandler> handlerClazz) {
StepContainer stepWorker = new StepContainer();
stepWorker.createThreadPool(poolSize, queue, handlerClazz);
stepWorker.setName(name);
stepWorker.setFlowManager(this);
this.stepContainerMap.put(name, stepWorker);
}
// 启动线程池容器
public void startContainers() {
for (StepContainer stepContainer : this.stepContainerMap.values()) {
stepContainer.startRunExecutor();
}
}
// 关闭线程池容器
public void stopContainers() {
for (StepContainer stepContainer : this.stepContainerMap.values()) {
stepContainer.stopRunExecutor();
}
this.stepContainerMap.clear();
}
// 任务放入下一个线程池
public boolean sendToNextContainer(String nextStepName, Object obj) {
if (nextStepName != null && !StringUtils.equals(nextStepName, "")) {
if (this.stepContainerMap.containsKey(nextStepName)) {
this.stepContainerMap.get(nextStepName).putStepInQueue(obj);
return true;
} else {
return false;
}
} else {
return false;
}
}
public String getName() {
return name;
}
}
4.StepContainer线程池容器
StepContainer线程池容器,维护多个线程执行器StepExecutor,实现多线程异步完成每个独立任务。
@Slf4j
public class StepContainer {
// 线程池名称
private String name;
// 线程池
private ExecutorService threadPool;
// 线程数目
private int nThreads = 0;
// 线程处理业务handler类
private Class handlerClazz;
// 线程处理业务队列
private BlockingQueue<StepModel> queue = null;
// 线程池内线程管理
private Map<String, StepExecutor> stepExecutorMap = new LinkedHashMap<>();
// 线程池运行状态
private boolean isRun = false;
// 线程池管理器
private FlowManager flowManager = null;
// 构造函数
public StepContainer() {}
// 创建线程池
public boolean createThreadPool(int nThreads, BlockingQueue<StepModel> queue,
Class<? extends StepHandler> handlerClazz) {
try {
this.nThreads = nThreads;
this.queue = queue;
this.handlerClazz = handlerClazz;
this.threadPool = Executors.newFixedThreadPool(this.nThreads, new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
return new Thread(runnable);
}
});
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}
// 启动线程
public void startRunExecutor() {
if (!this.isRun) {
if (this.handlerClazz != null) {
log.info("线程池: " + this.name + ",启动,加载线程Executor.");
StepExecutor stepExecutor;
String executorName = "";
for (int num = 0; num < this.nThreads; num++) {
try {
executorName = this.name + "_" + (num + 1);
StepHandler stepHandler = (StepHandler) createStepHandler(this.handlerClazz);
stepExecutor = new StepExecutor(executorName, this.queue, stepHandler, this);
this.threadPool.execute(stepExecutor);
this.stepExecutorMap.put(executorName, stepExecutor);
} catch (Exception e) {
e.printStackTrace();
}
}
this.isRun = true;
}
}
}
// 关闭线程
public void stopRunExecutor() {
if (isRun) {
Iterator iterator = this.stepExecutorMap.values().iterator();
while (iterator.hasNext()) {
StepExecutor stepExecutor = (StepExecutor) iterator.next();
stepExecutor.stop();
}
this.stepExecutorMap.clear();
this.isRun = false;
}
}
// 从队列获取任务
public StepModel getStepFromQueue() {
StepModel stepModel = null;
synchronized (this.queue) {
try {
if (this.queue.size() > 0) {
stepModel = this.queue.take();
}
} catch (Exception e) {
log.info("从队列获取任务异常.");
e.printStackTrace();
}
}
return stepModel;
}
// 任务放入队列
public void putStepInQueue(Object obj) {
try {
StepModel stepModel = new StepModel(obj);
stepModel.setPutInQueueTime(System.currentTimeMillis());
this.queue.put(stepModel);
} catch (InterruptedException e) {
log.info("任务放入队列异常.");
e.printStackTrace();
}
}
// 重新放入
public void putStepInQueueAgain(StepModel stepModel) {
stepModel.setFinished(false);
stepModel.setPutInQueueNext(false);
stepModel.setPutInQueueAgain(false);
try {
this.queue.put(stepModel);
} catch (InterruptedException e) {
log.info("任务重新放入队列异常.");
e.printStackTrace();
}
}
// 清空队列
public void clearQueue() {
if (this.queue != null) {
this.queue.clear();
}
}
// 初始化实例对象
public Object createStepHandler(Class clazz)
throws InstantiationException, IllegalAccessException {
Object object = clazz.newInstance();
return object;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public FlowManager getFlowManager() {
return flowManager;
}
public void setFlowManager(FlowManager flowManager) {
this.flowManager = flowManager;
}
}
5.StepExecutor线程执行器
StepExecutor线程执行器,实现Runnable接口。线程执行单元通用逻辑,具体业务逻辑通过调用StepHandler的execute方法实现。
@Slf4j
public class StepExecutor implements Runnable {
// 执行器名称
private String name;
// 线程执行的任务
private StepModel stepModel;
// 线程执行的队列
private BlockingQueue<StepModel> queue;
// 线程执行的业务处理逻辑
private Object stepHandler;
// 线程运行状态
private volatile boolean isRun = false;
// 线程开启(True)和关闭(False)
private volatile boolean isClose = false;
// 线程隶属容器
private StepContainer stepContainer;
// 线程计数器(关闭线程使用)
private CountDownLatch countDownLatch = null;
public StepExecutor() {}
public StepExecutor(String name, BlockingQueue<StepModel> queue,
StepHandler stepHandler, StepContainer stepContainer) {
this.name = name;
this.queue = queue;
this.stepHandler = stepHandler;
this.stepContainer = stepContainer;
}
@Override
public void run() {
this.isRun = true;
this.countDownLatch = new CountDownLatch(1);
// 没收到关闭信号,则循环运行
while (!this.isClose) {
this.stepModel = null;
String threadName = "【线程池:" + this.stepContainer.getName()
+ ",线程:" + Thread.currentThread().getName() + "】";
// 循环运行,为防止中断和卡主,需捕获异常
try {
StepHandler stepHandler = (StepHandler) this.stepHandler;
this.stepModel = this.stepContainer.getStepFromQueue();
if (this.stepModel != null) {
log.info(threadName + ",处理任务.");
this.stepModel.getStepResultList().clear();
stepHandler.execute(this.stepModel);
// 执行完成后结果数据
List<StepResult> stepResultList = this.stepModel.getStepResultList();
boolean isFinished = this.stepModel.isFinished();
boolean isPutInQueueAgain = this.stepModel.isPutInQueueAgain();
boolean isPutInQueueNext = this.stepModel.isPutInQueueNext();
if (isFinished && !isPutInQueueAgain && !isPutInQueueNext) {
log.info(threadName + ",任务结束.");
}
if (!isFinished && isPutInQueueAgain && !isPutInQueueNext) {
log.info(threadName + ",任务在本步骤未完成,重新放队列.");
this.stepContainer.putStepInQueueAgain(this.stepModel);
}
if (!isFinished && !isPutInQueueAgain && isPutInQueueNext) {
int resultNum = stepResultList.size();
if (resultNum > 0) {
for (StepResult stepResult : stepResultList) {
log.info(threadName + ",任务在本步骤已经完成,发送给下一个线程池: "
+ stepResult.getNextStepName() + ",执行.");
this.stepContainer.getFlowManager().sendToNextContainer(
stepResult.getNextStepName(),
stepResult.getResult());
}
}
}
} else {
threadToSleep(1000 * 3L);
}
} catch (Exception e) {
log.info("执行器异常.");
e.printStackTrace();
this.stepContainer.putStepInQueueAgain(this.stepModel);
}
}
// 跳出循环后,线程计数减1
this.countDownLatch.countDown();
this.isRun = false;
}
public void stop() {
this.isClose = true;
if (this.countDownLatch != null) {
while (this.countDownLatch.getCount() > 0L) {
try {
this.countDownLatch.await();
} catch (InterruptedException e) {
log.info("线程关闭异常.");
e.printStackTrace();
}
}
}
this.isClose = false;
}
public void threadToSleep(long time) {
try {
Thread.sleep(time);
} catch (Exception e) {
log.info("线程休眠异常.");
e.printStackTrace();
}
}
}
6.StepHandler业务处理handler
StepHandler是StepExecutor线程执行器,具体执行业务逻辑的入口。
StepHandler抽象类
每个具体的实现类都继承抽象的StepHandler。
public abstract class StepHandler {
public StepHandler() {}
public abstract void execute(StepModel stepModel);
}
Step01Handler
Step01Handler是StepHandler实现类,从队列中取任务执行,执行完成后放入下一个业务处理器Step02Handler。
@Slf4j
public class Step01Handler extends StepHandler {
@Override
public void execute(StepModel stepModel) {
log.info("Step01Handler执行开始,stepModel: " + stepModel.toString());
OrderInfo orderInfo = (OrderInfo) stepModel.getObj();
List<StepResult> stepResultList = stepModel.getStepResultList();
try {
log.info("Step01Handler执行,处理订单.");
String orderNo = UUID.randomUUID().toString()
.replace("-", "").toUpperCase();
orderInfo.setOrderNo(orderNo);
orderInfo.setPlatformType("线上");
orderInfo.setOrderSource("Web");
stepModel.setFinished(false);
stepModel.setPutInQueueNext(true);
stepModel.setPutInQueueAgain(false);
stepResultList.add(new StepResult(ConstantUtils.STEP_02, orderInfo));
} catch (Exception e) {
stepModel.setFinished(false);
stepModel.setPutInQueueNext(false);
stepModel.setPutInQueueAgain(true);
stepResultList.add(new StepResult(ConstantUtils.STEP_01, orderInfo));
}
log.info("Step01Handler执行完成,stepModel: " + stepModel.toString());
}
}
Step02Handler
Step02Handler是StepHandler实现类,从队列中取任务执行。
@Slf4j
public class Step02Handler extends StepHandler{
@Override
public void execute(StepModel stepModel) {
log.info("Step02Handler执行开始,stepModel: " + stepModel.toString());
OrderInfo orderInfo = (OrderInfo) stepModel.getObj();
List<StepResult> stepResultList = stepModel.getStepResultList();
try {
orderInfo.setEndTime(System.currentTimeMillis());
stepModel.setFinished(true);
stepModel.setPutInQueueNext(false);
stepModel.setPutInQueueAgain(false);
log.info("Step02Handler执行,入库.");
} catch (Exception e) {
stepModel.setFinished(true);
stepModel.setPutInQueueNext(false);
stepModel.setPutInQueueAgain(false);
}
log.info("Step02Handler执行完成,stepModel: " + stepModel.toString());
}
}
7.阻塞队列
BlockingQueue是线程安全的阻塞队列。
7.1 FlowQueue
FlowQueue,管理本例使用的两个阻塞队列。
public class FlowQueue {
private static final LinkedBlockingQueue<StepModel> queueA = new LinkedBlockingQueue<StepModel>();
private static final LinkedBlockingQueue<StepModel> queueB = new LinkedBlockingQueue<StepModel>();
public static LinkedBlockingQueue<StepModel> getBlockingQueue(String queueName) {
LinkedBlockingQueue<StepModel> queue = null;
switch (queueName) {
case "QUEUE_A":
queue = queueA;
break;
case "QUEUE_B":
queue = queueB;
break;
}
return queue;
}
}
7.2 QueueUtils
QueueUtils,队列简易工具。
@Slf4j
public class QueueUtils {
public static StepModel getStepFromQueue(
LinkedBlockingQueue<StepModel> queue) {
StepModel stepModel = null;
try {
if (queue.size() > 0) {
stepModel = queue.take();
}
} catch (Exception e) {
log.info("读队列异常.");
e.printStackTrace();
}
return stepModel;
}
public static void putStepPutInQueue(
LinkedBlockingQueue<StepModel> queue, Object obj) {
try {
StepModel stepModel = new StepModel(obj);
stepModel.setPutInQueueTime(System.currentTimeMillis());
queue.put(stepModel);
} catch (Exception e) {
log.info("写队列异常.");
e.printStackTrace();
}
}
public static int getQueueSize(
LinkedBlockingQueue<StepModel> queue) {
int size = 0;
try {
size = queue.size();
} catch (Exception e) {
log.info("获取队列Size异常.");
e.printStackTrace();
}
return size;
}
}
7.3 ConstantUtils
ConstantUtils,管理常量,即线程池名称。
public class ConstantUtils {
public static final String STEP_01 = "STEP_01_THREAD_POOL";
public static final String STEP_02 = "STEP_02_THREAD_POOL";
}
8.任务模型
任务模型,即具体需要处理对象,封装成线程使用的任务模型,这样可以把业务和流程框架解耦。
8.1 StepModel
StepModel,任务模型封装。
@Data
public class StepModel {
// 任务对象
private Object obj;
// 任务执行结果
private List<StepResult> stepResultList;
// 任务接收时间
private long putInQueueTime;
// 任务完成标识
private boolean isFinished = false;
// 任务重新放入队列标识
private boolean isPutInQueueAgain = false;
// 任务放入下一个队列标识
private boolean isPutInQueueNext = false;
public StepModel(Object object) {
this.obj = object;
this.stepResultList = new ArrayList<>();
}
}
8.2 StepResult
StepResult,执行结果模型封装。
@Data
public class StepResult {
// 目标线程池名
private String nextStepName;
// 执行结果
private Object result;
public StepResult(String nextStepName,Object result){
this.nextStepName = nextStepName;
this.result = result;
}
}
9.业务数据模型
业务数据模型,即生成具体需要处理的数据,在传入给线程池的线程执行前,需要封装成任务模型。
9.1 OrderInfo
OrderInfo,本例要处理的业务数据模型。
@Data
@NoArgsConstructor
public class OrderInfo {
private String userName;
private String orderNo;
private String tradeName;
private String platformType;
private String orderSource;
private long orderTime;
private long endTime;
}
9.2 ResultObj
ResultObj,web请求返回的统一封装对象。
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class ResultObj {
private String code;
private String message;
}
10.测试
包括web请求和后台任务
10.1 web请求
请求URL: http://127.0.0.1:8080/server/order/f1
入参:
{
"userName": "HangZhou0614",
"tradeName": "Vue进阶教程"
}
返回值:
{
"code": "200",
"message": "成功"
}
10.2 后台任务日志
日志输出:
到此这篇关于Springboot详解线程池与多线程及阻塞队列的应用详解的文章就介绍到这了,更多相关Springboot线程池内容请搜索编程学习网以前的文章希望大家以后多多支持编程学习网!
本文标题为:Springboot详解线程池与多线程及阻塞队列的应用详解
基础教程推荐
- java实现多人聊天系统 2023-05-19
- springboot自定义starter方法及注解实例 2023-03-31
- JDK数组阻塞队列源码深入分析总结 2023-04-18
- ConditionalOnProperty配置swagger不生效问题及解决 2023-01-02
- Java并发编程进阶之线程控制篇 2023-03-07
- Java数据结构之对象比较详解 2023-03-07
- Java实现线程插队的示例代码 2022-09-03
- Java文件管理操作的知识点整理 2023-05-19
- Java实现查找文件和替换文件内容 2023-04-06
- java基础知识之FileInputStream流的使用 2023-08-11