外部网络访问示例¶
本主题提供从用户定义函数和过程访问外部网络位置的示例。
访问 Google Translate API¶
以下步骤包括创建外部访问集成以访问 Google Translation API 的代码。这些步骤添加了安全集成和执行语句所需的权限。
创建代表外部位置的网络规则。
有关网络规则在外部访问中的作用(包括所需的权限)的更多信息,请参阅 创建网络规则以表示外部网络位置。
CREATE OR REPLACE NETWORK RULE google_apis_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('translation.googleapis.com');
创建一个安全集成,用于保存通过
google_apis_network_rule
网络规则指定的外部网络位置进行身份验证所需的 OAuth 凭据。有关该命令的参考信息(包括所需的权限),请参阅 CREATE SECURITY INTEGRATION(外部 API 身份验证)。
CREATE OR REPLACE SECURITY INTEGRATION google_translate_oauth TYPE = API_AUTHENTICATION AUTH_TYPE = OAUTH2 OAUTH_CLIENT_ID = 'my-client-id' OAUTH_CLIENT_SECRET = 'my-client-secret' OAUTH_TOKEN_ENDPOINT = 'https://oauth2.googleapis.com/token' OAUTH_AUTHORIZATION_ENDPOINT = 'https://accounts.google.com/o/oauth2/auth' OAUTH_ALLOWED_SCOPES = ('https://www.googleapis.com/auth/cloud-platform') ENABLED = TRUE;
创建一个密钥,代表
google_translate_oauth
安全集成所包含的凭据。有关密钥在外部访问中的作用(包括所需的权限)的更多信息,请参阅 创建表示凭据的密钥。
密钥必须使用 OAUTH_REFRESH_TOKEN 参数指定刷新令牌。要从服务提供商(本例中为 Google Cloud Translation API 服务)处获取刷新令牌,可以使用提供商提供的方式或使用 Snowflake 系统函数。
要使用刷新令牌创建密钥,要么 使用 Google OAuth Playground,要么使用 Snowflake 系统函数,具体方法如下:
Snowflake 系统函数
执行 CREATE SECRET 创建一个密钥。将在后面的步骤中用刷新令牌更新该密钥。
USE DATABASE my_db; USE SCHEMA secret_schema; CREATE OR REPLACE SECRET oauth_token TYPE = oauth2 API_AUTHENTICATION = google_translate_oauth;
执行 SYSTEM$START_OAUTH_FLOW 函数以检索 URL,并在其实参中指定之前创建的密钥名称,从而获取刷新令牌。
CALL SYSTEM$START_OAUTH_FLOW( 'my_db.secret_schema.oauth_token' );
该函数将生成一个 URL,可以用它来完成 OAuth 意见征求流程。
在浏览器中访问生成的 URL 并完成 OAuth2 意见征求流程。完成后,让浏览器保持打开,直到流程的最后一页。
从浏览器地址栏中,复制意见征求流程最后一页 URL 中问号后面的所有文本。
执行 SYSTEM$FINISH_OAUTH_FLOW 函数,将刚才从浏览器地址栏复制的参数指定为实参。
这会用刷新令牌更新密钥。
请务必在 SYSTEM$START_OAUTH_FLOW 的同一会话中执行 SYSTEM$FINISH_OAUTH_FLOW。SYSTEM$FINISH_OAUTH_FLOW 将使用从 OAuth 服务器获得的访问令牌和刷新令牌更新在 SYSTEM$START_OAUTH_FLOW 中指定的密钥。
CALL SYSTEM$FINISH_OAUTH_FLOW( 'state=<remaining_url_text>' );
Google OAuth Playground
在 Google OAuth Playground (https://developers.google.com/oauthplayground/) 中,选择并授权第 1 步中指定的 Cloud Translation API。
在第 2 步中,点击 exchange authorization code for tokens,然后复制 refresh token 令牌值。
执行 CREATE SECRET 创建一个密钥,指定复制的刷新令牌值。
有关密钥在外部访问中的作用(包括所需的权限)的更多信息,请参阅 创建表示凭据的密钥。
CREATE OR REPLACE SECRET oauth_token TYPE = oauth2 API_AUTHENTICATION = google_translate_oauth OAUTH_REFRESH_TOKEN = 'my-refresh-token';
使用网络规则和密钥创建外部访问集成。
有关外部访问集成的作用(包括所需的权限)的更多信息,请参阅 创建外部访问集成。
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION google_apis_access_integration ALLOWED_NETWORK_RULES = (google_apis_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (oauth_token) ENABLED = TRUE;
创建
developer
角色,分配给需要创建 UDF 的用户或使用集成的过程。CREATE OR REPLACE ROLE developer; CREATE OR REPLACE ROLE user;
授予
developer
角色创建使用对象进行外部访问的 UDF 所需的权限。这包括以下内容:对密钥的 READ 权限。
对包含密钥的架构的 USAGE 权限。
对集成的 USAGE 权限。
GRANT READ ON SECRET oauth_token TO ROLE developer; GRANT USAGE ON SCHEMA secret_schema TO ROLE developer; GRANT USAGE ON INTEGRATION google_apis_access_integration TO ROLE developer;
创建一个 UDF
google_translate_python
,将指定文本翻译成指定语言的短语。有关更多信息,请参阅 在函数或过程中使用外部访问集成。USE ROLE developer; CREATE OR REPLACE FUNCTION google_translate_python(sentence STRING, language STRING) RETURNS STRING LANGUAGE PYTHON RUNTIME_VERSION = 3.8 HANDLER = 'get_translation' EXTERNAL_ACCESS_INTEGRATIONS = (google_apis_access_integration) PACKAGES = ('snowflake-snowpark-python','requests') SECRETS = ('cred' = oauth_token ) AS $$ import _snowflake import requests import json session = requests.Session() def get_translation(sentence, language): token = _snowflake.get_oauth_access_token('cred') url = "https://translation.googleapis.com/language/translate/v2" data = {'q': sentence,'target': language} response = session.post(url, json = data, headers = {"Authorization": "Bearer " + token}) return response.json()['data']['translations'][0]['translatedText'] $$;
授予对
google_translate_python
函数的 USAGE 权限,以便拥有user
角色的用户可以调用该函数。GRANT USAGE ON FUNCTION google_translate_python(string, string) TO ROLE user;
执行
google_translate_python
函数来翻译短语。USE ROLE user; SELECT google_translate_python('Happy Thursday!', 'zh-CN');
这会生成以下输出。
------------------------------------------------------- | GOOGLE_TRANSLATE_PYTHON('HAPPY THURSDAY!', 'ZH-CN') | ------------------------------------------------------- | 快乐星期四! | -------------------------------------------------------
访问外部 lambda 函数¶
以下步骤包括创建外部访问集成的示例代码,用于访问 Snowflake 外部的 lambda 函数。示例中使用的外部端点本身就是一个占位符,但它也可以是可在 REST 服务端点使用的一个函数。
外部访问用于 矢量化 Python UDF,它接收包含数据的 Pandas DataFrame。
创建
lambda_network_rule
代表外部位置my_external_service
的网络规则(这里是外部端点位置的占位符值)。有关网络规则在外部访问中作用的更多信息,请参阅 创建网络规则以表示外部网络位置。
CREATE OR REPLACE NETWORK RULE lambda_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('my_external_service');
创建一个密钥,代表外部服务所需的凭据。
本示例稍后的处理程序代码将使用用于 Python 的 Snowflake API 从密钥中检索凭据。
有关密钥在外部访问中作用的更多信息,请参阅 创建表示凭据的密钥。
CREATE OR REPLACE SECRET secret_password TYPE = PASSWORD USERNAME = 'my_user_name' PASSWORD = 'my_password';
创建一个
developer
角色,并授予其对密钥的 READ 权限。该角色将分配给需要创建使用该密钥的 UDF 或过程的用户。此外,创建用户将用来调用该函数的角色。
CREATE OR REPLACE ROLE developer; CREATE OR REPLACE ROLE user;
授予
developer
角色创建使用对象进行外部访问的 UDF 所需的权限。这包括以下内容:对密钥的 READ 权限。
对包含密钥的架构的 USAGE 权限。
GRANT READ ON SECRET secret_password TO ROLE developer; GRANT USAGE ON SCHEMA secret_schema TO ROLE developer;
创建外部访问集成,通过创建的网络规则和密钥指定外部端点和凭据。
有关外部访问集成的作用(包括所需的权限)的更多信息,请参阅 创建外部访问集成。
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION lambda_external_access_integration ALLOWED_NETWORK_RULES = (lambda_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (secret_password) ENABLED = TRUE;
创建一个 矢量化 Python UDF
return_double_column
,访问外部网络位置,处理以 Pandas DataFrame (https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) 的形式接收的数据。有关在 UDF 中使用外部访问的更多信息,请参阅 在函数或过程中使用外部访问集成。
CREATE OR REPLACE FUNCTION return_double_column(x int) RETURNS INT LANGUAGE PYTHON EXTERNAL_ACCESS_INTEGRATIONS = (lambda_external_access_integration) SECRETS = ('cred' = secret_password) RUNTIME_VERSION = 3.8 HANDLER = 'return_first_column' PACKAGES = ('pandas', 'requests') AS $$ import pandas import numpy as np import json import requests import base64 import _snowflake from _snowflake import vectorized from requests.auth import HTTPBasicAuth from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry session = requests.Session() retries = Retry(total=10, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], allowed_methods = None) session.mount('https://', HTTPAdapter(max_retries=retries)) @vectorized(input=pandas.DataFrame) def return_first_column(df): request_rows = [] df.iloc[:,0] = df.iloc[:,0].astype(int) request_rows = np.column_stack([df.index, df.iloc[:,0]]).tolist() request_payload = {"data" : request_rows} username_password_object = _snowflake.get_username_password('cred'); basic = HTTPBasicAuth(username_password_object.username, username_password_object.password) url = 'my_external_service' response = session.post(url, json=request_payload, auth=basic) response.raise_for_status() response_payload = json.loads(response.text) response_rows = response_payload["data"] return pandas.DataFrame(response_rows)[1] $$;
授予对
return_double_column
函数的 USAGE 权限,以便拥有user
角色的用户可以调用该函数。GRANT USAGE ON FUNCTION return_double_column(int) TO ROLE user;
执行
return_double_column
函数,向外部端点发出请求。以下示例中的代码创建了一个双列表,并插入了 100,000,000 行包含 4 字节整数的数据。然后,代码执行
return_double_column
函数,从a
列传递值,供外部端点处理。CREATE OR REPLACE TABLE t1 (a INT, b INT); INSERT INTO t1 SELECT SEQ4(), SEQ4() FROM TABLE(GENERATOR(ROWCOUNT => 100000000)); SELECT return_double_column(a) AS retval FROM t1 ORDER BY retval;