外部网络访问示例

本主题提供从用户定义函数和过程访问外部网络位置的示例。

访问 Google Translate API

以下步骤包括创建外部访问集成以访问 Google Translation API 的代码。这些步骤添加了安全集成和执行语句所需的权限。

  1. 创建代表外部位置的网络规则。

    有关网络规则在外部访问中的作用(包括所需的权限)的更多信息,请参阅 创建网络规则以表示外部网络位置

    CREATE OR REPLACE NETWORK RULE google_apis_network_rule
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('translation.googleapis.com');
    
    Copy
  2. 创建一个安全集成,用于保存通过 google_apis_network_rule 网络规则指定的外部网络位置进行身份验证所需的 OAuth 凭据。

    有关该命令的参考信息(包括所需的权限),请参阅 CREATE SECURITY INTEGRATION(外部 API 身份验证)

    CREATE OR REPLACE SECURITY INTEGRATION google_translate_oauth
      TYPE = API_AUTHENTICATION
      AUTH_TYPE = OAUTH2
      OAUTH_CLIENT_ID = 'my-client-id'
      OAUTH_CLIENT_SECRET = 'my-client-secret'
      OAUTH_TOKEN_ENDPOINT = 'https://oauth2.googleapis.com/token'
      OAUTH_AUTHORIZATION_ENDPOINT = 'https://accounts.google.com/o/oauth2/auth'
      OAUTH_ALLOWED_SCOPES = ('https://www.googleapis.com/auth/cloud-platform')
      ENABLED = TRUE;
    
    Copy
  3. 创建一个密钥,代表 google_translate_oauth 安全集成所包含的凭据。

    有关密钥在外部访问中的作用(包括所需的权限)的更多信息,请参阅 创建表示凭据的密钥

    密钥必须使用 OAUTH_REFRESH_TOKEN 参数指定刷新令牌。要从服务提供商(本例中为 Google Cloud Translation API 服务)处获取刷新令牌,可以使用提供商提供的方式或使用 Snowflake 系统函数。

    要使用刷新令牌创建密钥,要么 使用 Google OAuth Playground,要么使用 Snowflake 系统函数,具体方法如下:

    • Snowflake 系统函数

      1. 执行 CREATE SECRET 创建一个密钥。将在后面的步骤中用刷新令牌更新该密钥。

        USE DATABASE my_db;
        USE SCHEMA secret_schema;
        
        CREATE OR REPLACE SECRET oauth_token
          TYPE = oauth2
          API_AUTHENTICATION = google_translate_oauth;
        
        Copy
      2. 执行 SYSTEM$START_OAUTH_FLOW 函数以检索 URL,并在其实参中指定之前创建的密钥名称,从而获取刷新令牌。

        CALL SYSTEM$START_OAUTH_FLOW( 'my_db.secret_schema.oauth_token' );
        
        Copy

        该函数将生成一个 URL,可以用它来完成 OAuth 意见征求流程。

      3. 在浏览器中访问生成的 URL 并完成 OAuth2 意见征求流程。完成后,让浏览器保持打开,直到流程的最后一页。

      4. 从浏览器地址栏中,复制意见征求流程最后一页 URL 中问号后面的所有文本。

      5. 执行 SYSTEM$FINISH_OAUTH_FLOW 函数,将刚才从浏览器地址栏复制的参数指定为实参。

        这会用刷新令牌更新密钥。

        请务必在 SYSTEM$START_OAUTH_FLOW 的同一会话中执行 SYSTEM$FINISH_OAUTH_FLOW。SYSTEM$FINISH_OAUTH_FLOW 将使用从 OAuth 服务器获得的访问令牌和刷新令牌更新在 SYSTEM$START_OAUTH_FLOW 中指定的密钥。

        CALL SYSTEM$FINISH_OAUTH_FLOW( 'state=<remaining_url_text>' );
        
        Copy
    • Google OAuth Playground

      1. 在 Google OAuth Playground (https://developers.google.com/oauthplayground/) 中,选择并授权第 1 步中指定的 Cloud Translation API。

      2. 在第 2 步中,点击 exchange authorization code for tokens,然后复制 refresh token 令牌值。

      3. 执行 CREATE SECRET 创建一个密钥,指定复制的刷新令牌值。

        有关密钥在外部访问中的作用(包括所需的权限)的更多信息,请参阅 创建表示凭据的密钥

        CREATE OR REPLACE SECRET oauth_token
          TYPE = oauth2
          API_AUTHENTICATION = google_translate_oauth
          OAUTH_REFRESH_TOKEN = 'my-refresh-token';
        
        Copy
  4. 使用网络规则和密钥创建外部访问集成。

    有关外部访问集成的作用(包括所需的权限)的更多信息,请参阅 创建外部访问集成

    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION google_apis_access_integration
      ALLOWED_NETWORK_RULES = (google_apis_network_rule)
      ALLOWED_AUTHENTICATION_SECRETS = (oauth_token)
      ENABLED = TRUE;
    
    Copy
  5. 创建 developer 角色,分配给需要创建 UDF 的用户或使用集成的过程。

    CREATE OR REPLACE ROLE developer;
    CREATE OR REPLACE ROLE user;
    
    Copy
  6. 授予 developer 角色创建使用对象进行外部访问的 UDF 所需的权限。这包括以下内容:

    • 对密钥的 READ 权限。

    • 对包含密钥的架构的 USAGE 权限。

    • 对集成的 USAGE 权限。

      GRANT READ ON SECRET oauth_token TO ROLE developer;
      GRANT USAGE ON SCHEMA secret_schema TO ROLE developer;
      GRANT USAGE ON INTEGRATION google_apis_access_integration TO ROLE developer;
      
      Copy
  7. 创建一个 UDF google_translate_python,将指定文本翻译成指定语言的短语。有关更多信息,请参阅 在函数或过程中使用外部访问集成

    USE ROLE developer;
    
    CREATE OR REPLACE FUNCTION google_translate_python(sentence STRING, language STRING)
    RETURNS STRING
    LANGUAGE PYTHON
    RUNTIME_VERSION = 3.8
    HANDLER = 'get_translation'
    EXTERNAL_ACCESS_INTEGRATIONS = (google_apis_access_integration)
    PACKAGES = ('snowflake-snowpark-python','requests')
    SECRETS = ('cred' = oauth_token )
    AS
    $$
    import _snowflake
    import requests
    import json
    session = requests.Session()
    def get_translation(sentence, language):
      token = _snowflake.get_oauth_access_token('cred')
      url = "https://translation.googleapis.com/language/translate/v2"
      data = {'q': sentence,'target': language}
      response = session.post(url, json = data, headers = {"Authorization": "Bearer " + token})
      return response.json()['data']['translations'][0]['translatedText']
    $$;
    
    Copy
  8. 授予对 google_translate_python 函数的 USAGE 权限,以便拥有 user 角色的用户可以调用该函数。

    GRANT USAGE ON FUNCTION google_translate_python(string, string) TO ROLE user;
    
    Copy
  9. 执行 google_translate_python 函数来翻译短语。

    USE ROLE user;
    
    SELECT google_translate_python('Happy Thursday!', 'zh-CN');
    
    Copy

    这会生成以下输出。

    -------------------------------------------------------
    | GOOGLE_TRANSLATE_PYTHON('HAPPY THURSDAY!', 'ZH-CN') |
    -------------------------------------------------------
    | 快乐星期四!                                          |
    -------------------------------------------------------
    

访问外部 lambda 函数

以下步骤包括创建外部访问集成的示例代码,用于访问 Snowflake 外部的 lambda 函数。示例中使用的外部端点本身就是一个占位符,但它也可以是可在 REST 服务端点使用的一个函数。

外部访问用于 矢量化 Python UDF,它接收包含数据的 Pandas DataFrame。

  1. 创建 lambda_network_rule 代表外部位置 my_external_service 的网络规则(这里是外部端点位置的占位符值)。

    有关网络规则在外部访问中作用的更多信息,请参阅 创建网络规则以表示外部网络位置

    CREATE OR REPLACE NETWORK RULE lambda_network_rule
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('my_external_service');
    
    Copy
  2. 创建一个密钥,代表外部服务所需的凭据。

    本示例稍后的处理程序代码将使用用于 Python 的 Snowflake API 从密钥中检索凭据。

    有关密钥在外部访问中作用的更多信息,请参阅 创建表示凭据的密钥

    CREATE OR REPLACE SECRET secret_password
      TYPE = PASSWORD
      USERNAME = 'my_user_name'
      PASSWORD = 'my_password';
    
    Copy
  3. 创建一个 developer 角色,并授予其对密钥的 READ 权限。该角色将分配给需要创建使用该密钥的 UDF 或过程的用户。

    此外,创建用户将用来调用该函数的角色。

    CREATE OR REPLACE ROLE developer;
    CREATE OR REPLACE ROLE user;
    
    Copy
  4. 授予 developer 角色创建使用对象进行外部访问的 UDF 所需的权限。这包括以下内容:

    • 对密钥的 READ 权限。

    • 对包含密钥的架构的 USAGE 权限。

    GRANT READ ON SECRET secret_password TO ROLE developer;
    GRANT USAGE ON SCHEMA secret_schema TO ROLE developer;
    
    Copy
  5. 创建外部访问集成,通过创建的网络规则和密钥指定外部端点和凭据。

    有关外部访问集成的作用(包括所需的权限)的更多信息,请参阅 创建外部访问集成

    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION lambda_external_access_integration
      ALLOWED_NETWORK_RULES = (lambda_network_rule)
      ALLOWED_AUTHENTICATION_SECRETS = (secret_password)
      ENABLED = TRUE;
    
    Copy
  6. 创建一个 矢量化 Python UDF return_double_column,访问外部网络位置,处理以 Pandas DataFrame (https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) 的形式接收的数据。

    有关在 UDF 中使用外部访问的更多信息,请参阅 在函数或过程中使用外部访问集成

    CREATE OR REPLACE FUNCTION return_double_column(x int)
    RETURNS INT
    LANGUAGE PYTHON
    EXTERNAL_ACCESS_INTEGRATIONS = (lambda_external_access_integration)
    SECRETS = ('cred' = secret_password)
    RUNTIME_VERSION = 3.8
    HANDLER = 'return_first_column'
    PACKAGES = ('pandas', 'requests')
    AS $$
    import pandas
    import numpy as np
    import json
    import requests
    import base64
    import _snowflake
    from _snowflake import vectorized
    from requests.auth import HTTPBasicAuth
    from requests.adapters import HTTPAdapter
    from requests.packages.urllib3.util.retry import Retry
    
    session = requests.Session()
    retries = Retry(total=10, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], allowed_methods = None)
    
    session.mount('https://', HTTPAdapter(max_retries=retries))
    
    @vectorized(input=pandas.DataFrame)
    def return_first_column(df):
      request_rows = []
    
      df.iloc[:,0] = df.iloc[:,0].astype(int)
      request_rows = np.column_stack([df.index, df.iloc[:,0]]).tolist()
    
      request_payload = {"data" : request_rows}
    
      username_password_object = _snowflake.get_username_password('cred');
      basic = HTTPBasicAuth(username_password_object.username, username_password_object.password)
    
      url = 'my_external_service'
    
      response = session.post(url, json=request_payload, auth=basic)
    
      response.raise_for_status()
      response_payload = json.loads(response.text)
    
      response_rows = response_payload["data"]
    
      return pandas.DataFrame(response_rows)[1]
    $$;
    
    Copy
  7. 授予对 return_double_column 函数的 USAGE 权限,以便拥有 user 角色的用户可以调用该函数。

    GRANT USAGE ON FUNCTION return_double_column(int) TO ROLE user;
    
    Copy
  8. 执行 return_double_column 函数,向外部端点发出请求。

    以下示例中的代码创建了一个双列表,并插入了 100,000,000 行包含 4 字节整数的数据。然后,代码执行 return_double_column 函数,从 a 列传递值,供外部端点处理。

    CREATE OR REPLACE TABLE t1 (a INT, b INT);
    INSERT INTO t1 SELECT SEQ4(), SEQ4() FROM TABLE(GENERATOR(ROWCOUNT => 100000000));
    
    SELECT return_double_column(a) AS retval FROM t1 ORDER BY retval;
    
    Copy
语言: 中文