外部网络访问示例

本主题提供从用户定义函数和过程访问外部网络位置的示例。

访问 PyPi 以在 Snowpark Container 中安装包

You can access the PyPi package repository by creating an external access integration. You might do this when you want to allow Notebook users on Container Runtime to install pip packages using the pip install command. With this kind of integration, you can also allow Snowpark Container Services to install pip packages.

This example uses the Snowflake-managed network rule snowflake.external_access.pypi_rule described in Privileges and commands.

  1. Create an external access integration using the snowflake.external_access.pypi_rule network rule.

    CREATE [OR REPLACE] EXTERNAL ACCESS INTEGRATION pypi_access
      ALLOWED_NETWORK_RULES = (snowflake.external_access.pypi_rule)
      ENABLED = true;
  2. Create a developer role for users who need to use pip install in a Snowpark Container or Notebook on Container Runtime.

    CREATE OR REPLACE ROLE developer;
  3. Grant to the developer role the privileges needed to use the external access integration you created.

    GRANT USAGE ON INTEGRATION pypi_access TO ROLE developer;

使用 OAuth 访问 Google Translate API

以下步骤包括创建外部访问集成以访问 Google Translation API 的代码。这些步骤添加了安全集成和执行语句所需的权限。

  1. 创建代表外部位置的网络规则。

    For more information about the role of a network rule in external access, including privileges required, see Creating a network rule to represent the external network location.

    CREATE OR REPLACE NETWORK RULE google_apis_network_rule
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('translation.googleapis.com');
  2. Create a security integration to hold the OAuth credentials required to authenticate with the external network location specified by the google_apis_network_rule network rule.

    For reference information on the command, including privileges required, see CREATE SECURITY INTEGRATION (External API Authentication).

    CREATE OR REPLACE SECURITY INTEGRATION google_translate_oauth
      TYPE = API_AUTHENTICATION
      AUTH_TYPE = OAUTH2
      OAUTH_CLIENT_ID = 'my-client-id'
      OAUTH_CLIENT_SECRET = 'my-client-secret'
      OAUTH_TOKEN_ENDPOINT = 'https://oauth2.googleapis.com/token'
      OAUTH_AUTHORIZATION_ENDPOINT = 'https://accounts.google.com/o/oauth2/auth'
      OAUTH_ALLOWED_SCOPES = ('https://www.googleapis.com/auth/cloud-platform')
      ENABLED = TRUE;
  3. Create a secret to represent the credentials contained by the google_translate_oauth security integration.

    For more information about the role of the secret in external access, including privileges required, see Creating a secret to represent credentials.

密钥必须使用 OAUTH_REFRESH_TOKEN 参数指定刷新令牌。要从服务提供商(本例中为 Google Cloud Translation API 服务)处获取刷新令牌,可以使用提供商提供的方式或使用 Snowflake 系统函数。

要使用刷新令牌创建密钥,要么 使用 Google OAuth Playground,要么使用 Snowflake 系统函数,具体方法如下:

  • Snowflake 系统函数
    1. 执行 CREATE SECRET 创建一个密钥。将在后面的步骤中用刷新令牌更新该密钥。

      USE DATABASE my_db;
      USE SCHEMA secret_schema;
      
      CREATE OR REPLACE SECRET oauth_token
        TYPE = oauth2
        API_AUTHENTICATION = google_translate_oauth;
    2. Execute the SYSTEM$START_OAUTH_FLOW function to retrieve a URL with which you can obtain a refresh token, specifying as its argument the name of the secret you created previously.

      CALL SYSTEM$START_OAUTH_FLOW( 'my_db.secret_schema.oauth_token' );

该函数生成一个 URL,可以用它来完成 OAuth 意见征求流程。

  1. In a browser, visit the generated URL and complete the OAuth2 consent process. When you’ve finished, leave the browser open to the last page of the process.
  2. 从浏览器地址栏中,复制意见征求流程最后一页 URL 中问号后面的所有文本。
  3. Execute the SYSTEM$FINISH_OAUTH_FLOW function, specifying as an argument the parameters you just copied from the browser address bar to update the secret with a refresh token.

请务必在 SYSTEM$START_OAUTH_FLOW 的同一会话中执行 SYSTEM$FINISH_OAUTH_FLOW。SYSTEM$FINISH_OAUTH_FLOW 使用它从 OAuth 服务器获取的访问令牌和刷新令牌,更新您在 SYSTEM$START_OAUTH_FLOW 中指定的密钥。

