外部网络访问示例

本主题提供从用户定义函数和过程访问外部网络位置的示例。

访问 PyPi 以在 Snowpark Container 中安装包

您可以通过创建外部访问集成来访问 PyPi 包仓库。当您希望允许 Container Runtime 上的 Notebook 用户使用 pip install 命令安装 pip 包时,您可以执行此操作。通过这种集成,您还可以允许 Snowpark Container Services 安装 pip 包。

此示例使用 snowflake.external_access.pypi_rule 中所述的 Snowflake 管理的网络规则 Snowflake 管理的网络规则

  1. 使用 snowflake.external_access.pypi_rule 网络规则创建外部访问集成。

    CREATE [OR REPLACE] EXTERNAL ACCESS INTEGRATION pypi_access
      ALLOWED_NETWORK_RULES = (snowflake.external_access.pypi_rule)
      ENABLED = true;
    
    Copy
  2. 为需要在 Container Runtime 上的 Snowpark Container 或 Notebook 中使用 developer 的用户创建 pip install 角色。

    CREATE OR REPLACE ROLE developer;
    
    Copy
  3. developer 角色授予必要权限,以使用您创建的外部访问集成。

    GRANT USAGE ON INTEGRATION pypi_access TO ROLE developer;
    
    Copy

使用 OAuth 访问 Google Translate API

