教程 1:创建 Snowpark Container Services 服务

简介

完成 通用设置 后,您就可以创建服务了。在本教程中,您将创建一项服务(名为 echo_service),该服务仅回显您以输入形式提供的文本。例如,如果输入字符串为“Hello World”,则服务将返回“I said, Hello World”。

本教程分为两个部分:

第 1 部分:创建并测试服务。 您将下载为本教程提供的代码,并按照分步说明进行操作:

  1. 下载为本教程提供的服务代码。

  2. 为 Snowpark Container Services 构建 Docker 镜像,并将该镜像上传到账户的仓库中。

  3. 提供服务规范文件和用于运行服务的计算池,以创建服务。

  4. 创建服务函数,与服务进行通信。

  5. 使用服务。向服务发送回显请求并验证响应。

第 2 部分:了解服务。本部分概述服务代码,并重点介绍不同组件的协作方式。

1:下载服务代码

提供用于创建 Echo 服务的代码(Python 应用程序)。

  1. 下载:download: SnowparkContainerServices -Tutorials.zip </samples/spcs/SnowparkContainerServices-Tutorials.zip>

  2. 解压内容,其中包含每个教程的一个目录。Tutorial-1 目录包含以下文件:

    • Dockerfile

    • echo_service.py

    • templates/basic_ui.html

2:构建镜像并上传

为 Snowpark Container Services 支持的 linux/amd64 平台构建镜像,然后将镜像上传到账户中的镜像仓库(请参阅 通用设置)。

您将需要有关仓库(仓库 URL 和注册表主机名)的信息,才能构建和上传镜像。有关更多信息,请参阅 注册表和镜像仓库

获取有关仓库的信息

  1. 要获取镜像仓库 URL,请执行 SHOW IMAGE REPOSITORIES SQL 命令。

    SHOW IMAGE REPOSITORIES;
    
    Copy
    • 输出中的 repository_url 列提供 URL。示例如下:

      <orgname>-<acctname>.registry.snowflakecomputing.cn/tutorial_db/data_schema/tutorial_repository
      
    • 仓库 URL 中的主机名是注册表主机名。示例如下:

      <orgname>-<acctname>.registry.snowflakecomputing.cn
      

构建镜像并将其上传到仓库

  1. 打开终端窗口,然后切换到包含解压文件的目录。

  2. 要构建 Docker 镜像,请使用 Docker CLI 执行以下 docker build 命令。请注意,该命令指定当前工作目录 (.),作为用于构建镜像的文件的 PATH

    docker build --rm --platform linux/amd64 -t <repository_url>/<image_name> .
    
    Copy
    • 对于 image_name,请使用 my_echo_service_image:latest

    示例

    docker build --rm --platform linux/amd64 -t myorg-myacct.registry.snowflakecomputing.cn/tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest .
    
    Copy
  3. 将镜像上传到 Snowflake 账户中的仓库。要使 Docker 代表您将镜像上传到仓库,您必须首先 使用注册表对 Docker 进行身份验证

    1. 要使用镜像注册表对 Docker 进行身份验证,请执行以下命令。

      docker login <registry_hostname> -u <username>
      
      Copy
      • 对于 username,请指定您的 Snowflake 用户名。Docker 将提示您输入密码。

    2. 要上传镜像,请执行以下命令:

      docker push <repository_url>/<image_name>
      
      Copy

      示例

      docker push myorg-myacct.registry.snowflakecomputing.cn/tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest
      
      Copy

3:创建服务

在本部分中,您将创建一项服务,并创建服务函数以与服务进行通信。

要创建服务,您需要满足以下条件:

  • 计算池。Snowflake 在指定的计算池中运行您的服务。您在通用设置中创建了一个计算池。

  • 服务规范。此规范为 Snowflake 提供了配置和运行服务所需的信息。有关更多信息,请参阅 Snowpark Container Services:使用服务。在本教程中,您将在 CREATE SERVICE 命令以内联方式提供规范。您还可以将规范保存到 Snowflake 暂存区的文件中,并在 CREATE SERVICE 命令中提供文件信息,如教程 2 中所示。

