使用 Snowpark Python DB-API 从外部数据源读取数据

使用 Snowpark Python DB-API,Snowpark Python 用户能以编程方式将数据从外部数据库提取到 Snowflake 中。它包括:

  • Python DB-API 支持: 使用 Python 的标准 DB-API 2.0 驱动程序连接到外部数据库。

  • 简化设置: 使用 pip 安装必要的驱动程序,无需管理其他依赖关系。

利用这些 APIs,您可以无缝地将数据提取到 Snowflake 表中,并使用 Snowpark DataFrames 进行转换,以便实现高级分析。

使用 Snowpark Python DB-API

DB-API 的使用方式与 Spark JDBC API (https://spark.apache.org/docs/3.5.4/sql-data-sources-jdbc.html) 类似。大多数参数的设计保持相同或相似,以提升一致性。同时,Snowpark 重视采用直观命名惯例的 Python 优先设计,避免了 JDBC 特定配置。这为 Python 开发人员提供了熟悉的体验。有关将 Snowpark Python DB-API 与 Spark JDBC API 进行比较的更多信息,请参阅 DB-API 参数

DB-API 参数

参数

Snowpark DB-API

create_connection

创建 Python DB-API 连接的函数。

table

指定源数据库中的表。

query

将 SQL 查询作为子查询包装起来,用于读取数据。

column

用于并行读取的分区列。

lower_bound

分区的下限。

upper_bound

分区的上限。

num_partitions

用于并行处理的分区数量。

query_timeout

SQL 执行超时(以秒为单位)。

fetch_size

每次往返提取的行数。

custom_schema

用于从外部数据库提取数据的自定义架构。

max_workers

用于从外部数据库并行获取数据的工作线程数。

predicates

WHERE 子句分区的条件列表。

session_init_statement

在会话初始化时执行 SQL 或 PL/SQL 语句。

udtf_configs

使用 Snowflake UDTF 执行工作负载以获得更好的性能。

fetch_merge_count

上传之前要合并到单个 Parquet 文件中的已提取批次的数量。

了解并行

Snowpark Python DB-API 根据用户输入使用两种独立的并行形式:

  • 基于分区的并行

    当用户指定分区信息(例如,columnnum_partitionslower_boundupper_bound)或谓词时,Snowflake 会将查询拆分为多个分区。这些分区使用多处理进行并行处理,每个工作线程独立提取和写入其所属分区。

  • 每个分区内基于提取大小的并行

    在分区内,API 按由 fetch_size 定义的区块提取行。这些行在提取时会并行写入 Snowflake,从而允许读取和写入能够重叠进行并最大限度地提高吞吐量。

这两种形式的并行是独立运行的。如果分区和 fetch_size 均未指定,则该函数会在写入 Snowflake 之前将整个源表加载到内存中。这会增加内存使用量并降低大型数据集的性能。

SQL Server

使用 DB-API 从 Snowpark 客户端连接到 SQL Server

要从 Snowpark 连接到 SQL Server,您需要以下三个软件包:

  • Snowpark:snowflake-snowpark-python[pandas] (https://pypi.org/project/snowflake-snowpark-python/)

  • SQL Server ODBC Driver: Microsoft ODBC Driver for SQL Server (https://learn.microsoft.com/en-us/sql/connect/odbc/microsoft-odbc-driver-for-sql-server)。安装驱动程序即表示,您同意 Microsoft 的 EULA。

  • 开源 pyodbc 库:pyodbc (https://pypi.org/project/pyodbc/)

以下是从 Snowpark 客户端和存储过程连接到 SQL Server 所需的代码示例。

  • 安装 Python SQL 驱动程序

/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install.sh)"
  brew tap microsoft/mssql-release https://github.com/Microsoft/homebrew-mssql-release
  brew update
  HOMEBREW_ACCEPT_EULA=Y brew install msodbcsql18 mssql-tools18
Copy
  • 安装 snowflake-snowpark-python[pandas]pyodbc

pip install snowflake-snowpark-python[pandas]
pip install pyodbc
Copy
  • 定义用于创建 SQL Server 连接的工厂方法

def create_sql_server_connection():
    import pyodbc
    HOST = "mssql_host"
    PORT = "mssql_port"
    USERNAME = "mssql_username"
    PASSWORD = "mssql_password"
    DATABASE = "mssql_db"
    connection_str = (
        f"DRIVER={{ODBC Driver 18 for SQL Server}};"
        f"SERVER={HOST},{PORT};"
        f"DATABASE={DATABASE};"
        f"UID={USERNAME};"
        f"PWD={PASSWORD};"
    )
    connection = pyodbc.connect(connection_str)
    return connection


# Call dbapi to pull data from mssql_table

df = session.read.dbapi(
  create_sql_server_connection,
    table="mssql_table")
Copy

使用 DB-API 从存储过程连接到 SQL Server

  • 配置外部访问集成,这是允许 Snowflake 连接到源端点所必需的。

    备注

    建议使用 PrivateLink 实现安全的数据传输,尤其是在处理敏感信息时。确保您的 Snowflake 账户启用了必要的 PrivateLink 权限,并且 PrivateLink 功能已在 Snowflake 笔记本环境中配置并处于活动状态。

  • 配置密钥、允许出口到源端点的网络规则以及外部访问集成。

    CREATE OR REPLACE SECRET mssql_secret
      TYPE = PASSWORD
      USERNAME = 'mssql_username'
      PASSWORD = 'mssql_password';
    
    -- Configure a network rule to allow egress to the source endpoint
    
    CREATE OR REPLACE NETWORK RULE mssql_network_rule
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('mssql_host:mssql_port');
    
    -- Configure an external access integration
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mssql_access_integration
      ALLOWED_NETWORK_RULES = (mssql_network_rule)
      ALLOWED_AUTHENTICATION_SECRETS = (mssql_secret)
      ENABLED = true;
    
    -- Create or replace a Python stored procedure
    
    CREATE OR REPLACE PROCEDURE sp_mssql_dbapi()
      RETURNS TABLE()
      LANGUAGE PYTHON
      RUNTIME_VERSION='3.11'
      HANDLER='run'
      PACKAGES=('snowflake-snowpark-python', 'pyodbc', 'msodbcsql')
            EXTERNAL_ACCESS_INTEGRATIONS = (mssql_access_integration)
            SECRETS = ('cred' = mssql_secret )
    
    AS $$
    
    # Get user name and password from mssql_secret
    
    import _snowflake
    username_password_object = _snowflake.get_username_password('cred')
    USER = username_password_object.username
    PASSWORD = username_password_object.password
    
    # Define a method to connect to SQL server_hostname
    from snowflake.snowpark import Session
    def create_sql_server_connection():
        import pyodbc
    
        host = "mssql_host"
        port = mssql_port
        username = USER
        password = PASSWORD
        database = "mssql_database"
        connection_str = (
          f"DRIVER={{ODBC Driver 18 for SQL Server}};"
          f"SERVER={host},{port};"
          f"DATABASE={database};"
          f"UID={username};"
          f"PWD={password};"
        )
    
        connection = pyodbc.connect(connection_str)
        return connection
    
    def run(session: Session):
        df = session.read.dbapi(
            create_sql_server_connection,
            table="mssql_table"
        )
        return df
    $$;
    
    CALL sp_mssql_dbapi();
    
    Copy

Oracle

要从 Snowpark 连接到 Oracle,您需要以下两个软件包:

  • Snowpark:snowflake-snowpark-python[pandas] (https://pypi.org/project/snowflake-snowpark-python/)

  • 开源 oracledb 库:oracledb (https://pypi.org/project/oracledb/)

以下是从 Snowpark 客户端、存储过程和 Snowflake Notebooks 连接到 Oracle 所需的代码示例。

使用 DB-API 从 Snowpark 客户端连接到 Oracle

  • 安装 snowflake-snowpark-python[pandas]oracledb

    pip install snowflake-snowpark-python[pandas]
    pip install oradb
    
    Copy
  • 使用 DB-API 从 Oracle 提取数据并定义用于创建 Oracle 连接的工厂方法

    def create_oracle_db_connection():
      import oracledb
      HOST = "myhost"
      PORT = "myport"
      SERVICE_NAME = "myservice"
      USER = "myuser"
      PASSWORD = "mypassword"
      DSN = f"{HOST}:{PORT}/{SERVICE_NAME}"
      connection = oracledb.connect(
          user=USER,
          password=PASSWORD,
          dsn=DSN
      )
      return connection
    
    # Call dbapi to pull data from mytable
    
    df = session.read.dbapi(
      create_oracle_db_connection,
      table="mytable")
    
    Copy

使用 DB-API 从存储过程连接到 Oracle

需要外部访问集成才能允许 Snowflake 连接到源端点。

备注

建议使用 PrivateLink 实现安全的数据传输,尤其是在处理敏感信息时。确保您的 Snowflake 账户启用了必要的 PrivateLink 权限,并且 PrivateLink 功能已在 Snowflake 笔记本环境中配置并处于活动状态。

  • 配置密钥、允许出口到源端点的网络规则以及外部访问集成。

    -- Configure the secret, a network rule to allow egress to the source endpoint and external access integration.
    
    CREATE OR REPLACE SECRET ora_secret
      TYPE = PASSWORD
      USERNAME = 'ora_username'
      PASSWORD = 'ora_password';
    
    -- Configure a network rule to allow egress to the source endpoint
    
    CREATE OR REPLACE NETWORK RULE ora_network_rule
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('ora_host:ora_port');
    
    -- Configure an external access integration
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION ora_access_integration
      ALLOWED_NETWORK_RULES = (ora_network_rule)
      ALLOWED_AUTHENTICATION_SECRETS = (ora_secret)
      ENABLED = true;
    
    Copy

使用 Snowpark Python DB-API 在 Python 存储过程中从 Oracle 提取数据

CREATE OR REPLACE PROCEDURE sp_ora_dbapi()
  RETURNS TABLE()
  LANGUAGE PYTHON
  RUNTIME_VERSION='3.11'
  HANDLER='run'
  PACKAGES=('snowflake-snowpark-python', 'oracledb')
  EXTERNAL_ACCESS_INTEGRATIONS = (ora_access_integration)
  SECRETS = ('cred' = ora_secret )
AS $$

# Get user name and password from ora_secret
import _snowflake
username_password_object = _snowflake.get_username_password('cred')
USER = username_password_object.username
PASSWORD = username_password_object.password

# Define the factory method for creating a connection to Oracle

from snowflake.snowpark import Session

def create_oracle_db_connection():
    import oracledb
    host = "ora_host"
    port = "ora_port"
    service_name = "ora_service"
    user = USER
    password = PASSWORD
    DSN = f"{host}:{port}/{service_name}"
    connection = oracledb.connect(
        user=USER,
        password=PASSWORD,
        dsn=DSN
    )
    return connection

def run(session: Session):
    df = session.read.dbapi(
        create_ora_connection,
        table="ora_table"
    )
    return df
$$;

CALL sp_ora_dbapi();
Copy

使用 DB-API 从 Snowflake 笔记本连接到 Oracle

  • 从笔记本软件包中选择 snowflake-snowpark-pythonoracledb

  • 配置密钥、允许出口到源端点的网络规则以及外部访问集成。

    CREATE OR REPLACE SECRET ora_secret
      TYPE = PASSWORD
      USERNAME = 'ora_username'
      PASSWORD = 'ora_password';
      ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = ora_secret);
    
    -- Configure a network rule to allow egress to the source endpoint
    
    CREATE OR REPLACE NETWORK RULE ora_network_rule
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('ora_host:ora_port');
    
    -- Configure an external access integration.
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION ora_access_integration
      ALLOWED_NETWORK_RULES = (ora_network_rule)
      ALLOWED_AUTHENTICATION_SECRETS = (ora_secret)
      ENABLED = true;
    
    Copy
  • 为 Snowflake Notebooks 设置外部访问,然后重新启动笔记本会话。

使用 Snowpark Python DB-API 在 Snowflake 笔记本的 Python 单元中从 Oracle 提取数据

# Get user name and password from ora_secret

import _snowflake
username_password_object = _snowflake.get_username_password('snowflake-secret-object')
USER = username_password_object.username
PASSWORD = username_password_object.password

import snowflake.snowpark.context
session = snowflake.snowpark.context.get_active_session()

# Define the factory method for creating a connection to Oracle

def create_oracle_db_connection():
    import oracledb
    host = "ora_host"
    port = "ora_port"
    service_name = "ora_service"
    user = USER
    password = PASSWORD
    DSN = f"{host}:{port}/{service_name}"
    connection = oracledb.connect(
        user=USER,
        password=PASSWORD,
        dsn=DSN
    )
    return connection

# Use dbapi to read data from ora_table

df_ora = session.read.dbapi(
  create_oracle_db_connection,
  table='ora_table'
)

# Save data into sf_table

df_ora.write.mode("overwrite").save_as_table('sf_table')
Copy

PostgreSQL

要从 Snowpark 连接 PostgreSQL,您将需要以下两个软件包:

  • Snowpark:snowflake-snowpark-python[pandas] (https://pypi.org/project/snowflake-snowpark-python/)

  • 开源 psycopg2 库:psycopg2 (https://pypi.org/project/psycopg2/)

以下是从 Snowpark 客户端、存储过程和 Snowflake 笔记本连接到 PostgreSQL 所需的代码示例。

使用 DB-API 从 Snowpark 客户端连接到 PostgreSQL

  • 安装 psycopg2

    pip install psycopg2
    
    Copy
  • 定义用于创建 PostgreSQL 连接的工厂方法

    def create_pg_connection():
        import psycopg2
        connection = psycopg2.connect(
            host="pg_host",
            port=pg_port,
            dbname="pg_dbname",
            user="pg_user",
            password="pg_password",
        )
        return connection
    
    
    # Call dbapi to pull data from pg_table
    
    df = session.read.dbapi(
      create_pg_connection,
      table="pg_table")
    
    Copy

使用 DB-API 从存储过程连接到 PostgreSQL

  • 配置外部访问集成,这是允许 Snowflake 连接到源端点所必需的。

    备注

    建议使用 PrivateLink 实现安全的数据传输,尤其是在处理敏感信息时。确保您的 Snowflake 账户启用了必要的 PrivateLink 权限,并且 PrivateLink 功能已在 Snowflake 笔记本环境中配置并处于活动状态。

  • 配置密钥、允许出口到源端点的网络规则以及外部访问集成。

CREATE OR REPLACE SECRET pg_secret
  TYPE = PASSWORD
  USERNAME = 'pg_username'
  PASSWORD = 'pg_password';

-- Configure a network rule.

CREATE OR REPLACE NETWORK RULE pg_network_rule
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('pg_host:pg_port');

-- Configure an external access integration.

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION pg_access_integration
  ALLOWED_NETWORK_RULES = (pg_network_rule)
  ALLOWED_AUTHENTICATION_SECRETS = (pg_secret)
  ENABLED = true;
Copy
  • 使用 Snowpark Python DB-API 在 Python 存储过程中从 PostgreSQL 提取数据

CREATE OR REPLACE PROCEDURE sp_pg_dbapi()
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION='3.11'
HANDLER='run'
PACKAGES=('snowflake-snowpark-python', 'psycopg2')
EXTERNAL_ACCESS_INTEGRATIONS = (pg_access_integration)
SECRETS = ('cred' = pg_secret )
AS $$

# Get user name and password from pg_secret

import _snowflake
username_password_object = _snowflake.get_username_password('cred')
USER = username_password_object.username
PASSWORD = username_password_object.password

# Define the factory method for creating a connection to PostgreSQL

from snowflake.snowpark import Session

def create_pg_connection():
    import psycopg2
    connection = psycopg2.connect(
        host="pg_host",
        port=pg_port,
        dbname="pg_dbname",
        user=USER,
        password=PASSWORD,
    )
    return connection

def run(session: Session):
    df = session.read.dbapi(
        create_pg_connection,
        table="pg_table"
    )
    return df

$$;
CALL sp_pg_dbapi();
Copy

使用 DB-API 从 Snowflake 笔记本连接到 PostgreSQL

  • Snowflake 笔记本软件包 中选择 snowflake-snowpark-pythonpsycopg2

  • 配置外部访问集成,这是允许 Snowflake 连接到源端点所必需的。

    备注

    建议使用 PrivateLink 实现安全的数据传输,尤其是在处理敏感信息时。确保您的 Snowflake 账户启用了必要的 PrivateLink 权限,并且 PrivateLink 功能已在 Snowflake 笔记本环境中配置并处于活动状态。

  • 配置密钥、允许出口到源端点的网络规则以及外部访问集成。

-- Configure the secret

CREATE OR REPLACE SECRET pg_secret
  TYPE = PASSWORD
  USERNAME = 'pg_username'
  PASSWORD = 'pg_password';
  ALTER NOTEBOOK pg_notebook SET SECRETS = ('snowflake-secret-object' = pg_secret);

 -- Configure the network rule to allow egress to the source endpoint

 CREATE OR REPLACE NETWORK RULE pg_network_rule
    MODE = EGRESS
    TYPE = HOST_PORT
    VALUE_LIST = ('pg_host:pg_port');

 -- Configure external access integration

 CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION pg_access_integration
    ALLOWED_NETWORK_RULES = (pg_network_rule)
    ALLOWED_AUTHENTICATION_SECRETS = (pg_secret)
    ENABLED = true;
Copy
  • 为 Snowflake Notebooks 设置外部访问,然后重新启动笔记本会话。

    # Get the user name and password from :code:`pg_secret`
    
    import _snowflake
    username_password_object = _snowflake.get_username_password('snowflake-secret-object')
    USER = username_password_object.username
    PASSWORD = username_password_object.password
    
    import snowflake.snowpark.context
    session = snowflake.snowpark.context.get_active_session()
    
    # Define the factory method for creating a connection to PostgreSQL
    
    def create_pg_connection():
        import psycopg2
        connection = psycopg2.connect(
            host="pg_host",
            port=pg_port,
            dbname="pg_dbname",
            user=USER,
            password=PASSWORD,
        )
        return connection
    
    # Use dbapi to read and save data from pg_table
    
    df = session.read.dbapi(
        create_pg_connection,
        table="pg_table"
    )
    
    # Save data into sf_table
    
    df.write.mode("overwrite").save_as_table('sf_table')
    
    Copy

MySQL

要从 Snowpark 连接 MySQL,您将需要以下两个软件包:

  • Snowpark:snowflake-snowpark-python[pandas] (https://pypi.org/project/snowflake-snowpark-python/)

  • 开源 pymysql 库:PyMySQL (https://pypi.org/project/PyMySQL/)

以下是从 Snowpark 客户端、存储过程和 Snowflake 笔记本连接到 MySQL 所需的代码示例。

使用 DB-API 从 Snowpark 客户端连接到 MySQL

  • 安装 pymysql

pip install snowflake-snowpark-python[pandas]
pip install pymysql
Copy
  • 定义用于创建 MySQL 连接的工厂方法

def create_mysql_connection():
    import pymysql
    connection = pymysql.connect(
        host="mysql_host",
        port=mysql_port,
        database="mysql_db",
        user="mysql_user",
        password="mysql_password"
    )
    return connection


# Call dbapi to pull data from mysql_table

df = session.read.dbapi(
    create_mysql_connection,
    table="mysql_table"
)
Copy

使用 DB-API 从存储过程连接到 MySQL

  • 配置外部访问集成,这是允许 Snowflake 连接到源端点所必需的。

    备注

    建议使用 PrivateLink 实现安全的数据传输,尤其是在处理敏感信息时。确保您的 Snowflake 账户启用了必要的 PrivateLink 权限,并且 PrivateLink 功能已在 Snowflake 笔记本环境中配置并处于活动状态。

  • 配置密钥、允许出口到源端点的网络规则以及外部访问集成。

CREATE OR REPLACE SECRET mysql_secret
  TYPE = PASSWORD
  USERNAME = 'mysql_username'
  PASSWORD = 'mysql_password';

-- Configure a network rule.

CREATE OR REPLACE NETWORK RULE mysql_network_rule
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('mysql_host:mysql_port');

-- Configure an external access integration

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration
  ALLOWED_NETWORK_RULES = (mysql_network_rule)
  ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret)
  ENABLED = true;
Copy
  • 使用 Snowpark Python DB-API 在 Python 存储过程中从 MySQL 提取数据。

CREATE OR REPLACE PROCEDURE sp_mysql_dbapi()
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION='3.11'
HANDLER='run'
PACKAGES=('snowflake-snowpark-python', 'pymysql')
EXTERNAL_ACCESS_INTEGRATIONS = (mysql_access_integration)
SECRETS = ('cred' = mysql_secret )
AS $$

# Get user name and password from mysql_secret

import _snowflake
    username_password_object = _snowflake.get_username_password('cred')
    USER = username_password_object.username
    PASSWORD = username_password_object.password

# Define the factory method for creating a connection to MySQL

from snowflake.snowpark import session

def create_mysql_connection():
    import pymysql
    connection = pymysql.connect(
        host="mysql_host",
        port=mysql_port,
        dbname="mysql_dbname",
        user=USER,
        password=PASSWORD,
    )
    return connection

def run(session: Session):
    df = session.read.dbapi(
        create_mysql_connection,
        table="mysql_table"
    )
    return df
$$;

CALL sp_mysql_dbapi();
Copy

使用 DB-API 从 Snowflake 笔记本连接到 MySQL

  • Snowflake 笔记本软件包 中选择 snowflake-snowpark-pythonpymysql

  • 配置外部访问集成,这是允许 Snowflake 连接到源端点所必需的。

    备注

    建议使用 PrivateLink 实现安全的数据传输,尤其是在处理敏感信息时。确保您的 Snowflake 账户启用了必要的 PrivateLink 权限,并且 PrivateLink 功能已在 Snowflake 笔记本环境中配置并处于活动状态。

  • 配置密钥并将其添加到 Snowflake 笔记本 中。

CREATE OR REPLACE SECRET mysql_secret
  TYPE = PASSWORD
  USERNAME = 'mysql_username'
  PASSWORD = 'mysql_password';

  ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = mysql_secret);
Copy
  • 配置网络规则和外部访问集成。

CREATE OR REPLACE NETWORK RULE mysql_network_rule
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('mysql_host:mysql_port');

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration
  ALLOWED_NETWORK_RULES = (mysql_network_rule)
  ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret)
  ENABLED = true;
Copy
# Get user name and password from mysql_secret
import _snowflake
username_password_object = _snowflake.get_username_password('snowflake-secret-object')
USER = username_password_object.username
PASSWORD = username_password_object.password

import snowflake.snowpark.context
session = snowflake.snowpark.context.get_active_session()

# Define the factory method for creating a connection to MySQL

def create_mysql_connection():
    import pymysql
    connection = pymysql.connect(
        host="mysql_host",
        port=mysql_port,
        dbname="mysql_dbname",
        user=USER,
        password=PASSWORD,
    )
    return connection

# Call dbapi to pull data from mysql_table

df = session.read.dbapi(
    create_mysql_connection,
    table="mysql_table")

# Save data into sf_table

df.write.mode("overwrite").save_as_table('sf_table')
Copy

Databricks

要从 Snowpark 连接到 Databricks,您需要以下两个软件包:

  • Snowpark:snowflake-snowpark-python[pandas] (https://pypi.org/project/snowflake-snowpark-python/)

  • 开源 psycopg2 库:databricks-sql-connector (https://pypi.org/project/databricks-sql-connector/)

以下是从 Snowpark 客户端、存储过程和 Snowflake 笔记本连接到 Databricks 所需的代码示例。

使用 DB-API 从 Snowpark 客户端连接到 Databricks

  • 安装 databricks-sql-connector

pip install snowflake-snowpark-python[pandas]
pip install databricks-sql-connector
Copy
  • 定义用于创建 Databricks 连接的工厂方法

def create_dbx_connection():
    import databricks.sql
    connection = databricks.sql.connect(
        server_hostname=HOST,
        http_path=PATH,
        access_token=ACCESS_TOKEN
    )
    return connection


#Call dbapi to pull data from mytable

df = session.read.dbapi(
    create_dbx_connection,
    table="dbx_table")
Copy

使用 DB-API 从存储过程连接到 Databricks

  • Snowflake 笔记本软件包 中选择 snowflake-snowpark-pythonpymysql

  • 需要外部访问集成才能允许 Snowflake 连接到源端点。

    备注

    建议使用 PrivateLink 实现安全的数据传输,尤其是在处理敏感信息时。确保您的 Snowflake 账户启用了必要的 PrivateLink 权限,并且 PrivateLink 功能已在 Snowflake 笔记本环境中配置并处于活动状态。

  • 配置密钥、允许出口到源端点的网络规则以及外部访问集成。

CREATE OR REPLACE SECRET dbx_secret
  TYPE = GENERIC_STRING
  SECRET_STRING = 'dbx_access_token';

-- Configure a network rule

CREATE OR REPLACE NETWORK RULE dbx_network_rule
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('dbx_host:dbx_port');

--  Configure an external access integration

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION dbx_access_integration
  ALLOWED_NETWORK_RULES = (dbx_network_rule)
  ALLOWED_AUTHENTICATION_SECRETS = (dbx_secret)
  ENABLED = true;
Copy
  • 使用 Snowpark Python DB-API 在 Python 存储过程中从 Databricks 提取数据

CREATE OR REPLACE PROCEDURE sp_dbx_dbapi()
  RETURNS TABLE()
  LANGUAGE PYTHON
  RUNTIME_VERSION='3.11'
  HANDLER='run'
  PACKAGES=('snowflake-snowpark-python', 'databricks-sql-connector')
  EXTERNAL_ACCESS_INTEGRATIONS = (dbx_access_integration)
  SECRETS = ('cred' = dbx_secret )
AS $$

# Get user name and password from dbx_secret

import _snowflake
ACCESS_TOKEN = _snowflake.get_generic_secret_string('cred')

from snowflake.snowpark import Session

# define the method for creating a connection to Databricks
def create_dbx_connection():
    import databricks.sql
    connection = databricks.sql.connect(
        server_hostname="dbx_host",
        http_path="dbx_path",
        access_token=ACCESS_TOKEN,
    )
    return connection

def run(session: Session):
    df = session.read.dbapi(
        create_dbx_connection,
        table="dbx_table"
    )
    return df

$$;

CALL sp_dbx_dbapi();
Copy

使用 DB-API 从 Snowflake 笔记本连接到 Databricks

  • Snowflake 笔记本软件包 中选择 snowflake-snowpark-pythonpymysql

  • 配置外部访问集成,这是允许 Snowflake 连接到源端点所必需的。

    备注

    建议使用 PrivateLink 实现安全的数据传输,尤其是在处理敏感信息时。确保您的 Snowflake 账户启用了必要的 PrivateLink 权限,并且 PrivateLink 功能已在 Snowflake 笔记本环境中配置并处于活动状态。

  • 配置密钥并将其添加到 Snowflake 笔记本 中。

    CREATE OR REPLACE SECRET dbx_secret
      TYPE = GENERIC_STRING
      SECRET_STRING = 'dbx_access_token';
      ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = dbx_secret);
    
    Copy
  • 配置

    CREATE OR REPLACE NETWORK RULE dbx_network_rule
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('dbx_host:dbx_port');
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION dbx_access_integration
      ALLOWED_NETWORK_RULES = (dbx_network_rule)
      ALLOWED_AUTHENTICATION_SECRETS = (dbx_secret)
      ENABLED = true;
    
    Copy
  • 为 Snowflake Notebooks 设置外部访问,然后重新启动笔记本会话。

    # Get user name and password from dbx_secret
    
    import _snowflake
    ACCESS_TOKEN = _snowflake.get_generic_secret_string('cred')
    
    import snowflake.snowpark.context
    session = snowflake.snowpark.context.get_active_session()
    
    # Define the factory method for creating a connection to Databricks
    
    def create_dbx_connection():
        import databricks.sql
        connection = databricks.sql.connect(
            server_hostname="dbx_host",
            http_path="dbx_path",
            access_token=ACCESS_TOKEN,
        )
        return connection
    
    # use dbapi to read data from dbx_table
    
    df = session.read.dbapi(
        create_dbx_connection,
        table="dbx_table"
    )
    
    # save data into sf_table
    
    df.write.mode("overwrite").save_as_table('sf_table')
    
    Copy

限制

驱动程序

Snowpark Python DB-API 仅支持符合 Python DB-API 2.0 的驱动程序(例如 pyodbcoracledb)。此版本不支持 JDBC 驱动程序。

语言: 中文