以下步骤包括创建外部访问集成以访问 Google Translation API 的代码。这些步骤添加了安全集成和执行语句所需的权限。

  1. 创建代表外部位置的网络规则。

    有关网络规则在外部访问中的作用(包括所需的权限)的更多信息,请参阅 创建网络规则以表示外部网络位置

    CREATE OR REPLACE NETWORK RULE google_apis_network_rule
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('translation.googleapis.com');
    
    Copy
  2. 创建一个安全集成,用于保存通过 google_apis_network_rule 网络规则指定的外部网络位置进行身份验证所需的 OAuth 凭据。

    有关该命令的参考信息(包括所需的权限),请参阅 CREATE SECURITY INTEGRATION(外部 API 身份验证)

    CREATE OR REPLACE SECURITY INTEGRATION google_translate_oauth
      TYPE = API_AUTHENTICATION
      AUTH_TYPE = OAUTH2
      OAUTH_CLIENT_ID = 'my-client-id'
      OAUTH_CLIENT_SECRET = 'my-client-secret'
      OAUTH_TOKEN_ENDPOINT = 'https://oauth2.googleapis.com/token'
      OAUTH_AUTHORIZATION_ENDPOINT = 'https://accounts.google.com/o/oauth2/auth'
      OAUTH_ALLOWED_SCOPES = ('https://www.googleapis.com/auth/cloud-platform')
      ENABLED = TRUE;
    
    Copy
  3. 创建一个密钥,代表 google_translate_oauth 安全集成所包含的凭据。

    有关密钥在外部访问中的作用(包括所需的权限)的更多信息,请参阅 创建表示凭据的密钥

    密钥必须使用 OAUTH_REFRESH_TOKEN 参数指定刷新令牌。要从服务提供商(本例中为 Google Cloud Translation API 服务)处获取刷新令牌,可以使用提供商提供的方式或使用 Snowflake 系统函数。

    要使用刷新令牌创建密钥,要么 使用 Google OAuth Playground,要么使用 Snowflake 系统函数,具体方法如下:

    • Snowflake 系统函数

      1. 执行 CREATE SECRET 创建一个密钥。将在后面的步骤中用刷新令牌更新该密钥。

        USE DATABASE my_db;
        USE SCHEMA secret_schema;
        
        CREATE OR REPLACE SECRET oauth_token
          TYPE = oauth2
          API_AUTHENTICATION = google_translate_oauth;
        
        Copy
      2. 执行 SYSTEM$START_OAUTH_FLOW 函数以检索 URL,并在其实参中指定之前创建的密钥名称,从而获取刷新令牌。

        CALL SYSTEM$START_OAUTH_FLOW( 'my_db.secret_schema.oauth_token' );
        
        Copy

        该函数生成一个 URL,可以用它来完成 OAuth 意见征求流程。

      3. 在浏览器中访问生成的 URL 并完成 OAuth2 意见征求流程。完成后,让浏览器保持打开,直到流程的最后一页。

      4. 从浏览器地址栏中,复制意见征求流程最后一页 URL 中问号后面的所有文本。

      5. 执行 SYSTEM$FINISH_OAUTH_FLOW 函数,将刚刚从浏览器地址栏复制的参数指定为实参,以使用刷新令牌更新密钥。

        请务必在 SYSTEM$START_OAUTH_FLOW 的同一会话中执行 SYSTEM$FINISH_OAUTH_FLOW。SYSTEM$FINISH_OAUTH_FLOW 使用它从 OAuth 服务器获取的访问令牌和刷新令牌,更新您在 SYSTEM$START_OAUTH_FLOW 中指定的密钥。

        CALL SYSTEM$FINISH_OAUTH_FLOW( 'state=<remaining_url_text>' );
        
        Copy
    • Google OAuth Playground

      1. 在 Google OAuth Playground (https://developers.google.com/oauthplayground/) 中,选择并授权第 1 步中指定的 Cloud Translation API。

      2. 在第 2 步中,点击 exchange authorization code for tokens,然后复制 refresh token 令牌值。

      3. 执行 CREATE SECRET 创建一个密钥,指定复制的刷新令牌值。

        有关密钥在外部访问中的作用(包括所需的权限)的更多信息,请参阅 创建表示凭据的密钥

        CREATE OR REPLACE SECRET oauth_token
          TYPE = oauth2
          API_AUTHENTICATION = google_translate_oauth
          OAUTH_REFRESH_TOKEN = 'my-refresh-token';
        
        Copy
  4. 使用网络规则和密钥创建外部访问集成。

    有关外部访问集成的作用(包括所需的权限)的更多信息,请参阅 创建外部访问集成

    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION google_apis_access_integration
      ALLOWED_NETWORK_RULES = (google_apis_network_rule)
      ALLOWED_AUTHENTICATION_SECRETS = (oauth_token)
      ENABLED = TRUE;
    
    Copy
  5. 创建 developer 角色,分配给需要创建 UDF 的用户或使用集成的过程。

    CREATE OR REPLACE ROLE developer;
    CREATE OR REPLACE ROLE user;
    
    Copy
  6. 授予 developer 角色创建使用对象进行外部访问的 UDF 所需的权限。这包括以下内容:

    • 对密钥的 READ 权限。

    • 对包含密钥的架构的 USAGE 权限。

    • 对集成的 USAGE 权限。

      GRANT READ ON SECRET oauth_token TO ROLE developer;
      GRANT USAGE ON SCHEMA secret_schema TO ROLE developer;
      GRANT USAGE ON INTEGRATION google_apis_access_integration TO ROLE developer;
      
      Copy
  7. 创建一个 UDF google_translate_python,将指定文本翻译成指定语言的短语。有关更多信息,请参阅 在函数或过程中使用外部访问集成

    USE ROLE developer;
    
    CREATE OR REPLACE FUNCTION google_translate_python(sentence STRING, language STRING)
    RETURNS STRING
    LANGUAGE PYTHON
    RUNTIME_VERSION = 3.9
    HANDLER = 'get_translation'
    EXTERNAL_ACCESS_INTEGRATIONS = (google_apis_access_integration)
    PACKAGES = ('snowflake-snowpark-python','requests')
    SECRETS = ('cred' = oauth_token )
    AS
    $$
    import _snowflake
    import requests
    import json
    session = requests.Session()
    def get_translation(sentence, language):
      token = _snowflake.get_oauth_access_token('cred')
      url = "https://translation.googleapis.com/language/translate/v2"
      data = {'q': sentence,'target': language}
      response = session.post(url, json = data, headers = {"Authorization": "Bearer " + token})
      return response.json()['data']['translations'][0]['translatedText']
    $$;
    
    Copy
  8. 授予对 google_translate_python 函数的 USAGE 权限,以便拥有 user 角色的用户可以调用该函数。

    GRANT USAGE ON FUNCTION google_translate_python(string, string) TO ROLE user;
    
    Copy
  9. 执行 google_translate_python 函数来翻译短语。

    USE ROLE user;
    
    SELECT google_translate_python('Happy Thursday!', 'zh-CN');
    
    Copy

    这会生成以下输出。

    -------------------------------------------------------
    | GOOGLE_TRANSLATE_PYTHON('HAPPY THURSDAY!', 'ZH-CN') |
    -------------------------------------------------------
    | 快乐星期四!                                          |
    -------------------------------------------------------
    

使用基本身份验证访问外部 Lambda 函数

以下步骤包括创建外部访问集成的示例代码,用于访问 Snowflake 外部的 lambda 函数。示例中使用的外部端点本身就是一个占位符,但它也可以是可在 REST 服务端点使用的一个函数。

外部访问用于 矢量化 Python UDF,它接收包含数据的 Pandas DataFrame。

  1. 创建 lambda_network_rule 代表外部位置 my_external_service 的网络规则(这里是外部端点位置的占位符值)。

    有关网络规则在外部访问中作用的更多信息,请参阅 创建网络规则以表示外部网络位置

    CREATE OR REPLACE NETWORK RULE lambda_network_rule
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('my_external_service');
    
    Copy
  2. 创建一个密钥,代表外部服务所需的凭据。

    本示例稍后的处理程序代码将使用用于 Python 的 Snowflake API 从密钥中检索凭据。

    有关密钥在外部访问中作用的更多信息,请参阅 创建表示凭据的密钥

    CREATE OR REPLACE SECRET secret_password
      TYPE = PASSWORD
      USERNAME = 'my_user_name'
      PASSWORD = 'my_password';
    
    Copy
  3. 创建一个 developer 角色,并授予其对密钥的 READ 权限。该角色将分配给需要创建使用该密钥的 UDF 或过程的用户。

    此外,创建用户将用来调用该函数的角色。

    CREATE OR REPLACE ROLE developer;
    CREATE OR REPLACE ROLE user;
    
    Copy
  4. 授予 developer 角色创建使用对象进行外部访问的 UDF 所需的权限。这包括以下内容:

    • 对密钥的 READ 权限。

    • 对包含密钥的架构的 USAGE 权限。

    GRANT READ ON SECRET secret_password TO ROLE developer;
    GRANT USAGE ON SCHEMA secret_schema TO ROLE developer;
    
    Copy
  5. 创建外部访问集成,通过创建的网络规则和密钥指定外部端点和凭据。

    有关外部访问集成的作用(包括所需的权限)的更多信息,请参阅 创建外部访问集成

    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION lambda_external_access_integration
      ALLOWED_NETWORK_RULES = (lambda_network_rule)
      ALLOWED_AUTHENTICATION_SECRETS = (secret_password)
      ENABLED = TRUE;
    
    Copy
  6. 创建一个 矢量化 Python UDF return_double_column,访问外部网络位置,处理以 Pandas DataFrame (https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) 的形式接收的数据。

    有关在 UDF 中使用外部访问的更多信息,请参阅 在函数或过程中使用外部访问集成

    CREATE OR REPLACE FUNCTION return_double_column(x int)
    RETURNS INT
    LANGUAGE PYTHON
    EXTERNAL_ACCESS_INTEGRATIONS = (lambda_external_access_integration)
    SECRETS = ('cred' = secret_password)
    RUNTIME_VERSION = 3.9
    HANDLER = 'return_first_column'
    PACKAGES = ('pandas', 'requests')
    AS $$
    import pandas
    import numpy as np
    import json
    import requests
    import base64
    import _snowflake
    from _snowflake import vectorized
    from requests.auth import HTTPBasicAuth
    from requests.adapters import HTTPAdapter
    from requests.packages.urllib3.util.retry import Retry
    
    session = requests.Session()
    retries = Retry(total=10, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], allowed_methods = None)
    
    session.mount('https://', HTTPAdapter(max_retries=retries))
    
    @vectorized(input=pandas.DataFrame)
    def return_first_column(df):
      request_rows = []
    
      df.iloc[:,0] = df.iloc[:,0].astype(int)
      request_rows = np.column_stack([df.index, df.iloc[:,0]]).tolist()
    
      request_payload = {"data" : request_rows}
    
      username_password_object = _snowflake.get_username_password('cred');
      basic = HTTPBasicAuth(username_password_object.username, username_password_object.password)
    
      url = 'my_external_service'
    
      response = session.post(url, json=request_payload, auth=basic)
    
      response.raise_for_status()
      response_payload = json.loads(response.text)
    
      response_rows = response_payload["data"]
    
      return pandas.DataFrame(response_rows)[1]
    $$;
    
    Copy
  7. 授予对 return_double_column 函数的 USAGE 权限,以便拥有 user 角色的用户可以调用该函数。

    GRANT USAGE ON FUNCTION return_double_column(int) TO ROLE user;
    
    Copy
  8. 执行 return_double_column 函数,向外部端点发出请求。

    以下示例中的代码创建了一个双列表,并插入了 100,000,000 行包含 4 字节整数的数据。然后,代码执行 return_double_column 函数,从 a 列传递值,供外部端点处理。

    CREATE OR REPLACE TABLE t1 (a INT, b INT);
    INSERT INTO t1 SELECT SEQ4(), SEQ4() FROM TABLE(GENERATOR(ROWCOUNT => 100000000));
    
    SELECT return_double_column(a) AS retval FROM t1 ORDER BY retval;
    
    Copy