服务函数是用于与服务进行通信的方法之一。服务函数是用户定义的函数 (UDF),与服务端点关联。当执行服务函数时,服务函数会向服务端点发送请求并接收响应。

  1. 验证计算池是否已准备就绪,以及您是否处于创建服务的正确上下文中。

    1. 之前,您在 通用设置 步骤中设置了上下文。为确保您处于针对本步骤中 SQL 语句的正确上下文中,请执行以下命令:

    USE ROLE test_role;
    USE DATABASE tutorial_db;
    USE SCHEMA data_schema;
    USE WAREHOUSE tutorial_warehouse;
    
    Copy
    1. 要确保您在 通用设置 中创建的计算池已准备就绪,请执行 DESCRIBE COMPUTE POOL,并且验证 stateACTIVE,还是 IDLE。如果 stateSTARTING,您需要等到 state 更改为 ACTIVEIDLE

    DESCRIBE COMPUTE POOL tutorial_compute_pool;
    
    Copy
  2. 要创建服务,请使用 test_role 执行以下命令:

    CREATE SERVICE echo_service
      IN COMPUTE POOL tutorial_compute_pool
      FROM SPECIFICATION $$
        spec:
          containers:
          - name: echo
            image: /tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest
            env:
              SERVER_PORT: 8000
              CHARACTER_NAME: Bob
            readinessProbe:
              port: 8000
              path: /healthcheck
          endpoints:
          - name: echoendpoint
            port: 8000
            public: true
          $$
       MIN_INSTANCES=1
       MAX_INSTANCES=1;
    
    Copy

    备注

    如果已存在具有该名称的服务,请使用 DROP SERVICE 命令删除之前创建的服务,然后再创建此服务。

  3. 执行以下 SQL 命令,获取有关您刚刚创建的服务的详细信息。有关更多信息,请参阅 Snowpark Container Services:使用服务

    • 要列出账户中的服务,请执行 SHOW SERVICES 命令:

      SHOW SERVICES;
      
      Copy
    • 要获取服务的状态,请调用系统函数 SYSTEM$GET_SERVICE_STATUS:

      SELECT SYSTEM$GET_SERVICE_STATUS('echo_service');
      
      Copy
    • 要获取服务的相关信息,请执行 DESCRIBE SERVICE 命令:

      DESCRIBE SERVICE echo_service;
      
      Copy
  4. 要创建服务函数,请执行以下命令:

    CREATE FUNCTION my_echo_udf (InputText varchar)
      RETURNS varchar
      SERVICE=echo_service
      ENDPOINT=echoendpoint
      AS '/echo';
    
    Copy

    请注意以下事项:

    • SERVICE 属性将 UDF 与 echo_service 服务关联。

    • ENDPOINT 属性将 UDF 与 echoendpoint 服务内的端点关联。

    • AS '/echo' 指定 Echo 服务器的 HTTP 路径。您可以在服务代码 (echo_service.py) 中找到此路径。

4:使用服务

首先,为本部分中的 SQL 语句设置上下文,执行以下命令:

USE ROLE test_role;
USE DATABASE tutorial_db;
USE SCHEMA data_schema;
USE WAREHOUSE tutorial_warehouse;
Copy

现在,您可以与 Echo 服务进行通信。

  1. 使用服务函数: 您可以在查询中调用服务函数。示例服务函数 (my_echo_udf) 可以将单个字符串或字符串列表作为输入。

    示例 1.1:传递单个字符串

    • 要调用 my_echo_udf 服务函数,请执行以下 SELECT 语句,传递一个输入字符串 ('hello'):

      SELECT my_echo_udf('hello!');
      
      Copy

      Snowflake 向服务端点 (echoendpoint) 发送 POST 请求。收到请求后,服务会在响应中回显输入字符串。

      +--------------------------+
      | **MY_ECHO_UDF('HELLO!')**|
      |------------------------- |
      | Bob said hello!          |
      +--------------------------+
      

    示例 1.2:传递字符串列表

    当您向服务函数传递字符串列表时,Snowflake 会对这些输入字符串进行批处理,并向服务发送一系列 POST 请求。在服务处理完所有字符串后,Snowflake 将合并结果并返回它们。

    以下示例将表列作为输入传递给服务函数。

    1. 创建包含多个字符串的表:

      CREATE TABLE messages (message_text VARCHAR)
        AS (SELECT * FROM (VALUES ('Thank you'), ('Hello'), ('Hello World')));
      
      Copy
    2. 验证是否已创建表:

      SELECT * FROM messages;
      
      Copy
    3. 要调用服务函数,请执行以下 SELECT 语句,以输入的形式传递表行:

      SELECT my_echo_udf(message_text) FROM messages;
      
      Copy

      输出:

      +---------------------------+
      | MY_ECHO_UDF(MESSAGE_TEXT) |
      |---------------------------|
      | Bob said Thank you        |
      | Bob said Hello            |
      | Bob said Hello World      |
      +---------------------------+
      
  2. 使用 Web 浏览器: 服务会公开端点(请参阅 CREATE SERVICE 命令中提供的内联规范)。因此,您可以登录服务向互联网公开的 Web UI,然后从 Web 浏览器向该服务发送请求。

    1. 找到服务公开的公众端点的 URL:

      SHOW ENDPOINTS IN SERVICE echo_service;
      
      Copy

      响应中的 ingress_url 列提供 URL。

      示例

      p6bye-myorg-myacct.snowflakecomputing.app
      
    2. /ui 追加到端点 URL,并将其粘贴到 Web 浏览器中。这样会导致服务执行 ui() 函数(请参阅 echo_service.py)。

      请注意,首次访问端点 URL 时,系统将要求您登录 Snowflake。对于此测试,请使用用于创建服务的同一用户,以确保用户具有必要的权限。

      用于与 Echo 服务进行通信的 Web 表单。
    3. 输入 框中输入字符串“Hello”,然后按 Return

      用于显示 Echo 服务的响应的 Web 表单。

    备注

    您可以通过编程方式访问公共端点。有关示例代码,请参阅 来自 Snowflake 外部的公共端点访问及身份验证。请注意,您需要将 /ui 追加到代码中的端点 URL,以便 Snowflake 可以将请求路由到服务代码中的 ui() 函数。

