DBMS级别的管道和过滤器:拆分合并输出流

Pipes and filters at DBMS-level: Splitting the MERGE output stream(DBMS级别的管道和过滤器:拆分合并输出流)

本文介绍了DBMS级别的管道和过滤器:拆分合并输出流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

方案

我们有一个非常标准的数据导入过程,在该过程中我们加载 staging表,然后MERGE将其放入target表。 新需求(绿色)涉及捕获导入数据的子集 放到单独的queue表中以进行完全无关的处理。

"挑战"

(1)子集由以下记录的选择组成: 仅新插入target表。 (2)子集是一些插入列的投影,但也 至少有一列仅存在于源中(staging 表)。 (3)MERGE语句已使用OUTPUT..INTO子句 严格记录MERGE拍摄的$action,以便 PIVOT结果和COUNT插入、更新和 出于统计目的的删除。我们真的不喜欢缓冲 对整个数据集执行类似的操作,并且更倾向于聚合 飞翔上的金额。不用说,我们不想将更多数据添加到 此OUTPUT表。 (4)我们不想做MERGE 无论出于何种原因,即使是部分原因,都会执行第二次。这个 target表非常大,我们不能索引所有内容,并且 操作通常相当昂贵(分钟,而不是秒)。 (5)我们不考虑将MERGE的任何输出往返到 客户端只是为了使客户端可以通过以下方式将其路由到queue 马上把它送回去。数据必须保留在服务器上。 (6)我们希望避免在临时存储中缓冲整个数据集 在stagingqueue之间。

做这件事的最佳方式是什么?

故障

(A)仅将插入的记录入队的要求阻止我们 的OUTPUT..INTO子句中直接以queue表为目标 MERGE,因为它不允许任何WHERE子句。我们可以用一些 CASE标记不需要的记录以供后续删除的诡计 来自queue而不进行处理,但这似乎很疯狂。 (B)因为queue的某些列没有出现在 target表中,我们不能简单地在目标上添加插入触发器 表来加载queue。"数据流拆分"必须更早发生。 (C)由于我们已经在MERGE中使用了OUTPUT..INTO子句,因此我们 无法添加第二个OUTPUT子句MERGE嵌套到 INSERT..SELECT加载队列。这太可惜了,因为它 感觉对一些可行的东西来说,这是一个完全武断的限制 否则很好;SELECT只过滤具有 $action我们希望(INSERT)和INSERTqueue中 声明。因此,从理论上讲,DBMS可以避免缓冲整个 数据集,只需将其流式传输到queue。(注:我们没有追查 而且很可能它实际上并没有以这种方式优化计划。)

情况

我们觉得我们已经用尽了我们的选择,但决定求助于Hivemind 可以肯定的是。我们能想到的只有: (S1)创建target表的VIEW,该表也包含可为空的 仅用于queue的数据的列,并具有 SELECT语句将它们定义为NULL。然后,设置INSTEAD OF 同时填充target表和queue的触发器 恰如其分。最后,将MERGE连接到目标视图。这 是可行的,但是我们并不喜欢这个结构--它绝对是 看起来棘手。 (S2)放弃,使用以下命令在临时表中缓冲整个数据集 另一个MERGE..OUTPUT。在MERGE之后,立即复制数据 (再说一遍!)从临时表导入queue

推荐答案

我的理解是,主要障碍是SQL Server中OUTPUT子句的限制。它允许一个OUTPUT INTO table和/或一个OUTPUT将结果集返回给调用方。

您希望以两种不同的方式保存MERGE语句的结果:

  • MERGE影响的所有行,用于收集统计信息
  • 仅为queue
  • 插入行

简单变量

我会使用您的S2解决方案。至少一开始是这样。它很容易理解和维护,而且应该非常高效,因为最耗费资源的操作(MERGEintoTarget本身只执行一次)。下面还有第二个变体,比较它们在真实数据上的性能会很有趣。