使用 AWS IAM 访问 Amazon S3

以下步骤包括示例代码,以使用 IAM 连接到 AWS S3 桶。

有关 AWS IAM 的更多信息,请参阅 AWS IAM 文档 (https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_identifiers.html)。

  1. 创建网络规则 aws_s3_network_rule,表示 VALUE_LIST 参数指定位置的 AWS S3 桶。

    有关网络规则在外部访问中作用的更多信息,请参阅 创建网络规则以表示外部网络位置

    CREATE OR REPLACE NETWORK RULE aws_s3_network_rule
      MODE = EGRESS
      TYPE = PRIVATE_HOST_PORT
      VALUE_LIST = ('external-access-iam-bucket.s3.us-west-2.amazonaws.com');
    
    Copy
  2. 创建一个安全集成,用于保存在 aws_s3_network_rule 网络规则指定的外部网络位置进行身份验证时所需的 AWS IAM Amazon Resource Name (ARN) 凭据。

    有关该命令的参考信息(包括所需的权限),请参阅 CREATE SECURITY INTEGRATION(AWS IAM 身份验证)

    CREATE OR REPLACE SECURITY INTEGRATION aws_s3_security_integration
      TYPE = API_AUTHENTICATION
      AUTH_TYPE = AWS_IAM
      ENABLED = TRUE
      AWS_ROLE_ARN = 'arn:aws:iam::736112632310:role/external-access-iam-bucket';
    
    Copy
  3. 获取 IAM USER 的 ARN 和 ID。

    1. 在您创建的安全集成上执行 DESC 命令。

      DESC SECURITY INTEGRATION aws_s3_security_integration;
      
      Copy
    2. 从显示的输出中,复制以下属性的值以在下一步中使用:

      • API_AWS_IAM_USER_ARN

      • API_AWS_EXTERNAL_ID

  4. 授予必要的 IAM 用户权限以访问桶。

    按照 选项 1:配置 Snowflake 存储集成以访问 Amazon S3 的“第 5 步”中所述配置信任策略时,使用 ARN 和 ID 值。

  5. 创建类型 CLOUD_PROVIDER_TOKEN 的 密钥,表示外部服务所需的凭据。

    本示例后面的处理程序代码使用 Snowflake API 从密钥中检索凭据。

    有关密钥在外部访问中作用的更多信息,请参阅 创建表示凭据的密钥

    CREATE OR REPLACE SECRET aws_s3_access_token
      TYPE = CLOUD_PROVIDER_TOKEN
      API_AUTHENTICATION = aws_s3_security_integration;
    
    Copy
  6. 创建一个 developer 角色,并授予其对密钥的 READ 权限。该角色将分配给需要创建使用该密钥的 UDF 或过程的用户。

    此外,创建用户将用来调用该函数的角色。

    CREATE OR REPLACE ROLE developer;
    CREATE OR REPLACE ROLE user;
    
    Copy
  7. developer 角色授予必要权限以创建 UDF,使用对象进行外部访问。这包括以下内容:

    • 对密钥的 READ 权限。

    • 对包含密钥的架构的 USAGE 权限。

    GRANT READ ON SECRET aws_s3_access_token TO ROLE developer;
    GRANT USAGE ON SCHEMA secret_schema TO ROLE developer;
    
    Copy
  8. 创建 外部访问集成,通过创建的网络规则和密钥指定外部端点和凭据。

    有关外部访问集成的作用(包括所需的权限)的更多信息,请参阅 创建外部访问集成

    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION aws_s3_external_access_integration
      ALLOWED_NETWORK_RULES = (aws_s3_network_rule)
      ALLOWED_AUTHENTICATION_SECRETS = (aws_s3_access_token)
      ENABLED = TRUE
      COMMENT = 'Testing S3 connectivity';
    
    Copy
  9. 创建 UDF,使用外部访问集成来连接您创建的网络规则中指定的 Amazon S3 桶。

    处理程序代码使用 Snowflake APIs 从您创建的密钥中检索令牌。通过此令牌,您可以使用 Snowflake APIs 检索必要的值,以创建与 Amazon S3 连接的会话,包括访问密钥 ID、秘密访问密钥和会话令牌。

    有关在 UDF 中使用外部访问的更多信息,请参阅 在函数或过程中使用外部访问集成

    CREATE OR REPLACE FUNCTION aws_s3_python_function()
      RETURNS VARCHAR
      LANGUAGE PYTHON
      EXTERNAL_ACCESS_INTEGRATIONS = (aws_s3_external_access_integration)
      RUNTIME_VERSION = '3.9'
      SECRETS = ('cred' = aws_s3_access_token)
      PACKAGES = ('boto3')
      HANDLER = 'main_handler'
    AS
    $$
    import boto3
    import _snowflake
    from botocore.config import Config
    
    def main_handler():
      # Get token object
      cloud_provider_object = _snowflake.get_cloud_provider_token('cred')
    
      # Boto3 configuration
      config = Config(
        retries=dict(total_max_attempts=9),
        connect_timeout=30,
        read_timeout=30,
        max_pool_connections=50
      )
    
      # Connect to S3 using boto3
      s3 = boto3.client(
        's3',
        region_name='us-west-2',
        aws_access_key_id=cloud_provider_object.access_key_id,
        aws_secret_access_key=cloud_provider_object.secret_access_key,
        aws_session_token=cloud_provider_object.token,
        config=config
      )
    
      # Use S3 object to upload/download
      return 'Successfully connected with S3'
    $$;
    
    Copy
  10. 授予对 UDF 的 USAGE 权限,以便具有 user 角色的用户可以调用它。

    GRANT USAGE ON FUNCTION return_double_column(int) TO ROLE user;
    
    Copy
  11. 执行函数以连接到外部端点。

    SELECT aws_s3_python_function();
    
    Copy
语言: 中文