5:(可选)以编程方式访问公共端点

在上一部分中,您使用 Web 浏览器测试了 Echo 服务。在浏览器中,您访问了公共端点(入口端点)并使用服务公开的 Web UI 发送了请求。在本部分中,您将以编程方式测试同一公共端点。

该示例使用 密钥对身份验证。使用您提供的密钥对,示例代码首先生成一个 JSON Web 令牌 (JWT),然后用 Snowflake 交换令牌,获得 OAuth 令牌。然后,代码使用 OAuth 令牌,在与 Echo 服务公共端点通信时进行身份验证。

先决条件

确保您拥有以下信息:

  • 公共端点的入口 URL。 执行 SHOW ENDPOINTS IN SERVICE 命令来获取 URL:

    SHOW ENDPOINTS IN SERVICE echo_service;
    
    Copy
  • 您的 Snowflake 账户名称。 有关更多信息,请参阅 常见设置:确认您已准备好继续

  • 您的 Snowflake 账户 URL:<acctname>.snowflakecomputing.cn

  • Snowflake 账户中的用户名。 这是您在 常见设置:创建 Snowflake 对象 中选择的用户。您以该用户身份登录 Snowflake 并测试编程访问。

  • 角色名称: 您创建了一个角色 (test_role),作为常见设置的一部分。用户使用此角色来执行操作。

设置