所以:

  • MERGE
  • 中使用OUTPUT INTO @TempTable
  • 插入前INSERT@TempTableStats的所有行或聚合。如果您只需要聚合统计数据,则聚合此批次的结果并将其合并到最终Stats而不是复制所有行是有意义的。
  • INSERTQueue中仅"插入"了@TempTable中的行。

我将从@i-one的答案中提取样本数据。

架构

-- I'll return to commented lines later

CREATE TABLE [dbo].[TestTarget](
    -- [ID] [int] IDENTITY(1,1) NOT NULL,
    [foo] [varchar](10) NULL,
    [bar] [varchar](10) NULL
);

CREATE TABLE [dbo].[TestStaging](
    [foo] [varchar](10) NULL,
    [bar] [varchar](10) NULL,
    [baz] [varchar](10) NULL
);

CREATE TABLE [dbo].[TestStats](
    [MergeAction] [nvarchar](10) NOT NULL
);

CREATE TABLE [dbo].[TestQueue](
    -- [TargetID] [int] NOT NULL,
    [foo] [varchar](10) NULL,
    [baz] [varchar](10) NULL
);

样本数据

TRUNCATE TABLE [dbo].[TestTarget];
TRUNCATE TABLE [dbo].[TestStaging];
TRUNCATE TABLE [dbo].[TestStats];
TRUNCATE TABLE [dbo].[TestQueue];

INSERT INTO [dbo].[TestStaging]
    ([foo]
    ,[bar]
    ,[baz])
VALUES
    ('A', 'AA', 'AAA'),
    ('B', 'BB', 'BBB'),
    ('C', 'CC', 'CCC');

INSERT INTO [dbo].[TestTarget]
    ([foo]
    ,[bar])
VALUES
    ('A', 'A_'),
    ('B', 'B?');

合并

DECLARE @TempTable TABLE (
    MergeAction nvarchar(10) NOT NULL,
    foo varchar(10) NULL,
    baz varchar(10) NULL);

MERGE INTO TestTarget AS Dst
USING TestStaging AS Src
ON Dst.foo = Src.foo
WHEN MATCHED THEN
UPDATE SET
    Dst.bar = Src.bar
WHEN NOT MATCHED BY TARGET THEN
INSERT (foo, bar)
VALUES (Src.foo, Src.bar)
OUTPUT $action AS MergeAction, inserted.foo, Src.baz
INTO @TempTable(MergeAction, foo, baz)
;

INSERT INTO [dbo].[TestStats] (MergeAction)
SELECT T.MergeAction
FROM @TempTable AS T;

INSERT INTO [dbo].[TestQueue]
    ([foo]
    ,[baz])
SELECT
    T.foo
    ,T.baz
FROM @TempTable AS T
WHERE T.MergeAction = 'INSERT'
;

SELECT * FROM [dbo].[TestTarget];
SELECT * FROM [dbo].[TestStats];
SELECT * FROM [dbo].[TestQueue];

结果

TestTarget
+-----+-----+
| foo | bar |
+-----+-----+
| A   | AA  |
| B   | BB  |
| C   | CC  |
+-----+-----+

TestStats
+-------------+
| MergeAction |
+-------------+
| INSERT      |
| UPDATE      |
| UPDATE      |
+-------------+

TestQueue
+-----+-----+
| foo | baz |
+-----+-----+
| C   | CCC |
+-----+-----+

第二个变量

在SQL Server 2014 Express上测试。

OUTPUT子句可以将其结果集发送到表和调用方。因此,OUTPUT INTO可以直接进入Stats,如果我们将MERGE语句包装到存储过程中,则可以使用INSERT ... EXEC进入Queue

如果您检查执行计划,您会发现INSERT ... EXEC无论如何都会在后台创建一个临时表(另请参阅The Hidden Costs of INSERT EXECby Adam Machanic),所以我预计当您显式创建临时表时,总体性能将类似于第一个变体。

还有一个问题需要解决:Queue表应该只有"插入"的行,而不是所有受影响的行。要实现这一点,您可以在Queue表上使用触发器来丢弃"INSERT"之外的行。另一种可能性是使用IGNORE_DUP_KEY = ON定义唯一索引,并以这样的方式准备数据:"未插入"的行将违反唯一索引,并且不会插入到表中。

