Snowpark Migration Accelerator: 合并

描述

MERGE 语句将来自一个或多个源表的数据与目标表相结合,允许您在单个操作中执行更新和插入。根据您定义的条件,确定是更新现有行还是在目标表中插入新行。这使得它比使用单独的 INSERTUPDATEDELETE 语句更有效。使用相同的数据多次运行时,MERGE 语句始终会产生一致的结果。

在 Spark 中,您可以在 Spark 文档 (https://docs.databricks.com/en/sql/language-manual/delta-merge-into.html) 中找到 MERGE 语法。

MERGE INTO target_table_name [target_alias]
   USING source_table_reference [source_alias]
   ON merge_condition
   { WHEN MATCHED [ AND matched_condition ] THEN matched_action |
     WHEN NOT MATCHED [BY TARGET] [ AND not_matched_condition ] THEN not_matched_action |
     WHEN NOT MATCHED BY SOURCE [ AND not_matched_by_source_condition ] THEN not_matched_by_source_action } [...]

matched_action
 { DELETE |
   UPDATE SET * |
   UPDATE SET { column = { expr | DEFAULT } } [, ...] }

not_matched_action
 { INSERT * |
   INSERT (column1 [, ...] ) VALUES ( expr | DEFAULT ] [, ...] )

not_matched_by_source_action
 { DELETE |
   UPDATE SET { column = { expr | DEFAULT } } [, ...] }
Copy

在 Snowflake 中,MERGE 语句遵循以下语法(有关更多详细信息,请参阅 Snowflake 文档):

MERGE INTO <target_table> USING <source> ON <join_expr> { matchedClause | notMatchedClause } [ ... ]

matchedClause ::=
  WHEN MATCHED [ AND <case_predicate> ] THEN { UPDATE SET <col_name> = <expr> [ , <col_name2> = <expr2> ... ] | DELETE } [ ... ]
  
notMatchedClause ::=
   WHEN NOT MATCHED [ AND <case_predicate> ] THEN INSERT [ ( <col_name> [ , ... ] ) ] VALUES ( <expr> [ , ... ] )
Copy

关键区别在于 Snowflake 缺少与 WHEN NOT MATCHED BY SOURCE 子句直接等效的子句。要在 Snowflake 中实现类似的功能,需要一个变通解决方案。

示例源模式

辅助数据示例

备注

执行了以下代码示例以帮助您更好地了解它们的工作方式:

CREATE OR REPLACE people_source ( 
  person_id  INTEGER NOT NULL PRIMARY KEY, 
  first_name STRING NOT NULL, 
  last_name  STRING NOT NULL, 
  title      STRING NOT NULL,
);

CREATE OR REPLACE TABLE people_target ( 
  person_id  INTEGER NOT NULL PRIMARY KEY, 
  first_name STRING NOT NULL, 
  last_name  STRING NOT NULL, 
  title      STRING NOT NULL DEFAULT 'NONE'
);


INSERT INTO people_target VALUES (1, 'John', 'Smith', 'Mr');
INSERT INTO people_target VALUES (2, 'alice', 'jones', 'Mrs');
INSERT INTO people_source VALUES (2, 'Alice', 'Jones', 'Mrs.');
INSERT INTO people_source VALUES (3, 'Jane', 'Doe', 'Miss');
INSERT INTO people_source VALUES (4, 'Dave', 'Brown', 'Mr');
Copy
CREATE OR REPLACE TABLE people_source (
    person_id  INTEGER NOT NULL PRIMARY KEY,
    first_name VARCHAR(20) NOT NULL,
    last_name VARCHAR(20) NOT NULL,
    title VARCHAR(10) NOT NULL
);

CREATE OR REPLACE TABLE people_target (
    person_id  INTEGER NOT NULL PRIMARY KEY,
    first_name VARCHAR(20) NOT NULL,
    last_name VARCHAR(20) NOT NULL,
    title VARCHAR(10) NOT NULL DEFAULT 'NONE'
);


INSERT INTO people_target VALUES (1, 'John', 'Smith', 'Mr');
INSERT INTO people_target VALUES (2, 'alice', 'jones', 'Mrs');
INSERT INTO people_source VALUES (2, 'Alice', 'Jones', 'Mrs.');
INSERT INTO people_source VALUES (3, 'Jane', 'Doe', 'Miss');
INSERT INTO people_source VALUES (4, 'Dave', 'Brown', 'Mr');
Copy

MERGE 语句 – 插入和更新案例

Spark

MERGE INTO people_target pt 
USING people_source ps 
ON    (pt.person_id = ps.person_id) 
WHEN MATCHED THEN UPDATE 
  SET pt.first_name = ps.first_name, 
      pt.last_name = ps.last_name, 
      pt.title = DEFAULT 
WHEN NOT MATCHED THEN INSERT 
  (pt.person_id, pt.first_name, pt.last_name, pt.title) 
  VALUES (ps.person_id, ps.first_name, ps.last_name, ps.title);


SELECT * FROM people_target;
Copy
PERSON_ID|FIRST_NAME|LAST_NAME|TITLE|
---------+----------+---------+-----+
        1|John      |Smith    |Mr   |
        2|Alice     |Jones    |NONE |
        3|Jane      |Doe      |Miss |
        4|Dave      |Brown    |Mr   |
Copy

Snowflake

MERGE INTO people_target2 pt 
USING people_source ps 
ON    (pt.person_id = ps.person_id) 
WHEN MATCHED THEN UPDATE 
  SET pt.first_name = ps.first_name, 
      pt.last_name = ps.last_name, 
      pt.title = DEFAULT 
WHEN NOT MATCHED THEN INSERT 
  (pt.person_id, pt.first_name, pt.last_name, pt.title) 
  VALUES (ps.person_id, ps.first_name, ps.last_name, ps.title);


SELECT * FROM PUBLIC.people_target ORDER BY person_id;
Copy
PERSON_ID|FIRST_NAME|LAST_NAME|TITLE|
---------+----------+---------+-----+
        1|John      |Smith    |Mr   |
        2|Alice     |Jones    |NONE |
        3|Jane      |Doe      |Miss |
        4|Dave      |Brown    |Mr   |
Copy

在 Snowflake 中,INSERTUPDATE 操作的工作方式相同。在这两种 SQL 方言中,都可以使用 DEFAULT 作为表达式将列设置为其默认值。

Spark 允许在进行插入和更新操作时无需明确列出列名。如果未指定列,则该操作会影响表中的所有列。要使其正常工作,源表和目标表必须具有相同的列结构。如果列结构不匹配,您将收到解析错误。

UPDATE SET *
-- This is equivalent to UPDATE SET col1 = source.col1 [, col2 = source.col2 ...]

INSERT * 
-- This command copies all columns from the source table to the target table, matching columns by name. It is the same as explicitly listing all columns in both the INSERT and VALUES clauses.

Since Snowflake doesn't support these options, the migration process will instead list all columns from the target table.

### MERGE Statement - Delete Case

 
```{code} sql
:force:
MERGE INTO people_target pt 
USING people_source ps 
ON    (pt.person_id = ps.person_id)
WHEN MATCHED AND pt.person_id < 3 THEN DELETE
WHEN NOT MATCHED BY TARGET THEN INSERT *;

SELECT * FROM people_target;
Copy
PERSON_ID|FIRST_NAME|LAST_NAME|TITLE|
---------+----------+---------+-----+
        1|John      |Smith    |Mr   |
        3|Jane      |Doe      |Miss |
        4|Dave      |Brown    |Mr   |
Copy

Snowflake

MERGE INTO people_target pt 
USING people_source ps 
ON    (pt.person_id = ps.person_id)
WHEN MATCHED AND pt.person_id < 3 THEN DELETE
WHEN NOT MATCHED THEN INSERT 
  (pt.person_id, pt.first_name, pt.last_name, pt.title) 
  VALUES (ps.person_id, ps.first_name, ps.last_name, ps.title);


SELECT * FROM people_target;
Copy
PERSON_ID|FIRST_NAME|LAST_NAME|TITLE|
---------+----------+---------+-----+
        1|John      |Smith    |Mr   |
        3|Jane      |Doe      |Miss |
        4|Dave      |Brown    |Mr   |
Copy

Snowflake 中的 DELETE 操作的工作方式与其他数据库中的操作相同。您还可以在 MATCHEDNOT MATCHED 子句中添加其他条件。

WHEN NOT MATCHED BY TARGETWHEN NOT MATCHED 是等效子句,可以在 SQL merge 语句中交替使用。

MERGE 语句 – WHEN NOT MATCHED BY SOURCE

当目标表中的某行在源表中没有匹配的行时,就会触发 WHEN NOT MATCHED BY SOURCE 子句。当 merge_condition 和可选的 not_match_by_source_condition 的计算结果均为 true 时,就会出现这种情况。有关更多详细信息,请参阅 Spark 文档 (https://docs.databricks.com/en/sql/language-manual/delta-merge-into.html)。

Snowflake 不直接支持此子句。要处理此限制,可以对 DELETEUPDATE 操作使用以下替代方案。

MERGE INTO people_target pt 
USING people_source ps 
ON pt.person_id = ps.person_id
WHEN NOT MATCHED BY SOURCE THEN DELETE;


SELECT * FROM people_target;
Copy
PERSON_ID|FIRST_NAME|LAST_NAME|TITLE|
---------+----------+---------+-----+
        2|Alice     |Jones    |NONE |
Copy

Snowflake

MERGE INTO people_target pt 
USING (
    SELECT 
        pt.person_id 
    FROM
        people_target pt LEFT 
    JOIN people_source ps ON pt.person_id = ps.person_id
    WHERE 
        ps.person_id is null
) s_src
    ON s_src.person_id = pt.person_id
WHEN MATCHED THEN DELETE;

SELECT * FROM people_target;
Copy
PERSON_ID|FIRST_NAME|LAST_NAME|TITLE|
---------+----------+---------+-----+
        2|Alice     |Jones    |NONE |
Copy

Snowflake 中的 DELETE 操作的工作方式与其他数据库中的操作相同。您还可以在 MATCHEDNOT MATCHED 子句中添加其他条件。

已知问题

1. MERGE is very similar in both languages

虽然 Apache Spark 提供了其他功能,但您可以使用其他方法在 Snowflake 中实现类似的功能,如前面的示例所示。

语言: 中文