按照步骤,以编程方式与 Echo 服务通信。使用提供的 Python 代码,您可以向 Echo 服务公开的公共端点发送请求。

  1. 在命令提示符处,创建目录并导航到该目录。

  2. 为用户配置密钥对身份验证。

    1. 生成 密钥对

      1. 生成私钥。为了简化练习步骤,生成一个未加密的私钥。您也可以使用加密私钥,但需要您输入密码。

        openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
        
        Copy
      2. 通过引用您创建的私钥生成公钥 (rsa_key.pub)。

        openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
        
        Copy
    2. 验证您是否已在目录中生成私钥和公钥。

    3. 将公钥分配给您用来测试编程访问的用户。这使用户可以指定用于身份验证的密钥。

      ALTER USER <user-name> SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
      
      Copy
  3. 将提供的示例代码保存在 Python 文件中。

    1. 将以下代码保存到 generateJWT.py

      # To run this on the command line, enter:
      #   python3 generateJWT.py --account=<account_identifier> --user=<username> --private_key_file_path=<path_to_private_key_file>
      
      from cryptography.hazmat.primitives.serialization import load_pem_private_key
      from cryptography.hazmat.primitives.serialization import Encoding
      from cryptography.hazmat.primitives.serialization import PublicFormat
      from cryptography.hazmat.backends import default_backend
      from datetime import timedelta, timezone, datetime
      import argparse
      import base64
      from getpass import getpass
      import hashlib
      import logging
      import sys
      
      # This class relies on the PyJWT module (https://pypi.org/project/PyJWT/).
      import jwt
      
      logger = logging.getLogger(__name__)
      
      try:
          from typing import Text
      except ImportError:
          logger.debug('# Python 3.5.0 and 3.5.1 have incompatible typing modules.', exc_info=True)
          from typing_extensions import Text
      
      ISSUER = "iss"
      EXPIRE_TIME = "exp"
      ISSUE_TIME = "iat"
      SUBJECT = "sub"
      
      # If you generated an encrypted private key, implement this method to return
      # the passphrase for decrypting your private key. As an example, this function
      # prompts the user for the passphrase.
      def get_private_key_passphrase():
          return getpass('Passphrase for private key: ')
      
      class JWTGenerator(object):
          """
          Creates and signs a JWT with the specified private key file, username, and account identifier. The JWTGenerator keeps the
          generated token and only regenerates the token if a specified period of time has passed.
          """
          LIFETIME = timedelta(minutes=59)  # The tokens will have a 59-minute lifetime
          RENEWAL_DELTA = timedelta(minutes=54)  # Tokens will be renewed after 54 minutes
          ALGORITHM = "RS256"  # Tokens will be generated using RSA with SHA256
      
          def __init__(self, account: Text, user: Text, private_key_file_path: Text,
                      lifetime: timedelta = LIFETIME, renewal_delay: timedelta = RENEWAL_DELTA):
              """
              __init__ creates an object that generates JWTs for the specified user, account identifier, and private key.
              :param account: Your Snowflake account identifier. See https://docs.snowflake.com/en/user-guide/admin-account-identifier.html. Note that if you are using the account locator, exclude any region information from the account locator.
              :param user: The Snowflake username.
              :param private_key_file_path: Path to the private key file used for signing the JWTs.
              :param lifetime: The number of minutes (as a timedelta) during which the key will be valid.
              :param renewal_delay: The number of minutes (as a timedelta) from now after which the JWT generator should renew the JWT.
              """
      
              logger.info(
                  """Creating JWTGenerator with arguments
                  account : %s, user : %s, lifetime : %s, renewal_delay : %s""",
                  account, user, lifetime, renewal_delay)
      
              # Construct the fully qualified name of the user in uppercase.
              self.account = self.prepare_account_name_for_jwt(account)
              self.user = user.upper()
              self.qualified_username = self.account + "." + self.user
      
              self.lifetime = lifetime
              self.renewal_delay = renewal_delay
              self.private_key_file_path = private_key_file_path
              self.renew_time = datetime.now(timezone.utc)
              self.token = None
      
              # Load the private key from the specified file.
              with open(self.private_key_file_path, 'rb') as pem_in:
                  pemlines = pem_in.read()
                  try:
                      # Try to access the private key without a passphrase.
                      self.private_key = load_pem_private_key(pemlines, None, default_backend())
                  except TypeError:
                      # If that fails, provide the passphrase returned from get_private_key_passphrase().
                      self.private_key = load_pem_private_key(pemlines, get_private_key_passphrase().encode(), default_backend())
      
          def prepare_account_name_for_jwt(self, raw_account: Text) -> Text:
              """
              Prepare the account identifier for use in the JWT.
              For the JWT, the account identifier must not include the subdomain or any region or cloud provider information.
              :param raw_account: The specified account identifier.
              :return: The account identifier in a form that can be used to generate the JWT.
              """
              account = raw_account
              if not '.global' in account:
                  # Handle the general case.
                  idx = account.find('.')
                  if idx > 0:
                      account = account[0:idx]
              else:
                  # Handle the replication case.
                  idx = account.find('-')
                  if idx > 0:
                      account = account[0:idx]
              # Use uppercase for the account identifier.
              return account.upper()
      
          def get_token(self) -> Text:
              """
              Generates a new JWT. If a JWT has already been generated earlier, return the previously generated token unless the
              specified renewal time has passed.
              :return: the new token
              """
              now = datetime.now(timezone.utc)  # Fetch the current time
      
              # If the token has expired or doesn't exist, regenerate the token.
              if self.token is None or self.renew_time <= now:
                  logger.info("Generating a new token because the present time (%s) is later than the renewal time (%s)",
                              now, self.renew_time)
                  # Calculate the next time we need to renew the token.
                  self.renew_time = now + self.renewal_delay
      
                  # Prepare the fields for the payload.
                  # Generate the public key fingerprint for the issuer in the payload.
                  public_key_fp = self.calculate_public_key_fingerprint(self.private_key)
      
                  # Create our payload
                  payload = {
                      # Set the issuer to the fully qualified username concatenated with the public key fingerprint.
                      ISSUER: self.qualified_username + '.' + public_key_fp,
      
                      # Set the subject to the fully qualified username.
                      SUBJECT: self.qualified_username,
      
                      # Set the issue time to now.
                      ISSUE_TIME: now,
      
                      # Set the expiration time, based on the lifetime specified for this object.
                      EXPIRE_TIME: now + self.lifetime
                  }
      
                  # Regenerate the actual token
                  token = jwt.encode(payload, key=self.private_key, algorithm=JWTGenerator.ALGORITHM)
                  # If you are using a version of PyJWT prior to 2.0, jwt.encode returns a byte string instead of a string.
                  # If the token is a byte string, convert it to a string.
                  if isinstance(token, bytes):
                    token = token.decode('utf-8')
                  self.token = token
                  logger.info("Generated a JWT with the following payload: %s", jwt.decode(self.token, key=self.private_key.public_key(), algorithms=[JWTGenerator.ALGORITHM]))
      
              return self.token
      
          def calculate_public_key_fingerprint(self, private_key: Text) -> Text:
              """
              Given a private key in PEM format, return the public key fingerprint.
              :param private_key: private key string
              :return: public key fingerprint
              """
              # Get the raw bytes of public key.
              public_key_raw = private_key.public_key().public_bytes(Encoding.DER, PublicFormat.SubjectPublicKeyInfo)
      
              # Get the sha256 hash of the raw bytes.
              sha256hash = hashlib.sha256()
              sha256hash.update(public_key_raw)
      
              # Base64-encode the value and prepend the prefix 'SHA256:'.
              public_key_fp = 'SHA256:' + base64.b64encode(sha256hash.digest()).decode('utf-8')
              logger.info("Public key fingerprint is %s", public_key_fp)
      
              return public_key_fp
      
      def main():
          logging.basicConfig(stream=sys.stdout, level=logging.INFO)
          cli_parser = argparse.ArgumentParser()
          cli_parser.add_argument('--account', required=True, help='The account identifier (e.g. "myorganization-myaccount" for "myorganization-myaccount.snowflakecomputing.cn").')
          cli_parser.add_argument('--user', required=True, help='The user name.')
          cli_parser.add_argument('--private_key_file_path', required=True, help='Path to the private key file used for signing the JWT.')
          cli_parser.add_argument('--lifetime', type=int, default=59, help='The number of minutes that the JWT should be valid for.')
          cli_parser.add_argument('--renewal_delay', type=int, default=54, help='The number of minutes before the JWT generator should produce a new JWT.')
          args = cli_parser.parse_args()
      
          token = JWTGenerator(args.account, args.user, args.private_key_file_path, timedelta(minutes=args.lifetime), timedelta(minutes=args.renewal_delay)).get_token()
          print('JWT:')
          print(token)
      
      if __name__ == "__main__":
          main()
      
      Copy
    2. 将以下代码保存到 access-via-keypair.py

      from generateJWT import JWTGenerator
      from datetime import timedelta
      import argparse
      import logging
      import sys
      import requests
      logger = logging.getLogger(__name__)
      
      def main():
        args = _parse_args()
        token = _get_token(args)
        snowflake_jwt = token_exchange(token,endpoint=args.endpoint, role=args.role,
                        snowflake_account_url=args.snowflake_account_url,
                        snowflake_account=args.account)
        spcs_url=f'https://{args.endpoint}{args.endpoint_path}'
        connect_to_spcs(snowflake_jwt, spcs_url)
      
      def _get_token(args):
        token = JWTGenerator(args.account, args.user, args.private_key_file_path, timedelta(minutes=args.lifetime),
                  timedelta(minutes=args.renewal_delay)).get_token()
        logger.info("Key Pair JWT: %s" % token)
        return token
      
      def token_exchange(token, role, endpoint, snowflake_account_url, snowflake_account):
        scope_role = f'session:role:{role}' if role is not None else None
        scope = f'{scope_role} {endpoint}' if scope_role is not None else endpoint
        data = {
          'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer',
          'scope': scope,
          'assertion': token,
        }
        logger.info(data)
        url = f'https://{snowflake_account}.snowflakecomputing.cn/oauth/token'
        if snowflake_account_url:
          url =       f'{snowflake_account_url}/oauth/token'
        logger.info("oauth url: %s" %url)
        response = requests.post(url, data=data)
        logger.info("snowflake jwt : %s" % response.text)
        assert 200 == response.status_code, "unable to get snowflake token"
        return response.text
      
      def connect_to_spcs(token, url):
        # Create a request to the ingress endpoint with authz.
        headers = {'Authorization': f'Snowflake Token="{token}"'}
        response = requests.post(f'{url}', headers=headers)
        logger.info("return code %s" % response.status_code)
        logger.info(response.text)
      
      def _parse_args():
        logging.basicConfig(stream=sys.stdout, level=logging.INFO)
        cli_parser = argparse.ArgumentParser()
        cli_parser.add_argument('--account', required=True,
                    help='The account identifier (for example, "myorganization-myaccount" for '
                      '"myorganization-myaccount.snowflakecomputing.cn").')
        cli_parser.add_argument('--user', required=True, help='The user name.')
        cli_parser.add_argument('--private_key_file_path', required=True,
                    help='Path to the private key file used for signing the JWT.')
        cli_parser.add_argument('--lifetime', type=int, default=59,
                    help='The number of minutes that the JWT should be valid for.')
        cli_parser.add_argument('--renewal_delay', type=int, default=54,
                    help='The number of minutes before the JWT generator should produce a new JWT.')
        cli_parser.add_argument('--role',
                    help='The role we want to use to create and maintain a session for. If a role is not provided, '
                      'use the default role.')
        cli_parser.add_argument('--endpoint', required=True,
                    help='The ingress endpoint of the service')
        cli_parser.add_argument('--endpoint-path', default='/',
                    help='The url path for the ingress endpoint of the service')
        cli_parser.add_argument('--snowflake_account_url', default=None,
                    help='The account url of the account for which we want to log in. Type of '
                      'https://myorganization-myaccount.snowflakecomputing.cn')
        args = cli_parser.parse_args()
        return args
      
      if __name__ == "__main__":
        main()
      
      Copy

