外部网络访问示例¶
本主题提供从用户定义函数和过程访问外部网络位置的示例。
访问 PyPi 以在 Snowpark Container 中安装包¶
您可以通过创建外部访问集成来访问 PyPi 包仓库。当您希望允许 Container Runtime 上的 Notebook 用户使用 pip install
命令安装 pip
包时,您可以执行此操作。通过这种集成,您还可以允许 Snowpark Container Services 安装 pip 包。
此示例使用 snowflake.external_access.pypi_rule
中所述的 Snowflake 管理的网络规则 Snowflake 管理的网络规则。
使用
snowflake.external_access.pypi_rule
网络规则创建外部访问集成。CREATE [OR REPLACE] EXTERNAL ACCESS INTEGRATION pypi_access ALLOWED_NETWORK_RULES = (snowflake.external_access.pypi_rule) ENABLED = true;
为需要在 Container Runtime 上的 Snowpark Container 或 Notebook 中使用
developer
的用户创建pip install
角色。CREATE OR REPLACE ROLE developer;
向
developer
角色授予必要权限,以使用您创建的外部访问集成。GRANT USAGE ON INTEGRATION pypi_access TO ROLE developer;
使用 OAuth 访问 Google Translate API¶
以下步骤包括创建外部访问集成以访问 Google Translation API 的代码。这些步骤添加了安全集成和执行语句所需的权限。
创建代表外部位置的网络规则。
有关网络规则在外部访问中的作用(包括所需的权限)的更多信息,请参阅 创建网络规则以表示外部网络位置。
CREATE OR REPLACE NETWORK RULE google_apis_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('translation.googleapis.com');
创建一个安全集成,用于保存通过
google_apis_network_rule
网络规则指定的外部网络位置进行身份验证所需的 OAuth 凭据。有关该命令的参考信息(包括所需的权限),请参阅 CREATE SECURITY INTEGRATION(外部 API 身份验证)。
CREATE OR REPLACE SECURITY INTEGRATION google_translate_oauth TYPE = API_AUTHENTICATION AUTH_TYPE = OAUTH2 OAUTH_CLIENT_ID = 'my-client-id' OAUTH_CLIENT_SECRET = 'my-client-secret' OAUTH_TOKEN_ENDPOINT = 'https://oauth2.googleapis.com/token' OAUTH_AUTHORIZATION_ENDPOINT = 'https://accounts.google.com/o/oauth2/auth' OAUTH_ALLOWED_SCOPES = ('https://www.googleapis.com/auth/cloud-platform') ENABLED = TRUE;
创建一个密钥,代表
google_translate_oauth
安全集成所包含的凭据。有关密钥在外部访问中的作用(包括所需的权限)的更多信息,请参阅 创建表示凭据的密钥。
密钥必须使用 OAUTH_REFRESH_TOKEN 参数指定刷新令牌。要从服务提供商(本例中为 Google Cloud Translation API 服务)处获取刷新令牌,可以使用提供商提供的方式或使用 Snowflake 系统函数。
要使用刷新令牌创建密钥,要么 使用 Google OAuth Playground,要么使用 Snowflake 系统函数,具体方法如下:
Snowflake 系统函数
执行 CREATE SECRET 创建一个密钥。将在后面的步骤中用刷新令牌更新该密钥。
USE DATABASE my_db; USE SCHEMA secret_schema; CREATE OR REPLACE SECRET oauth_token TYPE = oauth2 API_AUTHENTICATION = google_translate_oauth;
执行 SYSTEM$START_OAUTH_FLOW 函数以检索 URL,并在其实参中指定之前创建的密钥名称,从而获取刷新令牌。
CALL SYSTEM$START_OAUTH_FLOW( 'my_db.secret_schema.oauth_token' );
该函数生成一个 URL,可以用它来完成 OAuth 意见征求流程。
在浏览器中访问生成的 URL 并完成 OAuth2 意见征求流程。完成后,让浏览器保持打开,直到流程的最后一页。
从浏览器地址栏中,复制意见征求流程最后一页 URL 中问号后面的所有文本。
执行 SYSTEM$FINISH_OAUTH_FLOW 函数,将刚刚从浏览器地址栏复制的参数指定为实参,以使用刷新令牌更新密钥。
请务必在 SYSTEM$START_OAUTH_FLOW 的同一会话中执行 SYSTEM$FINISH_OAUTH_FLOW。SYSTEM$FINISH_OAUTH_FLOW 使用它从 OAuth 服务器获取的访问令牌和刷新令牌,更新您在 SYSTEM$START_OAUTH_FLOW 中指定的密钥。
CALL SYSTEM$FINISH_OAUTH_FLOW( 'state=<remaining_url_text>' );
Google OAuth Playground
在 Google OAuth Playground (https://developers.google.com/oauthplayground/) 中,选择并授权第 1 步中指定的 Cloud Translation API。
在第 2 步中,点击 exchange authorization code for tokens,然后复制 refresh token 令牌值。
执行 CREATE SECRET 创建一个密钥,指定复制的刷新令牌值。
有关密钥在外部访问中的作用(包括所需的权限)的更多信息,请参阅 创建表示凭据的密钥。
CREATE OR REPLACE SECRET oauth_token TYPE = oauth2 API_AUTHENTICATION = google_translate_oauth OAUTH_REFRESH_TOKEN = 'my-refresh-token';
使用网络规则和密钥创建外部访问集成。
有关外部访问集成的作用(包括所需的权限)的更多信息,请参阅 创建外部访问集成。
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION google_apis_access_integration ALLOWED_NETWORK_RULES = (google_apis_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (oauth_token) ENABLED = TRUE;
创建
developer
角色,分配给需要创建 UDF 的用户或使用集成的过程。CREATE OR REPLACE ROLE developer; CREATE OR REPLACE ROLE user;
授予
developer
角色创建使用对象进行外部访问的 UDF 所需的权限。这包括以下内容:对密钥的 READ 权限。
对包含密钥的架构的 USAGE 权限。
对集成的 USAGE 权限。
GRANT READ ON SECRET oauth_token TO ROLE developer; GRANT USAGE ON SCHEMA secret_schema TO ROLE developer; GRANT USAGE ON INTEGRATION google_apis_access_integration TO ROLE developer;
创建一个 UDF
google_translate_python
,将指定文本翻译成指定语言的短语。有关更多信息,请参阅 在函数或过程中使用外部访问集成。USE ROLE developer; CREATE OR REPLACE FUNCTION google_translate_python(sentence STRING, language STRING) RETURNS STRING LANGUAGE PYTHON RUNTIME_VERSION = 3.9 HANDLER = 'get_translation' EXTERNAL_ACCESS_INTEGRATIONS = (google_apis_access_integration) PACKAGES = ('snowflake-snowpark-python','requests') SECRETS = ('cred' = oauth_token ) AS $$ import _snowflake import requests import json session = requests.Session() def get_translation(sentence, language): token = _snowflake.get_oauth_access_token('cred') url = "https://translation.googleapis.com/language/translate/v2" data = {'q': sentence,'target': language} response = session.post(url, json = data, headers = {"Authorization": "Bearer " + token}) return response.json()['data']['translations'][0]['translatedText'] $$;
授予对
google_translate_python
函数的 USAGE 权限,以便拥有user
角色的用户可以调用该函数。GRANT USAGE ON FUNCTION google_translate_python(string, string) TO ROLE user;
执行
google_translate_python
函数来翻译短语。USE ROLE user; SELECT google_translate_python('Happy Thursday!', 'zh-CN');
这会生成以下输出。
------------------------------------------------------- | GOOGLE_TRANSLATE_PYTHON('HAPPY THURSDAY!', 'ZH-CN') | ------------------------------------------------------- | 快乐星期四! | -------------------------------------------------------
使用基本身份验证访问外部 Lambda 函数¶
以下步骤包括创建外部访问集成的示例代码,用于访问 Snowflake 外部的 lambda 函数。示例中使用的外部端点本身就是一个占位符,但它也可以是可在 REST 服务端点使用的一个函数。
外部访问用于 矢量化 Python UDF,它接收包含数据的 Pandas DataFrame。
创建
lambda_network_rule
代表外部位置my_external_service
的网络规则(这里是外部端点位置的占位符值)。有关网络规则在外部访问中作用的更多信息,请参阅 创建网络规则以表示外部网络位置。
CREATE OR REPLACE NETWORK RULE lambda_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('my_external_service');
创建一个密钥,代表外部服务所需的凭据。
本示例稍后的处理程序代码将使用用于 Python 的 Snowflake API 从密钥中检索凭据。
有关密钥在外部访问中作用的更多信息,请参阅 创建表示凭据的密钥。
CREATE OR REPLACE SECRET secret_password TYPE = PASSWORD USERNAME = 'my_user_name' PASSWORD = 'my_password';
创建一个
developer
角色,并授予其对密钥的 READ 权限。该角色将分配给需要创建使用该密钥的 UDF 或过程的用户。此外,创建用户将用来调用该函数的角色。
CREATE OR REPLACE ROLE developer; CREATE OR REPLACE ROLE user;
授予
developer
角色创建使用对象进行外部访问的 UDF 所需的权限。这包括以下内容:对密钥的 READ 权限。
对包含密钥的架构的 USAGE 权限。
GRANT READ ON SECRET secret_password TO ROLE developer; GRANT USAGE ON SCHEMA secret_schema TO ROLE developer;
创建外部访问集成,通过创建的网络规则和密钥指定外部端点和凭据。
有关外部访问集成的作用(包括所需的权限)的更多信息,请参阅 创建外部访问集成。
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION lambda_external_access_integration ALLOWED_NETWORK_RULES = (lambda_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (secret_password) ENABLED = TRUE;
创建一个 矢量化 Python UDF
return_double_column
,访问外部网络位置,处理以 Pandas DataFrame (https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) 的形式接收的数据。有关在 UDF 中使用外部访问的更多信息,请参阅 在函数或过程中使用外部访问集成。
CREATE OR REPLACE FUNCTION return_double_column(x int) RETURNS INT LANGUAGE PYTHON EXTERNAL_ACCESS_INTEGRATIONS = (lambda_external_access_integration) SECRETS = ('cred' = secret_password) RUNTIME_VERSION = 3.9 HANDLER = 'return_first_column' PACKAGES = ('pandas', 'requests') AS $$ import pandas import numpy as np import json import requests import base64 import _snowflake from _snowflake import vectorized from requests.auth import HTTPBasicAuth from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry session = requests.Session() retries = Retry(total=10, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], allowed_methods = None) session.mount('https://', HTTPAdapter(max_retries=retries)) @vectorized(input=pandas.DataFrame) def return_first_column(df): request_rows = [] df.iloc[:,0] = df.iloc[:,0].astype(int) request_rows = np.column_stack([df.index, df.iloc[:,0]]).tolist() request_payload = {"data" : request_rows} username_password_object = _snowflake.get_username_password('cred'); basic = HTTPBasicAuth(username_password_object.username, username_password_object.password) url = 'my_external_service' response = session.post(url, json=request_payload, auth=basic) response.raise_for_status() response_payload = json.loads(response.text) response_rows = response_payload["data"] return pandas.DataFrame(response_rows)[1] $$;
授予对
return_double_column
函数的 USAGE 权限,以便拥有user
角色的用户可以调用该函数。GRANT USAGE ON FUNCTION return_double_column(int) TO ROLE user;
执行
return_double_column
函数,向外部端点发出请求。以下示例中的代码创建了一个双列表,并插入了 100,000,000 行包含 4 字节整数的数据。然后,代码执行
return_double_column
函数,从a
列传递值,供外部端点处理。CREATE OR REPLACE TABLE t1 (a INT, b INT); INSERT INTO t1 SELECT SEQ4(), SEQ4() FROM TABLE(GENERATOR(ROWCOUNT => 100000000)); SELECT return_double_column(a) AS retval FROM t1 ORDER BY retval;
使用 AWS IAM 访问 Amazon S3¶
以下步骤包括示例代码,以使用 IAM 连接到 AWS S3 桶。
有关 AWS IAM 的更多信息,请参阅 AWS IAM 文档 (https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_identifiers.html)。
创建网络规则
aws_s3_network_rule
,表示 VALUE_LIST 参数指定位置的 AWS S3 桶。有关网络规则在外部访问中作用的更多信息,请参阅 创建网络规则以表示外部网络位置。
CREATE OR REPLACE NETWORK RULE aws_s3_network_rule MODE = EGRESS TYPE = PRIVATE_HOST_PORT VALUE_LIST = ('external-access-iam-bucket.s3.us-west-2.amazonaws.com');
创建一个安全集成,用于保存在
aws_s3_network_rule
网络规则指定的外部网络位置进行身份验证时所需的 AWS IAM Amazon Resource Name (ARN) 凭据。有关该命令的参考信息(包括所需的权限),请参阅 CREATE SECURITY INTEGRATION(AWS IAM 身份验证)。
CREATE OR REPLACE SECURITY INTEGRATION aws_s3_security_integration TYPE = API_AUTHENTICATION AUTH_TYPE = AWS_IAM ENABLED = TRUE AWS_ROLE_ARN = 'arn:aws:iam::736112632310:role/external-access-iam-bucket';
获取 IAM USER 的 ARN 和 ID。
在您创建的安全集成上执行 DESC 命令。
DESC SECURITY INTEGRATION aws_s3_security_integration;
从显示的输出中,复制以下属性的值以在下一步中使用:
API_AWS_IAM_USER_ARN
API_AWS_EXTERNAL_ID
授予必要的 IAM 用户权限以访问桶。
按照 选项 1:配置 Snowflake 存储集成以访问 Amazon S3 的“第 5 步”中所述配置信任策略时,使用 ARN 和 ID 值。
创建类型 CLOUD_PROVIDER_TOKEN 的 密钥,表示外部服务所需的凭据。
本示例后面的处理程序代码使用 Snowflake API 从密钥中检索凭据。
有关密钥在外部访问中作用的更多信息,请参阅 创建表示凭据的密钥。
CREATE OR REPLACE SECRET aws_s3_access_token TYPE = CLOUD_PROVIDER_TOKEN API_AUTHENTICATION = aws_s3_security_integration;
创建一个
developer
角色,并授予其对密钥的 READ 权限。该角色将分配给需要创建使用该密钥的 UDF 或过程的用户。此外,创建用户将用来调用该函数的角色。
CREATE OR REPLACE ROLE developer; CREATE OR REPLACE ROLE user;
向
developer
角色授予必要权限以创建 UDF,使用对象进行外部访问。这包括以下内容:对密钥的 READ 权限。
对包含密钥的架构的 USAGE 权限。
GRANT READ ON SECRET aws_s3_access_token TO ROLE developer; GRANT USAGE ON SCHEMA secret_schema TO ROLE developer;
创建 外部访问集成,通过创建的网络规则和密钥指定外部端点和凭据。
有关外部访问集成的作用(包括所需的权限)的更多信息,请参阅 创建外部访问集成。
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION aws_s3_external_access_integration ALLOWED_NETWORK_RULES = (aws_s3_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (aws_s3_access_token) ENABLED = TRUE COMMENT = 'Testing S3 connectivity';
创建 UDF,使用外部访问集成来连接您创建的网络规则中指定的 Amazon S3 桶。
处理程序代码使用 Snowflake APIs 从您创建的密钥中检索令牌。通过此令牌,您可以使用 Snowflake APIs 检索必要的值,以创建与 Amazon S3 连接的会话,包括访问密钥 ID、秘密访问密钥和会话令牌。
有关在 UDF 中使用外部访问的更多信息,请参阅 在函数或过程中使用外部访问集成。
CREATE OR REPLACE FUNCTION aws_s3_python_function() RETURNS VARCHAR LANGUAGE PYTHON EXTERNAL_ACCESS_INTEGRATIONS = (aws_s3_external_access_integration) RUNTIME_VERSION = '3.9' SECRETS = ('cred' = aws_s3_access_token) PACKAGES = ('boto3') HANDLER = 'main_handler' AS $$ import boto3 import _snowflake from botocore.config import Config def main_handler(): # Get token object cloud_provider_object = _snowflake.get_cloud_provider_token('cred') # Boto3 configuration config = Config( retries=dict(total_max_attempts=9), connect_timeout=30, read_timeout=30, max_pool_connections=50 ) # Connect to S3 using boto3 s3 = boto3.client( 's3', region_name='us-west-2', aws_access_key_id=cloud_provider_object.access_key_id, aws_secret_access_key=cloud_provider_object.secret_access_key, aws_session_token=cloud_provider_object.token, config=config ) # Use S3 object to upload/download return 'Successfully connected with S3' $$;
CREATE OR REPLACE FUNCTION aws_s3_java_function() RETURNS STRING LANGUAGE JAVA EXTERNAL_ACCESS_INTEGRATIONS = (aws_s3_external_access_integration) SECRETS = ('cred' = aws_s3_access_token) HANDLER = 'AWSTokenProvider.handle' AS $$ import com.snowflake.snowpark_java.types.CloudProviderToken; import com.snowflake.snowpark_java.types.SnowflakeSecrets; public class AWSTokenProvider { public static String handle() { // Get token object SnowflakeSecrets sfSecret = SnowflakeSecrets.newInstance(); CloudProviderToken cloudProviderToken = sfSecret.getCloudProviderToken("cred"); // Create variables for AWS session credentials String accessKeyId = cloudProviderToken.getAccessKeyId(); String secretAccessKey = cloudProviderToken.getSecretAccessKey(); String token = cloudProviderToken.getToken(); // Create S3 client using AWS APIs. return "Successfully connected with S3 with temp access token: " + token; } } $$;
授予对 UDF 的 USAGE 权限,以便具有
user
角色的用户可以调用它。GRANT USAGE ON FUNCTION return_double_column(int) TO ROLE user;
执行函数以连接到外部端点。
SELECT aws_s3_python_function();
SELECT aws_s3_java_function();