Skip to content

Full Reference

quixstreams

quixstreams.logging

configure_logging

def configure_logging(loglevel: Optional[LogLevel]) -> bool

[VIEW SOURCE]

Configure "quixstreams" logger.

NOTE: If "quixstreams" logger already has pre-defined handlers (e.g. logging has already been configured via logging, or the function is called twice), it will skip configuration and return False.

Arguments:

  • loglevel: a valid log level as a string or None. If None passed, this function is no-op and no logging will be configured.

Returns:

True if logging config has been updated, otherwise False.

quixstreams.error_callbacks

quixstreams.platforms

quixstreams.platforms.quix.config

strip_workspace_id_prefix

def strip_workspace_id_prefix(workspace_id: str, s: str) -> str

[VIEW SOURCE]

Remove the workspace ID from a given string if it starts with it,

typically a topic or consumer group id

Arguments:

  • workspace_id: the workspace id
  • s: the string to append to

Returns:

the string with workspace_id prefix removed

prepend_workspace_id

def prepend_workspace_id(workspace_id: str, s: str) -> str

[VIEW SOURCE]

Add the workspace ID as a prefix to a given string if it does not have it,

typically a topic or consumer group it

Arguments:

  • workspace_id: the workspace id
  • s: the string to append to

Returns:

the string with workspace_id prepended

QuixApplicationConfig

@dataclasses.dataclass
class QuixApplicationConfig()

[VIEW SOURCE]

A convenience container class for Quix Application configs.

QuixKafkaConfigsBuilder

class QuixKafkaConfigsBuilder()

[VIEW SOURCE]

Retrieves all the necessary information from the Quix API and builds all the objects required to connect a confluent-kafka client to the Quix Platform.

If not executed within the Quix platform directly, you must provide a Quix "streaming" (aka "sdk") token, or Personal Access Token.

Ideally you also know your workspace name or id. If not, you can search for it using a known topic name, but note the search space is limited to the access level of your token.

It also currently handles the app_auto_create_topics setting for Application.Quix.

QuixKafkaConfigsBuilder.__init__

def __init__(quix_sdk_token: Optional[str] = None,
             workspace_id: Optional[str] = None,
             quix_portal_api_service: Optional[QuixPortalApiService] = None,
             timeout: float = 30,
             topic_create_timeout: float = 60)

[VIEW SOURCE]

Arguments:

  • quix_portal_api_service: A QuixPortalApiService instance (else generated)
  • workspace_id: A valid Quix Workspace ID (else searched for)

QuixKafkaConfigsBuilder.strip_workspace_id_prefix

def strip_workspace_id_prefix(s: str) -> str

[VIEW SOURCE]

Remove the workspace ID from a given string if it starts with it,

typically a topic or consumer group id

Arguments:

  • s: the string to append to

Returns:

the string with workspace_id prefix removed

QuixKafkaConfigsBuilder.prepend_workspace_id

def prepend_workspace_id(s: str) -> str

[VIEW SOURCE]

Add the workspace ID as a prefix to a given string if it does not have it,

typically a topic or consumer group it

Arguments:

  • s: the string to append to

Returns:

the string with workspace_id prepended

QuixKafkaConfigsBuilder.search_for_workspace

def search_for_workspace(workspace_name_or_id: Optional[str] = None,
                         timeout: Optional[float] = None) -> Optional[dict]

[VIEW SOURCE]

Search for a workspace given an expected workspace name or id.

Arguments:

  • workspace_name_or_id: the expected name or id of a workspace
  • timeout: response timeout (seconds); Default 30

Returns:

the workspace data dict if search success, else None

QuixKafkaConfigsBuilder.get_workspace_info

def get_workspace_info(known_workspace_topic: Optional[str] = None,
                       timeout: Optional[float] = None) -> dict

[VIEW SOURCE]

Queries for workspace data from the Quix API, regardless of instance cache,

and updates instance attributes from query result.

Arguments:

  • known_workspace_topic: a topic you know to exist in some workspace
  • timeout: response timeout (seconds); Default 30

QuixKafkaConfigsBuilder.search_workspace_for_topic

def search_workspace_for_topic(
        workspace_id: str,
        topic: str,
        timeout: Optional[float] = None) -> Optional[str]

[VIEW SOURCE]

Search through all the topics in the given workspace id to see if there is a

match with the provided topic.

Arguments:

  • workspace_id: the workspace to search in
  • topic: the topic to search for
  • timeout: response timeout (seconds); Default 30

Returns:

the workspace_id if success, else None

QuixKafkaConfigsBuilder.search_for_topic_workspace

def search_for_topic_workspace(topic: str,
                               timeout: Optional[float] = None
                               ) -> Optional[dict]

[VIEW SOURCE]

Find what workspace a topic belongs to.

If there is only one workspace altogether, it is assumed to be the workspace. More than one means each workspace will be searched until the first hit.

Arguments:

  • topic: the topic to search for
  • timeout: response timeout (seconds); Default 30

Returns:

workspace data dict if topic search success, else None

QuixKafkaConfigsBuilder.create_topics

def create_topics(topics: List[Topic],
                  timeout: Optional[float] = None,
                  finalize_timeout: Optional[float] = None)

[VIEW SOURCE]

Create topics in a Quix cluster.

Arguments:

  • topics: a list of Topic objects
  • timeout: response timeout (seconds); Default 30
  • finalize_timeout: topic finalization timeout (seconds); Default 60 marked as "Ready" (and thus ready to produce to/consume from).

QuixKafkaConfigsBuilder.get_topic

def get_topic(topic_name: str,
              timeout: Optional[float] = None) -> Optional[dict]

[VIEW SOURCE]

return the topic ID (the actual cluster topic name) if it exists, else None

NOTE: if the name registered in Quix is instead the workspace-prefixed version, this returns None unless that exact name was created WITHOUT the Quix API.

Arguments:

  • topic_name: name of the topic
  • timeout: response timeout (seconds); Default 30

Returns:

response dict of the topic info if topic found, else None

QuixKafkaConfigsBuilder.confirm_topics_exist

def confirm_topics_exist(topics: Union[List[Topic], List[str]],
                         timeout: Optional[float] = None)

[VIEW SOURCE]

Confirm whether the desired set of topics exists in the Quix workspace.

Arguments:

  • topics: a list of Topic or topic names
  • timeout: response timeout (seconds); Default 30

QuixKafkaConfigsBuilder.get_application_config

def get_application_config(consumer_group_id: str) -> QuixApplicationConfig

[VIEW SOURCE]

Get all the necessary attributes for an Application to run on Quix Cloud.

Arguments:

  • consumer_group_id: consumer group id, if needed

Returns:

a QuixApplicationConfig instance

quixstreams.platforms.quix.env

QuixEnvironment

class QuixEnvironment()

[VIEW SOURCE]

Class to access various Quix platform environment settings

QuixEnvironment.state_management_enabled

@property
def state_management_enabled() -> bool

[VIEW SOURCE]

Check whether "State management" is enabled for the current deployment

Returns:

True if state management is enabled, otherwise False

QuixEnvironment.deployment_id

@property
def deployment_id() -> Optional[str]

[VIEW SOURCE]

Return current Quix deployment id.

This variable is meant to be set only by Quix Platform and only when the application is deployed.

Returns:

deployment id or None

QuixEnvironment.workspace_id

@property
def workspace_id() -> Optional[str]

[VIEW SOURCE]

Return Quix workspace id if set

Returns:

workspace id or None

QuixEnvironment.portal_api

@property
def portal_api() -> Optional[str]

[VIEW SOURCE]

Return Quix Portal API url if set

Returns:

portal API URL or None

QuixEnvironment.state_dir

@property
def state_dir() -> str

[VIEW SOURCE]

Return application state directory on Quix.

Returns:

path to state dir

quixstreams.platforms.quix.checks

check_state_management_enabled

def check_state_management_enabled()

[VIEW SOURCE]

Check if State Management feature is enabled for the current deployment on Quix platform. If it's disabled, the exception will be raised.

check_state_dir

def check_state_dir(state_dir: str)

[VIEW SOURCE]

Check if Application "state_dir" matches the state dir on Quix platform.

If it doesn't match, the warning will be logged.

Arguments:

  • state_dir: application state_dir path

quixstreams.platforms.quix

quixstreams.platforms.quix.api

QuixPortalApiService

class QuixPortalApiService()

[VIEW SOURCE]

A light wrapper around the Quix Portal Api. If used in the Quix Platform, it will use that workspaces auth token and portal endpoint, else you must provide it.

Function names closely reflect the respective API endpoint, each starting with the method [GET, POST, etc.] followed by the endpoint path.

Results will be returned in the form of request's Response.json(), unless something else is required. Non-200's will raise exceptions.

See the swagger documentation for more info about the endpoints.

QuixPortalApiService.get_workspace_certificate

def get_workspace_certificate(workspace_id: Optional[str] = None,
                              timeout: float = 30) -> Optional[bytes]

[VIEW SOURCE]

Get a workspace TLS certificate if available.

Returns None if certificate is not specified.

Arguments:

  • workspace_id: workspace id, optional
  • timeout: request timeout; Default 30

Returns:

certificate as bytes if present, or None

quixstreams.platforms.quix.exceptions

quixstreams.platforms.quix.topic_manager

QuixTopicManager

class QuixTopicManager(TopicManager)

[VIEW SOURCE]

The source of all topic management with quixstreams.

This is specifically for Applications using the Quix platform.

Generally initialized and managed automatically by an Application.Quix, but allows a user to work with it directly when needed, such as using it alongside a plain Producer to create its topics.

See methods for details.

QuixTopicManager.__init__

def __init__(topic_admin: TopicAdmin,
             consumer_group: str,
             quix_config_builder: QuixKafkaConfigsBuilder,
             timeout: float = 30,
             create_timeout: float = 60)

[VIEW SOURCE]

Arguments:

  • topic_admin: an Admin instance
  • quix_config_builder: A QuixKafkaConfigsBuilder instance, else one is generated for you.
  • timeout: response timeout (seconds)
  • create_timeout: timeout for topic creation

quixstreams.dataframe.dataframe

StreamingDataFrame

class StreamingDataFrame(BaseStreaming)

[VIEW SOURCE]

StreamingDataFrame is the main object you will use for ETL work.

Typically created with an app = quixstreams.app.Application() instance, via sdf = app.dataframe().

What it Does:

  • Builds a data processing pipeline, declaratively (not executed immediately)
    • Executes this pipeline on inputs at runtime (Kafka message values)
  • Provides functions/interface similar to Pandas Dataframes/Series
  • Enables stateful processing (and manages everything related to it)

How to Use:

Define various operations while continuously reassigning to itself (or new fields).

These operations will generally transform your data, access/update state, or produce to kafka topics.

We recommend your data structure to be "columnar" (aka a dict/JSON) in nature so that it works with the entire interface, but simple types like ints, str, etc. are also supported.

See the various methods and classes for more specifics, or for a deep dive into usage, see streamingdataframe.md under the docs/ folder.

NOTE: column referencing like sdf["a_column"] and various methods often create other object types (typically quixstreams.dataframe.StreamingSeries), which is expected; type hinting should alert you to any issues should you attempt invalid operations with said objects (however, we cannot infer whether an operation is valid with respect to your data!).

Example Snippet:

sdf = StreamingDataframe()
sdf = sdf.apply(a_func)
sdf = sdf.filter(another_func)
sdf = sdf.to_topic(topic_obj)

StreamingDataFrame.apply

def apply(func: Union[
    ApplyCallback,
    ApplyCallbackStateful,
    ApplyWithMetadataCallback,
    ApplyWithMetadataCallbackStateful,
],
          *,
          stateful: bool = False,
          expand: bool = False,
          metadata: bool = False) -> Self

[VIEW SOURCE]

Apply a function to transform the value and return a new value.

The result will be passed downstream as an input value.

Example Snippet:

# This stores a string in state and capitalizes every column with a string value.
# A second apply then keeps only the string value columns (shows non-stateful).
def func(d: dict, state: State):
    value = d["store_field"]
    if value != state.get("my_store_key"):
        state.set("my_store_key") = value
    return {k: v.upper() if isinstance(v, str) else v for k, v in d.items()}

sdf = StreamingDataframe()
sdf = sdf.apply(func, stateful=True)
sdf = sdf.apply(lambda d: {k: v for k,v in d.items() if isinstance(v, str)})

Arguments:

  • func: a function to apply
  • stateful: if True, the function will be provided with a second argument of type State to perform stateful operations.
  • expand: if True, expand the returned iterable into individual values downstream. If returned value is not iterable, TypeError will be raised. Default - False.
  • metadata: if True, the callback will receive key, timestamp and headers along with the value. Default - False.

StreamingDataFrame.update

def update(func: Union[
    UpdateCallback,
    UpdateCallbackStateful,
    UpdateWithMetadataCallback,
    UpdateWithMetadataCallbackStateful,
],
           *,
           stateful: bool = False,
           metadata: bool = False) -> Self

[VIEW SOURCE]

Apply a function to mutate value in-place or to perform a side effect

(e.g., printing a value to the console).

The result of the function will be ignored, and the original value will be passed downstream.

This operation occurs in-place, meaning reassignment is entirely OPTIONAL: the original StreamingDataFrame is returned for chaining (sdf.update().print()).

Example Snippet:

# Stores a value and mutates a list by appending a new item to it.
# Also prints to console.

def func(values: list, state: State):
    value = values[0]
    if value != state.get("my_store_key"):
        state.set("my_store_key") = value
    values.append("new_item")

sdf = StreamingDataframe()
sdf = sdf.update(func, stateful=True)
# does not require reassigning
sdf.update(lambda v: v.append(1))

Arguments:

  • func: function to update value
  • stateful: if True, the function will be provided with a second argument of type State to perform stateful operations.
  • metadata: if True, the callback will receive key, timestamp and headers along with the value. Default - False.

Returns:

the updated StreamingDataFrame instance (reassignment NOT required).

StreamingDataFrame.filter

def filter(func: Union[
    FilterCallback,
    FilterCallbackStateful,
    FilterWithMetadataCallback,
    FilterWithMetadataCallbackStateful,
],
           *,
           stateful: bool = False,
           metadata: bool = False) -> Self

[VIEW SOURCE]

Filter value using provided function.

If the function returns True-like value, the original value will be passed downstream.

Example Snippet:

# Stores a value and allows further processing only if the value is greater than
# what was previously stored.

def func(d: dict, state: State):
    value = d["my_value"]
    if value > state.get("my_store_key"):
        state.set("my_store_key") = value
        return True
    return False

sdf = StreamingDataframe()
sdf = sdf.filter(func, stateful=True)

Arguments:

  • func: function to filter value
  • stateful: if True, the function will be provided with second argument of type State to perform stateful operations.
  • metadata: if True, the callback will receive key, timestamp and headers along with the value. Default - False.

StreamingDataFrame.group_by

def group_by(key: Union[str, Callable[[Any], Any]],
             name: Optional[str] = None,
             value_deserializer: Optional[DeserializerType] = "json",
             key_deserializer: Optional[DeserializerType] = "json",
             value_serializer: Optional[SerializerType] = "json",
             key_serializer: Optional[SerializerType] = "json") -> Self

[VIEW SOURCE]

"Groups" messages by re-keying them via the provided group_by operation

on their message values.

This enables things like aggregations on messages with non-matching keys.

