使用 Snowpark Python JDBC

通过 Snowpark Python JDBC,Snowpark Python 用户能以编程方式将数据从外部数据库提取到 Snowflake 中。这允许您使用 JDBC 驱动程序连接到外部数据库。

利用这些 APIs,您可以无缝地将数据提取到 Snowflake 表中,并使用 Snowpark DataFrames 进行转换,以便实现高级分析。

JDBC 的使用方式与 Spark JDBC API 类似。大多数参数的设计保持相同或相似,以提升一致性。有关比较 Snowpark Python JDBC 与 Spark JDBC API 的更多信息,请参阅下表:

Snowpark JDBC 参数

参数

Snowpark Python JDBC

url

用于通过 JDBC 驱动程序连接到外部数据源的连接字符串

udtf_configs

包含创建 UDTF 所需配置的字典

properties

包含建立 JDBC 连接过程中所需的键值对的字典

table

源数据库中的表

query

包装为用于读取数据的子查询的 SQL 查询

column

用于并行读取的分区列

lower_bound

分区的下限

upper_bound

分区的上限

num_partitions

用于并行处理的分区数量

query_timeout

SQL 执行的超时时长(以秒为单位)。

fetch_size

每次往返提取的行数

custom_schema

用于从外部数据库提取数据的自定义架构

predicates

WHERE 子句分区的条件列表

session_init_statement

在会话初始化时执行 SQL 或 PL/SQL 语句

了解并行

Python JDBC 目前有一种形式的底层引入机制:

UDTF 引入

所有工作负载都在 Snowflake 服务器上运行。Snowpark 创建一个 Java UDTF 并并行调用它,以将数据引入到 Snowflake 临时表中。因此,此功能需要 udtf_configs 参数。

Snowpark Python JDBC 有两种并行化和加速引入的方法:

分区列

该方法在用户调用 jdbc() 时,根据四个参数将源数据划分为多个分区:

  • column

  • lower_bound

  • upper_bound

  • num_partitions

这四个参数必须同时设置,且 column 必须为数字或日期类型。

谓词

此方法根据参数谓词将源数据划分为多个分区,这些谓词是适合包含在 WHERE 子句中的表达式列表,其中每个表达式定义一个分区。谓词提供了一种更灵活的分区划分方式;例如,您可以在布尔列或非数字列上划分分区。

Snowpark Python JDBC 还允许调整分区内的并行度级别:

Fetch_size

在分区内,API 按由 fetch_size 定义的区块提取行。这些行在提取时会并行写入 Snowflake,从而允许读取和写入重叠进行,并最大限度地提高吞吐量。

使用 JDBC 从外部数据源引入数据

使用 JDBC 从 Snowpark 客户端引入数据

  1. 使用 Snowpark 或 Snowsight 将 JDBC 驱动程序 jar 文件上传到 Snowflake 暂存区

    • 使用 Snowpark 上传。

      在 Snowpark 中,创建会话后运行以下代码:

      session.file.put("<your directory>/<your file name>", "@<your stage name>/<stage path>")
      
      Copy
    • 按照以下步骤所述,使用 Snowsight 进行上传。

      1. 在 Snowsight 中,点击 Catalog -> Database Explorer

      2. 在数据库左侧搜索栏中,点击 [your database name] -> [your schema name] -> stages -> [your stage name]

      3. 点击暂存区页面的右上角的 “+File” 按钮。

  2. 配置密钥、网络规则和外部访问集成。

    -- Configure a secret to allow egress to the source endpoint
    CREATE OR REPLACE SECRET <your secret>
        TYPE = PASSWORD
        USERNAME = '<your username>'
        PASSWORD = '<your password>';
    
    -- Configure a network rule to allow egress to the source endpoint
    CREATE OR REPLACE NETWORK RULE <your network rule>
      TYPE = HOST_PORT
      MODE = EGRESS
      VALUE_LIST = ('<your host>:<your port>');
    
    -- Configure an external access integration
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION <your integration>
      ALLOWED_NETWORK_RULES = (<your network rule>)
      ALLOWED_AUTHENTICATION_SECRETS = (<your secret>)
      ENABLED = true;
    
    Copy
  3. 在 Snowpark 客户端使用 Snowpark JDBC 从目标提取数据。

    connection_str=f"jdbc:<your dbms>://<your host>:<your port>/<your db>"
    udtf_configs = {
        "external_access_integration": "<your integration>",
        "secret": "<your secret>",
        "imports": ["<your stage path to jdbc jar file>"]
    }
    
    # Call jdbc to pull data from target table
    df_table = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
        )
    
    # Call jdbc to pull data from target query
    df_query = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            query="select * from <your table>",
        )
    
    # Pull data from target table with parallelism using partition column
    df_table_partition_column = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
            fetch_size=100000,
            num_partitions=4,
            column="ID",
            upper_bound=10000,
            lower_bound=0
        )
    
    # Pull data from target table with parallelism using predicates
    df_table_predicates = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
            fetch_size=100000,
            predicates = [
                "ID < 3",
                "ID >= 3"
            ]
        )
    
    Copy

