教程 1:创建 Snowpark Container Services 服务¶
简介¶
完成 通用设置 后,您就可以创建服务了。在本教程中,您将创建一项服务(名为 echo_service
),该服务仅回显您以输入形式提供的文本。例如,如果输入字符串为“Hello World”,则服务将返回“I said, Hello World”。
本教程分为两个部分:
第 1 部分:创建并测试服务。 您将下载为本教程提供的代码,并按照分步说明进行操作:
下载为本教程提供的服务代码。
为 Snowpark Container Services 构建 Docker 镜像,并将该镜像上传到账户的仓库中。
提供服务规范文件和用于运行服务的计算池,以创建服务。
创建服务函数,与服务进行通信。
使用服务。向服务发送回显请求并验证响应。
第 2 部分:了解服务。本部分概述服务代码,并重点介绍不同组件的协作方式。
1:下载服务代码¶
提供用于创建 Echo 服务的代码(Python 应用程序)。
下载:download:
SnowparkContainerServices -Tutorials.zip </samples/spcs/SnowparkContainerServices-Tutorials.zip>
。解压内容,其中包含每个教程的一个目录。
Tutorial-1
目录包含以下文件:Dockerfile
echo_service.py
templates/basic_ui.html
2:构建镜像并上传¶
为 Snowpark Container Services 支持的 linux/amd64 平台构建镜像,然后将镜像上传到账户中的镜像仓库(请参阅 通用设置)。
您将需要有关仓库(仓库 URL 和注册表主机名)的信息,才能构建和上传镜像。有关更多信息,请参阅 注册表和镜像仓库。
获取有关仓库的信息
要获取镜像仓库 URL,请执行 SHOW IMAGE REPOSITORIES SQL 命令。
SHOW IMAGE REPOSITORIES;
输出中的
repository_url
列提供 URL。示例如下:<orgname>-<acctname>.registry.snowflakecomputing.cn/tutorial_db/data_schema/tutorial_repository
仓库 URL 中的主机名是注册表主机名。示例如下:
<orgname>-<acctname>.registry.snowflakecomputing.cn
构建镜像并将其上传到仓库
打开终端窗口,然后切换到包含解压文件的目录。
要构建 Docker 镜像,请使用 Docker CLI 执行以下
docker build
命令。请注意,该命令指定当前工作目录 (.
),作为用于构建镜像的文件的PATH
。docker build --rm --platform linux/amd64 -t <repository_url>/<image_name> .
对于
image_name
,请使用my_echo_service_image:latest
。
示例
docker build --rm --platform linux/amd64 -t myorg-myacct.registry.snowflakecomputing.cn/tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest .
将镜像上传到 Snowflake 账户中的仓库。要使 Docker 代表您将镜像上传到仓库,您必须首先 使用注册表对 Docker 进行身份验证。
要使用镜像注册表对 Docker 进行身份验证,请执行以下命令。
docker login <registry_hostname> -u <username>
对于
username
,请指定您的 Snowflake 用户名。Docker 将提示您输入密码。
要上传镜像,请执行以下命令:
docker push <repository_url>/<image_name>
示例
docker push myorg-myacct.registry.snowflakecomputing.cn/tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest
3:创建服务¶
在本部分中,您将创建一项服务,并创建服务函数以与服务进行通信。
要创建服务,您需要满足以下条件:
计算池。Snowflake 在指定的计算池中运行您的服务。您在通用设置中创建了一个计算池。
服务规范。此规范为 Snowflake 提供了配置和运行服务所需的信息。有关更多信息,请参阅 Snowpark Container Services:使用服务。在本教程中,您将在 CREATE SERVICE 命令以内联方式提供规范。您还可以将规范保存到 Snowflake 暂存区的文件中,并在 CREATE SERVICE 命令中提供文件信息,如教程 2 中所示。
服务函数是用于与服务进行通信的方法之一。服务函数是用户定义的函数 (UDF),与服务端点关联。当执行服务函数时,服务函数会向服务端点发送请求并接收响应。
验证计算池是否已准备就绪,以及您是否处于创建服务的正确上下文中。
之前,您在 通用设置 步骤中设置了上下文。为确保您处于针对本步骤中 SQL 语句的正确上下文中,请执行以下命令:
USE ROLE test_role; USE DATABASE tutorial_db; USE SCHEMA data_schema; USE WAREHOUSE tutorial_warehouse;
要确保您在 通用设置 中创建的计算池已准备就绪,请执行
DESCRIBE COMPUTE POOL
,并且验证state
是ACTIVE
,还是IDLE
。如果state
是STARTING
,您需要等到state
更改为ACTIVE
或IDLE
。
DESCRIBE COMPUTE POOL tutorial_compute_pool;
要创建服务,请使用
test_role
执行以下命令:CREATE SERVICE echo_service IN COMPUTE POOL tutorial_compute_pool FROM SPECIFICATION $$ spec: containers: - name: echo image: /tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest env: SERVER_PORT: 8000 CHARACTER_NAME: Bob readinessProbe: port: 8000 path: /healthcheck endpoints: - name: echoendpoint port: 8000 public: true $$ MIN_INSTANCES=1 MAX_INSTANCES=1;
备注
如果已存在具有该名称的服务,请使用 DROP SERVICE 命令删除之前创建的服务,然后再创建此服务。
执行以下 SQL 命令,获取有关您刚刚创建的服务的详细信息。有关更多信息,请参阅 Snowpark Container Services:使用服务。
要列出账户中的服务,请执行 SHOW SERVICES 命令:
SHOW SERVICES;
要获取服务的状态,请调用系统函数 SYSTEM$GET_SERVICE_STATUS:
SELECT SYSTEM$GET_SERVICE_STATUS('echo_service');
要获取服务的相关信息,请执行 DESCRIBE SERVICE 命令:
DESCRIBE SERVICE echo_service;
要创建服务函数,请执行以下命令:
CREATE FUNCTION my_echo_udf (InputText varchar) RETURNS varchar SERVICE=echo_service ENDPOINT=echoendpoint AS '/echo';
请注意以下事项:
SERVICE 属性将 UDF 与
echo_service
服务关联。ENDPOINT 属性将 UDF 与
echoendpoint
服务内的端点关联。AS '/echo' 指定 Echo 服务器的 HTTP 路径。您可以在服务代码 (
echo_service.py
) 中找到此路径。
4:使用服务¶
首先,为本部分中的 SQL 语句设置上下文,执行以下命令:
USE ROLE test_role;
USE DATABASE tutorial_db;
USE SCHEMA data_schema;
USE WAREHOUSE tutorial_warehouse;
现在,您可以与 Echo 服务进行通信。
使用服务函数: 您可以在查询中调用服务函数。示例服务函数 (
my_echo_udf
) 可以将单个字符串或字符串列表作为输入。示例 1.1:传递单个字符串
要调用
my_echo_udf
服务函数,请执行以下 SELECT 语句,传递一个输入字符串 ('hello'
):SELECT my_echo_udf('hello!');
Snowflake 向服务端点 (
echoendpoint
) 发送 POST 请求。收到请求后,服务会在响应中回显输入字符串。+--------------------------+ | **MY_ECHO_UDF('HELLO!')**| |------------------------- | | Bob said hello! | +--------------------------+
示例 1.2:传递字符串列表
当您向服务函数传递字符串列表时,Snowflake 会对这些输入字符串进行批处理,并向服务发送一系列 POST 请求。在服务处理完所有字符串后,Snowflake 将合并结果并返回它们。
以下示例将表列作为输入传递给服务函数。
创建包含多个字符串的表:
CREATE TABLE messages (message_text VARCHAR) AS (SELECT * FROM (VALUES ('Thank you'), ('Hello'), ('Hello World')));
验证是否已创建表:
SELECT * FROM messages;
要调用服务函数,请执行以下 SELECT 语句,以输入的形式传递表行:
SELECT my_echo_udf(message_text) FROM messages;
输出:
+---------------------------+ | MY_ECHO_UDF(MESSAGE_TEXT) | |---------------------------| | Bob said Thank you | | Bob said Hello | | Bob said Hello World | +---------------------------+
使用 Web 浏览器: 服务会公开端点(请参阅 CREATE SERVICE 命令中提供的内联规范)。因此,您可以登录服务向互联网公开的 Web UI,然后从 Web 浏览器向该服务发送请求。
找到服务公开的公众端点的 URL:
SHOW ENDPOINTS IN SERVICE echo_service;
响应中的
ingress_url
列提供 URL。示例
p6bye-myorg-myacct.snowflakecomputing.app
将
/ui
追加到端点 URL,并将其粘贴到 Web 浏览器中。这样会导致服务执行ui()
函数(请参阅echo_service.py
)。请注意,首次访问端点 URL 时,系统将要求您登录 Snowflake。对于此测试,请使用用于创建服务的同一用户,以确保用户具有必要的权限。
在 输入 框中输入字符串“Hello”,然后按 Return。
备注
您可以通过编程方式访问公共端点。有关示例代码,请参阅 来自 Snowflake 外部的公共端点访问及身份验证。请注意,您需要将
/ui
追加到代码中的端点 URL,以便 Snowflake 可以将请求路由到服务代码中的ui()
函数。
5:(可选)以编程方式访问公共端点¶
在上一部分中,您使用 Web 浏览器测试了 Echo 服务。在浏览器中,您访问了公共端点(入口端点)并使用服务公开的 Web UI 发送了请求。在本部分中,您将以编程方式测试同一公共端点。
该示例使用 密钥对身份验证。使用您提供的密钥对,示例代码首先生成一个 JSON Web 令牌 (JWT),然后用 Snowflake 交换令牌,获得 OAuth 令牌。然后,代码使用 OAuth 令牌,在与 Echo 服务公共端点通信时进行身份验证。
先决条件¶
确保您拥有以下信息:
公共端点的入口 URL。 执行 SHOW ENDPOINTS IN SERVICE 命令来获取 URL:
SHOW ENDPOINTS IN SERVICE echo_service;
您的 Snowflake 账户名称。 有关更多信息,请参阅 常见设置:确认您已准备好继续。
您的 Snowflake 账户 URL: 是
<acctname>.snowflakecomputing.cn
。Snowflake 账户中的用户名。 这是您在 常见设置:创建 Snowflake 对象 中选择的用户。您以该用户身份登录 Snowflake 并测试编程访问。
角色名称: 您创建了一个角色 (
test_role
),作为常见设置的一部分。用户使用此角色来执行操作。
设置¶
按照步骤,以编程方式与 Echo 服务通信。使用提供的 Python 代码,您可以向 Echo 服务公开的公共端点发送请求。
在命令提示符处,创建目录并导航到该目录。
为用户配置密钥对身份验证。
生成 密钥对:
生成私钥。为了简化练习步骤,生成一个未加密的私钥。您也可以使用加密私钥,但需要您输入密码。
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
通过引用您创建的私钥生成公钥 (
rsa_key.pub
)。openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
验证您是否已在目录中生成私钥和公钥。
将公钥分配给您用来测试编程访问的用户。这使用户可以指定用于身份验证的密钥。
ALTER USER <user-name> SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
将提供的示例代码保存在 Python 文件中。
将以下代码保存到
generateJWT.py
。# To run this on the command line, enter: # python3 generateJWT.py --account=<account_identifier> --user=<username> --private_key_file_path=<path_to_private_key_file> from cryptography.hazmat.primitives.serialization import load_pem_private_key from cryptography.hazmat.primitives.serialization import Encoding from cryptography.hazmat.primitives.serialization import PublicFormat from cryptography.hazmat.backends import default_backend from datetime import timedelta, timezone, datetime import argparse import base64 from getpass import getpass import hashlib import logging import sys # This class relies on the PyJWT module (https://pypi.org/project/PyJWT/). import jwt logger = logging.getLogger(__name__) try: from typing import Text except ImportError: logger.debug('# Python 3.5.0 and 3.5.1 have incompatible typing modules.', exc_info=True) from typing_extensions import Text ISSUER = "iss" EXPIRE_TIME = "exp" ISSUE_TIME = "iat" SUBJECT = "sub" # If you generated an encrypted private key, implement this method to return # the passphrase for decrypting your private key. As an example, this function # prompts the user for the passphrase. def get_private_key_passphrase(): return getpass('Passphrase for private key: ') class JWTGenerator(object): """ Creates and signs a JWT with the specified private key file, username, and account identifier. The JWTGenerator keeps the generated token and only regenerates the token if a specified period of time has passed. """ LIFETIME = timedelta(minutes=59) # The tokens will have a 59-minute lifetime RENEWAL_DELTA = timedelta(minutes=54) # Tokens will be renewed after 54 minutes ALGORITHM = "RS256" # Tokens will be generated using RSA with SHA256 def __init__(self, account: Text, user: Text, private_key_file_path: Text, lifetime: timedelta = LIFETIME, renewal_delay: timedelta = RENEWAL_DELTA): """ __init__ creates an object that generates JWTs for the specified user, account identifier, and private key. :param account: Your Snowflake account identifier. See https://docs.snowflake.com/en/user-guide/admin-account-identifier.html. Note that if you are using the account locator, exclude any region information from the account locator. :param user: The Snowflake username. :param private_key_file_path: Path to the private key file used for signing the JWTs. :param lifetime: The number of minutes (as a timedelta) during which the key will be valid. :param renewal_delay: The number of minutes (as a timedelta) from now after which the JWT generator should renew the JWT. """ logger.info( """Creating JWTGenerator with arguments account : %s, user : %s, lifetime : %s, renewal_delay : %s""", account, user, lifetime, renewal_delay) # Construct the fully qualified name of the user in uppercase. self.account = self.prepare_account_name_for_jwt(account) self.user = user.upper() self.qualified_username = self.account + "." + self.user self.lifetime = lifetime self.renewal_delay = renewal_delay self.private_key_file_path = private_key_file_path self.renew_time = datetime.now(timezone.utc) self.token = None # Load the private key from the specified file. with open(self.private_key_file_path, 'rb') as pem_in: pemlines = pem_in.read() try: # Try to access the private key without a passphrase. self.private_key = load_pem_private_key(pemlines, None, default_backend()) except TypeError: # If that fails, provide the passphrase returned from get_private_key_passphrase(). self.private_key = load_pem_private_key(pemlines, get_private_key_passphrase().encode(), default_backend()) def prepare_account_name_for_jwt(self, raw_account: Text) -> Text: """ Prepare the account identifier for use in the JWT. For the JWT, the account identifier must not include the subdomain or any region or cloud provider information. :param raw_account: The specified account identifier. :return: The account identifier in a form that can be used to generate the JWT. """ account = raw_account if not '.global' in account: # Handle the general case. idx = account.find('.') if idx > 0: account = account[0:idx] else: # Handle the replication case. idx = account.find('-') if idx > 0: account = account[0:idx] # Use uppercase for the account identifier. return account.upper() def get_token(self) -> Text: """ Generates a new JWT. If a JWT has already been generated earlier, return the previously generated token unless the specified renewal time has passed. :return: the new token """ now = datetime.now(timezone.utc) # Fetch the current time # If the token has expired or doesn't exist, regenerate the token. if self.token is None or self.renew_time <= now: logger.info("Generating a new token because the present time (%s) is later than the renewal time (%s)", now, self.renew_time) # Calculate the next time we need to renew the token. self.renew_time = now + self.renewal_delay # Prepare the fields for the payload. # Generate the public key fingerprint for the issuer in the payload. public_key_fp = self.calculate_public_key_fingerprint(self.private_key) # Create our payload payload = { # Set the issuer to the fully qualified username concatenated with the public key fingerprint. ISSUER: self.qualified_username + '.' + public_key_fp, # Set the subject to the fully qualified username. SUBJECT: self.qualified_username, # Set the issue time to now. ISSUE_TIME: now, # Set the expiration time, based on the lifetime specified for this object. EXPIRE_TIME: now + self.lifetime } # Regenerate the actual token token = jwt.encode(payload, key=self.private_key, algorithm=JWTGenerator.ALGORITHM) # If you are using a version of PyJWT prior to 2.0, jwt.encode returns a byte string instead of a string. # If the token is a byte string, convert it to a string. if isinstance(token, bytes): token = token.decode('utf-8') self.token = token logger.info("Generated a JWT with the following payload: %s", jwt.decode(self.token, key=self.private_key.public_key(), algorithms=[JWTGenerator.ALGORITHM])) return self.token def calculate_public_key_fingerprint(self, private_key: Text) -> Text: """ Given a private key in PEM format, return the public key fingerprint. :param private_key: private key string :return: public key fingerprint """ # Get the raw bytes of public key. public_key_raw = private_key.public_key().public_bytes(Encoding.DER, PublicFormat.SubjectPublicKeyInfo) # Get the sha256 hash of the raw bytes. sha256hash = hashlib.sha256() sha256hash.update(public_key_raw) # Base64-encode the value and prepend the prefix 'SHA256:'. public_key_fp = 'SHA256:' + base64.b64encode(sha256hash.digest()).decode('utf-8') logger.info("Public key fingerprint is %s", public_key_fp) return public_key_fp def main(): logging.basicConfig(stream=sys.stdout, level=logging.INFO) cli_parser = argparse.ArgumentParser() cli_parser.add_argument('--account', required=True, help='The account identifier (e.g. "myorganization-myaccount" for "myorganization-myaccount.snowflakecomputing.cn").') cli_parser.add_argument('--user', required=True, help='The user name.') cli_parser.add_argument('--private_key_file_path', required=True, help='Path to the private key file used for signing the JWT.') cli_parser.add_argument('--lifetime', type=int, default=59, help='The number of minutes that the JWT should be valid for.') cli_parser.add_argument('--renewal_delay', type=int, default=54, help='The number of minutes before the JWT generator should produce a new JWT.') args = cli_parser.parse_args() token = JWTGenerator(args.account, args.user, args.private_key_file_path, timedelta(minutes=args.lifetime), timedelta(minutes=args.renewal_delay)).get_token() print('JWT:') print(token) if __name__ == "__main__": main()
将以下代码保存到
access-via-keypair.py
。from generateJWT import JWTGenerator from datetime import timedelta import argparse import logging import sys import requests logger = logging.getLogger(__name__) def main(): args = _parse_args() token = _get_token(args) snowflake_jwt = token_exchange(token,endpoint=args.endpoint, role=args.role, snowflake_account_url=args.snowflake_account_url, snowflake_account=args.account) spcs_url=f'https://{args.endpoint}{args.endpoint_path}' connect_to_spcs(snowflake_jwt, spcs_url) def _get_token(args): token = JWTGenerator(args.account, args.user, args.private_key_file_path, timedelta(minutes=args.lifetime), timedelta(minutes=args.renewal_delay)).get_token() logger.info("Key Pair JWT: %s" % token) return token def token_exchange(token, role, endpoint, snowflake_account_url, snowflake_account): scope_role = f'session:role:{role}' if role is not None else None scope = f'{scope_role} {endpoint}' if scope_role is not None else endpoint data = { 'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer', 'scope': scope, 'assertion': token, } logger.info(data) url = f'https://{snowflake_account}.snowflakecomputing.cn/oauth/token' if snowflake_account_url: url = f'{snowflake_account_url}/oauth/token' logger.info("oauth url: %s" %url) response = requests.post(url, data=data) logger.info("snowflake jwt : %s" % response.text) assert 200 == response.status_code, "unable to get snowflake token" return response.text def connect_to_spcs(token, url): # Create a request to the ingress endpoint with authz. headers = {'Authorization': f'Snowflake Token="{token}"'} response = requests.post(f'{url}', headers=headers) logger.info("return code %s" % response.status_code) logger.info(response.text) def _parse_args(): logging.basicConfig(stream=sys.stdout, level=logging.INFO) cli_parser = argparse.ArgumentParser() cli_parser.add_argument('--account', required=True, help='The account identifier (for example, "myorganization-myaccount" for ' '"myorganization-myaccount.snowflakecomputing.cn").') cli_parser.add_argument('--user', required=True, help='The user name.') cli_parser.add_argument('--private_key_file_path', required=True, help='Path to the private key file used for signing the JWT.') cli_parser.add_argument('--lifetime', type=int, default=59, help='The number of minutes that the JWT should be valid for.') cli_parser.add_argument('--renewal_delay', type=int, default=54, help='The number of minutes before the JWT generator should produce a new JWT.') cli_parser.add_argument('--role', help='The role we want to use to create and maintain a session for. If a role is not provided, ' 'use the default role.') cli_parser.add_argument('--endpoint', required=True, help='The ingress endpoint of the service') cli_parser.add_argument('--endpoint-path', default='/', help='The url path for the ingress endpoint of the service') cli_parser.add_argument('--snowflake_account_url', default=None, help='The account url of the account for which we want to log in. Type of ' 'https://myorganization-myaccount.snowflakecomputing.cn') args = cli_parser.parse_args() return args if __name__ == "__main__": main()
以编程方式向服务端点发送请求¶
执行 access-via-keypair.py
Python 代码,对 Echo 服务公共端点进行入口调用。
python3 access-via-keypair.py \ --account <account-identifier> \ --user <user-name> \ --role TEST_ROLE \ --private_key_file_path rsa_key.p8 \ --endpoint <ingress-hostname> \ --endpoint-path /ui
有关 account-identifier
的更多信息,请参阅 账户标识符。
身份验证的工作原理¶
代码首先将提供的密钥对转换为 JWT 令牌。然后发送 JWT 令牌到 Snowflake 以获取 OAuth 令牌。最后,代码使用 OAuth 令牌连接到 Snowflake 并访问公共端点。具体而言,代码执行以下操作:
调用
_get_token(args)
函数,从您提供的密钥对生成 JWT 令牌。函数实施如图所示:def _get_token(args): token = JWTGenerator(args.account, args.user, args.private_key_file_path, timedelta(minutes=args.lifetime), timedelta(minutes=args.renewal_delay)).get_token() logger.info("Key Pair JWT: %s" % token) return token
JWTGenerator
是一个向您提供的辅助函数类。请注意创建此对象时提供的以下参数:args.account
和args.user
参数:JWT 令牌有几个字段(请参阅 令牌格式),iss
是其中一个字段。该字段值包括 Snowflake 账户名称和用户名。因此,您提供这些值作为参数。这两个
timedelta
参数提供以下信息:lifetime
指定密钥有效的分钟数(60 分钟)。renewal_delay
指定从现在起多少分钟后,JWT 生成器应续期 JWT。
调用
token_exchange()
函数以连接到 Snowflake 并交换 JWT 令牌,以获取 OAuth 令牌。scope_role = f'session:role:{role}' if role is not None else None scope = f'{scope_role} {endpoint}' if scope_role is not None else endpoint data = { 'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer', 'scope': scope, 'assertion': token, }
上述代码构造了一个 JSON,设置了 OAuth 令牌的范围,以及使用指定角色可以访问的公共端点。然后,此代码会向 Snowflake 发出 POST 请求,传递 JSON 以交换 JWT 令牌,获取 OAuth 令牌(请参阅 令牌交换),如图所示:
url = f'{snowflake_account_url}/oauth/token' response = requests.post(url, data=data) assert 200 == response.status_code, "unable to get Snowflake token" return response.text
然后代码调用
connect_to_spcs()
函数以连接到 Echo 服务的公共端点。它提供了端点 URL (https://<ingress-URL>/ui
) 和用于身份验证的 OAuth 令牌。headers = {'Authorization': f'Snowflake Token="{token}"'} response = requests.post(f'{url}', headers=headers)
url
是您提供给程序的spcs_url
,token
是 OAuth 令牌。本示例中的 Echo 服务提供 HTML 页面(如上一部分所述)。此示例代码在响应中仅打印 HTML。
6:清理¶
如果您不打算继续学习 教程 2 或 教程 3,则应移除您创建的计费资源。有关更多信息,请参阅 教程 3 中的第 5 步。
7:查看服务代码¶
本部分包括以下主题:
检查教程 1 代码:查看用于实施 Echo 服务的代码文件。
了解服务函数:本部分介绍本教程中的服务函数如何与服务链接。
在本地构建和测试镜像。本部分介绍如何本地测试 Docker 镜像,然后将其上传到 Snowflake 账户的仓库。
检查教程 1 代码¶
您在第 1 步中下载的 Zip 文件包含以下文件:
Dockerfile
echo_service.py
templates/basic_ui.html
在创建服务时,您还可以使用服务规范。以下部分介绍这些代码组件如何协同工作以创建服务。
echo_service.py 文件¶
此 Python 文件包含用于实施极简 HTTP 服务器的代码,该服务器会返回(回显)输入文本。该代码主要执行两项任务:处理来自 Snowflake 服务函数的回显请求,以及提供 Web 用户界面 (UI) 以提交 Echo 请求。
from flask import Flask
from flask import request
from flask import make_response
from flask import render_template
import logging
import os
import sys
SERVICE_HOST = os.getenv('SERVER_HOST', '0.0.0.0')
SERVER_PORT = os.getenv('SERVER_PORT', 8080)
CHARACTER_NAME = os.getenv('CHARACTER_NAME', 'I')
def get_logger(logger_name):
logger = logging.getLogger(logger_name)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
handler.setFormatter(
logging.Formatter(
'%(name)s [%(asctime)s] [%(levelname)s] %(message)s'))
logger.addHandler(handler)
return logger
logger = get_logger('echo-service')
app = Flask(__name__)
@app.get("/healthcheck")
def readiness_probe():
return "I'm ready!"
@app.post("/echo")
def echo():
'''
Main handler for input data sent by Snowflake.
'''
message = request.json
logger.debug(f'Received request: {message}')
if message is None or not message['data']:
logger.info('Received empty message')
return {}
# input format:
# {"data": [
# [row_index, column_1_value, column_2_value, ...],
# ...
# ]}
input_rows = message['data']
logger.info(f'Received {len(input_rows)} rows')
# output format:
# {"data": [
# [row_index, column_1_value, column_2_value, ...}],
# ...
# ]}
output_rows = [[row[0], get_echo_response(row[1])] for row in input_rows]
logger.info(f'Produced {len(output_rows)} rows')
response = make_response({"data": output_rows})
response.headers['Content-type'] = 'application/json'
logger.debug(f'Sending response: {response.json}')
return response
@app.route("/ui", methods=["GET", "POST"])
def ui():
'''
Main handler for providing a web UI.
'''
if request.method == "POST":
# getting input in HTML form
input_text = request.form.get("input")
# display input and output
return render_template("basic_ui.html",
echo_input=input_text,
echo_reponse=get_echo_response(input_text))
return render_template("basic_ui.html")
def get_echo_response(input):
return f'{CHARACTER_NAME} said {input}'
if __name__ == '__main__':
app.run(host=SERVICE_HOST, port=SERVER_PORT)
在代码中:
echo
函数使 Snowflake 服务函数能够与服务进行通信。此函数指定@app.post()
装饰,如下所示:@app.post("/echo") def echo():
当 Echo 服务器收到带
/echo
路径的 HTTP POST 请求时,该服务器回将请求路由到此函数。此函数在响应中执行并回显请求正文中的字符串。为支持来自 Snowflake 服务函数的通信,此服务器会实施外部函数。也就是说,服务器实施遵循特定的输入/输出数据格式,以便提供 SQL 函数,这是 External Functions 使用的相同 输入/输出数据格式。
ui
代码的函数部分显示 Web 表单,并处理从 Web 表单提交的回显请求。此函数使用@app.route()
装饰器,以指定/ui
的请求由此函数处理:@app.route("/ui", methods=["GET", "POST"]) def ui():
Echo 服务公开
echoendpoint
端点(请参阅服务规范),允许通过 Web 与服务进行通信。当您在浏览器中加载已追加 /ui 的公共端点的 URL 时,浏览器会为此路径发送 HTTP GET 请求,然后服务器将请求路由到此函数。函数执行并返回简单的 HTML 表单,供用户在其中输入字符串。用户输入字符串并提交表单后,浏览器会为此路径发送 HTTP POST 请求,然后服务器将请求路由到此同一函数。函数执行并返回包含原始字符串的 HTTP 响应。
readiness_probe
函数使用@app.get()
装饰器,以指定/healthcheck
的请求由此函数处理:@app.get("/healthcheck") def readiness_probe():
此函数使 Snowflake 能够检查服务的就绪情况。当容器启动时,Snowflake 希望确认应用程序正在运行,并且服务已准备好处理请求。Snowflake 为此路径发送 HTTP GET 请求(作为运行状况探测、就绪情况探测),以确保只有正常运行的容器才能提供流量。该函数可以做任何您想做的事。
get_logger
函数有助于设置日志记录。
Dockerfile¶
此文件包含使用 Docker 构建镜像时的所有命令。
ARG BASE_IMAGE=python:3.10-slim-buster
FROM $BASE_IMAGE
COPY echo_service.py ./
COPY templates/ ./templates/
RUN pip install --upgrade pip && \\
pip install flask
CMD ["python", "echo_service.py"]
Dockerfile 包含在 Docker 容器中安装 Flask 库的说明。echo_service.py
中的代码依赖 Flask 库来处理 HTTP 请求。
/template/basic_ui.html¶
Echo 服务公开 echoendpoint
端点(请参阅服务规范),允许通过 Web 与服务进行通信。当您在浏览器中加载已追加 /ui
的公共端点 URL 时,Echo 服务将显示此表单。您可以在表单中输入字符串并提交表单,该服务会在 HTTP 响应中返回字符串。
<!DOCTYPE html>
<html lang="en">
<head>
<title>Welcome to echo service!</title>
</head>
<body>
<h1>Welcome to echo service!</h1>
<form action="{{ url_for("ui") }}" method="post">
<label for="input">Input:<label><br>
<input type="text" id="input" name="input"><br>
</form>
<h2>Input:</h2>
{{ echo_input }}
<h2>Output:</h2>
{{ echo_reponse }}
</body>
</html>
服务规范¶
Snowflake 使用您在此规范中提供的信息来配置和运行服务。
spec:
containers:
- name: echo
image: /tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest
env:
SERVER_PORT: 8000
CHARACTER_NAME: Bob
readinessProbe:
port: 8000
path: /healthcheck
endpoints:
- name: echoendpoint
port: 8000
public: true
在服务规范中:
containers.image
指定镜像以便 Snowflake 启动容器。可选的
endpoints
字段指定服务公开的端点。name
为容器正在侦听的 TCP 网络端口指定用户友好的名称。您可以使用此用户友好的端点名称,将请求发送到相应的端口。请注意,env.SERVER_PORT
控制此端口号。端点也被配置为
public
。这允许从公共 Web 到此端点的流量。
添加可选的
containers.env
字段,以说明如何替换 Snowflake 传递给容器中所有进程的环境变量。例如,服务代码 (echo_service.py
) 读取具有默认值的环境变量,如下所示:CHARACTER_NAME = os.getenv('CHARACTER_NAME', 'I') SERVER_PORT = os.getenv('SERVER_PORT', 8080)
它的工作原理如下:
当 Echo 服务收到请求正文中带字符串(例如,“Hello”)的 HTTP POST 请求时,服务默认返回“I said Hello”。代码使用
CHARACTER_NAME
环境变量,确定“said”之前的单词。默认情况下,CHARACTER_NAME
设置为“I”。您可以覆盖服务规范中的 CHARACTER_NAME 默认值。例如,如果将该值设置为“Bob”,则 Echo 服务返回“Bob said Hello”响应。
同样,服务规范将服务侦听的端口 (SERVER_PORT) 替换到 8000,覆盖默认端口 8080。
readinessProbe
字段标识port
和path
,Snowflake 可以用来将 HTTP GET 请求发送到就绪情况探测,以验证服务是否已准备好处理流量。服务代码 (
echo_python.py
) 按如下方式实施就绪情况探测:@app.get("/healthcheck") def readiness_probe():
因此,规范文件相应地包括
container.readinessProbe
字段。
有关服务规范的更多信息,请参阅 服务规范参考。
了解服务函数¶
服务函数是用于与服务进行通信的方法之一(请参阅 使用服务)。服务函数是用户定义的函数 (UDF),与服务端点关联。当执行服务函数时,它会向关联的服务端点发送请求并接收响应。
您可以执行具有以下参数的 CREATE FUNCTION 命令,从而创建以下服务函数:
CREATE FUNCTION my_echo_udf (InputText VARCHAR)
RETURNS VARCHAR
SERVICE=echo_service
ENDPOINT=echoendpoint
AS '/echo';
请注意以下事项:
my_echo_udf
函数将字符串作为输入并返回字符串。SERVICE 属性标识服务 (
echo_service
),ENDPOINT 属性标识用户友好的端点名称 (echoendpoint
)。AS '/echo' 指定服务的路径。在
echo_service.py
中,@app.post
装饰器将此路径与echo
函数关联。
此函数连接指定 SERVICE 的特定 ENDPOINT。当您调用此函数时,Snowflake 会向服务容器内的 /echo
路径发送请求。
在本地构建和测试镜像¶
您可以在本地测试 Docker 镜像,然后再将其上传到 Snowflake 账户中的仓库。在本地测试中,容器独立运行(它不是 Snowflake 运行的服务)。
要测试教程 1 Docker 镜像,请执行以下步骤:
要在 Docker CLI 中创建 Docker 镜像,请执行以下命令:
docker build --rm -t my_service:local .
要启动代码,请执行以下命令:
docker run --rm -p 8080:8080 my_service:local
使用以下方法之一,向服务发送回显请求:
使用 cURL 命令:
在另一个终端窗口中,使用 cURL,将以下 POST 请求发送到端口 8080:
curl -X POST http://localhost:8080/echo \ -H "Content-Type: application/json" \ -d '{"data":[[0, "Hello friend"], [1, "Hello World"]]}'
请注意,请求正文包含两个字符串。cURL 命令将 POST 请求发送到服务正在侦听的端口 8080。数据中的 0 是列表中输入字符串的索引。Echo 服务在响应中回显输入字符串,如下所示:
{"data":[[0,"I said Hello Friend"],[1,"I said Hello World"]]}
使用 Web 浏览器:
下一步是什么?¶
您现在可以测试执行作业的 教程 2。