因此,我将向Target表中添加一个ID IDENTITY列,并向Queue表中添加一个TargetID列。(在上面的脚本中取消对它们的注释)。 另外,我将向Queue表添加索引:

CREATE UNIQUE NONCLUSTERED INDEX [IX_TargetID] ON [dbo].[TestQueue]
(
    [TargetID] ASC
) WITH (
PAD_INDEX = OFF, 
STATISTICS_NORECOMPUTE = OFF, 
SORT_IN_TEMPDB = OFF, 
IGNORE_DUP_KEY = ON, 
DROP_EXISTING = OFF, 
ONLINE = OFF, 
ALLOW_ROW_LOCKS = ON, 
ALLOW_PAGE_LOCKS = ON)

重要部分是UNIQUEIGNORE_DUP_KEY = ON

以下是MERGE的存储过程:

CREATE PROCEDURE [dbo].[TestMerge]
AS
BEGIN
    SET NOCOUNT ON;
    SET XACT_ABORT ON;

    MERGE INTO dbo.TestTarget AS Dst
    USING dbo.TestStaging AS Src
    ON Dst.foo = Src.foo
    WHEN MATCHED THEN
    UPDATE SET
        Dst.bar = Src.bar
    WHEN NOT MATCHED BY TARGET THEN
    INSERT (foo, bar)
    VALUES (Src.foo, Src.bar)
    OUTPUT $action INTO dbo.TestStats(MergeAction)
    OUTPUT CASE WHEN $action = 'INSERT' THEN inserted.ID ELSE 0 END AS TargetID, 
    inserted.foo,
    Src.baz
    ;

END

用法

TRUNCATE TABLE [dbo].[TestTarget];
TRUNCATE TABLE [dbo].[TestStaging];
TRUNCATE TABLE [dbo].[TestStats];
TRUNCATE TABLE [dbo].[TestQueue];

-- Make sure that `Queue` has one special row with TargetID=0 in advance.
INSERT INTO [dbo].[TestQueue]
    ([TargetID]
    ,[foo]
    ,[baz])
VALUES
    (0
    ,NULL
    ,NULL);

INSERT INTO [dbo].[TestStaging]
    ([foo]
    ,[bar]
    ,[baz])
VALUES
    ('A', 'AA', 'AAA'),
    ('B', 'BB', 'BBB'),
    ('C', 'CC', 'CCC');

INSERT INTO [dbo].[TestTarget]
    ([foo]
    ,[bar])
VALUES
    ('A', 'A_'),
    ('B', 'B?');

INSERT INTO [dbo].[TestQueue]
EXEC [dbo].[TestMerge];

SELECT * FROM [dbo].[TestTarget];
SELECT * FROM [dbo].[TestStats];
SELECT * FROM [dbo].[TestQueue];

结果

TestTarget
+----+-----+-----+
| ID | foo | bar |
+----+-----+-----+
|  1 | A   | AA  |
|  2 | B   | BB  |
|  3 | C   | CC  |
+----+-----+-----+

TestStats
+-------------+
| MergeAction |
+-------------+
| INSERT      |
| UPDATE      |
| UPDATE      |
+-------------+

TestQueue
+----------+------+------+
| TargetID | foo  | baz  |
+----------+------+------+
|        0 | NULL | NULL |
|        3 | C    | CCC  |
+----------+------+------+

INSERT ... EXEC期间将有额外的消息:

Duplicate key was ignored.

IFMERGE更新了一些行。当唯一索引由于IGNORE_DUP_KEY = ON而在INSERT期间丢弃某些行时发送此警告消息。

插入重复键值时将出现警告消息 转换为唯一索引。只有违反唯一性约束的行 将失败。

这篇关于DBMS级别的管道和过滤器:拆分合并输出流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本文标题为:DBMS级别的管道和过滤器:拆分合并输出流

基础教程推荐