使用 JDBC 从存储过程引入数据

  1. 使用 Snowsight 将 JDBC 驱动程序 jar 文件上传到 Snowflake 暂存区

    • 在 Snowsight 中,点击 Catalog -> Database Explorer

    • 数据库左侧搜索栏中,点击 [your database name] -> [your schema name] -> stages -> [your stage name]

    • 点击暂存区页面的右上角的 “+File” 按钮。

  2. 配置密钥、网络规则和外部访问集成。

    -- Configure a secret to allow egress to the source endpoint
    CREATE OR REPLACE SECRET <your secret>
        TYPE = PASSWORD
        USERNAME = '<your username>'
        PASSWORD = '<your password>';
    
    -- Configure a network rule to allow egress to the source endpoint
    CREATE OR REPLACE NETWORK RULE <your network rule>
      TYPE = HOST_PORT
      MODE = EGRESS
      VALUE_LIST = ('<your host>:<your port>');
    
    -- Configure an external access integration
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION <your integration>
      ALLOWED_NETWORK_RULES = (<your network rule>)
      ALLOWED_AUTHENTICATION_SECRETS = (<your secret>)
      ENABLED = true;
    
    Copy
  3. 在存储过程中使用 Snowpark JDBC 从目标提取数据。

    CREATE OR REPLACE PROCEDURE sp_jdbc()
    RETURNS STRING
    LANGUAGE PYTHON
    RUNTIME_VERSION = '3.10'
    PACKAGES = ('snowflake-snowpark-python')
    HANDLER = 'run'
    AS
    $$
    import time
    def run(session):
        connection_str=f"jdbc:<your dbms>://<your host>:<your port>/<your db>"
        udtf_configs = {
            "external_access_integration": "<your integration>",
            "secret": "<your secret>",
            "imports": ["<your stage path to jdbc jar file>"]
        }
    
        # Call jdbc to pull data from target table
        df_table = session.read.jdbc(
                url=connection_str,
                udtf_configs=udtf_configs,
                table="<your table>",
            )
    
        # Call jdbc to pull data from target query
        df_query = session.read.jdbc(
                url=connection_str,
                udtf_configs=udtf_configs,
                query="select * from <your table>",
            )
    
        # Pull data from target table with parallelism using partition column
        df_table_partition_column = session.read.jdbc(
                url=connection_str,
                udtf_configs=udtf_configs,
                table="<your table>",
                fetch_size=100000,
                num_partitions=4,
                column="ID",
                upper_bound=10000,
                lower_bound=0
            )
    
        # Pull data from target table with parallelism using predicates
        df_table_predicates = session.read.jdbc(
                url=connection_str,
                udtf_configs=udtf_configs,
                table="<your table>",
                fetch_size=100000,
                predicates = [
                    "ID < 3",
                    "ID >= 3"
                ]
            )
        df_table.write.save_as_table("snowflake_table", mode="overwrite")
        return f"success"
    
    $$
    ;
    
    call sp_jdbc();
    select * from snowflake_table ;
    
    Copy