以编程方式向服务端点发送请求

执行 access-via-keypair.py Python 代码,对 Echo 服务公共端点进行入口调用。

python3 access-via-keypair.py \
  --account <account-identifier> \
  --user <user-name> \
  --role TEST_ROLE \
  --private_key_file_path rsa_key.p8 \
  --endpoint <ingress-hostname> \
  --endpoint-path /ui
Copy

有关 account-identifier 的更多信息,请参阅 账户标识符

身份验证的工作原理

代码首先将提供的密钥对转换为 JWT 令牌。然后发送 JWT 令牌到 Snowflake 以获取 OAuth 令牌。最后,代码使用 OAuth 令牌连接到 Snowflake 并访问公共端点。具体而言,代码执行以下操作:

  1. 调用 _get_token(args) 函数,从您提供的密钥对生成 JWT 令牌。函数实施如图所示:

    def _get_token(args):
        token = JWTGenerator(args.account,
                            args.user,
                            args.private_key_file_path,
                            timedelta(minutes=args.lifetime),
                            timedelta(minutes=args.renewal_delay)).get_token()
        logger.info("Key Pair JWT: %s" % token)
        return token
    
    Copy

    JWTGenerator 是一个向您提供的辅助函数类。请注意创建此对象时提供的以下参数:

    • args.accountargs.user 参数:JWT 令牌有几个字段(请参阅 令牌格式),iss 是其中一个字段。该字段值包括 Snowflake 账户名称和用户名。因此,您提供这些值作为参数。

    • 这两个 timedelta 参数提供以下信息:

      • lifetime 指定密钥有效的分钟数(60 分钟)。

      • renewal_delay 指定从现在起多少分钟后,JWT 生成器应续期 JWT。

  2. 调用 token_exchange() 函数以连接到 Snowflake 并交换 JWT 令牌,以获取 OAuth 令牌。

    scope_role = f'session:role:{role}' if role is not None else None
    scope = f'{scope_role} {endpoint}' if scope_role is not None else endpoint
    
    data = {
        'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer',
        'scope': scope,
        'assertion': token,
    }
    
    Copy

    上述代码构造了一个 JSON,设置了 OAuth 令牌的范围,以及使用指定角色可以访问的公共端点。然后,此代码会向 Snowflake 发出 POST 请求,传递 JSON 以交换 JWT 令牌,获取 OAuth 令牌(请参阅 令牌交换),如图所示:

    url = f'{snowflake_account_url}/oauth/token'
    response = requests.post(url, data=data)
    assert 200 == response.status_code, "unable to get Snowflake token"
    return response.text
    
    Copy
  3. 然后代码调用 connect_to_spcs() 函数以连接到 Echo 服务的公共端点。它提供了端点 URL (https://<ingress-URL>/ui) 和用于身份验证的 OAuth 令牌。

    headers = {'Authorization': f'Snowflake Token="{token}"'}
    response = requests.post(f'{url}', headers=headers)
    
    Copy

    url 是您提供给程序的 spcs_urltoken 是 OAuth 令牌。

    本示例中的 Echo 服务提供 HTML 页面(如上一部分所述)。此示例代码在响应中仅打印 HTML。

6:清理

如果您不打算继续学习 教程 2教程 3,则应移除您创建的计费资源。有关更多信息,请参阅 教程 3 中的第 5 步。

7:查看服务代码

本部分包括以下主题:

检查教程 1 代码

您在第 1 步中下载的 Zip 文件包含以下文件:

  • Dockerfile

  • echo_service.py

  • templates/basic_ui.html

在创建服务时,您还可以使用服务规范。以下部分介绍这些代码组件如何协同工作以创建服务。

echo_service.py 文件

此 Python 文件包含用于实施极简 HTTP 服务器的代码,该服务器会返回(回显)输入文本。该代码主要执行两项任务:处理来自 Snowflake 服务函数的回显请求,以及提供 Web 用户界面 (UI) 以提交 Echo 请求。

from flask import Flask
from flask import request
from flask import make_response
from flask import render_template
import logging
import os
import sys

SERVICE_HOST = os.getenv('SERVER_HOST', '0.0.0.0')
SERVER_PORT = os.getenv('SERVER_PORT', 8080)
CHARACTER_NAME = os.getenv('CHARACTER_NAME', 'I')


def get_logger(logger_name):
  logger = logging.getLogger(logger_name)
  logger.setLevel(logging.DEBUG)
  handler = logging.StreamHandler(sys.stdout)
  handler.setLevel(logging.DEBUG)
  handler.setFormatter(
    logging.Formatter(
      '%(name)s [%(asctime)s] [%(levelname)s] %(message)s'))
  logger.addHandler(handler)
  return logger


logger = get_logger('echo-service')

app = Flask(__name__)


@app.get("/healthcheck")
def readiness_probe():
  return "I'm ready!"


@app.post("/echo")
def echo():
  '''
  Main handler for input data sent by Snowflake.
  '''
  message = request.json
  logger.debug(f'Received request: {message}')

  if message is None or not message['data']:
    logger.info('Received empty message')
    return {}

  # input format:
  #   {"data": [
  #     [row_index, column_1_value, column_2_value, ...],
  #     ...
  #   ]}
  input_rows = message['data']
  logger.info(f'Received {len(input_rows)} rows')

  # output format:
  #   {"data": [
  #     [row_index, column_1_value, column_2_value, ...}],
  #     ...
  #   ]}
  output_rows = [[row[0], get_echo_response(row[1])] for row in input_rows]
  logger.info(f'Produced {len(output_rows)} rows')

  response = make_response({"data": output_rows})
  response.headers['Content-type'] = 'application/json'
  logger.debug(f'Sending response: {response.json}')
  return response


@app.route("/ui", methods=["GET", "POST"])
def ui():
  '''
  Main handler for providing a web UI.
  '''
  if request.method == "POST":
    # getting input in HTML form
    input_text = request.form.get("input")
    # display input and output
    return render_template("basic_ui.html",
      echo_input=input_text,
      echo_reponse=get_echo_response(input_text))
  return render_template("basic_ui.html")


def get_echo_response(input):
  return f'{CHARACTER_NAME} said {input}'

if __name__ == '__main__':
  app.run(host=SERVICE_HOST, port=SERVER_PORT)
Copy

在代码中:

  • echo 函数使 Snowflake 服务函数能够与服务进行通信。此函数指定 @app.post() 装饰,如下所示:

    @app.post("/echo")
    def echo():
    
    Copy

    当 Echo 服务器收到带 /echo 路径的 HTTP POST 请求时,该服务器回将请求路由到此函数。此函数在响应中执行并回显请求正文中的字符串。

    为支持来自 Snowflake 服务函数的通信,此服务器会实施外部函数。也就是说,服务器实施遵循特定的输入/输出数据格式,以便提供 SQL 函数,这是 External Functions 使用的相同 输入/输出数据格式

  • ui 代码的函数部分显示 Web 表单,并处理从 Web 表单提交的回显请求。此函数使用 @app.route() 装饰器,以指定 /ui 的请求由此函数处理:

    @app.route("/ui", methods=["GET", "POST"])
    def ui():
    
    Copy

    Echo 服务公开 echoendpoint 端点(请参阅服务规范),允许通过 Web 与服务进行通信。当您在浏览器中加载已追加 /ui 的公共端点的 URL 时,浏览器会为此路径发送 HTTP GET 请求,然后服务器将请求路由到此函数。函数执行并返回简单的 HTML 表单,供用户在其中输入字符串。

    用户输入字符串并提交表单后,浏览器会为此路径发送 HTTP POST 请求,然后服务器将请求路由到此同一函数。函数执行并返回包含原始字符串的 HTTP 响应。

  • readiness_probe 函数使用 @app.get() 装饰器,以指定 /healthcheck 的请求由此函数处理:

    @app.get("/healthcheck")
    def readiness_probe():
    
    Copy

    此函数使 Snowflake 能够检查服务的就绪情况。当容器启动时,Snowflake 希望确认应用程序正在运行,并且服务已准备好处理请求。Snowflake 为此路径发送 HTTP GET 请求(作为运行状况探测、就绪情况探测),以确保只有正常运行的容器才能提供流量。该函数可以做任何您想做的事。

  • get_logger 函数有助于设置日志记录。

Dockerfile

此文件包含使用 Docker 构建镜像时的所有命令。

ARG BASE_IMAGE=python:3.10-slim-buster
FROM $BASE_IMAGE
COPY echo_service.py ./
COPY templates/ ./templates/
RUN pip install --upgrade pip && \\
pip install flask
CMD ["python", "echo_service.py"]
Copy

Dockerfile 包含在 Docker 容器中安装 Flask 库的说明。echo_service.py 中的代码依赖 Flask 库来处理 HTTP 请求。

/template/basic_ui.html

Echo 服务公开 echoendpoint 端点(请参阅服务规范),允许通过 Web 与服务进行通信。当您在浏览器中加载已追加 /ui 的公共端点 URL 时,Echo 服务将显示此表单。您可以在表单中输入字符串并提交表单,该服务会在 HTTP 响应中返回字符串。

<!DOCTYPE html>
<html lang="en">
  <head>
    <title>Welcome to echo service!</title>
  </head>
  <body>
    <h1>Welcome to echo service!</h1>
    <form action="{{ url_for("ui") }}" method="post">
      <label for="input">Input:<label><br>
      <input type="text" id="input" name="input"><br>
    </form>
    <h2>Input:</h2>
    {{ echo_input }}
    <h2>Output:</h2>
    {{ echo_reponse }}
  </body>
</html>
Copy

服务规范

Snowflake 使用您在此规范中提供的信息来配置和运行服务。

spec:
  containers:
  - name: echo
    image: /tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest
    env:
      SERVER_PORT: 8000
      CHARACTER_NAME: Bob
    readinessProbe:
      port: 8000
      path: /healthcheck
  endpoints:
  - name: echoendpoint
    port: 8000
    public: true
Copy

在服务规范中:

  • containers.image 指定镜像以便 Snowflake 启动容器。

  • 可选的 endpoints 字段指定服务公开的端点。

    • name 为容器正在侦听的 TCP 网络端口指定用户友好的名称。您可以使用此用户友好的端点名称,将请求发送到相应的端口。请注意,env.SERVER_PORT 控制此端口号。

    • 端点也被配置为 public。这允许从公共 Web 到此端点的流量。

  • 添加可选的 containers.env 字段,以说明如何替换 Snowflake 传递给容器中所有进程的环境变量。例如,服务代码 (echo_service.py) 读取具有默认值的环境变量,如下所示:

    CHARACTER_NAME = os.getenv('CHARACTER_NAME', 'I')
    SERVER_PORT = os.getenv('SERVER_PORT', 8080)
    
    Copy

    它的工作原理如下:

    • 当 Echo 服务收到请求正文中带字符串(例如,“Hello”)的 HTTP POST 请求时,服务默认返回“I said Hello”。代码使用 CHARACTER_NAME 环境变量,确定“said”之前的单词。默认情况下,CHARACTER_NAME 设置为“I”。

      您可以覆盖服务规范中的 CHARACTER_NAME 默认值。例如,如果将该值设置为“Bob”,则 Echo 服务返回“Bob said Hello”响应。

    • 同样,服务规范将服务侦听的端口 (SERVER_PORT) 替换到 8000,覆盖默认端口 8080。

  • readinessProbe 字段标识 portpath,Snowflake 可以用来将 HTTP GET 请求发送到就绪情况探测,以验证服务是否已准备好处理流量。

    服务代码 (echo_python.py) 按如下方式实施就绪情况探测:

    @app.get("/healthcheck")
    def readiness_probe():
    
    Copy

    因此,规范文件相应地包括 container.readinessProbe 字段。

有关服务规范的更多信息,请参阅 服务规范参考

了解服务函数

服务函数是用于与服务进行通信的方法之一(请参阅 使用服务)。服务函数是用户定义的函数 (UDF),与服务端点关联。当执行服务函数时,它会向关联的服务端点发送请求并接收响应。

您可以执行具有以下参数的 CREATE FUNCTION 命令,从而创建以下服务函数:

CREATE FUNCTION my_echo_udf (InputText VARCHAR)
  RETURNS VARCHAR
  SERVICE=echo_service
  ENDPOINT=echoendpoint
  AS '/echo';
Copy

请注意以下事项:

  • my_echo_udf 函数将字符串作为输入并返回字符串。

  • SERVICE 属性标识服务 (echo_service),ENDPOINT 属性标识用户友好的端点名称 (echoendpoint)。

  • AS '/echo' 指定服务的路径。在 echo_service.py 中,@app.post 装饰器将此路径与 echo 函数关联。

此函数连接指定 SERVICE 的特定 ENDPOINT。当您调用此函数时,Snowflake 会向服务容器内的 /echo 路径发送请求。

在本地构建和测试镜像

您可以在本地测试 Docker 镜像,然后再将其上传到 Snowflake 账户中的仓库。在本地测试中,容器独立运行(它不是 Snowflake 运行的服务)。

要测试教程 1 Docker 镜像,请执行以下步骤:

  1. 要在 Docker CLI 中创建 Docker 镜像,请执行以下命令:

    docker build --rm -t my_service:local .
    
    Copy
  2. 要启动代码,请执行以下命令:

    docker run --rm -p 8080:8080 my_service:local
    
    Copy
  3. 使用以下方法之一,向服务发送回显请求:

    • 使用 cURL 命令:

      在另一个终端窗口中,使用 cURL,将以下 POST 请求发送到端口 8080:

      curl -X POST http://localhost:8080/echo \
        -H "Content-Type: application/json" \
        -d '{"data":[[0, "Hello friend"], [1, "Hello World"]]}'
      
      Copy

      请注意,请求正文包含两个字符串。cURL 命令将 POST 请求发送到服务正在侦听的端口 8080。数据中的 0 是列表中输入字符串的索引。Echo 服务在响应中回显输入字符串,如下所示:

      {"data":[[0,"I said Hello Friend"],[1,"I said Hello World"]]}
      
    • 使用 Web 浏览器:

      1. 在在同一台计算机上的浏览器中,打开 http://localhost:8080/ui

        这会将 GET 请求发送到服务正在侦听的端口 8080。服务执行 ui() 函数,该函数呈现 HTML 表单,如下所示:

        用于与 Echo 服务进行通信的 Web 表单。
      2. 输入 框中输入字符串“Hello”,然后按 Return

        用于显示 Echo 服务的响应的 Web 表单。

下一步是什么?

您现在可以测试执行作业的 教程 2

语言: 中文