我正在处理的应用程序是一个基于Java的ETL过程,它将数据加载到多个表中. DBMS是Infobright(基于MYSQL的DBMS,适用于数据仓库).数据加载应以原子方式完成;但是,出于性能原因,我想同时将数据加载到多个表中(使用LOAD D...
我正在处理的应用程序是一个基于Java的ETL过程,它将数据加载到多个表中. DBMS是Infobright(基于MYSQL的DBMS,适用于数据仓库).
数据加载应以原子方式完成;但是,出于性能原因,我想同时将数据加载到多个表中(使用LOAD DATA INFILE命令).这意味着我需要打开多个连接.
有没有任何解决方案允许我原子地并行地进行负载?
(我猜测答案可能取决于我加载的表格的引擎;大多数是Brighthouse,它允许事务,但没有XA和没有保存点).
为了进一步澄清,我想避免让我们说:
>我将数据加载到5个表中
>我提交前4个表的负载
>第5个表的提交失败
在这种情况下,我无法回滚前4个加载,因为它们已经被提交.
解决方法:
介绍
正如我所承诺的,我已经破解了一个完整的例子.我使用MySQL并创建了三个表,如下所示:
CREATE TABLE `test{1,2,3}` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`data` varchar(255) NOT NULL UNIQUE,
PRIMARY KEY (`id`)
);
test2最初包含一行.
INSERT INTO `test2` (`data`) VALUES ('a');
(I’ve posted the full code to http://pastebin.com.)
以下示例执行了几项操作.
>将线程设置为3,确定要并行运行的作业数.
>创建线程连接数.
>为每个表分配一些示例数据(默认情况下,每个表的数据都是一个).
>创建要运行的作业的线程数,并使用数据加载它们.
>以线程数运行线程数并等待它们完成(成功与否).
>如果没有发生异常,则提交每个连接;否则它会回滚每一个.
>关闭连接(但这些可以重复使用).
(注意,我在SQLTask.call()中使用过Java 7的自动资源管理功能.)
逻辑
public static void main(String[] args) throws SQLException, InterruptedException {
int threads = 3;
List<Connection> connections = getConnections(threads);
Map<String, String> tableData = getTableData(threads);
List<SQLTask> tasks = getTasks(threads, connections);
setData(tableData, tasks);
try {
runTasks(tasks);
commitConnections(connections);
} catch (ExecutionException ex) {
rollbackConnections(connections);
} finally {
closeConnections(connections);
}
}
数据
private static Map<String, String> getTableData(int threads) {
Map<String, String> tableData = new HashMap<>();
for (int i = 1; i <= threads; i++)
tableData.put("test" + i, "a");
return tableData;
}
任务
private static final class SQLTask implements Callable<Void> {
private final Connection connection;
private String data;
private String table;
public SQLTask(Connection connection) {
this.connection = connection;
}
public void setTable(String table) {
this.table = table;
}
public void setData(String data) {
this.data = data;
}
@Override
public Void call() throws SQLException {
try (Statement statement = connection.createStatement()) {
statement.executeUpdate(String.format(
"INSERT INTO `%s` (data) VALUES ('%s');", table, data));
}
return null;
}
}
private static List<SQLTask> getTasks(int threads, List<Connection> connections) {
List<SQLTask> tasks = new ArrayList<>();
for (int i = 0; i < threads; i++)
tasks.add(new SQLTask(connections.get(i)));
return tasks;
}
private static void setData(Map<String, String> tableData, List<SQLTask> tasks) {
Iterator<Entry<String, String>> i = tableData.entrySet().iterator();
Iterator<SQLTask> j = tasks.iterator();
while (i.hasNext()) {
Entry<String, String> entry = i.next();
SQLTask task = j.next();
task.setTable(entry.getKey());
task.setData(entry.getValue());
}
}
跑
private static void runTasks(List<SQLTask> tasks)
throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(tasks.size());
List<Future<Void>> futures = executorService.invokeAll(tasks);
executorService.shutdown();
for (Future<Void> future : futures)
future.get();
}
结果
给定getTableData(…)返回的默认数据
test1 -> `a`
test2 -> `a`
test3 -> `a`
并且test2已经包含一个(并且数据列是唯一的)这一事实将导致第二个作业失败并抛出异常,因此每个连接都将被回滚.
如果不是返回bs,那么将安全地提交连接.
这可以与LOAD DATA类似地完成.
OP对我的回答做出回应之后,我意识到她/他想要做的事情是不可能以简单明了的方式做到的.
基本上问题是,在成功提交之后,无法回滚已提交的数据,因为操作是原子操作.鉴于在给定的情况下需要多次提交,除非一个人跟踪所有数据(在所有事务中)并且如果发生某些事情,则删除所有成功提交的内容.
关于提交和回滚问题有一个nice answer.
本文标题为:java – 使用多个连接的单个事务. (MYSQL / JDBC)
基础教程推荐
- Java深入讲解SPI的使用 2023-01-09
- springboot2学习世界著名程序springboot开发体验 2022-11-20
- Java设计模式之模板方法详解 2023-03-21
- java调用shell脚本及注意事项说明 2023-01-18
- SpringMVC使用注解配置方式 2022-11-20
- 智能 AI 代码生成工具 Cursor 安装和使用超详细教程 2023-07-15
- Java Ribbon与openfeign区别和用法讲解 2023-03-22
- SpringMVC实现上传下载文件 2023-05-14
- Spring Cloud Alibaba微服务组件Sentinel实现熔断限流 2023-01-08
- SpringBoot如何使用RateLimiter通过AOP方式进行限流 2022-12-07