CALL SYSTEM$FINISH_OAUTH_FLOW( 'state=<remaining_url_text>' );
  • Google OAuth Playground
    1. In Google OAuth Playground (https://developers.google.com/oauthplayground/), select and authorize the Cloud Translation API as specified in step 1.

    2. In Step 2, click exchange authorization code for tokens, then copy the refresh token token value.

    3. 执行 CREATE SECRET 创建一个密钥,指定复制的刷新令牌值。

      For more information about the role of a secret in external access, including privileges required, see Creating a secret to represent credentials.

      CREATE OR REPLACE SECRET oauth_token
        TYPE = oauth2
        API_AUTHENTICATION = google_translate_oauth
        OAUTH_REFRESH_TOKEN = 'my-refresh-token';
  1. 使用网络规则和密钥创建外部访问集成。

    For more information about the role of an external access integration, including privileges required, see Creating an external access integration.

    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION google_apis_access_integration
      ALLOWED_NETWORK_RULES = (google_apis_network_rule)
      ALLOWED_AUTHENTICATION_SECRETS = (oauth_token)
      ENABLED = TRUE;
  2. Create a developer role that will be assigned to users who need to create a UDF or procedure that uses the integration.

    CREATE OR REPLACE ROLE developer;
    CREATE OR REPLACE ROLE user;
  3. Grant to the developer role privileges needed to create a UDF that uses the objects for external access. This includes the following:

    • 对密钥的 READ 权限。
    • 对包含密钥的架构的 USAGE 权限。
    • 对集成的 USAGE 权限。
      GRANT READ ON SECRET oauth_token TO ROLE developer;
      GRANT USAGE ON SCHEMA secret_schema TO ROLE developer;
      GRANT USAGE ON INTEGRATION google_apis_access_integration TO ROLE developer;
  4. Create a UDF google_translate_python that translates the specified text into a phrase in the specified language. For more information, see Using the external access integration in a function or procedure.

    USE ROLE developer;
    
    CREATE OR REPLACE FUNCTION google_translate_python(sentence STRING, language STRING)
      RETURNS STRING
      LANGUAGE PYTHON
      RUNTIME_VERSION = 3.12
      HANDLER = 'get_translation'
      EXTERNAL_ACCESS_INTEGRATIONS = (google_apis_access_integration)
      PACKAGES = ('snowflake-snowpark-python','requests')
      SECRETS = ('cred' = oauth_token )
    AS $$
    import _snowflake
    import requests
    import json
    session = requests.Session()
    def get_translation(sentence, language):
      token = _snowflake.get_oauth_access_token('cred')
      url = "https://translation.googleapis.com/language/translate/v2"
      data = {'q': sentence,'target': language}
      response = session.post(url, json = data, headers = {"Authorization": "Bearer " + token})
      return response.json()['data']['translations'][0]['translatedText']
    $$;
  5. Grant the USAGE privilege on the google_translate_python function so that those with the user role can call it.

    GRANT USAGE ON FUNCTION google_translate_python(string, string) TO ROLE user;
  6. Execute the google_translate_python function to translate a phrase.

    USE ROLE user;
    
    SELECT google_translate_python('Happy Thursday!', 'zh-CN');

这会生成以下输出。

-------------------------------------------------------
| GOOGLE_TRANSLATE_PYTHON('HAPPY THURSDAY!', 'ZH-CN') |
-------------------------------------------------------
|                                           |
-------------------------------------------------------

使用基本身份验证访问外部 Lambda 函数

以下步骤包括创建外部访问集成的示例代码,用于访问 Snowflake 外部的 lambda 函数。示例中使用的外部端点本身就是一个占位符,但它也可以是可在 REST 服务端点使用的一个函数。

The external access is used in a vectorized Python UDF that receives a Pandas DataFrame containing the data.

  1. Create a network rule lambda_network_rule representing the external location my_external_service (here, a placeholder value for the location of an external endpoint).

    For more information about the role of a network rule in external access, see Creating a network rule to represent the external network location.

    CREATE OR REPLACE NETWORK RULE lambda_network_rule
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('my_external_service');
  2. 创建一个密钥,代表外部服务所需的凭据。

本示例稍后的处理程序代码将使用用于 Python 的 Snowflake API 从密钥中检索凭据。

For more information about the role of the secret in external access, see Creating a secret to represent credentials.

CREATE OR REPLACE SECRET secret_password
  TYPE = PASSWORD
  USERNAME = 'my_user_name'
  PASSWORD = 'my_password';
  1. Create a developer role and grant to it READ privileges on the secret. This role will be assigned to users who need to create a UDF or procedure that uses the secret.

此外,创建用户将用来调用该函数的角色。

CREATE OR REPLACE ROLE developer;
CREATE OR REPLACE ROLE user;
  1. Grant to the developer role privileges needed to create a UDF that uses the objects for external access. This includes the following:

    • 对密钥的 READ 权限。
    • 对包含密钥的架构的 USAGE 权限。
    GRANT READ ON SECRET secret_password TO ROLE developer;
    GRANT USAGE ON SCHEMA secret_schema TO ROLE developer;
  2. 创建外部访问集成,通过创建的网络规则和密钥指定外部端点和凭据。

    For more information about the role of an external access integration, including privileges required, see Creating an external access integration.

    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION lambda_external_access_integration
      ALLOWED_NETWORK_RULES = (lambda_network_rule)
      ALLOWED_AUTHENTICATION_SECRETS = (secret_password)
      ENABLED = TRUE;
  3. Create a vectorized Python UDF return_double_column that accesses an external network location to process data received as a Pandas DataFrame (https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).

    For more information on using external access in a UDF, see Using the external access integration in a function or procedure.

    CREATE OR REPLACE FUNCTION return_double_column(x int)
      RETURNS INT
      LANGUAGE PYTHON
      EXTERNAL_ACCESS_INTEGRATIONS = (lambda_external_access_integration)
      SECRETS = ('cred' = secret_password)
      RUNTIME_VERSION = 3.12
      HANDLER = 'return_first_column'
      PACKAGES = ('pandas', 'requests')
    AS $$
    import pandas
    import numpy as np
    import json
    import requests
    import base64
    import _snowflake
    from _snowflake import vectorized
    from requests.auth import HTTPBasicAuth
    from requests.adapters import HTTPAdapter
    from requests.packages.urllib3.util.retry import Retry
    
    session = requests.Session()
    retries = Retry(total=10, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], allowed_methods = None)
    
    session.mount('https://', HTTPAdapter(max_retries=retries))
    
    @vectorized(input=pandas.DataFrame)
    def return_first_column(df):
      request_rows = []
    
      df.iloc[:,0] = df.iloc[:,0].astype(int)
      request_rows = np.column_stack([df.index, df.iloc[:,0]]).tolist()
    
      request_payload = {"data" : request_rows}
    
      username_password_object = _snowflake.get_username_password('cred');
      basic = HTTPBasicAuth(username_password_object.username, username_password_object.password)
    
      url = 'my_external_service'
    
      response = session.post(url, json=request_payload, auth=basic)
    
      response.raise_for_status()
      response_payload = json.loads(response.text)
    
      response_rows = response_payload["data"]
    
      return pandas.DataFrame(response_rows)[1]
    $$;
  4. Grant the USAGE privilege on the return_double_column function so that those with the user role can call it.

    GRANT USAGE ON FUNCTION return_double_column(int) TO ROLE user;
  5. Execute the return_double_column function, making a request to the external endpoint.

    Code in the following example creates a two-column table and inserts 100,000,000 rows containing 4-byte integers. The code then executes the return_double_column function, passing values from column a for processing by the external endpoint.

    CREATE OR REPLACE TABLE t1 (a INT, b INT);
    INSERT INTO t1 SELECT SEQ4(), SEQ4() FROM TABLE(GENERATOR(ROWCOUNT => 100000000));
    
    SELECT return_double_column(a) AS retval FROM t1 ORDER BY retval;

