使用 Snowpark Python JDBC¶
通过 Snowpark Python JDBC,Snowpark Python 用户能以编程方式将数据从外部数据库提取到 Snowflake 中。这允许您使用 JDBC 驱动程序连接到外部数据库。
利用这些 APIs,您可以无缝地将数据提取到 Snowflake 表中,并使用 Snowpark DataFrames 进行转换,以便实现高级分析。
JDBC 的使用方式与 Spark JDBC API 类似。大多数参数的设计保持相同或相似,以提升一致性。有关比较 Snowpark Python JDBC 与 Spark JDBC API 的更多信息,请参阅下表:
Snowpark JDBC 参数¶
参数 |
Snowpark Python JDBC |
|---|---|
|
用于通过 JDBC 驱动程序连接到外部数据源的连接字符串 |
|
包含创建 UDTF 所需配置的字典 |
|
包含建立 JDBC 连接过程中所需的键值对的字典 |
|
源数据库中的表 |
|
包装为用于读取数据的子查询的 SQL 查询 |
|
用于并行读取的分区列 |
|
分区的下限 |
|
分区的上限 |
|
用于并行处理的分区数量 |
|
SQL 执行的超时时长(以秒为单位)。 |
|
每次往返提取的行数 |
|
用于从外部数据库提取数据的自定义架构 |
|
WHERE 子句分区的条件列表 |
|
在会话初始化时执行 SQL 或 PL/SQL 语句 |
了解并行¶
Python JDBC 目前有一种形式的底层引入机制:
- UDTF 引入
所有工作负载都在 Snowflake 服务器上运行。Snowpark 创建一个 Java UDTF 并并行调用它,以将数据引入到 Snowflake 临时表中。因此,此功能需要
udtf_configs参数。
Snowpark Python JDBC 有两种并行化和加速引入的方法:
- 分区列
该方法在用户调用
jdbc()时,根据四个参数将源数据划分为多个分区:columnlower_boundupper_boundnum_partitions
这四个参数必须同时设置,且
column必须为数字或日期类型。- 谓词
此方法根据参数谓词将源数据划分为多个分区,这些谓词是适合包含在
WHERE子句中的表达式列表,其中每个表达式定义一个分区。谓词提供了一种更灵活的分区划分方式;例如,您可以在布尔列或非数字列上划分分区。
Snowpark Python JDBC 还允许调整分区内的并行度级别:
- Fetch_size
在分区内,API 按由
fetch_size定义的区块提取行。这些行在提取时会并行写入 Snowflake,从而允许读取和写入重叠进行,并最大限度地提高吞吐量。
使用 JDBC 从外部数据源引入数据¶
使用 JDBC 从 Snowpark 客户端引入数据¶
使用 Snowpark 或 Snowsight 将 JDBC 驱动程序 jar 文件上传到 Snowflake 暂存区
使用 Snowpark 上传。
在 Snowpark 中,创建会话后运行以下代码:
session.file.put("<your directory>/<your file name>", "@<your stage name>/<stage path>")
按照以下步骤所述,使用 Snowsight 进行上传。
在 Snowsight 中,点击 Catalog -> Database Explorer。
在数据库左侧搜索栏中,点击 [your database name] -> [your schema name] -> stages -> [your stage name]。
点击暂存区页面的右上角的 “+File” 按钮。
配置密钥、网络规则和外部访问集成。
-- Configure a secret to allow egress to the source endpoint CREATE OR REPLACE SECRET <your secret> TYPE = PASSWORD USERNAME = '<your username>' PASSWORD = '<your password>'; -- Configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE <your network rule> TYPE = HOST_PORT MODE = EGRESS VALUE_LIST = ('<your host>:<your port>'); -- Configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION <your integration> ALLOWED_NETWORK_RULES = (<your network rule>) ALLOWED_AUTHENTICATION_SECRETS = (<your secret>) ENABLED = true;
在 Snowpark 客户端使用 Snowpark JDBC 从目标提取数据。
connection_str=f"jdbc:<your dbms>://<your host>:<your port>/<your db>" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call jdbc to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", ) # Call jdbc to pull data from target query df_query = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, query="select * from <your table>", ) # Pull data from target table with parallelism using partition column df_table_partition_column = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", fetch_size=100000, num_partitions=4, column="ID", upper_bound=10000, lower_bound=0 ) # Pull data from target table with parallelism using predicates df_table_predicates = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", fetch_size=100000, predicates = [ "ID < 3", "ID >= 3" ] )
使用 JDBC 从存储过程引入数据¶
使用 Snowsight 将 JDBC 驱动程序 jar 文件上传到 Snowflake 暂存区
在 Snowsight 中,点击 Catalog -> Database Explorer
数据库左侧搜索栏中,点击 [your database name] -> [your schema name] -> stages -> [your stage name]。
点击暂存区页面的右上角的 “+File” 按钮。
配置密钥、网络规则和外部访问集成。
-- Configure a secret to allow egress to the source endpoint CREATE OR REPLACE SECRET <your secret> TYPE = PASSWORD USERNAME = '<your username>' PASSWORD = '<your password>'; -- Configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE <your network rule> TYPE = HOST_PORT MODE = EGRESS VALUE_LIST = ('<your host>:<your port>'); -- Configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION <your integration> ALLOWED_NETWORK_RULES = (<your network rule>) ALLOWED_AUTHENTICATION_SECRETS = (<your secret>) ENABLED = true;
在存储过程中使用 Snowpark JDBC 从目标提取数据。
CREATE OR REPLACE PROCEDURE sp_jdbc() RETURNS STRING LANGUAGE PYTHON RUNTIME_VERSION = '3.10' PACKAGES = ('snowflake-snowpark-python') HANDLER = 'run' AS $$ import time def run(session): connection_str=f"jdbc:<your dbms>://<your host>:<your port>/<your db>" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call jdbc to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", ) # Call jdbc to pull data from target query df_query = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, query="select * from <your table>", ) # Pull data from target table with parallelism using partition column df_table_partition_column = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", fetch_size=100000, num_partitions=4, column="ID", upper_bound=10000, lower_bound=0 ) # Pull data from target table with parallelism using predicates df_table_predicates = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", fetch_size=100000, predicates = [ "ID < 3", "ID >= 3" ] ) df_table.write.save_as_table("snowflake_table", mode="overwrite") return f"success" $$ ; call sp_jdbc(); select * from snowflake_table ;
使用 JDBC 从 Snowflake 笔记本引入数据¶
使用 Snowsight 将 JDBC 驱动程序 jar 文件上传到 Snowflake 暂存区
在 Snowsight 中,点击 Catalog -> Database Explorer
数据库左侧搜索栏中,点击 [your database name] -> [your schema name] -> stages -> [your stage name]。
点击暂存区页面的右上角的 “+File” 按钮。
配置密钥、网络规则和外部访问集成。
-- Configure a secret to allow egress to the source endpoint CREATE OR REPLACE SECRET <your secret> TYPE = PASSWORD USERNAME = '<your username>' PASSWORD = '<your password>'; -- Configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE <your network rule> TYPE = HOST_PORT MODE = EGRESS VALUE_LIST = ('<your host>:<your port>'); -- Configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION <your integration> ALLOWED_NETWORK_RULES = (<your network rule>) ALLOWED_AUTHENTICATION_SECRETS = (<your secret>) ENABLED = true;
在 Snowflake 笔记本中使用 Snowpark JDBC 从目标提取数据。
import snowflake.snowpark.context session = snowflake.snowpark.context.get_active_session() connection_str=f"jdbc:<your dbms>://<your host>:<your port>/<your db>" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call jdbc to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", ) # Call jdbc to pull data from target query df_query = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, query="select * from <your table>", ) # Pull data from target table with parallelism using partition column df_table_partition_column = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", fetch_size=100000, num_partitions=4, column="ID", upper_bound=10000, lower_bound=0 ) # Pull data from target table with parallelism using predicates df_table_predicates = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", fetch_size=100000, predicates = [ "ID < 3", "ID >= 3" ] )
源跟踪¶
使用 Snowpark JDBC 连接到 MySQL 时的源跟踪¶
在创建连接的函数中包含一个 Snowpark 标签:
connection_str="jdbc:mysql://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call dbapi to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", )
在数据源中运行以下 SQL,以捕获来自 Snowpark 且仍处于活动状态的查询:
SELECT * FROM performance_schema.events_statements_history_long WHERE THREAD_ID = ( SELECT THREAD_ID, NAME FROM performance_schema.threads WHERE NAME LIKE '%snowflake-snowpark-python%'; )
使用 Snowpark JDBC 连接到 SQL Server 时的源跟踪¶
在创建连接的函数中包含一个 Snowpark 标签:
connection_str="jdbc:mysql://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call dbapi to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", )
在数据源中运行以下 SQL,以捕获来自 Snowpark 且仍处于活动状态的查询:
SELECT s.session_id, s.program_name, r.status, t.text AS sql_text FROM sys.dm_exec_sessions s JOIN sys.dm_exec_requests r ON s.session_id = r.session_id CROSS APPLY sys.dm_exec_sql_text(r.sql_handle) AS t WHERE s.program_name = 'snowflake-snowpark-python';
使用 Snowpark JDBC 连接到 PostgresSQL 时的源跟踪¶
在创建连接的函数中包含一个 Snowpark 标签:
connection_str="jdbc:mysql://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call dbapi to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", )
在数据源中运行以下 SQL,以捕获来自 Snowpark 且仍处于活动状态的查询:
SELECT pid, usename AS username, datname AS database, application_name, client_addr, state, query_start, query FROM pg_stat_activity WHERE application_name = 'snowflake-snowpark-python';
使用 Snowpark JDBC 连接到 Oracle 时的源跟踪¶
在创建连接的函数中包含一个 Snowpark 标签:
connection_str="jdbc:mysql://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call dbapi to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", )
在数据源中运行以下 SQL,以捕获来自 Snowpark 且仍处于活动状态的查询:
SELECT sid, serial#, username, program, module, action, client_identifier, client_info, osuser, machine FROM v$session WHERE program = 'snowflake-snowpark-python';
常用 DBMS 和类型支持¶
以下是不同 DBMS 系统的认证数据类型列表。如果源数据涉及其他数据类型,Snowpark Python JDBC 将尝试将其映射为尽力而为的 Snowflake 数据类型,或回退为字符串。
Oracle¶
INTEGER
NUMBER
BINARY_FLOAT
BINARY_DOUBLE
VARCHAR2
CHAR
CLOB
NCHAR
NVARCHAR2
NCLOB
DATE
TIMESTAMP
TIMESTAMP WITH TIME ZONE
TIMESTAMP WITH LOCAL TIME ZONE
RAW
PostgresSQL¶
BIGINT
BIGSERIAL
BIT
BIT VARYING
BOOLEAN
BOX
BYTEA
CHAR
VARCHAR
CIDR
CIRCLE
DATE
DOUBLE PRECISION
INET
INTEGER
INTERVAL
JSON
JSONB
LINE
LSEG
MACADDR
POINT
POLYGON
REAL
SMALLINT
SMALLSERIAL
SERIAL
TEXT
TIME
TIMESTAMP
TIMESTAMPTZ
TSQUERY
TSVECTOR
TXID_SNAPSHOT
UUID
XML
MySQL¶
INT
DECIMAL
INT
TINYINT
SMALLINT
MEDIUMINT
BIGINT
YEAR
FLOAT
DOUBLE
CHAR
VARCHAR
TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT
ENUM
SET
BIT
BINARY
VARBINARY
TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOB
DATE
DATETIME
TIMESTAMP
TIME
JSON
SQL Server¶
INT
BIGINT
INT
SMALLINT
TINYINT
BIT
DECIMAL
NUMERIC
MONEY
SMALLMONEY
FLOAT
REAL
DATE
TIME
DATETIME
DATETIME2
SMALLDATETIME
CHAR
VARCHAR
VARCHAR(MAX)
TEXT
NCHAR
NVARCHAR
NVARCHAR(MAX)
NTEXT
BINARY
VARBINARY
VARBINARY(MAX)
IMAGE
UNIQUEIDENTIFIER
TIMESTAMP
Databricks¶
目前不支持使用 Snowpark Python JDBC 连接到 Databricks。