Processing tasks in parallel and sequentially Java(并行和顺序处理 Java 任务)
问题描述
In my program, the user can trigger different tasks via an interface, which take some time to process. Therefore they are executed by threads. So far I have implemented it so that I have an executer with one thread that executes all tasks one after the other. But now I would like to parallelize everything a little bit.
i.e. I would like to run tasks in parallel, except if they have the same path, then I want to run them sequentially. For example, I have 10 threads in my pool and when a task comes in, the task should be assigned to the worker which is currently processing a task with the same path. If no task with the same path is currently being processed by a worker, then the task should be processed by a currently free worker.
Additional info: A task is any type of task that is executed on a file in the local file system. For example, renaming a file. Therefore, the task have the attribute path
. And I don't want to execute two tasks on the same file at the same time, so such tasks with the same paths should be performed sequentially.
Here is my sample code but there is work to do:
One of my problems is, I need a safe way to check if a worker is currently running and get the path of the currently running worker. By safe I mean, that no problems of simultaneous access or other thread problems occur.
public class TasksOrderingExecutor {
public interface Task extends Runnable {
//Task code here
String getPath();
}
private static class Worker implements Runnable {
private final LinkedBlockingQueue<Task> tasks = new LinkedBlockingQueue<>();
//some variable or mechanic to give the actual path of the running tasks??
private volatile boolean stopped;
void schedule(Task task) {
tasks.add(task);
}
void stop() {
stopped = true;
}
@Override
public void run() {
while (!stopped) {
try {
Task task = tasks.take();
task.run();
} catch (InterruptedException ie) {
// perhaps, handle somehow
}
}
}
}
private final Worker[] workers;
private final ExecutorService executorService;
/**
* @param queuesNr nr of concurrent task queues
*/
public TasksOrderingExecutor(int queuesNr) {
Preconditions.checkArgument(queuesNr >= 1, "queuesNr >= 1");
executorService = new ThreadPoolExecutor(queuesNr, queuesNr, 0, TimeUnit.SECONDS, new SynchronousQueue<>());
workers = new Worker[queuesNr];
for (int i = 0; i < queuesNr; i++) {
Worker worker = new Worker();
executorService.submit(worker);
workers[i] = worker;
}
}
public void submit(Task task) {
Worker worker = getWorker(task);
worker.schedule(task);
}
public void stop() {
for (Worker w : workers) w.stop();
executorService.shutdown();
}
private Worker getWorker(Task task) {
//check here if a running worker with a specific path exists? If yes return it, else return a free worker. How do I check if a worker is currently running?
return workers[task.getPath() //HERE I NEED HELP//];
}
}
all you need is a hash map of actors, with file path as a key. Different actors would run in parallel, and concrete actor would handle tasks sequentially.
Your solution is wrong because Worker class uses blocking operation take
but is executed in a limited thread pool, which may lead to a thread starvation (a kind of deadlock). Actors do not block when waiting for next message.
import org.df4j.core.dataflow.ClassicActor;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
public class TasksOrderingExecutor {
public static class Task implements Runnable {
private final String path;
private final String task;
public Task(String path, String task) {
this.path = path;
this.task = task;
}
//Task code here
String getPath() {
return path;
}
@Override
public void run() {
System.out.println(path+"/"+task+" started");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
System.out.println(path+"/"+task+" stopped");
}
}
static class Worker extends ClassicActor<Task> {
@Override
protected void runAction(Task task) throws Throwable {
task.run();
}
}
private final ExecutorService executorService;
private final Map<String,Worker> workers = new HashMap<String,Worker>(){
@Override
public Worker get(Object key) {
return super.computeIfAbsent((String) key, (k) -> {
Worker res = new Worker();
res.setExecutor(executorService);
res.start();
return res;
});
}
};
/**
* @param queuesNr nr of concurrent task queues
*/
public TasksOrderingExecutor(int queuesNr) {
executorService = ForkJoinPool.commonPool();
}
public void submit(Task task) {
Worker worker = getWorker(task);
worker.onNext(task);
}
public void stop() throws InterruptedException {
for (Worker w : workers.values()) {
w.onComplete();
}
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.SECONDS);
}
private Worker getWorker(Task task) {
//check here if a runnig worker with a specific path exists? If yes return it, else return a free worker. How do I check if a worker is currently running?
return workers.get(task.getPath());
}
public static void main(String[] args) throws InterruptedException {
TasksOrderingExecutor orderingExecutor = new TasksOrderingExecutor(20);
orderingExecutor.submit(new Task("path1", "task1"));
orderingExecutor.submit(new Task("path1", "task2"));
orderingExecutor.submit(new Task("path2", "task1"));
orderingExecutor.submit(new Task("path3", "task1"));
orderingExecutor.submit(new Task("path2", "task2"));
orderingExecutor.stop();
}
}
The protocol of execution shows that tasks with te same key are executed sequentially and tasks with different keys are executed in parallel:
path3/task1 started
path2/task1 started
path1/task1 started
path3/task1 stopped
path2/task1 stopped
path1/task1 stopped
path2/task2 started
path1/task2 started
path2/task2 stopped
path1/task2 stopped
I used my own actor library DF4J, but any other actor library can be used.
这篇关于并行和顺序处理 Java 任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:并行和顺序处理 Java 任务
基础教程推荐
- 在螺旋中写一个字符串 2022-01-01
- 如何强制对超级方法进行多态调用? 2022-01-01
- 如何在不安装整个 WTP 包的情况下将 Tomcat 8 添加到 Eclipse Kepler 2022-01-01
- 由于对所需库 rt.jar 的限制,对类的访问限制? 2022-01-01
- Java 中保存最后 N 个元素的大小受限队列 2022-01-01
- 首次使用 Hadoop,MapReduce Job 不运行 Reduce Phase 2022-01-01
- 如何对 HashSet 进行排序? 2022-01-01
- 如何使用 Eclipse 检查调试符号状态? 2022-01-01
- 如何使用 Stream 在集合中拆分奇数和偶数以及两者的总和 2022-01-01
- Spring Boot Freemarker从2.2.0升级失败 2022-01-01