使用 AWS IAM 访问 Amazon S3

以下步骤包括示例代码,以使用 IAM 连接到 AWS S3 桶。

For more information about AWS IAM, see AWS IAM documentation (https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_identifiers.html).

  1. Create a network rule, aws_s3_network_rule, that represents the AWS S3 bucket at the location specified by the VALUE_LIST parameter.

    For more information about the role of a network rule in external access, see Creating a network rule to represent the external network location.

    CREATE OR REPLACE NETWORK RULE aws_s3_network_rule
      MODE = EGRESS
      TYPE = PRIVATE_HOST_PORT
      VALUE_LIST = ('external-access-iam-bucket.s3.us-west-2.amazonaws.com');
  2. Create a security integration to hold the AWS IAM Amazon Resource Name (ARN) credentials required to authenticate with the external network location specified by the aws_s3_network_rule network rule.

    For reference information on the command, including privileges required, see CREATE SECURITY INTEGRATION (AWS IAM Authentication).

    CREATE OR REPLACE SECURITY INTEGRATION aws_s3_security_integration
      TYPE = API_AUTHENTICATION
      AUTH_TYPE = AWS_IAM
      ENABLED = TRUE
      AWS_ROLE_ARN = 'arn:aws:iam::736112632310:role/external-access-iam-bucket';
  3. 获取 IAM USER 的 ARN 和 ID。

    1. 在您创建的安全集成上执行 DESC 命令。

      DESC SECURITY INTEGRATION aws_s3_security_integration;
    2. 从显示的输出中,复制以下属性的值以在下一步中使用:

      • API_AWS_IAM_USER_ARN
      • API_AWS_EXTERNAL_ID
  4. 授予必要的 IAM 用户权限以访问桶。

    Use the ARN and ID values when configuring a trust policy as described in Step 5 of Option 1: Configure a Snowflake storage integration to access Amazon S3.

  5. Create a secret of type CLOUD_PROVIDER_TOKEN to represent credentials required by the external service.

    Handler code later in this example retrieves the credentials from the secret using a Snowflake API.

    For more information about the role of the secret in external access, see Creating a secret to represent credentials.

    CREATE OR REPLACE SECRET aws_s3_access_token
      TYPE = CLOUD_PROVIDER_TOKEN
      API_AUTHENTICATION = aws_s3_security_integration;
  6. Create a developer role and grant to it READ privileges on the secret. This role will be assigned to users who need to create a UDF or procedure that uses the secret.

