Snowpark Connect for Spark 的 JDBC 数据源设置

本节提供使用 Snowpark Connect JDBC 数据源功能从外部数据库读取数据以及向外部数据库写入数据(例如 MySQL 和 PostgreSQL)的指南和示例代码。它涵盖了客户端和 Snowflake 笔记本设置。

第 1 部分:客户端设置 (MySQL)

当从本地客户端应用程序(如 Python 脚本或 IDE)运行 Snowpark Connect 时,需要进行此设置。

先决条件

  1. Java 运行时环境 (JRE)/Java 开发工具包 (JDK):

    • 安装 JRE 或 JDK。Java 安装的架构(例如 64 位)**必须** 与 Python 安装的架构匹配。

    • *安装示例源:*` Adoptium Temurin 版本 <https://adoptium.net/temurin/releases/?version=11 (https://adoptium.net/temurin/releases/?version=11)>`__(如果使用 Java 11)。

  2. 设置 ``JAVA_HOME`` 环境变量:

    • 请将 JAVA_HOME 环境变量配置为指向您 Java 安装的根目录。

    • 示例 (macOS/Linux):

export JAVA_HOME=/path/to/your/jdk/home
  1. 设置 ``CLASSPATH`` 环境变量:

    • 将特定数据库的 JDBC 驱动程序 .jar 文件的路径添加到 CLASSPATH 环境变量中。这允许 Java 环境找到必要的驱动程序。

    • 示例(对于 MySQL 驱动程序):

export CLASSPATH=$CLASSPATH:/path/to/your/driver/mysql-connector-j-9.2.0.jar

示例客户端代码(读取自 MySQL)

此示例演示如何使用 spark_session.read.jdbc() 从 MySQL 数据库读取表。

from pyspark.sql import Row

# Adjust the URL for your server host, port, and database name
MYSQL_JDBC_URL = "jdbc:mysql://localhost/test_db"

# Ensure this driver name matches your version of the JDBC driver
MYSQL_JDBC_DRIVER = "com.mysql.cj.jdbc.Driver"

def test_jdbc_read_from_mysql(self, spark_session):
    # This code snippet uses the Snowpark Connect Spark session
    jdbc_df = spark_session.read.jdbc(
        MYSQL_JDBC_URL,
        "my_schema.my_table",  # Specify your table name in MySQL
        properties={
            "user": "root",           # Your MySQL user name
            "password": "****",       # Your password for MySQL
            "driver": MYSQL_JDBC_DRIVER,
        },
    ).collect()

    # After reading via JDBC, the data is loaded into a temporary table in Snowflake.
    # You can now perform any standard DataFrame operations supported by Snowpark Connect.

示例客户端代码(写入 MySQL)

此示例演示如何使用 spark_session.write.jdbc() 将数据写入 MySQL 数据库。

from pyspark.sql import Row

# Adjust the URL for your server host, port, and database name
MYSQL_JDBC_URL = "jdbc:mysql://localhost/test_db"

# Ensure this driver name matches your version of the JDBC driver
MYSQL_JDBC_DRIVER = "com.mysql.cj.jdbc.Driver"

def test_jdbc_write_overwrite_to_mysql(self, spark_session):
    # This code snippet uses the Snowpark Connect Spark session
    jdbc_df = spark_session.createDataFrame(
        [
            Row(a=1, b=2.0, c="test1"),
            Row(a=2, b=3.0, c="test2"),
            Row(a=4, b=5.0, c="test3"),
        ]
    )

    jdbc_df.write.jdbc(
        MYSQL_JDBC_URL,
        "my_schema.my_table2",  # Specify your table name in MySQL
        mode="overwrite",
        properties={
            "user": "root",        # Your MySQL user name
            "password": "****",    # Your password for MySQL
            "driver": MYSQL_JDBC_DRIVER,
        },
    )

第 2 部分:Snowflake 仓库笔记本设置 (PostgreSQL)

此设置适用于在 Snowflake 笔记本环境中直接运行 Snowpark Connect 的场景。

设置步骤

  • 添加 ``snowpark-connect`` 包:

    • 确保 snowflake-snowpark-connect 包已添加到您的笔记本环境中。