You can provide a column name (uses the column's value) or a custom function to generate this new key.

.groupby() can only be performed once per StreamingDataFrame instance.

NOTE: group_by generates a topic that copies the original topic's settings.

Example Snippet:

# We have customer purchase events where the message key is the "store_id",
# but we want to calculate sales per customer (by "customer_account_id").

def func(d: dict, state: State):
    current_total = state.get("customer_sum", 0)
    new_total = current_total + d["customer_spent"]
    state.set("customer_sum", new_total)
    d["customer_total"] = new_total
    return d

sdf = StreamingDataframe()
sdf = sdf.group_by("customer_account_id")
sdf = sdf.apply(func, stateful=True)

Arguments:

  • key: how the new key should be generated from the message value; requires a column name (string) or a callable that takes the message value.
  • name: a name for the op (must be unique per group-by), required if key is a custom callable.
  • value_deserializer: a deserializer type for values; default - JSON
  • key_deserializer: a deserializer type for keys; default - JSON
  • value_serializer: a serializer type for values; default - JSON
  • key_serializer: a serializer type for keys; default - JSON

Returns:

a clone with this operation added (assign to keep its effect).

StreamingDataFrame.contains

@staticmethod
def contains(key: str) -> StreamingSeries

[VIEW SOURCE]

Check if the key is present in the Row value.

Example Snippet:

# Add new column 'has_column' which contains a boolean indicating
# the presence of 'column_x'

sdf = StreamingDataframe()
sdf['has_column'] = sdf.contains('column_x')

Arguments:

  • key: a column name to check.

Returns:

a Column object that evaluates to True if the key is present or False otherwise.

StreamingDataFrame.to_topic

def to_topic(topic: Topic, key: Optional[Callable[[Any], Any]] = None) -> Self

[VIEW SOURCE]

Produce current value to a topic. You can optionally specify a new key.

This operation occurs in-place, meaning reassignment is entirely OPTIONAL: the original StreamingDataFrame is returned for chaining (sdf.update().print()).

Example Snippet:

from quixstreams import Application

# Produce to two different topics, changing the key for one of them.

app = Application()
input_topic = app.topic("input_x")
output_topic_0 = app.topic("output_a")
output_topic_1 = app.topic("output_b")

sdf = app.dataframe(input_topic)
sdf = sdf.to_topic(output_topic_0)
# does not require reassigning
sdf.to_topic(output_topic_1, key=lambda data: data["a_field"])

Arguments:

  • topic: instance of Topic
  • key: a callable to generate a new message key, optional. If passed, the return type of this callable must be serializable by key_serializer defined for this Topic object. By default, the current message key will be used.

Returns:

the updated StreamingDataFrame instance (reassignment NOT required).

StreamingDataFrame.set_timestamp

def set_timestamp(func: Callable[[Any, Any, int, Any], int]) -> Self

[VIEW SOURCE]

Set a new timestamp based on the current message value and its metadata.

The new timestamp will be used in windowed aggregations and when producing messages to the output topics.

The new timestamp must be in milliseconds to conform Kafka requirements.

Example Snippet:

from quixstreams import Application


app = Application()
input_topic = app.topic("data")

sdf = app.dataframe(input_topic)
# Updating the record's timestamp based on the value
sdf = sdf.set_timestamp(lambda value, key, timestamp, headers: value['new_timestamp'])

Arguments:

  • func: callable accepting the current value, key, timestamp, and headers. It's expected to return a new timestamp as integer in milliseconds.

Returns:

a new StreamingDataFrame instance

StreamingDataFrame.set_headers

def set_headers(
    func: Callable[
        [Any, Any, int, List[Tuple[str, HeaderValue]]],
        Collection[Tuple[str, HeaderValue]],
    ]
) -> Self

[VIEW SOURCE]

Set new message headers based on the current message value and metadata.

The new headers will be used when producing messages to the output topics.

The provided callback must accept value, key, timestamp, and headers, and return a new collection of (header, value) tuples.

Example Snippet:

from quixstreams import Application


app = Application()
input_topic = app.topic("data")

sdf = app.dataframe(input_topic)
# Updating the record's headers based on the value and metadata
sdf = sdf.set_headers(lambda value, key, timestamp, headers: [('id', value['id'])])

Arguments:

  • func: callable accepting the current value, key, timestamp, and headers. It's expected to return a new set of headers as a collection of (header, value) tuples.

Returns:

a new StreamingDataFrame instance

StreamingDataFrame.print

def print(pretty: bool = True, metadata: bool = False) -> Self

[VIEW SOURCE]

Print out the current message value (and optionally, the message metadata) to

stdout (console) (like the built-in print function).

Can also output a more dict-friendly format with pretty=True.

This operation occurs in-place, meaning reassignment is entirely OPTIONAL: the original StreamingDataFrame is returned for chaining (sdf.update().print()).

NOTE: prints the current (edited) values, not the original values.

Example Snippet:

from quixstreams import Application


app = Application()
input_topic = app.topic("data")

sdf = app.dataframe(input_topic)
sdf["edited_col"] = sdf["orig_col"] + "edited"
# print the updated message value with the newly added column
sdf.print()

Arguments:

  • pretty: Whether to use "pprint" formatting, which uses new-lines and indents for easier console reading (but might be worse for log parsing).
  • metadata: Whether to additionally print the key, timestamp, and headers

Returns:

the updated StreamingDataFrame instance (reassignment NOT required).

StreamingDataFrame.compose

def compose(
    sink: Optional[Callable[[Any, Any, int, Any], None]] = None
) -> Dict[str, VoidExecutor]

[VIEW SOURCE]

Compose all functions of this StreamingDataFrame into one big closure.

Closures are more performant than calling all the functions in the StreamingDataFrame one-by-one.

Generally not required by users; the quixstreams.app.Application class will do this automatically.

Example Snippet:

from quixstreams import Application
sdf = app.dataframe()
sdf = sdf.apply(apply_func)
sdf = sdf.filter(filter_func)
sdf = sdf.compose()

result_0 = sdf({"my": "record"})
result_1 = sdf({"other": "record"})

Arguments:

  • sink: callable to accumulate the results of the execution, optional.

Returns:

a function that accepts "value" and returns a result of StreamingDataFrame

StreamingDataFrame.test

def test(value: Any,
         key: Any,
         timestamp: int,
         headers: Optional[Any] = None,
         ctx: Optional[MessageContext] = None,
         topic: Optional[Topic] = None) -> List[Any]

[VIEW SOURCE]

A shorthand to test StreamingDataFrame with provided value

and MessageContext.

Arguments:

  • value: value to pass through StreamingDataFrame
  • key: key to pass through StreamingDataFrame
  • timestamp: timestamp to pass through StreamingDataFrame
  • ctx: instance of MessageContext, optional. Provide it if the StreamingDataFrame instance calls to_topic(), has stateful functions or windows. Default - None.
  • topic: optionally, a topic branch to test with

Returns:

result of StreamingDataFrame

StreamingDataFrame.tumbling_window

def tumbling_window(duration_ms: Union[int, timedelta],
                    grace_ms: Union[int, timedelta] = 0,
                    name: Optional[str] = None) -> TumblingWindowDefinition

[VIEW SOURCE]

Create a tumbling window transformation on this StreamingDataFrame.

Tumbling windows divide time into fixed-sized, non-overlapping windows.

They allow performing stateful aggregations like sum, reduce, etc. on top of the data and emit results downstream.

Notes:

  • The timestamp of the aggregation result is set to the window start timestamp.
  • Every window is grouped by the current Kafka message key.
  • Messages with None key will be ignored.
  • The time windows always use the current event time.

Example Snippet:

app = Application()
sdf = app.dataframe(...)

sdf = (
    # Define a tumbling window of 60s and grace period of 10s
    sdf.tumbling_window(
        duration_ms=timedelta(seconds=60), grace_ms=timedelta(seconds=10.0)
    )

    # Specify the aggregation function
    .sum()

    # Specify how the results should be emitted downstream.
    # "all()" will emit results as they come for each updated window,
    # possibly producing multiple messages per key-window pair
    # "final()" will emit windows only when they are closed and cannot
    # receive any updates anymore.
    .all()
)

Arguments:

  • duration_ms: The length of each window. Can be specified as either an int representing milliseconds or a timedelta object.

    NOTE: timedelta objects will be rounded to the closest millisecond value.

  • grace_ms: The grace period for data arrival. It allows late-arriving data (data arriving after the window has theoretically closed) to be included in the window. Can be specified as either an int representing milliseconds or as a timedelta object.

    NOTE: timedelta objects will be rounded to the closest millisecond value.

  • name: The unique identifier for the window. If not provided, it will be automatically generated based on the window's properties.

Returns:

TumblingWindowDefinition instance representing the tumbling window configuration. This object can be further configured with aggregation functions like sum, count, etc. applied to the StreamingDataFrame.

StreamingDataFrame.hopping_window

def hopping_window(duration_ms: Union[int, timedelta],
                   step_ms: Union[int, timedelta],
                   grace_ms: Union[int, timedelta] = 0,
                   name: Optional[str] = None) -> HoppingWindowDefinition

[VIEW SOURCE]

Create a hopping window transformation on this StreamingDataFrame.

Hopping windows divide the data stream into overlapping windows based on time. The overlap is controlled by the step_ms parameter.

They allow performing stateful aggregations like sum, reduce, etc. on top of the data and emit results downstream.

Notes:

  • The timestamp of the aggregation result is set to the window start timestamp.
  • Every window is grouped by the current Kafka message key.
  • Messages with None key will be ignored.
  • The time windows always use the current event time.

Example Snippet:

app = Application()
sdf = app.dataframe(...)

sdf = (
    # Define a hopping window of 60s with step 30s and grace period of 10s
    sdf.hopping_window(
        duration_ms=timedelta(seconds=60),
        step_ms=timedelta(seconds=30),
        grace_ms=timedelta(seconds=10)
    )

    # Specify the aggregation function
    .sum()

    # Specify how the results should be emitted downstream.
    # "all()" will emit results as they come for each updated window,
    # possibly producing multiple messages per key-window pair
    # "final()" will emit windows only when they are closed and cannot
    # receive any updates anymore.
    .all()
)

Arguments:

  • duration_ms: The length of each window. It defines the time span for which each window aggregates data. Can be specified as either an int representing milliseconds or a timedelta object.

    NOTE: timedelta objects will be rounded to the closest millisecond value.

  • step_ms: The step size for the window. It determines how much each successive window moves forward in time. Can be specified as either an int representing milliseconds or a timedelta object.

    NOTE: timedelta objects will be rounded to the closest millisecond value.

  • grace_ms: The grace period for data arrival. It allows late-arriving data to be included in the window, even if it arrives after the window has theoretically moved forward. Can be specified as either an int representing milliseconds or a timedelta object.

    NOTE: timedelta objects will be rounded to the closest millisecond value.

  • name: The unique identifier for the window. If not provided, it will be automatically generated based on the window's properties.

Returns:

HoppingWindowDefinition instance representing the hopping window configuration. This object can be further configured with aggregation functions like sum, count, etc. and applied to the StreamingDataFrame.

StreamingDataFrame.drop

def drop(columns: Union[str, List[str]]) -> Self

[VIEW SOURCE]

Drop column(s) from the message value (value must support del, like a dict).

This operation occurs in-place, meaning reassignment is entirely OPTIONAL: the original StreamingDataFrame is returned for chaining (sdf.update().print()).

Example Snippet:

# Remove columns "x" and "y" from the value.
# This would transform {"x": 1, "y": 2, "z": 3} to {"z": 3}

sdf = StreamingDataframe()
sdf.drop(["x", "y"])

Arguments:

  • columns: a single column name or a list of names, where names are str

Returns:

a new StreamingDataFrame instance

quixstreams.dataframe.series

StreamingSeries

class StreamingSeries(BaseStreaming)

[VIEW SOURCE]

StreamingSeries are typically generated by StreamingDataframes when getting elements from, or performing certain operations on, a StreamingDataframe, thus acting as a representation of "column" value.

They share some operations with the StreamingDataframe, but also provide some additional functionality.

Most column value operations are handled by this class, and StreamingSeries can generate other StreamingSeries as a result of said operations.

What it Does:

  • Allows ways to do simple operations with dataframe "column"/dictionary values:
    • Basic ops like add, subtract, modulo, etc.
  • Enables comparisons/inequalities:
    • Greater than, equals, etc.
    • and/or, is/not operations
  • Can check for existence of columns in StreamingDataFrames
  • Enables chaining of various operations together

How to Use:

For the most part, you may not even notice this class exists! They will naturally be created as a result of typical StreamingDataFrame use.

Auto-complete should help you with valid methods and type-checking should alert you to invalid operations between StreamingSeries.

In general, any typical Pands dataframe operation between columns should be valid with StreamingSeries, and you shouldn't have to think about them explicitly.

Example Snippet:

# Random methods for example purposes. More detailed explanations found under
# various methods or in the docs folder.

sdf = StreamingDataframe()
sdf = sdf["column_a"].apply(a_func).apply(diff_func, stateful=True)
sdf["my_new_bool_field"] = sdf["column_b"].contains("this_string")
sdf["new_sum_field"] = sdf["column_c"] + sdf["column_d"] + 2
sdf = sdf[["column_a"] & (sdf["new_sum_field"] >= 10)]

StreamingSeries.from_apply_callback

@classmethod
def from_apply_callback(cls, func: ApplyWithMetadataCallback) -> Self

[VIEW SOURCE]

Create a StreamingSeries from a function.

The provided function will be wrapped into Apply

Arguments:

  • func: a function to apply

Returns:

instance of StreamingSeries

StreamingSeries.apply

def apply(func: ApplyCallback) -> Self

[VIEW SOURCE]

Add a callable to the execution list for this series.

The provided callable should accept a single argument, which will be its input. The provided callable should similarly return one output, or None

They can be chained together or included with other operations.

Example Snippet:

# The `StreamingSeries` are generated when `sdf["COLUMN_NAME"]` is called.
# This stores a string in state and capitalizes the column value; the result is
# assigned to a new column.
#  Another apply converts a str column to an int, assigning it to a new column.

def func(value: str, state: State):
    if value != state.get("my_store_key"):
        state.set("my_store_key") = value
    return v.upper()

sdf = StreamingDataframe()
sdf["new_col"] = sdf["a_column"]["nested_dict_key"].apply(func, stateful=True)
sdf["new_col_2"] = sdf["str_col"].apply(lambda v: int(v)) + sdf["str_col2"] + 2

Arguments:

  • func: a callable with one argument and one output

Returns:

a new StreamingSeries with the new callable added

StreamingSeries.compose_returning

def compose_returning() -> ReturningExecutor

[VIEW SOURCE]

Compose a list of functions from this StreamingSeries and its parents into one

big closure that always returns the transformed record.

This closure is to be used to execute the functions in the stream and to get the result of the transformations.

Stream may only contain simple "apply" functions to be able to compose itself into a returning function.

Returns:

a callable accepting value, key and timestamp and returning a tuple "(value, key, timestamp)

StreamingSeries.compose

def compose(
    sink: Optional[Callable[[Any, Any, int, Any],
                            None]] = None) -> VoidExecutor

[VIEW SOURCE]

Compose all functions of this StreamingSeries into one big closure.

Generally not required by users; the quixstreams.app.Application class will do this automatically.

Example Snippet:

from quixstreams import Application

app = Application(...)

sdf = app.dataframe()
sdf = sdf["column_a"].apply(apply_func)
sdf = sdf["column_b"].contains(filter_func)
sdf = sdf.compose()

result_0 = sdf({"my": "record"})
result_1 = sdf({"other": "record"})

Arguments:

  • sink: callable to accumulate the results of the execution.

Raises:

  • ValueError: if disallowed functions are present in the tree of underlying Stream.

Returns:

a callable accepting value, key and timestamp and returning None

StreamingSeries.test

def test(value: Any,
         key: Any,
         timestamp: int,
         headers: Optional[Any] = None,
         ctx: Optional[MessageContext] = None) -> Any

[VIEW SOURCE]

A shorthand to test StreamingSeries with provided value

and MessageContext.

Arguments:

  • value: value to pass through StreamingSeries
  • ctx: instance of MessageContext, optional. Provide it if the StreamingSeries instance has functions calling get_current_key(). Default - None.

Returns:

result of StreamingSeries

StreamingSeries.isin

def isin(other: Container) -> Self

[VIEW SOURCE]

Check if series value is in "other".

Same as "StreamingSeries in other".

Runtime result will be a bool.

Example Snippet:

from quixstreams import Application

# Check if "str_column" is contained in a column with a list of strings and
# assign the resulting `bool` to a new column: "has_my_str".

sdf = app.dataframe()
sdf["has_my_str"] = sdf["str_column"].isin(sdf["column_with_list_of_strs"])

Arguments:

  • other: a container to check

Returns:

new StreamingSeries

StreamingSeries.contains

def contains(other: Union[Self, object]) -> Self

[VIEW SOURCE]

Check if series value contains "other"

Same as "other in StreamingSeries".

Runtime result will be a bool.

Example Snippet:

from quixstreams import Application

# Check if "column_a" contains "my_substring" and assign the resulting
# `bool` to a new column: "has_my_substr"

sdf = app.dataframe()
sdf["has_my_substr"] = sdf["column_a"].contains("my_substring")

Arguments:

  • other: object to check

Returns:

new StreamingSeries

StreamingSeries.is_

def is_(other: Union[Self, object]) -> Self

[VIEW SOURCE]

Check if series value refers to the same object as other

Runtime result will be a bool.

Example Snippet:

# Check if "column_a" is the same as "column_b" and assign the resulting `bool`
#  to a new column: "is_same"

from quixstreams import Application
sdf = app.dataframe()
sdf["is_same"] = sdf["column_a"].is_(sdf["column_b"])

Arguments:

  • other: object to check for "is"

Returns:

new StreamingSeries

StreamingSeries.isnot

def isnot(other: Union[Self, object]) -> Self

[VIEW SOURCE]

Check if series value does not refer to the same object as other

Runtime result will be a bool.

Example Snippet:

from quixstreams import Application

# Check if "column_a" is the same as "column_b" and assign the resulting `bool`
# to a new column: "is_not_same"

sdf = app.dataframe()
sdf["is_not_same"] = sdf["column_a"].isnot(sdf["column_b"])

Arguments:

  • other: object to check for "is_not"

Returns:

new StreamingSeries

StreamingSeries.isnull

def isnull() -> Self

[VIEW SOURCE]

Check if series value is None.

Runtime result will be a bool.

Example Snippet:

from quixstreams import Application

# Check if "column_a" is null and assign the resulting `bool` to a new column:
# "is_null"

sdf = app.dataframe()
sdf["is_null"] = sdf["column_a"].isnull()

Returns:

new StreamingSeries

StreamingSeries.notnull

def notnull() -> Self

[VIEW SOURCE]

Check if series value is not None.

Runtime result will be a bool.

Example Snippet:

from quixstreams import Application

# Check if "column_a" is not null and assign the resulting `bool` to a new column:
# "is_not_null"

sdf = app.dataframe()
sdf["is_not_null"] = sdf["column_a"].notnull()

Returns:

new StreamingSeries

StreamingSeries.abs

def abs() -> Self

[VIEW SOURCE]

Get absolute value of the series value.

Example Snippet:

from quixstreams import Application

# Get absolute value of "int_col" and add it to "other_int_col".
# Finally, assign the result to a new column: "abs_col_sum".

sdf = app.dataframe()
sdf["abs_col_sum"] = sdf["int_col"].abs() + sdf["other_int_col"]

Returns:

new StreamingSeries

quixstreams.dataframe

quixstreams.dataframe.utils

ensure_milliseconds

def ensure_milliseconds(delta: Union[int, timedelta]) -> int

[VIEW SOURCE]

Convert timedelta to milliseconds.

If the delta is not This function will also round the value to the closest milliseconds in case of higher precision.

Arguments:

  • delta: timedelta object

Returns:

timedelta value in milliseconds as int

quixstreams.dataframe.exceptions

quixstreams.dataframe.windows.definitions

FixedTimeWindowDefinition

class FixedTimeWindowDefinition(abc.ABC)

[VIEW SOURCE]

FixedTimeWindowDefinition.sum

def sum() -> "FixedTimeWindow"

[VIEW SOURCE]

Configure the window to aggregate data by summing up values within

each window period.

Returns:

an instance of FixedTimeWindow configured to perform sum aggregation.

FixedTimeWindowDefinition.count

def count() -> "FixedTimeWindow"

[VIEW SOURCE]

Configure the window to aggregate data by counting the number of values

within each window period.

Returns:

an instance of FixedTimeWindow configured to perform record count.

FixedTimeWindowDefinition.mean

def mean() -> "FixedTimeWindow"

[VIEW SOURCE]

Configure the window to aggregate data by calculating the mean of the values

within each window period.

Returns:

an instance of FixedTimeWindow configured to calculate the mean of the values.

FixedTimeWindowDefinition.reduce

def reduce(reducer: Callable[[Any, Any], Any],
           initializer: Callable[[Any], Any]) -> "FixedTimeWindow"

[VIEW SOURCE]

Configure the window to perform a custom aggregation using reducer

and initializer functions.

Example Snippet:

sdf = StreamingDataFrame(...)

# Using "reduce()" to calculate multiple aggregates at once
def reducer(agg: dict, current: int):
    aggregated = {
        'min': min(agg['min'], current),
        'max': max(agg['max'], current)
        'count': agg['count'] + 1
    }
    return aggregated

def initializer(current) -> dict:
    return {'min': current, 'max': current, 'count': 1}

window = (
    sdf.tumbling_window(duration_ms=1000)
    .reduce(reducer=reducer, initializer=initializer)
    .final()
)

Arguments:

  • reducer: A function that takes two arguments (the accumulated value and a new value) and returns a single value. The returned value will be saved to the state store and sent downstream.
  • initializer: A function to call for every first element of the window. This function is used to initialize the aggregation within a window.

Returns:

A window configured to perform custom reduce aggregation on the data.

FixedTimeWindowDefinition.max

def max() -> "FixedTimeWindow"

[VIEW SOURCE]

Configure a window to aggregate the maximum value within each window period.

Returns:

an instance of FixedTimeWindow configured to calculate the maximum value within each window period.

FixedTimeWindowDefinition.min

def min() -> "FixedTimeWindow"

[VIEW SOURCE]

Configure a window to aggregate the minimum value within each window period.

Returns:

an instance of FixedTimeWindow configured to calculate the maximum value within each window period.

quixstreams.dataframe.windows

quixstreams.dataframe.windows.time_based

FixedTimeWindow

class FixedTimeWindow()

[VIEW SOURCE]

FixedTimeWindow.final

def final() -> "StreamingDataFrame"

[VIEW SOURCE]

Apply the window aggregation and return results only when the windows are closed.

The format of returned windows:

{
    "start": <window start time in milliseconds>,
    "end": <window end time in milliseconds>,
    "value: <aggregated window value>,
}

The individual window is closed when the event time (the maximum observed timestamp across the partition) passes its end timestamp + grace period. The closed windows cannot receive updates anymore and are considered final.

NOTE: Windows can be closed only within the same message key. If some message keys appear irregularly in the stream, the latest windows can remain unprocessed until the message the same key is received.

FixedTimeWindow.current

def current() -> "StreamingDataFrame"

[VIEW SOURCE]

Apply the window transformation to the StreamingDataFrame to return results for each updated window.

The format of returned windows:

{
    "start": <window start time in milliseconds>,
    "end": <window end time in milliseconds>,
    "value: <aggregated window value>,
}

This method processes streaming data and returns results as they come, regardless of whether the window is closed or not.

quixstreams.dataframe.windows.base

get_window_ranges

def get_window_ranges(timestamp_ms: int,
                      duration_ms: int,
                      step_ms: Optional[int] = None) -> List[Tuple[int, int]]

[VIEW SOURCE]

Get a list of window ranges for the given timestamp.

Arguments:

  • timestamp_ms: timestamp in milliseconds
  • duration_ms: window duration in milliseconds
  • step_ms: window step in milliseconds for hopping windows, optional.

Returns:

a list of (, ) tuples

quixstreams.dataframe.base

quixstreams.rowproducer

RowProducer

class RowProducer()

[VIEW SOURCE]

A producer class that is capable of serializing Rows to bytes and send them to Kafka.

The serialization is performed according to the Topic serialization settings.

Arguments:

  • broker_address: Connection settings for Kafka. Accepts string with Kafka broker host and port formatted as <host>:<port>, or a ConnectionConfig object if authentication is required.
  • extra_config: A dictionary with additional options that will be passed to confluent_kafka.Producer as is. Note: values passed as arguments override values in extra_config.
  • on_error: a callback triggered when RowProducer.produce_row() or RowProducer.poll() fail. If producer fails and the callback returnsTrue, the exception will be logged but not propagated. The default callback logs an exception and returnsFalse`.
  • flush_timeout: The time the producer is waiting for all messages to be delivered.
  • transactional: whether to use Kafka transactions or not. Note this changes which underlying Producer class is used.

RowProducer.produce_row

def produce_row(row: Row,
                topic: Topic,
                key: Optional[Any] = _KEY_UNSET,
                partition: Optional[int] = None,
                timestamp: Optional[int] = None)

[VIEW SOURCE]

Serialize Row to bytes according to the Topic serialization settings

and produce it to Kafka

If this method fails, it will trigger the provided "on_error" callback.

Arguments:

  • row: Row object
  • topic: Topic object
  • key: message key, optional
  • partition: partition number, optional
  • timestamp: timestamp in milliseconds, optional

RowProducer.poll

def poll(timeout: float = None)

[VIEW SOURCE]

Polls the producer for events and calls on_delivery callbacks.

If poll() fails, it will trigger the provided "on_error" callback

Arguments:

  • timeout: timeout in seconds

RowProducer.abort_transaction

def abort_transaction(timeout: Optional[float] = None)

[VIEW SOURCE]

Attempt an abort if an active transaction.

Else, skip since it throws an exception if at least one transaction was successfully completed at some point.

This avoids polluting the stack trace in the case where a transaction was not active as expected (because of some other exception already raised) and a cleanup abort is attempted.

NOTE: under normal circumstances a transaction will be open due to how the Checkpoint inits another immediately after committing.

quixstreams.core.stream.functions

StreamFunction

class StreamFunction(abc.ABC)

[VIEW SOURCE]

A base class for all the streaming operations in Quix Streams.

It provides a get_executor method to return a closure to be called with the input values.

StreamFunction.get_executor

@abc.abstractmethod
def get_executor(child_executor: VoidExecutor) -> VoidExecutor

[VIEW SOURCE]

Returns a wrapper to be called on a value, key and timestamp.

ApplyFunction

class ApplyFunction(StreamFunction)

[VIEW SOURCE]

Wrap a function into "Apply" function.

The provided callback is expected to return a new value based on input, and its result will always be passed downstream.

ApplyWithMetadataFunction

class ApplyWithMetadataFunction(StreamFunction)

[VIEW SOURCE]

Wrap a function into "Apply" function.

The provided function is expected to accept value, and timestamp and return a new value based on input, and its result will always be passed downstream.

FilterFunction

class FilterFunction(StreamFunction)

[VIEW SOURCE]

Wraps a function into a "Filter" function. The result of a Filter function is interpreted as boolean. If it's True, the input will be return downstream. If it's False, the Filtered exception will be raised to signal that the value is filtered out.

FilterWithMetadataFunction

class FilterWithMetadataFunction(StreamFunction)

[VIEW SOURCE]

Wraps a function into a "Filter" function.

The passed callback must accept value, key, and timestamp, and it's expected to return a boolean-like result.

If the result is True, the input will be passed downstream. Otherwise, the value will be filtered out.

UpdateFunction

class UpdateFunction(StreamFunction)

[VIEW SOURCE]

Wrap a function into an "Update" function.

The provided function must accept a value, and it's expected to mutate it or to perform some side effect.

The result of the callback is always ignored, and the original input is passed downstream.

UpdateWithMetadataFunction

class UpdateWithMetadataFunction(StreamFunction)

[VIEW SOURCE]

Wrap a function into an "Update" function.

The provided function must accept a value, a key, and a timestamp. The callback is expected to mutate the value or to perform some side effect with it.

The result of the callback is always ignored, and the original input is passed downstream.

TransformFunction

class TransformFunction(StreamFunction)

[VIEW SOURCE]

Wrap a function into a "Transform" function.

The provided callback must accept a value, a key and a timestamp. It's expected to return a new value, new key and new timestamp.

This function must be used with caution, because it can technically change the key. It's supposed to be used by the library internals and not be a part of the public API.

The result of the callback will always be passed downstream.

quixstreams.core.stream

quixstreams.core.stream.stream

Stream

class Stream()

[VIEW SOURCE]

Stream.__init__

def __init__(func: Optional[StreamFunction] = None,
             parent: Optional[Self] = None)

[VIEW SOURCE]

A base class for all streaming operations.

Stream is an abstraction of a function pipeline. Each Stream has a function and a parent (None by default). When adding new function to the stream, it creates a new Stream object and sets "parent" to the previous Stream to maintain an order of execution.

Streams supports four types of functions:

  • "Apply" - generate new values based on a previous one. The result of an Apply function is passed downstream to the next functions. If "expand=True" is passed and the function returns an Iterable, each item of it will be treated as a separate value downstream.
  • "Update" - update values in-place. The result of an Update function is always ignored, and its input is passed downstream.
  • "Filter" - to filter values from the Stream. The result of a Filter function is interpreted as boolean. If it's True, the input will be passed downstream. If it's False, the record will be filtered from the stream.
  • "Transform" - to transform keys and timestamps along with the values. "Transform" functions may change the keys and should be used with caution. The result of the Transform function is passed downstream to the next functions. If "expand=True" is passed and the function returns an Iterable, each item of it will be treated as a separate value downstream.

To execute the functions on the Stream, call .compose() method, and it will return a closure to execute all the functions accumulated in the Stream and its parents.

Arguments:

  • func: a function to be called on the stream. It is expected to be wrapped into one of "Apply", "Filter", "Update" or "Trasform" from quixstreams.core.stream.functions package. Default - "ApplyFunction(lambda value: value)".
  • parent: a parent Stream

Stream.add_filter

def add_filter(func: Union[FilterCallback, FilterWithMetadataCallback],
               *,
               metadata: bool = False) -> Self

[VIEW SOURCE]

Add a function to filter values from the Stream.

The return value of the function will be interpreted as bool. If the function returns False-like result, the Stream will raise Filtered exception during execution.

Arguments:

  • func: a function to filter values from the stream
  • metadata: if True, the callback will receive key and timestamp along with the value. Default - False.

Returns:

a new Stream derived from the current one

Stream.add_apply

def add_apply(func: Union[
    ApplyCallback,
    ApplyExpandedCallback,
    ApplyWithMetadataCallback,
    ApplyWithMetadataExpandedCallback,
],
              *,
              expand: bool = False,
              metadata: bool = False) -> Self

[VIEW SOURCE]

Add an "apply" function to the Stream.

The function is supposed to return a new value, which will be passed further during execution.

Arguments:

  • func: a function to generate a new value
  • expand: if True, expand the returned iterable into individual values downstream. If returned value is not iterable, TypeError will be raised. Default - False.
  • metadata: if True, the callback will receive key and timestamp along with the value. Default - False.

Returns:

a new Stream derived from the current one

Stream.add_update

def add_update(func: Union[UpdateCallback, UpdateWithMetadataCallback],
               *,
               metadata: bool = False) -> Self

[VIEW SOURCE]

Add an "update" function to the Stream, that will mutate the input value.

The return of this function will be ignored and its input will be passed downstream.

Arguments:

  • func: a function to mutate the value
  • metadata: if True, the callback will receive key and timestamp along with the value. Default - False.

Returns:

a new Stream derived from the current one

Stream.add_transform

def add_transform(func: Union[TransformCallback, TransformExpandedCallback],
                  *,
                  expand: bool = False) -> Self

[VIEW SOURCE]

Add a "transform" function to the Stream, that will mutate the input value.

The callback must accept a value, a key, and a timestamp. It's expected to return a new value, new key and new timestamp.

The result of the callback which will be passed downstream during execution.

Arguments:

  • func: a function to mutate the value
  • expand: if True, expand the returned iterable into individual items downstream. If returned value is not iterable, TypeError will be raised. Default - False.

Returns:

a new Stream derived from the current one

Stream.diff

def diff(other: "Stream") -> Self

[VIEW SOURCE]

Takes the difference between Streams self and other based on their last

common parent, and returns a new Stream that includes only this difference.

It's impossible to calculate a diff when: - Streams don't have a common parent. - When the self Stream already includes all the nodes from the other Stream, and the resulting diff is empty.

Arguments:

  • other: a Stream to take a diff from.

Raises:

  • ValueError: if Streams don't have a common parent or if the diff is empty.

Returns:

new Stream instance including all the Streams from the diff

Stream.tree

def tree() -> List[Self]

[VIEW SOURCE]

Return a list of all parent Streams including the node itself.

The tree is ordered from parent to child (current node comes last).

Returns:

a list of Stream objects

Stream.compose_returning

def compose_returning() -> ReturningExecutor

[VIEW SOURCE]

Compose a list of functions from this Stream and its parents into one big closure that always returns the transformed record.

This closure is to be used to execute the functions in the stream and to get the result of the transformations.

Stream may only contain simple "apply" functions to be able to compose itself into a returning function.

Stream.compose

def compose(
    allow_filters: bool = True,
    allow_updates: bool = True,
    allow_expands: bool = True,
    allow_transforms: bool = True,
    sink: Optional[Callable[[Any, Any, int, Any],
                            None]] = None) -> VoidExecutor

[VIEW SOURCE]

Compose a list of functions from this Stream and its parents into one

big closure using a "composer" function.

This "executor" closure is to be used to execute all functions in the stream for the given key, value and timestamps.

By default, executor doesn't return the result of the execution. To accumulate the results, pass the sink parameter.

Arguments:

  • allow_filters: If False, this function will fail with ValueError if the stream has filter functions in the tree. Default - True.
  • allow_updates: If False, this function will fail with ValueError if the stream has update functions in the tree. Default - True.
  • allow_expands: If False, this function will fail with ValueError if the stream has functions with "expand=True" in the tree. Default - True.
  • allow_transforms: If False, this function will fail with ValueError if the stream has transform functions in the tree. Default - True.
  • sink: callable to accumulate the results of the execution, optional.

Raises:

  • ValueError: if disallowed functions are present in the stream tree.

quixstreams.core

quixstreams.processing_context

ProcessingContext

@dataclasses.dataclass
class ProcessingContext()

[VIEW SOURCE]

A class to share processing-related objects between Application and StreamingDataFrame instances.

ProcessingContext.store_offset

def store_offset(topic: str, partition: int, offset: int)

[VIEW SOURCE]

Store the offset of the processed message to the checkpoint.

Arguments:

  • topic: topic name
  • partition: partition number
  • offset: message offset

ProcessingContext.init_checkpoint

def init_checkpoint()

[VIEW SOURCE]

Initialize a new checkpoint

ProcessingContext.commit_checkpoint

def commit_checkpoint(force: bool = False)

[VIEW SOURCE]

Attempts finalizing the current Checkpoint only if the Checkpoint is "expired",

or force=True is passed, otherwise do nothing.

To finalize: the Checkpoint will be committed if it has any stored offsets, else just close it. A new Checkpoint is then created.

Arguments:

  • force: if True, commit the Checkpoint before its expiration deadline.

quixstreams.utils

quixstreams.utils.dicts

dict_values

def dict_values(d: object) -> List

[VIEW SOURCE]

Recursively unpacks a set of nested dicts to get a flattened list of leaves,

where "leaves" are the first non-dict item.

i.e {"a": {"b": {"c": 1}, "d": 2}, "e": 3} becomes [1, 2, 3]

Arguments:

  • d: initially, a dict (with potentially nested dicts)

Returns:

a list with all the leaves of the various contained dicts

quixstreams.utils.json

dumps

def dumps(value: Any) -> bytes

[VIEW SOURCE]

Serialize to JSON using orjson package.

Arguments:

  • value: value to serialize to JSON

Returns:

bytes

loads

def loads(value: bytes) -> Any

[VIEW SOURCE]

Deserialize from JSON using orjson package.

Main differences: - It returns bytes - It doesn't allow non-str keys in dictionaries

Arguments:

  • value: value to deserialize from

Returns:

object

quixstreams.types

quixstreams.models.timestamps

TimestampType

class TimestampType(enum.IntEnum)

[VIEW SOURCE]

TIMESTAMP_NOT_AVAILABLE

timestamps not supported by broker

TIMESTAMP_CREATE_TIME

message creation time (or source / producer time)

TIMESTAMP_LOG_APPEND_TIME

broker receive time

MessageTimestamp

class MessageTimestamp()

[VIEW SOURCE]

Represents a timestamp of incoming Kafka message.

It is made pseudo-immutable (i.e. public attributes don't have setters), and it should not be mutated during message processing.

MessageTimestamp.create

@classmethod
def create(cls, timestamp_type: int, milliseconds: int) -> Self

[VIEW SOURCE]

Create a Timestamp object based on data

from confluent_kafka.Message.timestamp().

If timestamp type is "TIMESTAMP_NOT_AVAILABLE", the milliseconds are set to None

Arguments:

  • timestamp_type: a timestamp type represented as a number Can be one of:
  • "0" - TIMESTAMP_NOT_AVAILABLE, timestamps not supported by broker.
  • "1" - TIMESTAMP_CREATE_TIME, message creation time (or source / producer time).
  • "2" - TIMESTAMP_LOG_APPEND_TIME, broker receive time.
  • milliseconds: the number of milliseconds since the epoch (UTC).

Returns:

Timestamp object

quixstreams.models

quixstreams.models.messagecontext

MessageContext

class MessageContext()

[VIEW SOURCE]

An object with Kafka message properties.

It is made pseudo-immutable (i.e. public attributes don't have setters), and it should not be mutated during message processing.

quixstreams.models.types

ConfluentKafkaMessageProto

class ConfluentKafkaMessageProto(Protocol)

[VIEW SOURCE]

An interface of confluent_kafka.Message.

Use it to not depend on exact implementation and simplify testing.

Instances of confluent_kafka.Message cannot be directly created from Python, see https://github.com/confluentinc/confluent-kafka-python/issues/1535.

quixstreams.models.serializers

quixstreams.models.serializers.exceptions

IgnoreMessage

class IgnoreMessage(exceptions.QuixException)

[VIEW SOURCE]

Raise this exception from Deserializer.call in order to ignore the processing of the particular message.

quixstreams.models.serializers.quix

QuixDeserializer

class QuixDeserializer(JSONDeserializer)

[VIEW SOURCE]

Handles Deserialization for any Quix-formatted topic.

Parses JSON data from either TimeseriesData and EventData (ignores the rest).

QuixDeserializer.__init__

def __init__(loads: Callable[[Union[bytes, bytearray]], Any] = default_loads)

[VIEW SOURCE]

Arguments:

  • loads: function to parse json from bytes. Default - 🇵🇾func:quixstreams.utils.json.loads.

QuixDeserializer.split_values

@property
def split_values() -> bool

[VIEW SOURCE]

Each Quix message might contain data for multiple Rows. This property informs the downstream processors about that, so they can expect an Iterable instead of Mapping.

QuixDeserializer.deserialize

def deserialize(model_key: str, value: Union[List[Mapping],
                                             Mapping]) -> Iterable[Mapping]

[VIEW SOURCE]

Deserialization function for particular data types (Timeseries or EventData).

Arguments:

  • model_key: value of "__Q_ModelKey" message header
  • value: deserialized JSON value of the message, list or dict

Returns:

Iterable of dicts

QuixSerializer

class QuixSerializer(JSONSerializer)

[VIEW SOURCE]

QuixSerializer.__init__

def __init__(as_legacy: bool = True,
             dumps: Callable[[Any], Union[str, bytes]] = default_dumps)

[VIEW SOURCE]

Serializer that returns data in json format.

Arguments:

  • as_legacy: parse as the legacy format; Default = True
  • dumps: a function to serialize objects to json. Default - 🇵🇾func:quixstreams.utils.json.dumps

QuixTimeseriesSerializer

class QuixTimeseriesSerializer(QuixSerializer)

[VIEW SOURCE]

Serialize data to JSON formatted according to Quix Timeseries format.

The serializable object must be dictionary, and each item must be of str, int, float, bytes or bytearray type. Otherwise, the SerializationError will be raised.

Input:

{'a': 1, 'b': 1.1, 'c': "string", 'd': b'bytes', 'Tags': {'tag1': 'tag'}}

Output:

{
    "Timestamps": [123123123],
    "NumericValues": {"a": [1], "b": [1.1]},
    "StringValues": {"c": ["string"]},
    "BinaryValues": {"d": ["Ynl0ZXM="]},
    "TagValues": {"tag1": ["tag"]}
}

QuixEventsSerializer

class QuixEventsSerializer(QuixSerializer)

[VIEW SOURCE]

Serialize data to JSON formatted according to Quix EventData format. The input value is expected to be a dictionary with the following keys: - "Id" (type str, default - "") - "Value" (type str, default - ""), - "Tags" (type dict, default - {})

NOTE: All the other fields will be ignored.

Input:

{
    "Id": "an_event",
    "Value": "any_string",
    "Tags": {"tag1": "tag"}}
}

Output:

{
    "Id": "an_event",
    "Value": "any_string",
    "Tags": {"tag1": "tag"}},
    "Timestamp":1692703362840389000
}

quixstreams.models.serializers.simple_types

BytesDeserializer

class BytesDeserializer(Deserializer)

[VIEW SOURCE]

A deserializer to bypass bytes without any changes

BytesSerializer

class BytesSerializer(Serializer)

[VIEW SOURCE]

A serializer to bypass bytes without any changes

StringDeserializer

class StringDeserializer(Deserializer)

[VIEW SOURCE]

StringDeserializer.__init__

def __init__(codec: str = "utf_8")

[VIEW SOURCE]

Deserializes bytes to strings using the specified encoding.

Arguments:

  • codec: string encoding A wrapper around confluent_kafka.serialization.StringDeserializer.

IntegerDeserializer

class IntegerDeserializer(Deserializer)

[VIEW SOURCE]

Deserializes bytes to integers.

A wrapper around confluent_kafka.serialization.IntegerDeserializer.

DoubleDeserializer

class DoubleDeserializer(Deserializer)

[VIEW SOURCE]

Deserializes float to IEEE 764 binary64.

A wrapper around confluent_kafka.serialization.DoubleDeserializer.

StringSerializer

class StringSerializer(Serializer)

[VIEW SOURCE]

StringSerializer.__init__

def __init__(codec: str = "utf_8")

[VIEW SOURCE]

Serializes strings to bytes using the specified encoding.

Arguments:

  • codec: string encoding

IntegerSerializer

class IntegerSerializer(Serializer)

[VIEW SOURCE]

Serializes integers to bytes

DoubleSerializer

class DoubleSerializer(Serializer)

[VIEW SOURCE]

Serializes floats to bytes

quixstreams.models.serializers.json

JSONSerializer

class JSONSerializer(Serializer)

[VIEW SOURCE]

JSONSerializer.__init__

def __init__(dumps: Callable[[Any], Union[str, bytes]] = default_dumps)

[VIEW SOURCE]

Serializer that returns data in json format.

Arguments:

  • dumps: a function to serialize objects to json. Default - 🇵🇾func:quixstreams.utils.json.dumps

JSONDeserializer

class JSONDeserializer(Deserializer)

[VIEW SOURCE]

JSONDeserializer.__init__

def __init__(loads: Callable[[Union[bytes, bytearray]], Any] = default_loads)

[VIEW SOURCE]

Deserializer that parses data from JSON

Arguments:

  • loads: function to parse json from bytes. Default - 🇵🇾func:quixstreams.utils.json.loads.

quixstreams.models.serializers.base

SerializationContext

class SerializationContext()

[VIEW SOURCE]

Provides additional context for message serialization/deserialization.

Every Serializer and Deserializer receives an instance of SerializationContext

SerializationContext.to_confluent_ctx

def to_confluent_ctx(field: MessageField) -> _SerializationContext

[VIEW SOURCE]

Convert SerializationContext to confluent_kafka.SerializationContext

in order to re-use serialization already provided by confluent_kafka library.

Arguments:

  • field: instance of confluent_kafka.serialization.MessageField

Returns:

instance of confluent_kafka.serialization.SerializationContext

Deserializer

class Deserializer(abc.ABC)

[VIEW SOURCE]

Deserializer.__init__

def __init__(*args, **kwargs)

[VIEW SOURCE]

A base class for all Deserializers

Deserializer.split_values

@property
def split_values() -> bool

[VIEW SOURCE]

Return True if the deserialized message should be considered as Iterable and each item in it should be processed as a separate message.

Serializer

class Serializer(abc.ABC)

[VIEW SOURCE]

A base class for all Serializers

Serializer.extra_headers

@property
def extra_headers() -> MessageHeadersMapping

[VIEW SOURCE]

Informs producer to set additional headers

for the message it will be serializing

Must return a dictionary with headers. Keys must be strings, and values must be strings, bytes or None.

Returns:

dict with headers

quixstreams.models.messages

quixstreams.models.rows

quixstreams.models.topics

quixstreams.models.topics.admin

convert_topic_list

def convert_topic_list(topics: List[Topic]) -> List[ConfluentTopic]

[VIEW SOURCE]

Converts Topics to ConfluentTopics as required for Confluent's

AdminClient.create_topic().

Arguments:

  • topics: list of Topics

Returns:

list of confluent_kafka ConfluentTopics

TopicAdmin

class TopicAdmin()

[VIEW SOURCE]

For performing "admin"-level operations on a Kafka cluster, mostly around topics.

Primarily used to create and inspect topic configurations.

TopicAdmin.__init__

def __init__(broker_address: Union[str, ConnectionConfig],
             logger: logging.Logger = logger,
             extra_config: Optional[Mapping] = None)

[VIEW SOURCE]

Arguments:

  • broker_address: Connection settings for Kafka. Accepts string with Kafka broker host and port formatted as <host>:<port>, or a ConnectionConfig object if authentication is required.
  • logger: a Logger instance to attach librdkafka logging to
  • extra_config: optional configs (generally accepts producer configs)

TopicAdmin.list_topics

def list_topics(timeout: float = -1) -> Dict[str, ConfluentTopicMetadata]

[VIEW SOURCE]

Get a list of topics and their metadata from a Kafka cluster

Arguments:

  • timeout: response timeout (seconds); Default infinite (-1)

Returns:

a dict of topic names and their metadata objects

TopicAdmin.inspect_topics

def inspect_topics(topic_names: List[str],
                   timeout: float = 30) -> Dict[str, Optional[TopicConfig]]

[VIEW SOURCE]

A simplified way of getting the topic configurations of the provided topics

from the cluster (if they exist).

Arguments:

  • topic_names: a list of topic names
  • timeout: response timeout (seconds)

    NOTE: timeout must be >0 here (expects non-neg, and 0 != inf).

Returns:

a dict with topic names and their respective TopicConfig

TopicAdmin.create_topics

def create_topics(topics: List[Topic],
                  timeout: float = 30,
                  finalize_timeout: float = 60)

[VIEW SOURCE]

Create the given list of topics and confirm they are ready.

Also raises an exception with detailed printout should the creation fail (it ignores issues for a topic already existing).

Arguments:

  • topics: a list of Topic
  • timeout: creation acknowledge timeout (seconds)
  • finalize_timeout: topic finalization timeout (seconds)

    NOTE: timeout must be >0 here (expects non-neg, and 0 != inf).

quixstreams.models.topics.utils

merge_headers

def merge_headers(original: Optional[MessageHeadersTuples],
                  other: MessageHeadersMapping) -> MessageHeadersTuples

[VIEW SOURCE]

Merge two sets of Kafka message headers, overwriting headers in "origin"

by the values from "other".

Arguments:

  • original: original headers as a list of (key, value) tuples.
  • other: headers to merge as a dictionary.

Returns:

a list of (key, value) tuples.

quixstreams.models.topics.topic

TopicConfig

@dataclasses.dataclass(eq=True)
class TopicConfig()

[VIEW SOURCE]

Represents all kafka-level configuration for a kafka topic.

Generally used by Topic and any topic creation procedures.

Topic

class Topic()

[VIEW SOURCE]

A definition of a Kafka topic.

Typically created with an app = quixstreams.app.Application() instance via app.topic(), and used by quixstreams.dataframe.StreamingDataFrame instance.

Topic.__init__

def __init__(
        name: str,
        config: TopicConfig,
        value_deserializer: Optional[DeserializerType] = None,
        key_deserializer: Optional[DeserializerType] = BytesDeserializer(),
        value_serializer: Optional[SerializerType] = None,
        key_serializer: Optional[SerializerType] = BytesSerializer(),
        timestamp_extractor: Optional[TimestampExtractor] = None)

[VIEW SOURCE]

Arguments:

  • name: topic name
  • config: topic configs via TopicConfig (creation/validation)
  • value_deserializer: a deserializer type for values
  • key_deserializer: a deserializer type for keys
  • value_serializer: a serializer type for values
  • key_serializer: a serializer type for keys
  • timestamp_extractor: a callable that returns a timestamp in milliseconds from a deserialized message.

Topic.name

@property
def name() -> str

[VIEW SOURCE]

Topic name

Topic.row_serialize

def row_serialize(row: Row, key: Any) -> KafkaMessage

[VIEW SOURCE]

Serialize Row to a Kafka message structure

Arguments:

  • row: Row to serialize
  • key: message key to serialize

Returns:

KafkaMessage object with serialized values

Topic.row_deserialize

def row_deserialize(
        message: ConfluentKafkaMessageProto) -> Union[Row, List[Row], None]

[VIEW SOURCE]

Deserialize incoming Kafka message to a Row.

Arguments:

  • message: an object with interface of confluent_kafka.Message

Returns:

Row, list of Rows or None if the message is ignored.

quixstreams.models.topics.exceptions

quixstreams.models.topics.manager

affirm_ready_for_create

def affirm_ready_for_create(topics: List[Topic])

[VIEW SOURCE]

Validate a list of topics is ready for creation attempt

Arguments:

  • topics: list of Topics

TopicManager

class TopicManager()

[VIEW SOURCE]

The source of all topic management with quixstreams.

Generally initialized and managed automatically by an Application, but allows a user to work with it directly when needed, such as using it alongside a plain Producer to create its topics.

See methods for details.

TopicManager.__init__

def __init__(topic_admin: TopicAdmin,
             consumer_group: str,
             timeout: float = 30,
             create_timeout: float = 60)

[VIEW SOURCE]

Arguments:

  • topic_admin: an Admin instance (required for some functionality)
  • consumer_group: the consumer group (of the Application)
  • timeout: response timeout (seconds)
  • create_timeout: timeout for topic creation

TopicManager.changelog_topics

@property
def changelog_topics() -> Dict[str, Dict[str, Topic]]

[VIEW SOURCE]

Note: Topics are the changelogs.

returns: the changelog topic dict, {topic_name: {suffix: Topic}}

TopicManager.all_topics

@property
def all_topics() -> Dict[str, Topic]

[VIEW SOURCE]

Every registered topic name mapped to its respective Topic.

returns: full topic dict, {topic_name: Topic}

TopicManager.topic_config

def topic_config(num_partitions: Optional[int] = None,
                 replication_factor: Optional[int] = None,
                 extra_config: Optional[dict] = None) -> TopicConfig

[VIEW SOURCE]

Convenience method for generating a TopicConfig with default settings

Arguments:

  • num_partitions: the number of topic partitions
  • replication_factor: the topic replication factor
  • extra_config: other optional configuration settings

Returns:

a TopicConfig object

TopicManager.topic

def topic(name: str,
          value_deserializer: Optional[DeserializerType] = None,
          key_deserializer: Optional[DeserializerType] = "bytes",
          value_serializer: Optional[SerializerType] = None,
          key_serializer: Optional[SerializerType] = "bytes",
          config: Optional[TopicConfig] = None,
          timestamp_extractor: Optional[TimestampExtractor] = None) -> Topic

[VIEW SOURCE]

A convenience method for generating a Topic. Will use default config options

as dictated by the TopicManager.

Arguments:

  • name: topic name
  • value_deserializer: a deserializer type for values
  • key_deserializer: a deserializer type for keys
  • value_serializer: a serializer type for values
  • key_serializer: a serializer type for keys
  • config: optional topic configurations (for creation/validation)
  • timestamp_extractor: a callable that returns a timestamp in milliseconds from a deserialized message.

Returns:

Topic object with creation configs

TopicManager.repartition_topic

def repartition_topic(operation: str,
                      topic_name: str,
                      value_deserializer: Optional[DeserializerType] = "json",
                      key_deserializer: Optional[DeserializerType] = "json",
                      value_serializer: Optional[SerializerType] = "json",
                      key_serializer: Optional[SerializerType] = "json",
                      timeout: Optional[float] = None) -> Topic

[VIEW SOURCE]

Create an internal repartition topic.

Arguments:

  • operation: name of the GroupBy operation (column name or user-defined).
  • topic_name: name of the topic the GroupBy is sourced from.
  • value_deserializer: a deserializer type for values; default - JSON
  • key_deserializer: a deserializer type for keys; default - JSON
  • value_serializer: a serializer type for values; default - JSON
  • key_serializer: a serializer type for keys; default - JSON
  • timeout: config lookup timeout (seconds); Default 30

Returns:

Topic object (which is also stored on the TopicManager)

TopicManager.changelog_topic

def changelog_topic(topic_name: str,
                    store_name: str,
                    timeout: Optional[float] = None) -> Topic

[VIEW SOURCE]

Performs all the logic necessary to generate a changelog topic based on a

"source topic" (aka input/consumed topic).

Its main goal is to ensure partition counts of the to-be generated changelog match the source topic, and ensure the changelog topic is compacted. Also enforces the serialization type. All Topic objects generated with this are stored on the TopicManager.

If source topic already exists, defers to the existing topic settings, else uses the settings as defined by the Topic (and its defaults) as generated by the TopicManager.

In general, users should NOT need this; an Application knows when/how to generate changelog topics. To turn off changelogs, init an Application with "use_changelog_topics"=False.

Arguments:

  • topic_name: name of consumed topic (app input topic)

    NOTE: normally contain any prefixes added by TopicManager.topic()

  • store_name: name of the store this changelog belongs to (default, rolling10s, etc.)
  • timeout: config lookup timeout (seconds); Default 30

Returns:

Topic object (which is also stored on the TopicManager)

TopicManager.create_topics

def create_topics(topics: List[Topic],
                  timeout: Optional[float] = None,
                  create_timeout: Optional[float] = None)

[VIEW SOURCE]

Creates topics via an explicit list of provided Topics.

Exists as a way to manually specify what topics to create; otherwise, create_all_topics() is generally simpler.

Arguments:

  • topics: list of Topics
  • timeout: creation acknowledge timeout (seconds); Default 30
  • create_timeout: topic finalization timeout (seconds); Default 60

TopicManager.create_all_topics

def create_all_topics(timeout: Optional[float] = None,
                      create_timeout: Optional[float] = None)

[VIEW SOURCE]

A convenience method to create all Topic objects stored on this TopicManager.

Arguments:

  • timeout: creation acknowledge timeout (seconds); Default 30
  • create_timeout: topic finalization timeout (seconds); Default 60

TopicManager.validate_all_topics

def validate_all_topics(timeout: Optional[float] = None)

[VIEW SOURCE]

Validates all topics exist and changelogs have correct topic and rep factor.

Issues are pooled and raised as an Exception once inspections are complete.

quixstreams.state.rocksdb.windowed.store

WindowedRocksDBStore

class WindowedRocksDBStore(RocksDBStore)

[VIEW SOURCE]

RocksDB-based windowed state store.

It keeps track of individual store partitions and provides access to the partitions' transactions.

WindowedRocksDBStore.__init__

def __init__(
        name: str,
        topic: str,
        base_dir: str,
        changelog_producer_factory: Optional[ChangelogProducerFactory] = None,
        options: Optional[RocksDBOptionsType] = None)

[VIEW SOURCE]

Arguments:

  • name: a unique store name
  • topic: a topic name for this store
  • base_dir: path to a directory with the state
  • changelog_producer_factory: a ChangelogProducerFactory instance if using changelogs
  • options: RocksDB options. If None, the default options will be used.

quixstreams.state.rocksdb.windowed.partition

WindowedRocksDBStorePartition

class WindowedRocksDBStorePartition(RocksDBStorePartition)

[VIEW SOURCE]

A base class to access windowed state in RocksDB.

It represents a single RocksDB database.

Besides the data, it keeps track of the latest observed timestamp and stores the expiration index to delete expired windows.

Arguments:

  • path: an absolute path to the RocksDB folder
  • options: RocksDB options. If None, the default options will be used.

quixstreams.state.rocksdb.windowed.metadata

quixstreams.state.rocksdb.windowed.transaction

WindowedRocksDBPartitionTransaction

class WindowedRocksDBPartitionTransaction(RocksDBPartitionTransaction)

[VIEW SOURCE]

WindowedRocksDBPartitionTransaction.expire_windows

def expire_windows(duration_ms: int,
                   prefix: bytes,
                   grace_ms: int = 0) -> List[Tuple[Tuple[int, int], Any]]

[VIEW SOURCE]

Get a list of expired windows from RocksDB considering latest timestamp,

window size and grace period. It marks the latest found window as expired in the expiration index, so calling this method multiple times will yield different results for the same "latest timestamp".

How it works: - First, it looks for the start time of the last expired window for the current prefix using expiration cache. If it's found, it will be used to reduce the search space and to avoid returning already expired windows. - Then it goes over window segments and fetches the windows that should be expired. - At last, it updates the expiration cache with the start time of the latest found windows

Returns:

sorted list of tuples in format ((start, end), value)

quixstreams.state.rocksdb.windowed

quixstreams.state.rocksdb.windowed.serialization

parse_window_key

def parse_window_key(key: bytes) -> Tuple[bytes, int, int]

[VIEW SOURCE]

Parse the window key from Rocksdb into (message_key, start, end) structure.

Expected window key format: ||

Arguments:

  • key: a key from Rocksdb

Returns:

a tuple with message key, start timestamp, end timestamp

encode_window_key

def encode_window_key(start_ms: int, end_ms: int) -> bytes

[VIEW SOURCE]

Encode window start and end timestamps into bytes of the following format:

<start>|<end>

Encoding window keys this way make them sortable in RocksDB within the same prefix.

Arguments:

  • start_ms: window start in milliseconds
  • end_ms: window end in milliseconds

Returns:

window timestamps as bytes

encode_window_prefix

def encode_window_prefix(prefix: bytes, start_ms: int) -> bytes

[VIEW SOURCE]

Encode window prefix and start time to iterate over keys in RocksDB

Format: <prefix>|<start>

Arguments:

  • prefix: transaction prefix
  • start_ms: window start time in milliseconds

Returns:

bytes

quixstreams.state.rocksdb.windowed.state

WindowedTransactionState

class WindowedTransactionState(WindowedState)

[VIEW SOURCE]

WindowedTransactionState.__init__

def __init__(transaction: "WindowedRocksDBPartitionTransaction",
             prefix: bytes)

[VIEW SOURCE]

A windowed state to be provided into StreamingDataFrame window functions.

Arguments:

  • transaction: instance of WindowedRocksDBPartitionTransaction

WindowedTransactionState.get_window

def get_window(start_ms: int,
               end_ms: int,
               default: Any = None) -> Optional[Any]

[VIEW SOURCE]

Get the value of the window defined by start and end timestamps

if the window is present in the state, else default

Arguments:

  • start_ms: start of the window in milliseconds
  • end_ms: end of the window in milliseconds
  • default: default value to return if the key is not found

Returns:

value or None if the key is not found and default is not provided

WindowedTransactionState.update_window

def update_window(start_ms: int, end_ms: int, value: Any, timestamp_ms: int)

[VIEW SOURCE]

Set a value for the window.

This method will also update the latest observed timestamp in state partition using the provided timestamp.

Arguments:

  • start_ms: start of the window in milliseconds
  • end_ms: end of the window in milliseconds
  • value: value of the window
  • timestamp_ms: current message timestamp in milliseconds

WindowedTransactionState.get_latest_timestamp

def get_latest_timestamp() -> int

[VIEW SOURCE]

Get the latest observed timestamp for the current state partition.

Use this timestamp to determine if the arriving event is late and should be discarded from the processing.

Returns:

latest observed event timestamp in milliseconds

WindowedTransactionState.expire_windows

def expire_windows(duration_ms: int,
                   grace_ms: int = 0) -> List[Tuple[Tuple[int, int], Any]]

[VIEW SOURCE]

Get a list of expired windows from RocksDB considering the current latest timestamp, window duration and grace period.

It also marks the latest found window as expired in the expiration index, so calling this method multiple times will yield different results for the same "latest timestamp".

quixstreams.state.rocksdb.options

RocksDBOptions

@dataclasses.dataclass(frozen=True)
class RocksDBOptions(RocksDBOptionsType)

[VIEW SOURCE]

RocksDB database options.

Arguments:

  • dumps: function to dump data to JSON
  • loads: function to load data from JSON
  • open_max_retries: number of times to retry opening the database if it's locked by another process. To disable retrying, pass 0
  • open_retry_backoff: number of seconds to wait between each retry. Please see rocksdict.Options for a complete description of other options.

RocksDBOptions.to_options

def to_options() -> rocksdict.Options

[VIEW SOURCE]

Convert parameters to rocksdict.Options

Returns:

instance of rocksdict.Options

quixstreams.state.rocksdb.store

RocksDBStore

class RocksDBStore(Store)

[VIEW SOURCE]

RocksDB-based state store.

It keeps track of individual store partitions and provides access to the partitions' transactions.

RocksDBStore.__init__

def __init__(
        name: str,
        topic: str,
        base_dir: str,
        changelog_producer_factory: Optional[ChangelogProducerFactory] = None,
        options: Optional[options_type] = None)

[VIEW SOURCE]

Arguments:

  • name: a unique store name
  • topic: a topic name for this store
  • base_dir: path to a directory with the state
  • changelog_producer_factory: a ChangelogProducerFactory instance if using changelogs
  • options: RocksDB options. If None, the default options will be used.

RocksDBStore.topic

@property
def topic() -> str

[VIEW SOURCE]

Store topic name

RocksDBStore.name

@property
def name() -> str

[VIEW SOURCE]

Store name

RocksDBStore.partitions

@property
def partitions() -> Dict[int, RocksDBStorePartition]

[VIEW SOURCE]

Mapping of assigned store partitions

RocksDBStore.assign_partition

def assign_partition(partition: int) -> RocksDBStorePartition

[VIEW SOURCE]

Open and assign store partition.

If the partition is already assigned, it will not re-open it and return the existing partition instead.

Arguments:

  • partition: partition number

Returns:

instance ofRocksDBStorePartition

RocksDBStore.revoke_partition

def revoke_partition(partition: int)

[VIEW SOURCE]

Revoke and close the assigned store partition.

If the partition is not assigned, it will log the message and return.

Arguments:

  • partition: partition number

RocksDBStore.start_partition_transaction

def start_partition_transaction(partition: int) -> RocksDBPartitionTransaction

[VIEW SOURCE]

Start a new partition transaction.

RocksDBPartitionTransaction is the primary interface for working with data in the underlying RocksDB.

Arguments:

  • partition: partition number

Returns:

instance of RocksDBPartitionTransaction

RocksDBStore.close

def close()

[VIEW SOURCE]

Close the store and revoke all assigned partitions

quixstreams.state.rocksdb.partition

RocksDBStorePartition

class RocksDBStorePartition(StorePartition)

[VIEW SOURCE]

A base class to access state in RocksDB.

It represents a single RocksDB database.

Responsibilities: 1. Managing access to the RocksDB instance 2. Creating transactions to interact with data 3. Flushing WriteBatches to the RocksDB

It opens the RocksDB on __init__. If the db is locked by another process, it will retry according to open_max_retries and open_retry_backoff options.

Arguments:

  • path: an absolute path to the RocksDB folder
  • options: RocksDB options. If None, the default options will be used.

RocksDBStorePartition.begin

def begin() -> RocksDBPartitionTransaction

[VIEW SOURCE]

Create a new RocksDBTransaction object.

Using RocksDBTransaction is a recommended way for accessing the data.

Returns:

an instance of RocksDBTransaction

RocksDBStorePartition.recover_from_changelog_message

def recover_from_changelog_message(
        changelog_message: ConfluentKafkaMessageProto, committed_offset: int)

[VIEW SOURCE]

Updates state from a given changelog message.

The actual update may be skipped when both conditions are met:

  • The changelog message has headers with the processed message offset.
  • This processed offset is larger than the latest committed offset for the same topic partition.

This way the state does not apply the state changes for not-yet-committed messages and improves the state consistency guarantees.

Arguments:

  • changelog_message: A raw Confluent message read from a changelog topic.
  • committed_offset: latest committed offset for the partition

RocksDBStorePartition.set_changelog_offset

def set_changelog_offset(changelog_offset: int)

[VIEW SOURCE]

Set the changelog offset based on a message (usually an "offset-only" message).

Used during recovery.

Arguments:

  • changelog_offset: A changelog offset

RocksDBStorePartition.write

def write(batch: WriteBatch)

[VIEW SOURCE]

Write WriteBatch to RocksDB

Arguments:

  • batch: an instance of rocksdict.WriteBatch

RocksDBStorePartition.get

def get(key: bytes,
        default: Any = None,
        cf_name: str = "default") -> Union[None, bytes, Any]

[VIEW SOURCE]

Get a key from RocksDB.

Arguments:

  • key: a key encoded to bytes
  • default: a default value to return if the key is not found.
  • cf_name: rocksdb column family name. Default - "default"

Returns:

a value if the key is present in the DB. Otherwise, default

RocksDBStorePartition.exists

def exists(key: bytes, cf_name: str = "default") -> bool

[VIEW SOURCE]

Check if a key is present in the DB.

Arguments:

  • key: a key encoded to bytes.
  • cf_name: rocksdb column family name. Default - "default"

Returns:

True if the key is present, False otherwise.

RocksDBStorePartition.get_processed_offset

def get_processed_offset() -> Optional[int]

[VIEW SOURCE]

Get last processed offset for the given partition

Returns:

offset or None if there's no processed offset yet

RocksDBStorePartition.get_changelog_offset

def get_changelog_offset() -> Optional[int]

[VIEW SOURCE]

Get offset that the changelog is up-to-date with.

Returns:

offset or None if there's no processed offset yet

RocksDBStorePartition.close

def close()

[VIEW SOURCE]

Close the underlying RocksDB

RocksDBStorePartition.path

@property
def path() -> str

[VIEW SOURCE]

Absolute path to RocksDB database folder

Returns:

file path

RocksDBStorePartition.destroy

@classmethod
def destroy(cls, path: str)

[VIEW SOURCE]

Delete underlying RocksDB database

The database must be closed first.

Arguments:

  • path: an absolute path to the RocksDB folder

RocksDBStorePartition.get_column_family_handle

def get_column_family_handle(cf_name: str) -> ColumnFamily

[VIEW SOURCE]

Get a column family handle to pass to it WriteBatch.

This method will cache the CF handle instance to avoid creating them repeatedly.

Arguments:

  • cf_name: column family name

Returns:

instance of rocksdict.ColumnFamily

RocksDBStorePartition.get_column_family

def get_column_family(cf_name: str) -> Rdict

[VIEW SOURCE]

Get a column family instance.

This method will cache the CF instance to avoid creating them repeatedly.

Arguments:

  • cf_name: column family name

Returns:

instance of rocksdict.Rdict for the given column family

quixstreams.state.rocksdb.metadata

quixstreams.state.rocksdb.transaction

RocksDBPartitionTransaction

class RocksDBPartitionTransaction(PartitionTransaction)

[VIEW SOURCE]

A transaction class to perform simple key-value operations like "get", "set", "delete" and "exists" on a single RocksDB partition.

Serialization


RocksDBTransaction automatically serializes keys and values to bytes.

Prefixing


Methods get(), set(), delete() and exists() methods require prefixes for the keys. Normally, the Kafka message keys are supposed to be used as prefixes.

Transactional properties


RocksDBTransaction uses a combination of in-memory update cache and RocksDB's WriteBatch in order to accumulate all the state mutations in a single batch, flush them atomically, and allow the updates be visible within the transaction before it's flushed (aka "read-your-own-writes" problem).

If any mutation fails during the transaction (e.g., failed to write the updates to the RocksDB), the whole transaction will be marked as failed and cannot be used anymore. In this case, a new RocksDBTransaction should be created.

RocksDBTransaction can be used only once.

RocksDBPartitionTransaction.__init__

def __init__(partition: "RocksDBStorePartition",
             dumps: DumpsFunc,
             loads: LoadsFunc,
             changelog_producer: Optional[ChangelogProducer] = None)

[VIEW SOURCE]

Arguments:

  • partition: instance of RocksDBStatePartition to be used for accessing the underlying RocksDB
  • dumps: a function to serialize data to bytes.
  • loads: a function to deserialize data from bytes.

RocksDBPartitionTransaction.get

@_validate_transaction_status(PartitionTransactionStatus.STARTED)
def get(key: Any,
        prefix: bytes,
        default: Any = None,
        cf_name: str = "default") -> Optional[Any]

[VIEW SOURCE]

Get a key from the store.

It first looks up the key in the update cache in case it has been updated but not flushed yet.

It returns None if the key is not found and default is not provided.

Arguments:

  • key: a key to get from DB
  • prefix: a key prefix
  • default: value to return if the key is not present in the state. It can be of any type.
  • cf_name: rocksdb column family name. Default - "default"

Returns:

value or default

RocksDBPartitionTransaction.set

@_validate_transaction_status(PartitionTransactionStatus.STARTED)
def set(key: Any, value: Any, prefix: bytes, cf_name: str = "default")

[VIEW SOURCE]

Set a key to the store.

It first updates the key in the update cache.

Arguments:

  • key: key to store in DB
  • prefix: a key prefix
  • value: value to store in DB
  • cf_name: rocksdb column family name. Default - "default"

RocksDBPartitionTransaction.delete

@_validate_transaction_status(PartitionTransactionStatus.STARTED)
def delete(key: Any, prefix: bytes, cf_name: str = "default")

[VIEW SOURCE]

Delete a key from the store.

It first deletes the key from the update cache.

Arguments:

  • key: a key to delete from DB
  • prefix: a key prefix
  • cf_name: rocksdb column family name. Default - "default"

RocksDBPartitionTransaction.exists

@_validate_transaction_status(PartitionTransactionStatus.STARTED)
def exists(key: Any, prefix: bytes, cf_name: str = "default") -> bool

[VIEW SOURCE]

Check if a key exists in the store.

It first looks up the key in the update cache.

Arguments:

  • key: a key to check in DB
  • prefix: a key prefix
  • cf_name: rocksdb column family name. Default - "default"

Returns:

True if the key exists, False otherwise.

RocksDBPartitionTransaction.prepare

@_validate_transaction_status(PartitionTransactionStatus.STARTED)
def prepare(processed_offset: int)

[VIEW SOURCE]

Produce changelog messages to the changelog topic for all changes accumulated

in this transaction and prepare transaction to flush its state to the state store.

After successful prepare(), the transaction status is changed to PREPARED, and it cannot receive updates anymore.

If changelog is disabled for this application, no updates will be produced to the changelog topic.

Arguments:

  • processed_offset: the offset of the latest processed message

RocksDBPartitionTransaction.flush

@_validate_transaction_status(PartitionTransactionStatus.STARTED,
                              PartitionTransactionStatus.PREPARED)
def flush(processed_offset: Optional[int] = None,
          changelog_offset: Optional[int] = None)

[VIEW SOURCE]

Flush the recent updates to the database.

It writes the WriteBatch to RocksDB and marks itself as finished.

If writing fails, the transaction is marked as failed and cannot be used anymore.

NOTE: If no keys have been modified during the transaction (i.e. no "set" or "delete" have been called at least once), it will not flush ANY data to the database including the offset to optimize I/O.

Arguments:

  • processed_offset: offset of the last processed message, optional.
  • changelog_offset: offset of the last produced changelog message, optional.

RocksDBPartitionTransaction.completed

@property
def completed() -> bool

[VIEW SOURCE]

Check if the transaction is completed.

It doesn't indicate whether transaction is successful or not. Use RocksDBTransaction.failed for that.

The completed transaction should not be re-used.

Returns:

True if transaction is completed, False otherwise.

RocksDBPartitionTransaction.prepared

@property
def prepared() -> bool

[VIEW SOURCE]

Check if the transaction is in PREPARED status.

Prepared transaction successfully flushed its changelog and cannot receive updates anymore, but its state is not yet flushed to the disk

Returns:

True if transaction is prepared, False otherwise.

RocksDBPartitionTransaction.failed

@property
def failed() -> bool

[VIEW SOURCE]

Check if the transaction has failed.

The failed transaction should not be re-used because the update cache and

Returns:

True if transaction is failed, False otherwise.

RocksDBPartitionTransaction.changelog_topic_partition

@property
def changelog_topic_partition() -> Optional[Tuple[str, int]]

[VIEW SOURCE]

Return the changelog topic-partition for the StorePartition of this transaction.

Returns None if changelog_producer is not provided.

Returns:

(topic, partition) or None

RocksDBPartitionTransaction.as_state

def as_state(prefix: Any = DEFAULT_PREFIX) -> TransactionState

[VIEW SOURCE]

Create a one-time use TransactionState object with a limited CRUD interface

to be provided to StreamingDataFrame operations.

The TransactionState will prefix all the keys with the supplied prefix for all underlying operations.

Arguments:

  • prefix: a prefix to be used for all keys

Returns:

an instance of TransactionState

quixstreams.state.rocksdb

quixstreams.state.rocksdb.types

quixstreams.state.rocksdb.exceptions

quixstreams.state.rocksdb.serialization

quixstreams.state.recovery

RecoveryPartition

class RecoveryPartition()

[VIEW SOURCE]

A changelog topic partition mapped to a respective StorePartition with helper methods to determine its current recovery status.

Since StorePartitions do recovery directly, it also handles recovery transactions.

RecoveryPartition.offset

@property
def offset() -> int

[VIEW SOURCE]

Get the changelog offset from the underlying StorePartition.

Returns:

changelog offset (int)

RecoveryPartition.needs_recovery_check

@property
def needs_recovery_check() -> bool

[VIEW SOURCE]

Determine whether to attempt recovery for underlying StorePartition.

This does NOT mean that anything actually requires recovering.

RecoveryPartition.has_invalid_offset

@property
def has_invalid_offset() -> bool

[VIEW SOURCE]

Determine if the current changelog offset stored in state is invalid.

RecoveryPartition.recover_from_changelog_message

def recover_from_changelog_message(
        changelog_message: ConfluentKafkaMessageProto)

[VIEW SOURCE]

Recover the StorePartition using a message read from its respective changelog.

Arguments:

  • changelog_message: A confluent kafka message (everything as bytes)

RecoveryPartition.set_watermarks

def set_watermarks(lowwater: int, highwater: int)

[VIEW SOURCE]

Set the changelog watermarks as gathered from Consumer.get_watermark_offsets()

Arguments:

  • lowwater: topic partition lowwater
  • highwater: topic partition highwater

RecoveryPartition.set_recovery_consume_position

def set_recovery_consume_position(offset: int)

[VIEW SOURCE]

Update the recovery partition with the consumer's position (whenever

an empty poll is returned during recovery).

It is possible that it may be set more than once.

Arguments:

  • offset: the consumer's current read position of the changelog

ChangelogProducerFactory

class ChangelogProducerFactory()

[VIEW SOURCE]

Generates ChangelogProducers, which produce changelog messages to a StorePartition.

ChangelogProducerFactory.__init__

def __init__(changelog_name: str, producer: RowProducer)

[VIEW SOURCE]

Arguments:

  • changelog_name: changelog topic name
  • producer: a RowProducer (not shared with Application instance)

Returns:

a ChangelogWriter instance

ChangelogProducerFactory.get_partition_producer

def get_partition_producer(partition_num) -> "ChangelogProducer"

[VIEW SOURCE]

Generate a ChangelogProducer for producing to a specific partition number

(and thus StorePartition).

Arguments:

  • partition_num: source topic partition number

ChangelogProducer

class ChangelogProducer()

[VIEW SOURCE]

Generated for a StorePartition to produce state changes to its respective kafka changelog partition.

ChangelogProducer.__init__

def __init__(changelog_name: str, partition: int, producer: RowProducer)

[VIEW SOURCE]

Arguments:

  • changelog_name: A changelog topic name
  • partition: source topic partition number
  • producer: a RowProducer (not shared with Application instance)

ChangelogProducer.produce

def produce(key: bytes,
            value: Optional[bytes] = None,
            headers: Optional[MessageHeadersMapping] = None)

[VIEW SOURCE]

Produce a message to a changelog topic partition.

Arguments:

  • key: message key (same as state key, including prefixes)
  • value: message value (same as state value)
  • headers: message headers (includes column family info)

RecoveryManager

class RecoveryManager()

[VIEW SOURCE]

Manages all consumer-related aspects of recovery, including: - assigning/revoking, pausing/resuming topic partitions (especially changelogs) - consuming changelog messages until state is updated fully.

Also tracks/manages RecoveryPartitions, which are assigned/tracked only if recovery for that changelog partition is required.

Recovery is attempted from the Application after any new partition assignment.

RecoveryManager.partitions

@property
def partitions() -> Dict[int, Dict[str, RecoveryPartition]]

[VIEW SOURCE]

Returns a mapping of assigned RecoveryPartitions in the following format: {: {: }}

RecoveryManager.has_assignments

@property
def has_assignments() -> bool

[VIEW SOURCE]

Whether the Application has assigned RecoveryPartitions

Returns:

has assignments, as bool

RecoveryManager.recovering

@property
def recovering() -> bool

[VIEW SOURCE]

Whether the Application is currently recovering

Returns:

is recovering, as bool

RecoveryManager.register_changelog

def register_changelog(topic_name: str, store_name: str) -> Topic

[VIEW SOURCE]

Register a changelog Topic with the TopicManager.

Arguments:

  • topic_name: source topic name
  • store_name: name of the store

RecoveryManager.do_recovery

def do_recovery()

[VIEW SOURCE]

If there are any active RecoveryPartitions, do a recovery procedure.

After, will resume normal Application processing.

RecoveryManager.assign_partition

def assign_partition(topic: str, partition: int, committed_offset: int,
                     store_partitions: Dict[str, StorePartition])

[VIEW SOURCE]

Assigns StorePartitions (as RecoveryPartitions) ONLY IF recovery required.

Pauses active consumer partitions as needed.

RecoveryManager.revoke_partition

def revoke_partition(partition_num: int)

[VIEW SOURCE]

revoke ALL StorePartitions (across all Stores) for a given partition number

Arguments:

  • partition_num: partition number of source topic

quixstreams.state

quixstreams.state.types

Store

class Store(Protocol)

[VIEW SOURCE]

Abstract state store.

It keeps track of individual store partitions and provides access to the partitions' transactions.

Store.topic

@property
def topic() -> str

[VIEW SOURCE]

Topic name

Store.name

@property
def name() -> str

[VIEW SOURCE]

Store name

Store.partitions

@property
def partitions() -> Dict[int, "StorePartition"]

[VIEW SOURCE]

Mapping of assigned store partitions

Returns:

dict of "{partition: }"

Store.assign_partition

def assign_partition(partition: int) -> "StorePartition"

[VIEW SOURCE]

Assign new store partition

Arguments:

  • partition: partition number

Returns:

instance of StorePartition

Store.revoke_partition

def revoke_partition(partition: int)

[VIEW SOURCE]

Revoke assigned store partition

Arguments:

  • partition: partition number

Store.start_partition_transaction

def start_partition_transaction(partition: int) -> "PartitionTransaction"

[VIEW SOURCE]

Start a new partition transaction.

PartitionTransaction is the primary interface for working with data in Stores.

Arguments:

  • partition: partition number

Returns:

instance of PartitionTransaction

Store.close

def close()

[VIEW SOURCE]

Close store and revoke all store partitions

StorePartition

class StorePartition(Protocol)

[VIEW SOURCE]

A base class to access state in the underlying storage. It represents a single instance of some storage (e.g. a single database for the persistent storage).

StorePartition.path

@property
def path() -> str

[VIEW SOURCE]

Absolute path to RocksDB database folder

StorePartition.begin

def begin() -> "PartitionTransaction"

[VIEW SOURCE]

State new PartitionTransaction

StorePartition.recover_from_changelog_message

def recover_from_changelog_message(
        changelog_message: ConfluentKafkaMessageProto, committed_offset: int)

[VIEW SOURCE]

Updates state from a given changelog message.

Arguments:

  • changelog_message: A raw Confluent message read from a changelog topic.
  • committed_offset: latest committed offset for the partition

StorePartition.get_processed_offset

def get_processed_offset() -> Optional[int]

[VIEW SOURCE]

Get last processed offset for the given partition

Returns:

offset or None if there's no processed offset yet

StorePartition.get_changelog_offset

def get_changelog_offset() -> Optional[int]

[VIEW SOURCE]

Get offset that the changelog is up-to-date with.

Returns:

offset or None if there's no processed offset yet

StorePartition.set_changelog_offset

def set_changelog_offset(changelog_offset: int)

[VIEW SOURCE]

Set the changelog offset based on a message (usually an "offset-only" message).

Used during recovery.

Arguments:

  • changelog_offset: A changelog offset

State

class State(Protocol)

[VIEW SOURCE]

Primary interface for working with key-value state data from StreamingDataFrame

State.get

def get(key: Any, default: Any = None) -> Optional[Any]

[VIEW SOURCE]

Get the value for key if key is present in the state, else default

Arguments:

  • key: key
  • default: default value to return if the key is not found

Returns:

value or None if the key is not found and default is not provided

State.set

def set(key: Any, value: Any)

[VIEW SOURCE]

Set value for the key.

Arguments:

  • key: key
  • value: value

State.delete

def delete(key: Any)

[VIEW SOURCE]

Delete value for the key.

This function always returns None, even if value is not found.

Arguments:

  • key: key

State.exists

def exists(key: Any) -> bool

[VIEW SOURCE]

Check if the key exists in state.

Arguments:

  • key: key

Returns:

True if key exists, False otherwise

PartitionTransaction

class PartitionTransaction(Protocol)

[VIEW SOURCE]

A transaction class to perform simple key-value operations like "get", "set", "delete" and "exists" on a single storage partition.

PartitionTransaction.as_state

def as_state(prefix: Any) -> State

[VIEW SOURCE]

Create an instance implementing the State protocol to be provided

to StreamingDataFrame functions. All operations called on this State object will be prefixed with the supplied prefix.

Returns:

an instance implementing the State protocol

PartitionTransaction.get

def get(key: Any, prefix: bytes, default: Any = None) -> Optional[Any]

[VIEW SOURCE]

Get the value for key if key is present in the state, else default

Arguments:

  • key: key
  • prefix: a key prefix
  • default: default value to return if the key is not found

Returns:

value or None if the key is not found and default is not provided

PartitionTransaction.set

def set(key: Any, prefix: bytes, value: Any)

[VIEW SOURCE]

Set value for the key.

Arguments:

  • key: key
  • prefix: a key prefix
  • value: value

PartitionTransaction.delete

def delete(key: Any, prefix: bytes)

[VIEW SOURCE]

Delete value for the key.

This function always returns None, even if value is not found.

Arguments:

  • key: key
  • prefix: a key prefix

PartitionTransaction.exists

def exists(key: Any, prefix: bytes) -> bool

[VIEW SOURCE]

Check if the key exists in state.

Arguments:

  • key: key
  • prefix: a key prefix

Returns:

True if key exists, False otherwise

PartitionTransaction.failed

@property
def failed() -> bool

[VIEW SOURCE]

Return True if transaction failed to update data at some point.

Failed transactions cannot be re-used.

Returns:

bool

PartitionTransaction.completed

@property
def completed() -> bool

[VIEW SOURCE]

Return True if transaction is successfully completed.

Completed transactions cannot be re-used.

Returns:

bool

PartitionTransaction.prepared

@property
def prepared() -> bool

[VIEW SOURCE]

Return True if transaction is prepared completed.

Prepared transactions cannot receive new updates, but can be flushed.

Returns:

bool

PartitionTransaction.prepare

def prepare(processed_offset: int)

[VIEW SOURCE]

Produce changelog messages to the changelog topic for all changes accumulated

in this transaction and prepare transcation to flush its state to the state store.

After successful prepare(), the transaction status is changed to PREPARED, and it cannot receive updates anymore.

If changelog is disabled for this application, no updates will be produced to the changelog topic.

Arguments:

  • processed_offset: the offset of the latest processed message

PartitionTransaction.changelog_topic_partition

@property
def changelog_topic_partition() -> Optional[Tuple[str, int]]

[VIEW SOURCE]

Return the changelog topic-partition for the StorePartition of this transaction.

Returns None if changelog_producer is not provided.

Returns:

(topic, partition) or None

PartitionTransaction.flush

def flush(processed_offset: Optional[int] = None,
          changelog_offset: Optional[int] = None)

[VIEW SOURCE]

Flush the recent updates to the storage.

Arguments:

  • processed_offset: offset of the last processed message, optional.
  • changelog_offset: offset of the last produced changelog message, optional.

WindowedState

class WindowedState(Protocol)

[VIEW SOURCE]

A windowed state to be provided into StreamingDataFrame window functions.

WindowedState.get_window

def get_window(start_ms: int,
               end_ms: int,
               default: Any = None) -> Optional[Any]

[VIEW SOURCE]

Get the value of the window defined by start and end timestamps

if the window is present in the state, else default

Arguments:

  • start_ms: start of the window in milliseconds
  • end_ms: end of the window in milliseconds
  • default: default value to return if the key is not found

Returns:

value or None if the key is not found and default is not provided

WindowedState.update_window

def update_window(start_ms: int, end_ms: int, value: Any, timestamp_ms: int)

[VIEW SOURCE]

Set a value for the window.

This method will also update the latest observed timestamp in state partition using the provided timestamp.

Arguments:

  • start_ms: start of the window in milliseconds
  • end_ms: end of the window in milliseconds
  • value: value of the window
  • timestamp_ms: current message timestamp in milliseconds

WindowedState.get_latest_timestamp

def get_latest_timestamp() -> int

[VIEW SOURCE]

Get the latest observed timestamp for the current state partition.

Use this timestamp to determine if the arriving event is late and should be discarded from the processing.

Returns:

latest observed event timestamp in milliseconds

WindowedState.expire_windows

def expire_windows(duration_ms: int,
                   grace_ms: int = 0) -> List[Tuple[Tuple[int, int], Any]]

[VIEW SOURCE]

Get a list of expired windows from RocksDB considering the current

latest timestamp, window duration and grace period.

It also marks the latest found window as expired in the expiration index, so calling this method multiple times will yield different results for the same "latest timestamp".

Arguments:

  • duration_ms: duration of the windows in milliseconds
  • grace_ms: grace period in milliseconds. Default - "0"

WindowedPartitionTransaction

class WindowedPartitionTransaction(Protocol)

[VIEW SOURCE]

WindowedPartitionTransaction.failed

@property
def failed() -> bool

[VIEW SOURCE]

Return True if transaction failed to update data at some point.

Failed transactions cannot be re-used.

Returns:

bool

WindowedPartitionTransaction.completed

@property
def completed() -> bool

[VIEW SOURCE]

Return True if transaction is successfully completed.

Completed transactions cannot be re-used.

Returns:

bool

WindowedPartitionTransaction.prepared

@property
def prepared() -> bool

[VIEW SOURCE]

Return True if transaction is prepared completed.

Prepared transactions cannot receive new updates, but can be flushed.

Returns:

bool

WindowedPartitionTransaction.prepare

def prepare(processed_offset: int)

[VIEW SOURCE]

Produce changelog messages to the changelog topic for all changes accumulated

in this transaction and prepare transcation to flush its state to the state store.

After successful prepare(), the transaction status is changed to PREPARED, and it cannot receive updates anymore.

If changelog is disabled for this application, no updates will be produced to the changelog topic.

Arguments:

  • processed_offset: the offset of the latest processed message

WindowedPartitionTransaction.get_window

def get_window(start_ms: int,
               end_ms: int,
               prefix: bytes,
               default: Any = None) -> Optional[Any]

[VIEW SOURCE]

Get the value of the window defined by start and end timestamps

if the window is present in the state, else default

Arguments:

  • start_ms: start of the window in milliseconds
  • end_ms: end of the window in milliseconds
  • prefix: a key prefix
  • default: default value to return if the key is not found

Returns:

value or None if the key is not found and default is not provided

WindowedPartitionTransaction.update_window

def update_window(start_ms: int, end_ms: int, value: Any, timestamp_ms: int,
                  prefix: bytes)

[VIEW SOURCE]

Set a value for the window.

This method will also update the latest observed timestamp in state partition using the provided timestamp.

Arguments:

  • start_ms: start of the window in milliseconds
  • end_ms: end of the window in milliseconds
  • value: value of the window
  • timestamp_ms: current message timestamp in milliseconds
  • prefix: a key prefix

WindowedPartitionTransaction.get_latest_timestamp

def get_latest_timestamp() -> int

[VIEW SOURCE]

Get the latest observed timestamp for the current state partition.

Use this timestamp to determine if the arriving event is late and should be discarded from the processing.

Returns:

latest observed event timestamp in milliseconds

WindowedPartitionTransaction.expire_windows

def expire_windows(duration_ms: int, prefix: bytes, grace_ms: int = 0)

[VIEW SOURCE]

Get a list of expired windows from RocksDB considering the current

latest timestamp, window duration and grace period.

It also marks the latest found window as expired in the expiration index, so calling this method multiple times will yield different results for the same "latest timestamp".

Arguments:

  • duration_ms: duration of the windows in milliseconds
  • prefix: a key prefix
  • grace_ms: grace period in milliseconds. Default - "0"

WindowedPartitionTransaction.flush

def flush(processed_offset: Optional[int] = None,
          changelog_offset: Optional[int] = None)

[VIEW SOURCE]

Flush the recent updates to the storage.

Arguments:

  • processed_offset: offset of the last processed message, optional.
  • changelog_offset: offset of the last produced changelog message, optional.

WindowedPartitionTransaction.changelog_topic_partition

@property
def changelog_topic_partition() -> Optional[Tuple[str, int]]

[VIEW SOURCE]

Return the changelog topic-partition for the StorePartition of this transaction.

Returns None if changelog_producer is not provided.

Returns:

(topic, partition) or None

PartitionRecoveryTransaction

class PartitionRecoveryTransaction(Protocol)

[VIEW SOURCE]

A class for managing recovery for a StorePartition from a changelog message

PartitionRecoveryTransaction.flush

def flush()

[VIEW SOURCE]

Flush the recovery update to the storage.

PartitionTransactionStatus

class PartitionTransactionStatus(enum.Enum)

[VIEW SOURCE]

STARTED

Transaction is started and accepts updates

PREPARED

Transaction is prepared, it can no longer receive updates

COMPLETE

Transaction is fully completed, it cannot be used anymore

FAILED

Transaction is failed, it cannot be used anymore

quixstreams.state.exceptions

quixstreams.state.manager

StateStoreManager

class StateStoreManager()

[VIEW SOURCE]

Class for managing state stores and partitions.

StateStoreManager is responsible for: - reacting to rebalance callbacks - managing the individual state stores - providing access to store transactions

StateStoreManager.stores

@property
def stores() -> Dict[str, Dict[str, Store]]

[VIEW SOURCE]

Map of registered state stores

Returns:

dict in format {topic: {store_name: store}}

StateStoreManager.recovery_required

@property
def recovery_required() -> bool

[VIEW SOURCE]

Whether recovery needs to be done.

StateStoreManager.using_changelogs

@property
def using_changelogs() -> bool

[VIEW SOURCE]

Whether the StateStoreManager is using changelog topics

Returns:

using changelogs, as bool

StateStoreManager.do_recovery

def do_recovery()

[VIEW SOURCE]

Perform a state recovery, if necessary.

StateStoreManager.stop_recovery

def stop_recovery()

[VIEW SOURCE]

Stop recovery (called during app shutdown).

StateStoreManager.get_store

def get_store(topic: str, store_name: str = DEFAULT_STATE_STORE_NAME) -> Store

[VIEW SOURCE]

Get a store for given name and topic

Arguments:

  • topic: topic name
  • store_name: store name

Returns:

instance of Store

StateStoreManager.register_store

def register_store(topic_name: str,
                   store_name: str = DEFAULT_STATE_STORE_NAME)

[VIEW SOURCE]

Register a state store to be managed by StateStoreManager.

During processing, the StateStoreManager will react to rebalancing callbacks and assign/revoke the partitions for registered stores.

Each store can be registered only once for each topic.

Arguments:

  • topic_name: topic name
  • store_name: store name

StateStoreManager.register_windowed_store

def register_windowed_store(topic_name: str, store_name: str)

[VIEW SOURCE]

Register a windowed state store to be managed by StateStoreManager.

During processing, the StateStoreManager will react to rebalancing callbacks and assign/revoke the partitions for registered stores.

Each window store can be registered only once for each topic.

Arguments:

  • topic_name: topic name
  • store_name: store name

StateStoreManager.clear_stores

def clear_stores()

[VIEW SOURCE]

Delete all state stores managed by StateStoreManager.

StateStoreManager.on_partition_assign

def on_partition_assign(topic: str, partition: int,
                        committed_offset: int) -> List[StorePartition]

[VIEW SOURCE]

Assign store partitions for each registered store for the given TopicPartition

and return a list of assigned StorePartition objects.

Arguments:

  • topic: Kafka topic name
  • partition: Kafka topic partition
  • committed_offset: latest committed offset for the partition

Returns:

list of assigned StorePartition

StateStoreManager.on_partition_revoke

def on_partition_revoke(topic: str, partition: int)

[VIEW SOURCE]

Revoke store partitions for each registered store for the given TopicPartition

Arguments:

  • topic: Kafka topic name
  • partition: Kafka topic partition

StateStoreManager.init

def init()

[VIEW SOURCE]

Initialize StateStoreManager and create a store directory

StateStoreManager.close

def close()

[VIEW SOURCE]

Close all registered stores

quixstreams.state.state

TransactionState

class TransactionState(State)

[VIEW SOURCE]

TransactionState.__init__

def __init__(prefix: bytes, transaction: PartitionTransaction)

[VIEW SOURCE]

Simple key-value state to be provided into StreamingDataFrame functions

Arguments:

  • transaction: instance of PartitionTransaction

TransactionState.get

def get(key: Any, default: Any = None) -> Optional[Any]

[VIEW SOURCE]

Get the value for key if key is present in the state, else default

Arguments:

  • key: key
  • default: default value to return if the key is not found

Returns:

value or None if the key is not found and default is not provided

TransactionState.set

def set(key: Any, value: Any)

[VIEW SOURCE]

Set value for the key.

Arguments:

  • key: key
  • value: value

TransactionState.delete

def delete(key: Any)

[VIEW SOURCE]

Delete value for the key.

This function always returns None, even if value is not found.

Arguments:

  • key: key

TransactionState.exists

def exists(key: Any) -> bool

[VIEW SOURCE]

Check if the key exists in state.

Arguments:

  • key: key

Returns:

True if key exists, False otherwise

quixstreams.exceptions

quixstreams.exceptions.assignment

PartitionAssignmentError

class PartitionAssignmentError(QuixException)

[VIEW SOURCE]

Error happened during partition rebalancing. Raised from on_assign, on_revoke and on_lost callbacks

quixstreams.exceptions.base

quixstreams.context

set_message_context

def set_message_context(context: Optional[MessageContext])

[VIEW SOURCE]

Set a MessageContext for the current message in the given contextvars.Context

NOTE: This is for advanced usage only. If you need to change the message key, StreamingDataFrame.to_topic() has an argument for it.

Example Snippet:

from quixstreams import Application, set_message_context, message_context

# Changes the current sdf value based on what the message partition is.
def alter_context(value):
    context = message_context()
    if value > 1:
        context.headers = context.headers + (b"cool_new_header", value.encode())
        set_message_context(context)

app = Application()
sdf = app.dataframe()
sdf = sdf.update(lambda value: alter_context(value))

Arguments:

  • context: instance of MessageContext

message_context

def message_context() -> MessageContext

[VIEW SOURCE]

Get a MessageContext for the current message, which houses most of the message

metadata, like: - key - timestamp - partition - offset

Example Snippet:

from quixstreams import Application, message_context

# Changes the current sdf value based on what the message partition is.

app = Application()
sdf = app.dataframe()
sdf = sdf.apply(lambda value: 1 if message_context().partition == 2 else 0)

Returns:

instance of MessageContext

quixstreams.kafka.configuration

ConnectionConfig

class ConnectionConfig(BaseSettings)

[VIEW SOURCE]

Provides an interface for all librdkafka connection-based configs.

Allows converting to or from a librdkafka dictionary.

Also obscures secrets and handles any case sensitivity issues.

ConnectionConfig.settings_customise_sources

@classmethod
def settings_customise_sources(
    cls, settings_cls: Type[BaseSettings],
    init_settings: PydanticBaseSettingsSource,
    env_settings: PydanticBaseSettingsSource,
    dotenv_settings: PydanticBaseSettingsSource,
    file_secret_settings: PydanticBaseSettingsSource
) -> Tuple[PydanticBaseSettingsSource, ...]

[VIEW SOURCE]

Included to ignore reading/setting values from the environment

ConnectionConfig.from_librdkafka_dict

@classmethod
def from_librdkafka_dict(cls,
                         config: dict,
                         ignore_extras: bool = False) -> Self

[VIEW SOURCE]

Create a ConnectionConfig from a librdkafka config dictionary.

Arguments:

  • config: a dict of configs (like {"bootstrap.servers": "url"})
  • ignore_extras: Ignore non-connection settings (else raise exception)

Returns:

a ConnectionConfig

ConnectionConfig.as_librdkafka_dict

def as_librdkafka_dict(plaintext_secrets=True) -> dict

[VIEW SOURCE]

Dump any non-empty config values as a librdkafka dictionary.

NOTE: All secret values will be dumped in PLAINTEXT by default.

Arguments:

  • plaintext_secrets: whether secret values are plaintext or obscured (***)

Returns:

a librdkafka-compatible dictionary

quixstreams.kafka

quixstreams.kafka.producer

Producer

class Producer()

[VIEW SOURCE]

Producer.__init__

def __init__(broker_address: Union[str, ConnectionConfig],
             logger: logging.Logger = logger,
             error_callback: Callable[[KafkaError], None] = _default_error_cb,
             extra_config: Optional[dict] = None,
             flush_timeout: Optional[int] = None)

[VIEW SOURCE]

A wrapper around confluent_kafka.Producer.

It initializes confluent_kafka.Producer on demand avoiding network calls during __init__, provides typing info for methods and some reasonable defaults.

Arguments:

  • broker_address: Connection settings for Kafka. Accepts string with Kafka broker host and port formatted as <host>:<port>, or a ConnectionConfig object if authentication is required.
  • logger: a Logger instance to attach librdkafka logging to
  • error_callback: callback used for producer errors
  • extra_config: A dictionary with additional options that will be passed to confluent_kafka.Producer as is. Note: values passed as arguments override values in extra_config.
  • flush_timeout: The time the producer is waiting for all messages to be delivered.

Producer.produce

def produce(topic: str,
            value: Optional[Union[str, bytes]] = None,
            key: Optional[Union[str, bytes]] = None,
            headers: Optional[Headers] = None,
            partition: Optional[int] = None,
            timestamp: Optional[int] = None,
            poll_timeout: float = 5.0,
            buffer_error_max_tries: int = 3,
            on_delivery: Optional[DeliveryCallback] = None)

[VIEW SOURCE]

Produce a message to a topic.

It also polls Kafka for callbacks before producing to minimize the probability of BufferError. If BufferError still happens, the method will poll Kafka with timeout to free up the buffer and try again.

Arguments:

  • topic: topic name
  • value: message value
  • key: message key
  • headers: message headers
  • partition: topic partition
  • timestamp: message timestamp
  • poll_timeout: timeout for poll() call in case of BufferError
  • buffer_error_max_tries: max retries for BufferError. Pass 0 to not retry after BufferError.
  • on_delivery: the delivery callback to be triggered on poll() for the produced message.

Producer.poll

def poll(timeout: float = 0)

[VIEW SOURCE]

Polls the producer for events and calls on_delivery callbacks.

Arguments:

  • timeout: poll timeout seconds; Default: 0 (unlike others)

    NOTE: -1 will hang indefinitely if there are no messages to acknowledge

Producer.flush

def flush(timeout: Optional[float] = None) -> int

[VIEW SOURCE]

Wait for all messages in the Producer queue to be delivered.

Arguments:

  • timeout (float): time to attempt flushing (seconds). None use producer default or -1 is infinite. Default: None

Returns:

number of messages remaining to flush

TransactionalProducer

class TransactionalProducer(Producer)

[VIEW SOURCE]

A separate producer class used only internally for transactions (transactions are only needed when using a consumer).

quixstreams.kafka.consumer

Consumer

class Consumer()

[VIEW SOURCE]

Consumer.__init__

def __init__(broker_address: Union[str, ConnectionConfig],
             consumer_group: Optional[str],
             auto_offset_reset: AutoOffsetReset,
             auto_commit_enable: bool = True,
             logger: logging.Logger = logger,
             error_callback: Callable[[KafkaError], None] = _default_error_cb,
             on_commit: Optional[Callable[
                 [Optional[KafkaError], List[TopicPartition]], None]] = None,
             extra_config: Optional[dict] = None)

[VIEW SOURCE]

A wrapper around confluent_kafka.Consumer.

It initializes confluent_kafka.Consumer on demand avoiding network calls during __init__, provides typing info for methods and some reasonable defaults.

Arguments:

  • broker_address: Connection settings for Kafka. Accepts string with Kafka broker host and port formatted as <host>:<port>, or a ConnectionConfig object if authentication is required.
  • consumer_group: Kafka consumer group. Passed as group.id to confluent_kafka.Consumer
  • auto_offset_reset: Consumer auto.offset.reset setting. Available values:
    "earliest" - automatically reset the offset to the smallest offset
    "latest" - automatically reset the offset to the largest offset
    "error" - trigger an error (ERR__AUTO_OFFSET_RESET) which is retrieved by consuming messages (used for testing)
  • auto_commit_enable: If true, periodically commit offset of the last message handed to the application. Default - True.
  • logger: a Logger instance to attach librdkafka logging to
  • error_callback: callback used for consumer errors
  • on_commit: Offset commit result propagation callback. Passed as "offset_commit_cb" to confluent_kafka.Consumer.
  • extra_config: A dictionary with additional options that will be passed to confluent_kafka.Consumer as is. Note: values passed as arguments override values in extra_config.

Consumer.poll

def poll(timeout: Optional[float] = None) -> Optional[Message]

[VIEW SOURCE]

Consumes a single message, calls callbacks and returns events.

The application must check the returned 🇵🇾class:Message object's 🇵🇾func:Message.error() method to distinguish between proper messages (error() returns None), or an event or error.

Note: a RebalancingCallback may be called from this method ( on_assign, on_revoke, or on_lost).

Arguments:

  • timeout (float): Maximum time in seconds to block waiting for message, event or callback. None or -1 is infinite. Default: None.

Raises:

  • RuntimeError: if called on a closed consumer

Returns:

Optional[Message]: A Message object or None on timeout

Consumer.subscribe

def subscribe(topics: List[str],
              on_assign: Optional[RebalancingCallback] = None,
              on_revoke: Optional[RebalancingCallback] = None,
              on_lost: Optional[RebalancingCallback] = None)

[VIEW SOURCE]

Set subscription to supplied list of topics

This replaces a previous subscription.

Arguments:

  • topics (List[str]): List of topics (strings) to subscribe to.
  • on_assign (Optional[RebalancingCallback]): callback to provide handling of customized offsets on completion of a successful partition re-assignment.
  • on_revoke (Optional[RebalancingCallback]): callback to provide handling of offset commits to a customized store on the start of a rebalance operation.
  • on_lost (Optional[RebalancingCallback]): callback to provide handling in the case the partition assignment has been lost. Partitions that have been lost may already be owned by other members in the group and therefore committing offsets, for example, may fail.

Raises:

  • KafkaException: if a Kafka-based error occurs
  • RuntimeError: if called on a closed consumer

Consumer.unsubscribe

def unsubscribe()

[VIEW SOURCE]

Remove current subscription.

Raises:

  • KafkaException: if a Kafka-based error occurs
  • RuntimeError: if called on a closed consumer

Consumer.store_offsets

def store_offsets(message: Optional[Message] = None,
                  offsets: Optional[List[TopicPartition]] = None)

[VIEW SOURCE]

Store offsets for a message or a list of offsets.

message and offsets are mutually exclusive. The stored offsets will be committed according to 'auto.commit.interval.ms' or manual offset-less commit. Note that 'enable.auto.offset.store' must be set to False when using this API.

Arguments:

  • message (confluent_kafka.Message): Store message's offset+1.
  • offsets (List[TopicPartition]): List of topic+partitions+offsets to store.

Raises:

  • KafkaException: if a Kafka-based error occurs
  • RuntimeError: if called on a closed consumer

Consumer.commit

def commit(message: Optional[Message] = None,
           offsets: Optional[List[TopicPartition]] = None,
           asynchronous: bool = True) -> Optional[List[TopicPartition]]

[VIEW SOURCE]

Commit a message or a list of offsets.

The message and offsets parameters are mutually exclusive. If neither is set, the current partition assignment's offsets are used instead. Use this method to commit offsets if you have 'enable.auto.commit' set to False.

Arguments:

  • message (Message): Commit the message's offset+1. Note: By convention, committed offsets reflect the next message to be consumed, not the last message consumed.
  • offsets (List[TopicPartition]): List of topic+partitions+offsets to commit.
  • asynchronous (bool): If true, asynchronously commit, returning None immediately. If False, the commit() call will block until the commit succeeds or fails and the committed offsets will be returned (on success). Note that specific partitions may have failed and the .err field of each partition should be checked for success.

Raises:

  • KafkaException: if a Kafka-based error occurs
  • RuntimeError: if called on a closed consumer

Consumer.committed

def committed(partitions: List[TopicPartition],
              timeout: Optional[float] = None) -> List[TopicPartition]

[VIEW SOURCE]

Retrieve committed offsets for the specified partitions.

Arguments:

  • partitions (List[TopicPartition]): List of topic+partitions to query for stored offsets.
  • timeout (float): Request timeout (seconds). None or -1 is infinite. Default: None

Raises:

  • KafkaException: if a Kafka-based error occurs
  • RuntimeError: if called on a closed consumer

Returns:

List[TopicPartition]: List of topic+partitions with offset and possibly error set.

Consumer.get_watermark_offsets

def get_watermark_offsets(partition: TopicPartition,
                          timeout: Optional[float] = None,
                          cached: bool = False) -> Tuple[int, int]

[VIEW SOURCE]

Retrieve low and high offsets for the specified partition.

Arguments:

  • partition (TopicPartition): Topic+partition to return offsets for.
  • timeout (float): Request timeout (seconds). None or -1 is infinite. Ignored if cached=True. Default: None
  • cached (bool): Instead of querying the broker, use cached information. Cached values: The low offset is updated periodically (if statistics.interval.ms is set) while the high offset is updated on each message fetched from the broker for this partition.

Raises:

  • KafkaException: if a Kafka-based error occurs
  • RuntimeError: if called on a closed consumer

Returns:

Tuple[int, int]: Tuple of (low,high) on success or None on timeout. The high offset is the offset of the last message + 1.

Consumer.list_topics

def list_topics(topic: Optional[str] = None,
                timeout: Optional[float] = None) -> ClusterMetadata

[VIEW SOURCE]

Request metadata from the cluster.

This method provides the same information as listTopics(), describeTopics() and describeCluster() in the Java Admin client.

Arguments:

  • topic (str): If specified, only request information about this topic, else return results for all topics in cluster. Warning: If auto.create.topics.enable is set to true on the broker and an unknown topic is specified, it will be created.
  • timeout (float): The maximum response time before timing out None or -1 is infinite. Default: None

Raises:

  • KafkaException: if a Kafka-based error occurs

Consumer.memberid

def memberid() -> Optional[str]

[VIEW SOURCE]

Return this client's broker-assigned group member id.

The member id is assigned by the group coordinator and is propagated to the consumer during rebalance.

Raises:

  • RuntimeError: if called on a closed consumer

Returns:

Optional[string]: Member id string or None

Consumer.offsets_for_times

def offsets_for_times(partitions: List[TopicPartition],
                      timeout: Optional[float] = None) -> List[TopicPartition]

[VIEW SOURCE]

Look up offsets by timestamp for the specified partitions.

The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. If the provided timestamp exceeds that of the last message in the partition, a value of -1 will be returned.

Arguments:

  • partitions (List[TopicPartition]): topic+partitions with timestamps in the TopicPartition.offset field.
  • timeout (float): The maximum response time before timing out. None or -1 is infinite. Default: None

Raises:

  • KafkaException: if a Kafka-based error occurs
  • RuntimeError: if called on a closed consumer

Returns:

List[TopicPartition]: List of topic+partition with offset field set and possibly error set

Consumer.pause

def pause(partitions: List[TopicPartition])

[VIEW SOURCE]

Pause consumption for the provided list of partitions.

Paused partitions must be tracked manually.

Does NOT affect the result of Consumer.assignment().

Arguments:

  • partitions (List[TopicPartition]): List of topic+partitions to pause.

Raises:

  • KafkaException: if a Kafka-based error occurs

Consumer.resume

def resume(partitions: List[TopicPartition])

[VIEW SOURCE]

Resume consumption for the provided list of partitions.

Arguments:

  • partitions (List[TopicPartition]): List of topic+partitions to resume.

Raises:

  • KafkaException: if a Kafka-based error occurs

Consumer.position

def position(partitions: List[TopicPartition]) -> List[TopicPartition]

[VIEW SOURCE]

Retrieve current positions (offsets) for the specified partitions.

Arguments:

  • partitions (List[TopicPartition]): List of topic+partitions to return current offsets for. The current offset is the offset of the last consumed message + 1.

Raises:

  • KafkaException: if a Kafka-based error occurs
  • RuntimeError: if called on a closed consumer

Returns:

List[TopicPartition]: List of topic+partitions with offset and possibly error set.

Consumer.seek

def seek(partition: TopicPartition)

[VIEW SOURCE]

Set consume position for partition to offset.

The offset may be an absolute (>=0) or a logical offset like OFFSET_BEGINNING.

seek() may only be used to update the consume offset of an actively consumed partition (i.e., after Consumer.assign()), to set the starting offset of partition not being consumed instead pass the offset in an assign() call.

Arguments:

  • partition (TopicPartition): Topic+partition+offset to seek to.

Raises:

  • KafkaException: if a Kafka-based error occurs

Consumer.assignment

def assignment() -> List[TopicPartition]

[VIEW SOURCE]

Returns the current partition assignment.

Raises:

  • KafkaException: if a Kafka-based error occurs
  • RuntimeError: if called on a closed consumer

Returns:

List[TopicPartition]: List of assigned topic+partitions.

Consumer.set_sasl_credentials

def set_sasl_credentials(username: str, password: str)

[VIEW SOURCE]

Sets the SASL credentials used for this client.

These credentials will overwrite the old ones, and will be used the next time the client needs to authenticate. This method will not disconnect existing broker connections that have been established with the old credentials. This method is applicable only to SASL PLAIN and SCRAM mechanisms.

Arguments:

  • username (str): your username
  • password (str): your password

Consumer.incremental_assign

def incremental_assign(partitions: List[TopicPartition])

[VIEW SOURCE]

Assign new partitions.

Can be called outside the Consumer on_assign callback (multiple times). Partitions immediately show on Consumer.assignment().

Any additional partitions besides the ones passed during the Consumer on_assign callback will NOT be associated with the consumer group.

Arguments:

  • partitions (List[TopicPartition]): a list of topic partitions

Consumer.incremental_unassign

def incremental_unassign(partitions: List[TopicPartition])

[VIEW SOURCE]

Revoke partitions.

Can be called outside an on_revoke callback.

Arguments:

  • partitions (List[TopicPartition]): a list of topic partitions

Consumer.close

def close()

[VIEW SOURCE]

Close down and terminate the Kafka Consumer.

Actions performed:

  • Stops consuming.
  • Commits offsets, unless the consumer property 'enable.auto.commit' is set to False.
  • Leaves the consumer group.

Registered callbacks may be called from this method, see poll() for more info.

Consumer.consumer_group_metadata

def consumer_group_metadata() -> GroupMetadata

[VIEW SOURCE]

Used by the producer during consumer offset sending for an EOS transaction.

quixstreams.kafka.exceptions

quixstreams.app

Application

class Application()

[VIEW SOURCE]

The main Application class.

Typically, the primary object needed to get a kafka application up and running.

Most functionality is explained the various methods, except for "column assignment".

What it Does:

  • On init:
    • Provides defaults or helper methods for commonly needed objects
    • If quix_sdk_token is passed, configures the app to use the Quix Cloud.
  • When executed via .run() (after setup):
    • Initializes Topics and StreamingDataFrames
    • Facilitates processing of Kafka messages with a StreamingDataFrame
    • Handles all Kafka client consumer/producer responsibilities.

Example Snippet:

from quixstreams import Application

# Set up an `app = Application` and `sdf = StreamingDataFrame`;
# add some operations to `sdf` and then run everything.

app = Application(broker_address='localhost:9092', consumer_group='group')
topic = app.topic('test-topic')
df = app.dataframe(topic)
df.apply(lambda value, context: print('New message', value))

app.run(dataframe=df)

Application.__init__

def __init__(broker_address: Optional[Union[str, ConnectionConfig]] = None,
             quix_sdk_token: Optional[str] = None,
             consumer_group: Optional[str] = None,
             auto_offset_reset: AutoOffsetReset = "latest",
             commit_interval: float = 5.0,
             consumer_extra_config: Optional[dict] = None,
             producer_extra_config: Optional[dict] = None,
             state_dir: str = "state",
             rocksdb_options: Optional[RocksDBOptionsType] = None,
             on_consumer_error: Optional[ConsumerErrorCallback] = None,
             on_processing_error: Optional[ProcessingErrorCallback] = None,
             on_producer_error: Optional[ProducerErrorCallback] = None,
             on_message_processed: Optional[MessageProcessedCallback] = None,
             consumer_poll_timeout: float = 1.0,
             producer_poll_timeout: float = 0.0,
             loglevel: Optional[LogLevel] = "INFO",
             auto_create_topics: bool = True,
             use_changelog_topics: bool = True,
             quix_config_builder: Optional[QuixKafkaConfigsBuilder] = None,
             topic_manager: Optional[TopicManager] = None,
             request_timeout: float = 30,
             topic_create_timeout: float = 60,
             processing_guarantee: ProcessingGuarantee = "at-least-once")

[VIEW SOURCE]

Arguments:

  • broker_address: Connection settings for Kafka. Used by Producer, Consumer, and Admin clients. Accepts string with Kafka broker host and port formatted as <host>:<port>, or a ConnectionConfig object if authentication is required. Either this OR quix_sdk_token must be set to use Application (not both). Takes priority over quix auto-configuration. Linked Environment Variable: Quix__Broker__Address. Default: None
  • quix_sdk_token: If using the Quix Cloud, the SDK token to connect with. Either this OR broker_address must be set to use Application (not both). Linked Environment Variable: Quix__Sdk__Token. Default: None (if not run on Quix Cloud)

    NOTE: the environment variable is set for you in the Quix Cloud

  • consumer_group: Kafka consumer group. Passed as group.id to confluent_kafka.Consumer. Linked Environment Variable: Quix__Consumer__Group. Default - "quixstreams-default" (set during init)

    NOTE: Quix Applications will prefix it with the Quix workspace id.

  • commit_interval: How often to commit the processed messages in seconds. Default - 5.0.
  • auto_offset_reset: Consumer auto.offset.reset setting
  • consumer_extra_config: A dictionary with additional options that will be passed to confluent_kafka.Consumer as is.
  • producer_extra_config: A dictionary with additional options that will be passed to confluent_kafka.Producer as is.
  • state_dir: path to the application state directory. Default - ".state".
  • rocksdb_options: RocksDB options. If None, the default options will be used.
  • consumer_poll_timeout: timeout for RowConsumer.poll(). Default - 1.0s
  • producer_poll_timeout: timeout for RowProducer.poll(). Default - 0s.
  • on_message_processed: a callback triggered when message is successfully processed.
  • loglevel: a log level for "quixstreams" logger. Should be a string or None. If None is passed, no logging will be configured. You may pass None and configure "quixstreams" logger externally using logging library. Default - "INFO".
  • auto_create_topics: Create all Topics made via Application.topic() Default - True
  • use_changelog_topics: Use changelog topics to back stateful operations Default - True
  • topic_manager: A TopicManager instance
  • request_timeout: timeout (seconds) for REST-based requests
  • topic_create_timeout: timeout (seconds) for topic create finalization
  • processing_guarantee: Use "exactly-once" or "at-least-once" processing.

    Error Handlers
    To handle errors, Application accepts callbacks triggered when exceptions occur on different stages of stream processing. If the callback returns True, the exception will be ignored. Otherwise, the exception will be propagated and the processing will eventually stop.
  • on_consumer_error: triggered when internal RowConsumer fails to poll Kafka or cannot deserialize a message.
  • on_processing_error: triggered when exception is raised within StreamingDataFrame.process().
  • on_producer_error: triggered when RowProducer fails to serialize or to produce a message to Kafka.

    Quix Cloud Parameters
  • quix_config_builder: instance of QuixKafkaConfigsBuilder to be used instead of the default one.

    NOTE: It is recommended to just use quix_sdk_token instead.

Application.Quix

@classmethod
def Quix(
    cls,
    consumer_group: Optional[str] = None,
    auto_offset_reset: AutoOffsetReset = "latest",
    consumer_extra_config: Optional[dict] = None,
    producer_extra_config: Optional[dict] = None,
    state_dir: str = "state",
    rocksdb_options: Optional[RocksDBOptionsType] = None,
    on_consumer_error: Optional[ConsumerErrorCallback] = None,
    on_processing_error: Optional[ProcessingErrorCallback] = None,
    on_producer_error: Optional[ProducerErrorCallback] = None,
    on_message_processed: Optional[MessageProcessedCallback] = None,
    consumer_poll_timeout: float = 1.0,
    producer_poll_timeout: float = 0.0,
    loglevel: Optional[LogLevel] = "INFO",
    quix_config_builder: Optional[QuixKafkaConfigsBuilder] = None,
    auto_create_topics: bool = True,
    use_changelog_topics: bool = True,
    topic_manager: Optional[QuixTopicManager] = None,
    request_timeout: float = 30,
    topic_create_timeout: float = 60,
    processing_guarantee: Literal["at-least-once",
                                  "exactly-once"] = "exactly-once"
) -> Self

[VIEW SOURCE]

NOTE: DEPRECATED: use Application with quix_sdk_token argument instead.

Initialize an Application to work with Quix Cloud, assuming environment is properly configured (by default in Quix Cloud).

It takes the credentials from the environment and configures consumer and producer to properly connect to the Quix Cloud.

NOTE: Quix Cloud requires consumer_group and topic names to be prefixed with workspace id. If the application is created via Application.Quix(), the real consumer group will be <workspace_id>-<consumer_group>, and the real topic names will be <workspace_id>-<topic_name>.

Example Snippet:

from quixstreams import Application

# Set up an `app = Application.Quix` and `sdf = StreamingDataFrame`;
# add some operations to `sdf` and then run everything. Also shows off how to
# use the quix-specific serializers and deserializers.

app = Application.Quix()
input_topic = app.topic("topic-in", value_deserializer="quix")
output_topic = app.topic("topic-out", value_serializer="quix_timeseries")
df = app.dataframe(topic_in)
df = df.to_topic(output_topic)

app.run(dataframe=df)

Arguments:

  • consumer_group: Kafka consumer group. Passed as group.id to confluent_kafka.Consumer. Linked Environment Variable: Quix__Consumer__Group. Default - "quixstreams-default" (set during init).

    NOTE: Quix Applications will prefix it with the Quix workspace id.

  • auto_offset_reset: Consumer auto.offset.reset setting
  • consumer_extra_config: A dictionary with additional options that will be passed to confluent_kafka.Consumer as is.
  • producer_extra_config: A dictionary with additional options that will be passed to confluent_kafka.Producer as is.
  • state_dir: path to the application state directory. Default - ".state".
  • rocksdb_options: RocksDB options. If None, the default options will be used.
  • consumer_poll_timeout: timeout for RowConsumer.poll(). Default - 1.0s
  • producer_poll_timeout: timeout for RowProducer.poll(). Default - 0s.
  • on_message_processed: a callback triggered when message is successfully processed.
  • loglevel: a log level for "quixstreams" logger. Should be a string or None. If None is passed, no logging will be configured. You may pass None and configure "quixstreams" logger externally using logging library. Default - "INFO".
  • auto_create_topics: Create all Topics made via Application.topic() Default - True
  • use_changelog_topics: Use changelog topics to back stateful operations Default - True
  • topic_manager: A QuixTopicManager instance
  • request_timeout: timeout (seconds) for REST-based requests
  • topic_create_timeout: timeout (seconds) for topic create finalization
  • processing_guarantee: Use "exactly-once" or "at-least-once" processing.

    Error Handlers
    To handle errors, Application accepts callbacks triggered when exceptions occur on different stages of stream processing. If the callback returns True, the exception will be ignored. Otherwise, the exception will be propagated and the processing will eventually stop.
  • on_consumer_error: triggered when internal RowConsumer fails to poll Kafka or cannot deserialize a message.
  • on_processing_error: triggered when exception is raised within StreamingDataFrame.process().
  • on_producer_error: triggered when RowProducer fails to serialize or to produce a message to Kafka.

    Quix Cloud Parameters
  • quix_config_builder: instance of QuixKafkaConfigsBuilder to be used instead of the default one.

Returns:

Application object

Application.topic

def topic(name: str,
          value_deserializer: DeserializerType = "json",
          key_deserializer: DeserializerType = "bytes",
          value_serializer: SerializerType = "json",
          key_serializer: SerializerType = "bytes",
          config: Optional[TopicConfig] = None,
          timestamp_extractor: Optional[TimestampExtractor] = None) -> Topic

[VIEW SOURCE]

Create a topic definition.

Allows you to specify serialization that should be used when consuming/producing to the topic in the form of a string name (i.e. "json" for JSON) or a serialization class instance directly, like JSONSerializer().

Example Snippet:

from quixstreams import Application

# Specify an input and output topic for a `StreamingDataFrame` instance,
# where the output topic requires adjusting the key serializer.

app = Application()
input_topic = app.topic("input-topic", value_deserializer="json")
output_topic = app.topic(
    "output-topic", key_serializer="str", value_serializer=JSONSerializer()
)
sdf = app.dataframe(input_topic)
sdf.to_topic(output_topic)

Arguments:

  • name: topic name

    NOTE: If the application is created via Quix.Application(), the topic name will be prefixed by Quix workspace id, and it will be <workspace_id>-<name>

  • value_deserializer: a deserializer type for values; default="json"
  • key_deserializer: a deserializer type for keys; default="bytes"
  • value_serializer: a serializer type for values; default="json"
  • key_serializer: a serializer type for keys; default="bytes"
  • config: optional topic configurations (for creation/validation)

    NOTE: will not create without Application's auto_create_topics set to True (is True by default)

  • timestamp_extractor: a callable that returns a timestamp in milliseconds from a deserialized message. Default - None.

Example Snippet:

app = Application(...)


def custom_ts_extractor(
    value: Any,
    headers: Optional[List[Tuple[str, bytes]]],
    timestamp: float,
    timestamp_type: TimestampType,
) -> int:
    return value["timestamp"]

topic = app.topic("input-topic", timestamp_extractor=custom_ts_extractor)

Returns:

Topic object

Application.dataframe

def dataframe(topic: Topic) -> StreamingDataFrame

[VIEW SOURCE]

A simple helper method that generates a StreamingDataFrame, which is used

to define your message processing pipeline.

See :class:quixstreams.dataframe.StreamingDataFrame for more details.

Example Snippet:

from quixstreams import Application

# Set up an `app = Application` and  `sdf = StreamingDataFrame`;
# add some operations to `sdf` and then run everything.

app = Application(broker_address='localhost:9092', consumer_group='group')
topic = app.topic('test-topic')
df = app.dataframe(topic)
df.apply(lambda value, context: print('New message', value)

app.run(dataframe=df)

Arguments:

  • topic: a quixstreams.models.Topic instance to be used as an input topic.

Returns:

StreamingDataFrame object

Application.stop

def stop(fail: bool = False)

[VIEW SOURCE]

Stop the internal poll loop and the message processing.

Only necessary when manually managing the lifecycle of the Application ( likely through some sort of threading).

To otherwise stop an application, either send a SIGTERM to the process (like Kubernetes does) or perform a typical KeyboardInterrupt (Ctrl+C).

Arguments:

  • fail: if True, signals that application is stopped due to unhandled exception, and it shouldn't commit the current checkpoint.

Application.get_producer

def get_producer() -> Producer

[VIEW SOURCE]

Create and return a pre-configured Producer instance. The Producer is initialized with params passed to Application.

It's useful for producing data to Kafka outside the standard Application processing flow, (e.g. to produce test data into a topic). Using this within the StreamingDataFrame functions is not recommended, as it creates a new Producer instance each time, which is not optimized for repeated use in a streaming pipeline.

Example Snippet:

from quixstreams import Application

app = Application.Quix(...)
topic = app.topic("input")

with app.get_producer() as producer:
    for i in range(100):
        producer.produce(topic=topic.name, key=b"key", value=b"value")

Application.get_consumer

def get_consumer(auto_commit_enable: bool = True) -> Consumer

[VIEW SOURCE]

Create and return a pre-configured Consumer instance. The Consumer is initialized with params passed to Application.

It's useful for consuming data from Kafka outside the standard Application processing flow. (e.g., to consume test data from a topic). Using it within the StreamingDataFrame functions is not recommended, as it creates a new Consumer instance each time, which is not optimized for repeated use in a streaming pipeline.

Note: By default, this consumer does not autocommit the consumed offsets to allow at-least-once processing. To store the offset call store_offsets() after processing a message. If autocommit is necessary set enable.auto.offset.store to True in the consumer config when creating the app.

Example Snippet:

from quixstreams import Application

app = Application.Quix(...)
topic = app.topic("input")

with app.get_consumer() as consumer:
    consumer.subscribe([topic.name])
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is not None:
            # Process message
            # Optionally commit the offset
            # consumer.store_offsets(msg)

Application.clear_state

def clear_state()

[VIEW SOURCE]

Clear the state of the application.

Application.run

def run(dataframe: StreamingDataFrame)

[VIEW SOURCE]

Start processing data from Kafka using provided StreamingDataFrame

Once started, it can be safely terminated with a SIGTERM signal (like Kubernetes does) or a typical KeyboardInterrupt (Ctrl+C).

Example Snippet:

from quixstreams import Application

# Set up an `app = Application` and  `sdf = StreamingDataFrame`;
# add some operations to `sdf` and then run everything.

app = Application(broker_address='localhost:9092', consumer_group='group')
topic = app.topic('test-topic')
df = app.dataframe(topic)
df.apply(lambda value, context: print('New message', value)

app.run(dataframe=df)

Arguments:

  • dataframe: instance of StreamingDataFrame

quixstreams.rowconsumer

RowConsumer

class RowConsumer(Consumer)

[VIEW SOURCE]

RowConsumer.__init__

def __init__(broker_address: Union[str, ConnectionConfig],
             consumer_group: str,
             auto_offset_reset: AutoOffsetReset,
             auto_commit_enable: bool = True,
             on_commit: Callable[[Optional[KafkaError], List[TopicPartition]],
                                 None] = None,
             extra_config: Optional[dict] = None,
             on_error: Optional[ConsumerErrorCallback] = None)

[VIEW SOURCE]

A consumer class that is capable of deserializing Kafka messages to Rows

according to the Topics deserialization settings.

It overrides .subscribe() method of Consumer class to accept Topic objects instead of strings.

Arguments:

  • broker_address: Connection settings for Kafka. Accepts string with Kafka broker host and port formatted as <host>:<port>, or a ConnectionConfig object if authentication is required.
  • consumer_group: Kafka consumer group. Passed as group.id to confluent_kafka.Consumer
  • auto_offset_reset: Consumer auto.offset.reset setting. Available values:
  • "earliest" - automatically reset the offset to the smallest offset
  • "latest" - automatically reset the offset to the largest offset
  • auto_commit_enable: If true, periodically commit offset of the last message handed to the application. Default - True.
  • on_commit: Offset commit result propagation callback. Passed as "offset_commit_cb" to confluent_kafka.Consumer.
  • extra_config: A dictionary with additional options that will be passed to confluent_kafka.Consumer as is. Note: values passed as arguments override values in extra_config.
  • on_error: a callback triggered when RowConsumer.poll_row fails. If consumer fails and the callback returns True, the exception will be logged but not propagated. The default callback logs an exception and returns False.

RowConsumer.subscribe

def subscribe(topics: List[Topic],
              on_assign: Optional[RebalancingCallback] = None,
              on_revoke: Optional[RebalancingCallback] = None,
              on_lost: Optional[RebalancingCallback] = None)

[VIEW SOURCE]

Set subscription to supplied list of topics.

This replaces a previous subscription.

This method also updates the internal mapping with topics that is used to deserialize messages to Rows.

Arguments:

  • topics: list of Topic instances to subscribe to.
  • on_assign (callable): callback to provide handling of customized offsets on completion of a successful partition re-assignment.
  • on_revoke (callable): callback to provide handling of offset commits to a customized store on the start of a rebalance operation.
  • on_lost (callable): callback to provide handling in the case the partition assignment has been lost. Partitions that have been lost may already be owned by other members in the group and therefore committing offsets, for example, may fail.

RowConsumer.poll_row

def poll_row(timeout: float = None) -> Union[Row, List[Row], None]

[VIEW SOURCE]

Consumes a single message and deserialize it to Row or a list of Rows.

The message is deserialized according to the corresponding Topic. If deserializer raises IgnoreValue exception, this method will return None. If Kafka returns an error, it will be raised as exception.

Arguments:

  • timeout: poll timeout seconds

Returns:

single Row, list of Rows or None

quixstreams.checkpointing.checkpoint

Checkpoint

class Checkpoint()

[VIEW SOURCE]

Class to keep track of state updates and consumer offsets and to checkpoint these updates on schedule.

Checkpoint.expired

def expired() -> bool

[VIEW SOURCE]

Returns True if checkpoint deadline has expired.

Checkpoint.empty

def empty() -> bool

[VIEW SOURCE]

Returns True if checkpoint doesn't have any offsets stored yet.

Checkpoint.store_offset

def store_offset(topic: str, partition: int, offset: int)

[VIEW SOURCE]

Store the offset of the processed message to the checkpoint.

Arguments:

  • topic: topic name
  • partition: partition number
  • offset: message offset

Checkpoint.get_store_transaction

def get_store_transaction(
        topic: str,
        partition: int,
        store_name: str = DEFAULT_STATE_STORE_NAME) -> PartitionTransaction

[VIEW SOURCE]

Get a PartitionTransaction for the given store, topic and partition.

It will return already started transaction if there's one.

Arguments:

  • topic: topic name
  • partition: partition number
  • store_name: store name

Returns:

instance of PartitionTransaction

Checkpoint.close

def close()

[VIEW SOURCE]

Perform cleanup (when the checkpoint is empty) instead of committing.

Needed for exactly-once, as Kafka transactions are timeboxed.

Checkpoint.commit

def commit()

[VIEW SOURCE]

Commit the checkpoint.

This method will: 1. Produce the changelogs for each state store 2. Flush the producer to ensure everything is delivered. 3. Commit topic offsets. 4. Flush each state store partition to the disk.

quixstreams.checkpointing

quixstreams.checkpointing.exceptions