此外,创建用户将用来调用该函数的角色。

CREATE OR REPLACE ROLE developer;
CREATE OR REPLACE ROLE user;
  1. Grant to the developer role the privileges needed to create a UDF that uses the objects for external access. This includes the following:

    • 对密钥的 READ 权限。
    • 对包含密钥的架构的 USAGE 权限。
    GRANT READ ON SECRET aws_s3_access_token TO ROLE developer;
    GRANT USAGE ON SCHEMA secret_schema TO ROLE developer;
  2. Create an external access integration to specify the external endpoint and credentials through the network rule and secret you created.

    For more information about the role of an external access integration, including privileges required, see Creating an external access integration.

    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION aws_s3_external_access_integration
      ALLOWED_NETWORK_RULES = (aws_s3_network_rule)
      ALLOWED_AUTHENTICATION_SECRETS = (aws_s3_access_token)
      ENABLED = TRUE
      COMMENT = 'Testing S3 connectivity';
  3. 创建 UDF,使用外部访问集成来连接您创建的网络规则中指定的 Amazon S3 桶。

    The handler code uses Snowflake APIs to retrieve a token from the secret you created. From this token, you can use Snowflake APIs to retrieve values needed to create a session for connecting with Amazon S3, including an access key ID, secret access key, and session token.

    For more information on using external access in a UDF, see Using the external access integration in a function or procedure.

    CREATE OR REPLACE FUNCTION aws_s3_python_function()
      RETURNS VARCHAR
      LANGUAGE PYTHON
      EXTERNAL_ACCESS_INTEGRATIONS = (aws_s3_external_access_integration)
      RUNTIME_VERSION = '3.12'
      SECRETS = ('cred' = aws_s3_access_token)
      PACKAGES = ('boto3')
      HANDLER = 'main_handler'
    AS $$
    import boto3
    import _snowflake
    from botocore.config import Config
    
    def main_handler():
      # Get token object
      cloud_provider_object = _snowflake.get_cloud_provider_token('cred')
    
      # Boto3 configuration
      config = Config(
     retries=dict(total_max_attempts=9),
     connect_timeout=30,
     read_timeout=30,
     max_pool_connections=50
      )
    
      # Connect to S3 using boto3
      s3 = boto3.client(
     's3',
     region_name='us-west-2',
     aws_access_key_id=cloud_provider_object.access_key_id,
     aws_secret_access_key=cloud_provider_object.secret_access_key,
     aws_session_token=cloud_provider_object.token,
     config=config
      )
    
      # Use S3 object to upload/download
      return 'Successfully connected with S3'
    $$;
  4. Grant the USAGE privilege on the UDF so that those with the user role can call it.

GRANT USAGE ON FUNCTION return_double_column(int) TO ROLE user;
  1. 执行函数以连接到外部端点。
SELECT aws_s3_python_function();