使用 Snowpark Python DB-API 从外部数据源读取数据¶
使用 Snowpark Python DB-API,Snowpark Python 用户能以编程方式将数据从外部数据库提取到 Snowflake 中。它包括:
Python DB-API 支持: 使用 Python 的标准 DB-API 2.0 驱动程序连接到外部数据库。
简化设置: 使用
pip
安装必要的驱动程序,无需管理其他依赖关系。
利用这些 APIs,您可以无缝地将数据提取到 Snowflake 表中,并使用 Snowpark DataFrames 进行转换,以便实现高级分析。
使用 Snowpark Python DB-API¶
DB-API 的使用方式与 Spark JDBC API (https://spark.apache.org/docs/3.5.4/sql-data-sources-jdbc.html) 类似。大多数参数的设计保持相同或相似,以提升一致性。同时,Snowpark 重视采用直观命名惯例的 Python 优先设计,避免了 JDBC 特定配置。这为 Python 开发人员提供了熟悉的体验。有关将 Snowpark Python DB-API 与 Spark JDBC API 进行比较的更多信息,请参阅 DB-API 参数。
DB-API 参数¶
参数 |
Snowpark DB-API |
---|---|
|
创建 Python DB-API 连接的函数。 |
|
指定源数据库中的表。 |
|
将 SQL 查询作为子查询包装起来,用于读取数据。 |
|
用于并行读取的分区列。 |
|
分区的下限。 |
|
分区的上限。 |
|
用于并行处理的分区数量。 |
|
SQL 执行超时(以秒为单位)。 |
|
每次往返提取的行数。 |
|
用于从外部数据库提取数据的自定义架构。 |
|
用于从外部数据库并行获取数据的工作线程数。 |
|
WHERE 子句分区的条件列表。 |
|
在会话初始化时执行 SQL 或 PL/SQL 语句。 |
|
使用 Snowflake UDTF 执行工作负载以获得更好的性能。 |
|
上传之前要合并到单个 Parquet 文件中的已提取批次的数量。 |
了解并行¶
Snowpark Python DB-API 根据用户输入使用两种独立的并行形式:
基于分区的并行
当用户指定分区信息(例如,
column
、num_partitions
、lower_bound
、upper_bound
)或谓词时,Snowflake 会将查询拆分为多个分区。这些分区使用多处理进行并行处理,每个工作线程独立提取和写入其所属分区。每个分区内基于提取大小的并行
在分区内,API 按由
fetch_size
定义的区块提取行。这些行在提取时会并行写入 Snowflake,从而允许读取和写入能够重叠进行并最大限度地提高吞吐量。
这两种形式的并行是独立运行的。如果分区和 fetch_size
均未指定,则该函数会在写入 Snowflake 之前将整个源表加载到内存中。这会增加内存使用量并降低大型数据集的性能。
SQL Server¶
使用 DB-API 从 Snowpark 客户端连接到 SQL Server¶
要从 Snowpark 连接到 SQL Server,您需要以下三个软件包:
Snowpark:snowflake-snowpark-python[pandas] (https://pypi.org/project/snowflake-snowpark-python/)
SQL Server ODBC Driver: Microsoft ODBC Driver for SQL Server (https://learn.microsoft.com/en-us/sql/connect/odbc/microsoft-odbc-driver-for-sql-server)。安装驱动程序即表示,您同意 Microsoft 的 EULA。
开源 pyodbc 库:pyodbc (https://pypi.org/project/pyodbc/)
以下是从 Snowpark 客户端和存储过程连接到 SQL Server 所需的代码示例。
安装 Python SQL 驱动程序
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install.sh)" brew tap microsoft/mssql-release https://github.com/Microsoft/homebrew-mssql-release brew update HOMEBREW_ACCEPT_EULA=Y brew install msodbcsql18 mssql-tools18
安装
snowflake-snowpark-python[pandas]
和pyodbc
pip install snowflake-snowpark-python[pandas] pip install pyodbc
定义用于创建 SQL Server 连接的工厂方法
def create_sql_server_connection(): import pyodbc HOST = "mssql_host" PORT = "mssql_port" USERNAME = "mssql_username" PASSWORD = "mssql_password" DATABASE = "mssql_db" connection_str = ( f"DRIVER={{ODBC Driver 18 for SQL Server}};" f"SERVER={HOST},{PORT};" f"DATABASE={DATABASE};" f"UID={USERNAME};" f"PWD={PASSWORD};" ) connection = pyodbc.connect(connection_str) return connection # Call dbapi to pull data from mssql_table df = session.read.dbapi( create_sql_server_connection, table="mssql_table")
使用 DB-API 从存储过程连接到 SQL Server¶
配置外部访问集成,这是允许 Snowflake 连接到源端点所必需的。
备注
建议使用 PrivateLink 实现安全的数据传输,尤其是在处理敏感信息时。确保您的 Snowflake 账户启用了必要的 PrivateLink 权限,并且 PrivateLink 功能已在 Snowflake 笔记本环境中配置并处于活动状态。
配置密钥、允许出口到源端点的网络规则以及外部访问集成。
CREATE OR REPLACE SECRET mssql_secret TYPE = PASSWORD USERNAME = 'mssql_username' PASSWORD = 'mssql_password'; -- Configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE mssql_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('mssql_host:mssql_port'); -- Configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mssql_access_integration ALLOWED_NETWORK_RULES = (mssql_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (mssql_secret) ENABLED = true; -- Create or replace a Python stored procedure CREATE OR REPLACE PROCEDURE sp_mssql_dbapi() RETURNS TABLE() LANGUAGE PYTHON RUNTIME_VERSION='3.11' HANDLER='run' PACKAGES=('snowflake-snowpark-python', 'pyodbc', 'msodbcsql') EXTERNAL_ACCESS_INTEGRATIONS = (mssql_access_integration) SECRETS = ('cred' = mssql_secret ) AS $$ # Get user name and password from mssql_secret import _snowflake username_password_object = _snowflake.get_username_password('cred') USER = username_password_object.username PASSWORD = username_password_object.password # Define a method to connect to SQL server_hostname from snowflake.snowpark import Session def create_sql_server_connection(): import pyodbc host = "mssql_host" port = mssql_port username = USER password = PASSWORD database = "mssql_database" connection_str = ( f"DRIVER={{ODBC Driver 18 for SQL Server}};" f"SERVER={host},{port};" f"DATABASE={database};" f"UID={username};" f"PWD={password};" ) connection = pyodbc.connect(connection_str) return connection def run(session: Session): df = session.read.dbapi( create_sql_server_connection, table="mssql_table" ) return df $$; CALL sp_mssql_dbapi();
Oracle¶
要从 Snowpark 连接到 Oracle,您需要以下两个软件包:
Snowpark:snowflake-snowpark-python[pandas] (https://pypi.org/project/snowflake-snowpark-python/)
开源 oracledb 库:oracledb (https://pypi.org/project/oracledb/)
以下是从 Snowpark 客户端、存储过程和 Snowflake Notebooks 连接到 Oracle 所需的代码示例。
使用 DB-API 从 Snowpark 客户端连接到 Oracle¶
安装
snowflake-snowpark-python[pandas]
和oracledb
pip install snowflake-snowpark-python[pandas] pip install oradb
使用 DB-API 从 Oracle 提取数据并定义用于创建 Oracle 连接的工厂方法
def create_oracle_db_connection(): import oracledb HOST = "myhost" PORT = "myport" SERVICE_NAME = "myservice" USER = "myuser" PASSWORD = "mypassword" DSN = f"{HOST}:{PORT}/{SERVICE_NAME}" connection = oracledb.connect( user=USER, password=PASSWORD, dsn=DSN ) return connection # Call dbapi to pull data from mytable df = session.read.dbapi( create_oracle_db_connection, table="mytable")
使用 DB-API 从存储过程连接到 Oracle¶
需要外部访问集成才能允许 Snowflake 连接到源端点。
备注
建议使用 PrivateLink 实现安全的数据传输,尤其是在处理敏感信息时。确保您的 Snowflake 账户启用了必要的 PrivateLink 权限,并且 PrivateLink 功能已在 Snowflake 笔记本环境中配置并处于活动状态。
配置密钥、允许出口到源端点的网络规则以及外部访问集成。
-- Configure the secret, a network rule to allow egress to the source endpoint and external access integration. CREATE OR REPLACE SECRET ora_secret TYPE = PASSWORD USERNAME = 'ora_username' PASSWORD = 'ora_password'; -- Configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE ora_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('ora_host:ora_port'); -- Configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION ora_access_integration ALLOWED_NETWORK_RULES = (ora_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (ora_secret) ENABLED = true;
使用 Snowpark Python DB-API 在 Python 存储过程中从 Oracle 提取数据¶
CREATE OR REPLACE PROCEDURE sp_ora_dbapi()
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION='3.11'
HANDLER='run'
PACKAGES=('snowflake-snowpark-python', 'oracledb')
EXTERNAL_ACCESS_INTEGRATIONS = (ora_access_integration)
SECRETS = ('cred' = ora_secret )
AS $$
# Get user name and password from ora_secret
import _snowflake
username_password_object = _snowflake.get_username_password('cred')
USER = username_password_object.username
PASSWORD = username_password_object.password
# Define the factory method for creating a connection to Oracle
from snowflake.snowpark import Session
def create_oracle_db_connection():
import oracledb
host = "ora_host"
port = "ora_port"
service_name = "ora_service"
user = USER
password = PASSWORD
DSN = f"{host}:{port}/{service_name}"
connection = oracledb.connect(
user=USER,
password=PASSWORD,
dsn=DSN
)
return connection
def run(session: Session):
df = session.read.dbapi(
create_ora_connection,
table="ora_table"
)
return df
$$;
CALL sp_ora_dbapi();
使用 DB-API 从 Snowflake 笔记本连接到 Oracle¶
从笔记本软件包中选择
snowflake-snowpark-python
和oracledb
。配置密钥、允许出口到源端点的网络规则以及外部访问集成。
CREATE OR REPLACE SECRET ora_secret TYPE = PASSWORD USERNAME = 'ora_username' PASSWORD = 'ora_password'; ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = ora_secret); -- Configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE ora_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('ora_host:ora_port'); -- Configure an external access integration. CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION ora_access_integration ALLOWED_NETWORK_RULES = (ora_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (ora_secret) ENABLED = true;
为 Snowflake Notebooks 设置外部访问,然后重新启动笔记本会话。
使用 Snowpark Python DB-API 在 Snowflake 笔记本的 Python 单元中从 Oracle 提取数据¶
# Get user name and password from ora_secret import _snowflake username_password_object = _snowflake.get_username_password('snowflake-secret-object') USER = username_password_object.username PASSWORD = username_password_object.password import snowflake.snowpark.context session = snowflake.snowpark.context.get_active_session() # Define the factory method for creating a connection to Oracle def create_oracle_db_connection(): import oracledb host = "ora_host" port = "ora_port" service_name = "ora_service" user = USER password = PASSWORD DSN = f"{host}:{port}/{service_name}" connection = oracledb.connect( user=USER, password=PASSWORD, dsn=DSN ) return connection # Use dbapi to read data from ora_table df_ora = session.read.dbapi( create_oracle_db_connection, table='ora_table' ) # Save data into sf_table df_ora.write.mode("overwrite").save_as_table('sf_table')
PostgreSQL¶
要从 Snowpark 连接 PostgreSQL,您将需要以下两个软件包:
Snowpark:snowflake-snowpark-python[pandas] (https://pypi.org/project/snowflake-snowpark-python/)
开源 psycopg2 库:psycopg2 (https://pypi.org/project/psycopg2/)
以下是从 Snowpark 客户端、存储过程和 Snowflake 笔记本连接到 PostgreSQL 所需的代码示例。
使用 DB-API 从 Snowpark 客户端连接到 PostgreSQL¶
安装
psycopg2
pip install psycopg2
定义用于创建 PostgreSQL 连接的工厂方法
def create_pg_connection(): import psycopg2 connection = psycopg2.connect( host="pg_host", port=pg_port, dbname="pg_dbname", user="pg_user", password="pg_password", ) return connection # Call dbapi to pull data from pg_table df = session.read.dbapi( create_pg_connection, table="pg_table")
使用 DB-API 从存储过程连接到 PostgreSQL¶
配置外部访问集成,这是允许 Snowflake 连接到源端点所必需的。
备注
建议使用 PrivateLink 实现安全的数据传输,尤其是在处理敏感信息时。确保您的 Snowflake 账户启用了必要的 PrivateLink 权限,并且 PrivateLink 功能已在 Snowflake 笔记本环境中配置并处于活动状态。
配置密钥、允许出口到源端点的网络规则以及外部访问集成。
CREATE OR REPLACE SECRET pg_secret
TYPE = PASSWORD
USERNAME = 'pg_username'
PASSWORD = 'pg_password';
-- Configure a network rule.
CREATE OR REPLACE NETWORK RULE pg_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('pg_host:pg_port');
-- Configure an external access integration.
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION pg_access_integration
ALLOWED_NETWORK_RULES = (pg_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (pg_secret)
ENABLED = true;
使用 Snowpark Python DB-API 在 Python 存储过程中从 PostgreSQL 提取数据
CREATE OR REPLACE PROCEDURE sp_pg_dbapi()
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION='3.11'
HANDLER='run'
PACKAGES=('snowflake-snowpark-python', 'psycopg2')
EXTERNAL_ACCESS_INTEGRATIONS = (pg_access_integration)
SECRETS = ('cred' = pg_secret )
AS $$
# Get user name and password from pg_secret
import _snowflake
username_password_object = _snowflake.get_username_password('cred')
USER = username_password_object.username
PASSWORD = username_password_object.password
# Define the factory method for creating a connection to PostgreSQL
from snowflake.snowpark import Session
def create_pg_connection():
import psycopg2
connection = psycopg2.connect(
host="pg_host",
port=pg_port,
dbname="pg_dbname",
user=USER,
password=PASSWORD,
)
return connection
def run(session: Session):
df = session.read.dbapi(
create_pg_connection,
table="pg_table"
)
return df
$$;
CALL sp_pg_dbapi();
使用 DB-API 从 Snowflake 笔记本连接到 PostgreSQL¶
从 Snowflake 笔记本软件包 中选择
snowflake-snowpark-python
和psycopg2
。配置外部访问集成,这是允许 Snowflake 连接到源端点所必需的。
备注
建议使用 PrivateLink 实现安全的数据传输,尤其是在处理敏感信息时。确保您的 Snowflake 账户启用了必要的 PrivateLink 权限,并且 PrivateLink 功能已在 Snowflake 笔记本环境中配置并处于活动状态。
配置密钥、允许出口到源端点的网络规则以及外部访问集成。
-- Configure the secret
CREATE OR REPLACE SECRET pg_secret
TYPE = PASSWORD
USERNAME = 'pg_username'
PASSWORD = 'pg_password';
ALTER NOTEBOOK pg_notebook SET SECRETS = ('snowflake-secret-object' = pg_secret);
-- Configure the network rule to allow egress to the source endpoint
CREATE OR REPLACE NETWORK RULE pg_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('pg_host:pg_port');
-- Configure external access integration
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION pg_access_integration
ALLOWED_NETWORK_RULES = (pg_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (pg_secret)
ENABLED = true;
为 Snowflake Notebooks 设置外部访问,然后重新启动笔记本会话。
# Get the user name and password from :code:`pg_secret` import _snowflake username_password_object = _snowflake.get_username_password('snowflake-secret-object') USER = username_password_object.username PASSWORD = username_password_object.password import snowflake.snowpark.context session = snowflake.snowpark.context.get_active_session() # Define the factory method for creating a connection to PostgreSQL def create_pg_connection(): import psycopg2 connection = psycopg2.connect( host="pg_host", port=pg_port, dbname="pg_dbname", user=USER, password=PASSWORD, ) return connection # Use dbapi to read and save data from pg_table df = session.read.dbapi( create_pg_connection, table="pg_table" ) # Save data into sf_table df.write.mode("overwrite").save_as_table('sf_table')
MySQL¶
要从 Snowpark 连接 MySQL,您将需要以下两个软件包:
Snowpark:snowflake-snowpark-python[pandas] (https://pypi.org/project/snowflake-snowpark-python/)
开源 pymysql 库:PyMySQL (https://pypi.org/project/PyMySQL/)
以下是从 Snowpark 客户端、存储过程和 Snowflake 笔记本连接到 MySQL 所需的代码示例。
使用 DB-API 从 Snowpark 客户端连接到 MySQL¶
安装 pymysql
pip install snowflake-snowpark-python[pandas] pip install pymysql
定义用于创建 MySQL 连接的工厂方法
def create_mysql_connection():
import pymysql
connection = pymysql.connect(
host="mysql_host",
port=mysql_port,
database="mysql_db",
user="mysql_user",
password="mysql_password"
)
return connection
# Call dbapi to pull data from mysql_table
df = session.read.dbapi(
create_mysql_connection,
table="mysql_table"
)
使用 DB-API 从存储过程连接到 MySQL¶
配置外部访问集成,这是允许 Snowflake 连接到源端点所必需的。
备注
建议使用 PrivateLink 实现安全的数据传输,尤其是在处理敏感信息时。确保您的 Snowflake 账户启用了必要的 PrivateLink 权限,并且 PrivateLink 功能已在 Snowflake 笔记本环境中配置并处于活动状态。
配置密钥、允许出口到源端点的网络规则以及外部访问集成。
CREATE OR REPLACE SECRET mysql_secret
TYPE = PASSWORD
USERNAME = 'mysql_username'
PASSWORD = 'mysql_password';
-- Configure a network rule.
CREATE OR REPLACE NETWORK RULE mysql_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('mysql_host:mysql_port');
-- Configure an external access integration
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration
ALLOWED_NETWORK_RULES = (mysql_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret)
ENABLED = true;
使用 Snowpark Python DB-API 在 Python 存储过程中从 MySQL 提取数据。
CREATE OR REPLACE PROCEDURE sp_mysql_dbapi()
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION='3.11'
HANDLER='run'
PACKAGES=('snowflake-snowpark-python', 'pymysql')
EXTERNAL_ACCESS_INTEGRATIONS = (mysql_access_integration)
SECRETS = ('cred' = mysql_secret )
AS $$
# Get user name and password from mysql_secret
import _snowflake
username_password_object = _snowflake.get_username_password('cred')
USER = username_password_object.username
PASSWORD = username_password_object.password
# Define the factory method for creating a connection to MySQL
from snowflake.snowpark import session
def create_mysql_connection():
import pymysql
connection = pymysql.connect(
host="mysql_host",
port=mysql_port,
dbname="mysql_dbname",
user=USER,
password=PASSWORD,
)
return connection
def run(session: Session):
df = session.read.dbapi(
create_mysql_connection,
table="mysql_table"
)
return df
$$;
CALL sp_mysql_dbapi();
使用 DB-API 从 Snowflake 笔记本连接到 MySQL¶
从 Snowflake 笔记本软件包 中选择
snowflake-snowpark-python
和pymysql
。配置外部访问集成,这是允许 Snowflake 连接到源端点所必需的。
备注
建议使用 PrivateLink 实现安全的数据传输,尤其是在处理敏感信息时。确保您的 Snowflake 账户启用了必要的 PrivateLink 权限,并且 PrivateLink 功能已在 Snowflake 笔记本环境中配置并处于活动状态。
配置密钥并将其添加到 Snowflake 笔记本 中。
CREATE OR REPLACE SECRET mysql_secret
TYPE = PASSWORD
USERNAME = 'mysql_username'
PASSWORD = 'mysql_password';
ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = mysql_secret);
配置网络规则和外部访问集成。
CREATE OR REPLACE NETWORK RULE mysql_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('mysql_host:mysql_port');
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration
ALLOWED_NETWORK_RULES = (mysql_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret)
ENABLED = true;
为 Snowflake Notebooks 设置外部访问,然后重新启动笔记本会话。
# Get user name and password from mysql_secret
import _snowflake
username_password_object = _snowflake.get_username_password('snowflake-secret-object')
USER = username_password_object.username
PASSWORD = username_password_object.password
import snowflake.snowpark.context
session = snowflake.snowpark.context.get_active_session()
# Define the factory method for creating a connection to MySQL
def create_mysql_connection():
import pymysql
connection = pymysql.connect(
host="mysql_host",
port=mysql_port,
dbname="mysql_dbname",
user=USER,
password=PASSWORD,
)
return connection
# Call dbapi to pull data from mysql_table
df = session.read.dbapi(
create_mysql_connection,
table="mysql_table")
# Save data into sf_table
df.write.mode("overwrite").save_as_table('sf_table')
Databricks¶
要从 Snowpark 连接到 Databricks,您需要以下两个软件包:
Snowpark:snowflake-snowpark-python[pandas] (https://pypi.org/project/snowflake-snowpark-python/)
开源 psycopg2 库:databricks-sql-connector (https://pypi.org/project/databricks-sql-connector/)
以下是从 Snowpark 客户端、存储过程和 Snowflake 笔记本连接到 Databricks 所需的代码示例。
使用 DB-API 从 Snowpark 客户端连接到 Databricks¶
安装
databricks-sql-connector
:
pip install snowflake-snowpark-python[pandas]
pip install databricks-sql-connector
定义用于创建 Databricks 连接的工厂方法
def create_dbx_connection():
import databricks.sql
connection = databricks.sql.connect(
server_hostname=HOST,
http_path=PATH,
access_token=ACCESS_TOKEN
)
return connection
#Call dbapi to pull data from mytable
df = session.read.dbapi(
create_dbx_connection,
table="dbx_table")
使用 DB-API 从存储过程连接到 Databricks¶
从 Snowflake 笔记本软件包 中选择
snowflake-snowpark-python
和pymysql
。需要外部访问集成才能允许 Snowflake 连接到源端点。
备注
建议使用 PrivateLink 实现安全的数据传输,尤其是在处理敏感信息时。确保您的 Snowflake 账户启用了必要的 PrivateLink 权限,并且 PrivateLink 功能已在 Snowflake 笔记本环境中配置并处于活动状态。
配置密钥、允许出口到源端点的网络规则以及外部访问集成。
CREATE OR REPLACE SECRET dbx_secret
TYPE = GENERIC_STRING
SECRET_STRING = 'dbx_access_token';
-- Configure a network rule
CREATE OR REPLACE NETWORK RULE dbx_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('dbx_host:dbx_port');
-- Configure an external access integration
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION dbx_access_integration
ALLOWED_NETWORK_RULES = (dbx_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (dbx_secret)
ENABLED = true;
使用 Snowpark Python DB-API 在 Python 存储过程中从 Databricks 提取数据
CREATE OR REPLACE PROCEDURE sp_dbx_dbapi() RETURNS TABLE() LANGUAGE PYTHON RUNTIME_VERSION='3.11' HANDLER='run' PACKAGES=('snowflake-snowpark-python', 'databricks-sql-connector') EXTERNAL_ACCESS_INTEGRATIONS = (dbx_access_integration) SECRETS = ('cred' = dbx_secret ) AS $$ # Get user name and password from dbx_secret import _snowflake ACCESS_TOKEN = _snowflake.get_generic_secret_string('cred') from snowflake.snowpark import Session # define the method for creating a connection to Databricks def create_dbx_connection(): import databricks.sql connection = databricks.sql.connect( server_hostname="dbx_host", http_path="dbx_path", access_token=ACCESS_TOKEN, ) return connection def run(session: Session): df = session.read.dbapi( create_dbx_connection, table="dbx_table" ) return df $$; CALL sp_dbx_dbapi();
使用 DB-API 从 Snowflake 笔记本连接到 Databricks¶
从 Snowflake 笔记本软件包 中选择
snowflake-snowpark-python
和pymysql
。配置外部访问集成,这是允许 Snowflake 连接到源端点所必需的。
备注
建议使用 PrivateLink 实现安全的数据传输,尤其是在处理敏感信息时。确保您的 Snowflake 账户启用了必要的 PrivateLink 权限,并且 PrivateLink 功能已在 Snowflake 笔记本环境中配置并处于活动状态。
配置密钥并将其添加到 Snowflake 笔记本 中。
CREATE OR REPLACE SECRET dbx_secret TYPE = GENERIC_STRING SECRET_STRING = 'dbx_access_token'; ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = dbx_secret);
配置
CREATE OR REPLACE NETWORK RULE dbx_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('dbx_host:dbx_port'); CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION dbx_access_integration ALLOWED_NETWORK_RULES = (dbx_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (dbx_secret) ENABLED = true;
为 Snowflake Notebooks 设置外部访问,然后重新启动笔记本会话。
# Get user name and password from dbx_secret import _snowflake ACCESS_TOKEN = _snowflake.get_generic_secret_string('cred') import snowflake.snowpark.context session = snowflake.snowpark.context.get_active_session() # Define the factory method for creating a connection to Databricks def create_dbx_connection(): import databricks.sql connection = databricks.sql.connect( server_hostname="dbx_host", http_path="dbx_path", access_token=ACCESS_TOKEN, ) return connection # use dbapi to read data from dbx_table df = session.read.dbapi( create_dbx_connection, table="dbx_table" ) # save data into sf_table df.write.mode("overwrite").save_as_table('sf_table')
限制¶
驱动程序¶
Snowpark Python DB-API 仅支持符合 Python DB-API 2.0 的驱动程序(例如 pyodbc
、oracledb
)。此版本不支持 JDBC 驱动程序。