在 Snowflake 笔记本中添加 snowflake-snowpark-connect 包
  • 下载和上传 JDBC 驱动程序:

    • 为您的目标外部数据库(例如 PostgreSQL JDBC 驱动程序 (https://jdbc.postgresql.org/download/postgresql-42.7.8.jar))下载相应的 JDBC 驱动程序 .jar 文件。

    • 将下载的 .jar 文件直接上传到您的笔记本环境中。

  • 激活外部集成(网络规则和集成):

    • Snowflake 需要 外部访问集成,以允许笔记本与外部网络位置通信。您必须为外部数据库的主机和端口定义 网络规则

在 Snowflake 笔记本中配置网络规则设置
正在 Snowflake 笔记本中上传 JDBC 驱动程序 JAR 文件
-- 1. Create a Network Rule for the external database host and port
CREATE OR REPLACE NETWORK RULE JDBC_READ_NETWORK_RULE
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('hh-pgsql-public.ebi.ac.uk:5432'); -- REPLACE with your host:port

-- 2. Create the External Access Integration using the new Network Rule
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION JDBC_READ_ACCESS_INTEGRATION
  ALLOWED_NETWORK_RULES = (JDBC_READ_NETWORK_RULE)
  ENABLED = true;

-- NOTE: This integration must be referenced/activated within your notebook's settings.

示例仓库笔记本代码(读取自 PostgreSQL)

此示例显示了必要的 Python 代码,用于初始化会话、加载驱动程序以及从 PostgreSQL 中读取数据。

from snowflake import snowpark_connect
import jpype

# Initialize the Spark session for Snowpark Connect
spark = snowpark_connect.server.init_spark_session()
df = spark.sql("show schemas").limit(2)
df.show()

# Add the uploaded JDBC driver JAR to the Java Classpath using jpype
# Adjust the path to match the name of the JAR file you uploaded
jpype.addClassPath('/tmp/appRoot/postgresql-42.7.8.jar')

# Using public PostgreSQL DB as an example: https://rnacentral.org/help/public-database
jdbc_df = spark.read.jdbc(
    # Adjust this URL as per your server host, port, and database
    "jdbc:postgresql://hh-pgsql-public.ebi.ac.uk:5432/pfmegrnargs",
    "",  # Empty string for table name when providing a custom query
    properties={
        "user": "reader",                # Your PostgreSQL user name
        "password": "***",               # Your password for PostgreSQL
        "driver": "org.postgresql.Driver",
        # Use the "query" property for a custom SQL statement
        "query": """SELECT
  upi,     -- RNAcentral URS identifier
  taxid,   -- NCBI taxid
  ac       -- external accession
FROM xref
WHERE ac IN ('OTTHUMT00000106564.1', 'OTTHUMT00000416802.1')"""
    },
)

jdbc_df.show()

示例仓库笔记本代码(写入 PostgreSQL)

此示例显示了必要的 Python 代码,用于初始化会话、加载驱动程序以及将数据写入 PostgreSQL。

from snowflake import snowpark_connect
from pyspark.sql import Row
import jpype

# Initialize the Spark session for Snowpark Connect
spark = snowpark_connect.server.init_spark_session()
df = spark.sql("show schemas").limit(2)
df.show()

# Add the uploaded JDBC driver JAR to the Java Classpath using jpype
# Adjust the path to match the name of the JAR file you uploaded
jpype.addClassPath('/tmp/appRoot/postgresql-42.7.8.jar')

# Create dataframe
jdbc_df = spark.createDataFrame(
    [
        Row(a=1, b=2.0, c="test1"),
        Row(a=2, b=3.0, c="test2"),
        Row(a=4, b=5.0, c="test3"),
    ]
)

# Using public PostgreSQL DB as an example: https://rnacentral.org/help/public-database
jdbc_df.write.jdbc(
    # Adjust this URL as per your server host, port, and database
    "jdbc:postgresql://hh-pgsql-public.ebi.ac.uk:5432/pfmegrnargs",
    "public.my_table2",  # Specify your table name in PostgreSQL
    mode="overwrite",
    properties={
        "user": "writer",                # Your PostgreSQL user name
        "password": "***",               # Your password for PostgreSQL
        "driver": "org.postgresql.Driver",
    },
)

第 3 部分:Snowflake 工作区笔记本设置 (PostgreSQL)

此设置适用于在 Snowflake Workspace Notebook 环境中直接运行 Snowpark Connect 的场景。

设置步骤

  • snowpark-connect 包默认包含在 Workspace Notebook 中。

  • 下载和上传 JDBC 驱动程序:

    • 为您的目标外部数据库(例如 PostgreSQL JDBC 驱动程序 (https://jdbc.postgresql.org/download/postgresql-42.7.8.jar))下载相应的 JDBC 驱动程序 .jar 文件。

    • 将下载的 .jar 文件直接上传到您的笔记本环境中。

在 Snowflake Workspace Notebook 中上传 JDBC 驱动程序
  • 创建外部集成:

-- 1. Create a Network Rule for the external database host and port
CREATE OR REPLACE NETWORK RULE JDBC_READ_NETWORK_RULE
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('hh-pgsql-public.ebi.ac.uk:5432'); -- REPLACE with your host:port

-- 2. Create the External Access Integration using the new Network Rule
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION JDBC_READ_ACCESS_INTEGRATION
  ALLOWED_NETWORK_RULES = (JDBC_READ_NETWORK_RULE)
  ENABLED = true;

-- NOTE: This integration must be referenced/activated within your notebook's settings.
  • 激活外部集成(网络规则和集成):

    • Snowflake 需要 外部访问集成,以允许笔记本与外部网络位置通信。您必须为外部数据库的主机和端口定义 网络规则

    在 Workspace Notebook 设置中激活外部访问集成

示例 Workspace Notebook 代码(读取自 PostgreSQL)

此示例显示了必要的 Python 代码,用于初始化会话、加载驱动程序以及从 PostgreSQL 中读取数据。

from snowflake import snowpark_connect
import jpype
import os

# Initialize the Spark session for Snowpark Connect
spark = snowpark_connect.server.init_spark_session()
df = spark.sql("show schemas").limit(2)
df.show()

# Add the uploaded JDBC driver JAR to the Java Classpath using jpype
# Adjust the path to match the name of the JAR file you uploaded
# Copy the driver to /tmp directory
os.system("cp ./postgresql-42.7.8.jar /tmp/postgresql-42.7.8.jar")
jpype.addClassPath('/tmp/postgresql-42.7.8.jar')

# Using public PostgreSQL DB as an example: https://rnacentral.org/help/public-database
jdbc_df = spark.read.jdbc(
    # Adjust this URL as per your server host, port, and database
    "jdbc:postgresql://hh-pgsql-public.ebi.ac.uk:5432/pfmegrnargs",
    "",  # Empty string for table name when providing a custom query
    properties={
        "user": "reader",                # Your PostgreSQL user name
        "password": "***",               # Your password for PostgreSQL
        "driver": "org.postgresql.Driver",
        # Use the "query" property for a custom SQL statement
        "query": """SELECT
  upi,     -- RNAcentral URS identifier
  taxid,   -- NCBI taxid
  ac       -- external accession
FROM xref
WHERE ac IN ('OTTHUMT00000106564.1', 'OTTHUMT00000416802.1')"""
    },
)

jdbc_df.show()

示例 Workspace Notebook 代码(写入 PostgreSQL)

此示例显示了必要的 Python 代码,用于初始化会话、加载驱动程序以及将数据写入 PostgreSQL。

from snowflake import snowpark_connect
from pyspark.sql import Row
import jpype
import os

# Initialize the Spark session for Snowpark Connect
spark = snowpark_connect.server.init_spark_session()
df = spark.sql("show schemas").limit(2)
df.show()

# Add the uploaded JDBC driver JAR to the Java Classpath using jpype
# Adjust the path to match the name of the JAR file you uploaded
# Copy the driver to /tmp directory
os.system("cp ./postgresql-42.7.8.jar /tmp/postgresql-42.7.8.jar")
jpype.addClassPath('/tmp/postgresql-42.7.8.jar')

# Create dataframe
jdbc_df = spark.createDataFrame(
    [
        Row(a=1, b=2.0, c="test1"),
        Row(a=2, b=3.0, c="test2"),
        Row(a=4, b=5.0, c="test3"),
    ]
)

# Using public PostgreSQL DB as an example: https://rnacentral.org/help/public-database
jdbc_df.write.jdbc(
    # Adjust this URL as per your server host, port, and database
    "jdbc:postgresql://hh-pgsql-public.ebi.ac.uk:5432/pfmegrnargs",
    "public.my_table2",  # Specify your table name in PostgreSQL
    mode="overwrite",
    properties={
        "user": "writer",                # Your PostgreSQL user name
        "password": "***",               # Your password for PostgreSQL
        "driver": "org.postgresql.Driver",
    },
)

支持的数据源

  • SQL Server

  • MySQL

  • PostgreSQL