使用 JDBC 从 Snowflake 笔记本引入数据

  1. 使用 Snowsight 将 JDBC 驱动程序 jar 文件上传到 Snowflake 暂存区

    • 在 Snowsight 中,点击 Catalog -> Database Explorer

    • 数据库左侧搜索栏中,点击 [your database name] -> [your schema name] -> stages -> [your stage name]

    • 点击暂存区页面的右上角的 “+File” 按钮。

  2. 配置密钥、网络规则和外部访问集成。

    -- Configure a secret to allow egress to the source endpoint
    CREATE OR REPLACE SECRET <your secret>
        TYPE = PASSWORD
        USERNAME = '<your username>'
        PASSWORD = '<your password>';
    
    -- Configure a network rule to allow egress to the source endpoint
    CREATE OR REPLACE NETWORK RULE <your network rule>
      TYPE = HOST_PORT
      MODE = EGRESS
      VALUE_LIST = ('<your host>:<your port>');
    
    -- Configure an external access integration
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION <your integration>
      ALLOWED_NETWORK_RULES = (<your network rule>)
      ALLOWED_AUTHENTICATION_SECRETS = (<your secret>)
      ENABLED = true;
    
    Copy
  3. 在 Snowflake 笔记本中使用 Snowpark JDBC 从目标提取数据。

    import snowflake.snowpark.context
    session = snowflake.snowpark.context.get_active_session()
    connection_str=f"jdbc:<your dbms>://<your host>:<your port>/<your db>"
    udtf_configs = {
            "external_access_integration": "<your integration>",
            "secret": "<your secret>",
            "imports": ["<your stage path to jdbc jar file>"]
        }
    
    # Call jdbc to pull data from target table
    df_table = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
        )
    
    # Call jdbc to pull data from target query
    df_query = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            query="select * from <your table>",
        )
    
    # Pull data from target table with parallelism using partition column
    df_table_partition_column = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
            fetch_size=100000,
            num_partitions=4,
            column="ID",
            upper_bound=10000,
            lower_bound=0
        )
    
    # Pull data from target table with parallelism using predicates
    df_table_predicates = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
            fetch_size=100000,
            predicates = [
                "ID < 3",
                "ID >= 3"
            ]
        )
    
    Copy

源跟踪

使用 Snowpark JDBC 连接到 MySQL 时的源跟踪

  1. 在创建连接的函数中包含一个 Snowpark 标签:

    connection_str="jdbc:mysql://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python"
    udtf_configs = {
        "external_access_integration": "<your integration>",
        "secret": "<your secret>",
        "imports": ["<your stage path to jdbc jar file>"]
    }
    
    # Call dbapi to pull data from target table
    df_table = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
        )
    
    Copy
  2. 在数据源中运行以下 SQL,以捕获来自 Snowpark 且仍处于活动状态的查询:

    SELECT *
    FROM performance_schema.events_statements_history_long
    WHERE THREAD_ID = (
      SELECT THREAD_ID, NAME FROM performance_schema.threads WHERE NAME LIKE '%snowflake-snowpark-python%';
    )
    
    Copy

使用 Snowpark JDBC 连接到 SQL Server 时的源跟踪

  1. 在创建连接的函数中包含一个 Snowpark 标签:

    connection_str="jdbc:mysql://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python"
    udtf_configs = {
    "external_access_integration": "<your integration>",
    "secret": "<your secret>",
    "imports": ["<your stage path to jdbc jar file>"]
    }
    # Call dbapi to pull data from target table
    df_table = session.read.jdbc(
          url=connection_str,
          udtf_configs=udtf_configs,
          table="<your table>",
      )
    
    Copy
  2. 在数据源中运行以下 SQL,以捕获来自 Snowpark 且仍处于活动状态的查询:

    SELECT
      s.session_id,
      s.program_name,
      r.status,
      t.text AS sql_text
    FROM sys.dm_exec_sessions s
    JOIN sys.dm_exec_requests r ON s.session_id = r.session_id
    CROSS APPLY sys.dm_exec_sql_text(r.sql_handle) AS t
    WHERE s.program_name = 'snowflake-snowpark-python';
    
    Copy

