snowflake.core.stream.StreamCollection

class snowflake.core.stream.StreamCollection(schema: SchemaResource)

Bases: SchemaObjectCollectionParent[StreamResource]

Represents the collection operations on the Snowflake Stream resource.

With this collection, you can create, iterate through, and fetch streams that you have access to in the current context.

Attributes

database
root

Methods

create(stream: str, *, clone_stream: str | Clone, mode: CreateMode = CreateMode.error_if_exists, copy_grants: bool | None = False) StreamResource
create(stream: Stream, *, mode: CreateMode = CreateMode.error_if_exists, copy_grants: bool | None = False) StreamResource

Create a stream in Snowflake.

There are two ways to create a stream: by cloning or by building from scratch.

Cloning an existing stream

Parameters:
  • stream (str) – The new stream’s name

  • clone_stream (str or Clone object) – The name of stream to be cloned

  • mode (CreateMode, optional) –

    One of the following enum values:

    CreateMode.error_if_exists: Throw an snowflake.core.exceptions.ConflictError if the stream already exists in Snowflake. Equivalent to SQL create stream <name> ....

    CreateMode.or_replace: Replace if the stream already exists in Snowflake. Equivalent to SQL create or replace stream <name> ....

    CreateMode.if_not_exists: Do nothing if the stream already exists in Snowflake. Equivalent to SQL create stream <name> if not exists...

    Default is CreateMode.error_if_exists.

  • copy_grants (bool, optional) – Whether to enable copy grants when creating the object. Default is False.

Examples

Cloning a Stream instance:

>>> streams = schema.streams
>>> streams.create(
...     "new_stream_name",
...     clone_stream="stream_name_to_be_cloned",
...     mode=CreateMode.if_not_exists,
...     copy_grants=True
... )
Copy

Cloning a Stream instance in a different database and schema

>>> streams = schema.streams
>>> streams.create(
...     "new_stream_name",
...     clone_stream="stream_database_name.stream_schema_name.stream_name_to_be_cloned",
...     mode=CreateMode.if_not_exists,
...     copy_grants=True
... )
Copy

Creating a stream from scratch

Parameters:
  • stream (Stream) – The details of Stream object, together with Stream’s properties: name; comment is optional stream_source: StreamSource object, one of: StreamSourceStage, StreamSourceTable, StreamSourceView.

  • mode (CreateMode, optional) –

    One of the following enum values:

    CreateMode.error_if_exists: Throw an snowflake.core.exceptions.ConflictError if the stream already exists in Snowflake. Equivalent to SQL create stream <name> ....

    CreateMode.or_replace: Replace if the stream already exists in Snowflake. Equivalent to SQL create or replace stream <name> ....

    CreateMode.if_not_exists: Do nothing if the stream already exists in Snowflake. Equivalent to SQL create stream <name> if not exists...

    Default is CreateMode.error_if_exists.

  • copy_grants (bool, optional) – Whether to enable copy grants when creating the object. Default is False.

Examples

Creating a stream instance by source table:

>>> streams.create(
...     Stream(
...         name = "new_stream_name",
...         stream_source = StreamSourceTable(
...             point_of_time = PointOfTimeOffset(reference="before", offset="1"),
...             name = "my_source_table_name"
...             append_only = True,
...             show_initial_rows = False,
...             comment = "create stream by table"
...         )
...     ),
...     mode=CreateMode.if_not_exists,
...     copy_grants=True
... )
Copy

Creating a stream instance by source view:

>>> streams.create(
...     Stream(
...         name = "new_stream_name",
...         stream_source = StreamSourceView(
...             point_of_time = PointOfTimeOffset(reference="before", offset="1"),
...             name = "my_source_view_name"
...         )
...     ),
...     mode=CreateMode.if_not_exists,
...     copy_grants=True
... )
Copy

Creating a stream instance by source directory table:

>>> streams.create(
...     Stream(
...         name = "new_stream_name",
...         stream_source = StreamSourceStage(
...             point_of_time = PointOfTimeOffset(reference="before", offset="1"),
...             name = "my_source_directory_table_name"
...         )
...     ),
...     mode=CreateMode.if_not_exists,
...     copy_grants=True
... )
Copy
items() ItemsView[str, T]
iter(*, like: Annotated[str, Strict(strict=True)] | None = None, starts_with: Annotated[str, Strict(strict=True)] | None = None, show_limit: Annotated[int, FieldInfo(annotation=NoneType, required=True, metadata=[Strict(strict=True), Ge(ge=1), Le(le=10000)])] | None = None, from_name: Annotated[str, Strict(strict=True)] | None = None) Iterator[Stream]

Iterate through Stream objects from Snowflake, filtering on any optional ‘like’ pattern.

Parameters:
  • like (str, optional) – A case-insensitive string functioning as a filter, with support for SQL wildcard characters (% and _).

  • starts_with (str, optional) – String used to filter the command output based on the string of characters that appear at the beginning of the object name. Uses case-sensitive pattern matching.

  • show_limit (int, optional) – Limit of the maximum number of rows returned by iter(). The default is None, which behaves equivalently to show_limit=10000. This value must be between 1 and 10000.

  • from_name (str, optional) – Fetch rows only following the first row whose object name matches the specified string. This is case-sensitive and does not have to be the full name.

Examples

Showing all streams that you have access to see:

>>> streams = stream_collection.iter()
Copy

Showing information of the exact stream you want to see:

>>> streams = stream_collection.iter(like="your-stream-name")
Copy

Showing streams starting with ‘your-stream-name-‘:

>>> streams = stream_collection.iter(like="your-stream-name-%")
Copy

Using a for loop to retrieve information from iterator:

>>> for stream in streams:
...     print(stream.name)
Copy
keys() KeysView[str]
values() ValuesView[T]
Language: English