使用 Snowpark JDBC 连接到 PostgresSQL 时的源跟踪

  1. 在创建连接的函数中包含一个 Snowpark 标签:

    connection_str="jdbc:mysql://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python"
    udtf_configs = {
    "external_access_integration": "<your integration>",
    "secret": "<your secret>",
    "imports": ["<your stage path to jdbc jar file>"]
    }
    
    # Call dbapi to pull data from target table
    df_table = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
        )
    
    Copy
  2. 在数据源中运行以下 SQL,以捕获来自 Snowpark 且仍处于活动状态的查询:

    SELECT
      pid,
      usename AS username,
      datname AS database,
      application_name,
      client_addr,
      state,
      query_start,
      query
    FROM
      pg_stat_activity
    WHERE
      application_name = 'snowflake-snowpark-python';
    
    Copy

使用 Snowpark JDBC 连接到 Oracle 时的源跟踪

  1. 在创建连接的函数中包含一个 Snowpark 标签:

    connection_str="jdbc:mysql://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python"
    udtf_configs = {
    "external_access_integration": "<your integration>",
    "secret": "<your secret>",
    "imports": ["<your stage path to jdbc jar file>"]
    }
    # Call dbapi to pull data from target table
    df_table = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
        )
    
    Copy
  2. 在数据源中运行以下 SQL,以捕获来自 Snowpark 且仍处于活动状态的查询:

    SELECT
      sid,
      serial#,
      username,
      program,
      module,
      action,
      client_identifier,
      client_info,
      osuser,
      machine
    FROM v$session
    WHERE program = 'snowflake-snowpark-python';
    
    Copy

常用 DBMS 和类型支持

以下是不同 DBMS 系统的认证数据类型列表。如果源数据涉及其他数据类型,Snowpark Python JDBC 将尝试将其映射为尽力而为的 Snowflake 数据类型,或回退为字符串。

Oracle

  • INTEGER

  • NUMBER

  • BINARY_FLOAT

  • BINARY_DOUBLE

  • VARCHAR2

  • CHAR

  • CLOB

  • NCHAR

  • NVARCHAR2

  • NCLOB

  • DATE

  • TIMESTAMP

  • TIMESTAMP WITH TIME ZONE

  • TIMESTAMP WITH LOCAL TIME ZONE

  • RAW

PostgresSQL

  • BIGINT

  • BIGSERIAL

  • BIT

  • BIT VARYING

  • BOOLEAN

  • BOX

  • BYTEA

  • CHAR

  • VARCHAR

  • CIDR

  • CIRCLE

  • DATE

  • DOUBLE PRECISION

  • INET

  • INTEGER

  • INTERVAL

  • JSON

  • JSONB

  • LINE

  • LSEG

  • MACADDR

  • POINT

  • POLYGON

  • REAL

  • SMALLINT

  • SMALLSERIAL

  • SERIAL

  • TEXT

  • TIME

  • TIMESTAMP

  • TIMESTAMPTZ

  • TSQUERY

  • TSVECTOR

  • TXID_SNAPSHOT

  • UUID

  • XML

MySQL

  • INT

  • DECIMAL

  • INT

  • TINYINT

  • SMALLINT

  • MEDIUMINT

  • BIGINT

  • YEAR

  • FLOAT

  • DOUBLE

  • CHAR

  • VARCHAR

  • TINYTEXT

  • TEXT

  • MEDIUMTEXT

  • LONGTEXT

  • ENUM

  • SET

  • BIT

  • BINARY

  • VARBINARY

  • TINYBLOB

  • BLOB

  • MEDIUMBLOB

  • LONGBLOB

  • DATE

  • DATETIME

  • TIMESTAMP

  • TIME

  • JSON

SQL Server

  • INT

  • BIGINT

  • INT

  • SMALLINT

  • TINYINT

  • BIT

  • DECIMAL

  • NUMERIC

  • MONEY

  • SMALLMONEY

  • FLOAT

  • REAL

  • DATE

  • TIME

  • DATETIME

  • DATETIME2

  • SMALLDATETIME

  • CHAR

  • VARCHAR

  • VARCHAR(MAX)

  • TEXT

  • NCHAR

  • NVARCHAR

  • NVARCHAR(MAX)

  • NTEXT

  • BINARY

  • VARBINARY

  • VARBINARY(MAX)

  • IMAGE

  • UNIQUEIDENTIFIER

  • TIMESTAMP

Databricks

目前不支持使用 Snowpark Python JDBC 连接到 Databricks。

语言: 中文