Skip to content

Table of Contents

quixstreams

quixstreams.helpers.dotnet.datetimeconverter

DateTimeConverter Objects

class DateTimeConverter()

datetime_to_python

@staticmethod
def datetime_to_python(hptr: ctypes.c_void_p) -> datetime.datetime

Converts dotnet pointer to DateTime and frees the pointer.

Arguments:

  • hptr - Handler Pointer to .Net type DateTime

Returns:

datetime.datetime: Python type datetime

datetime_to_dotnet

@staticmethod
def datetime_to_dotnet(value: datetime.datetime) -> ctypes.c_void_p

Arguments:

  • value - Python type datetime

Returns:

ctypes.c_void_p: Handler Pointer to .Net type DateTime

timespan_to_python

@staticmethod
def timespan_to_python(uptr: ctypes.c_void_p) -> datetime.timedelta

Converts dotnet pointer to Timespan as binary and frees the pointer.

Arguments:

  • uptr - Pointer to .Net type TimeSpan

Returns:

datetime.timedelta: Python type timedelta

timedelta_to_dotnet

@staticmethod
def timedelta_to_dotnet(value: datetime.timedelta) -> ctypes.c_void_p

Arguments:

  • value - Python type timedelta

Returns:

ctypes.c_void_p: Pointer to unmanaged memory containing TimeSpan

quixstreams.helpers.timeconverter

TimeConverter Objects

class TimeConverter()

A utility class for converting between different time representations.

offset_from_utc

The local time ahead of utc by this amount of nanoseconds

to_unix_nanoseconds

@staticmethod
def to_unix_nanoseconds(value: datetime) -> int

Converts a datetime object to UNIX timestamp in nanoseconds.

Arguments:

  • value - The datetime object to be converted.

Returns:

  • int - The UNIX timestamp in nanoseconds.

to_nanoseconds

@staticmethod
def to_nanoseconds(value: timedelta) -> int

Converts a timedelta object to nanoseconds.

Arguments:

  • value - The timedelta object to be converted.

Returns:

  • int - The duration in nanoseconds.

from_nanoseconds

@staticmethod
def from_nanoseconds(value: int) -> timedelta

Converts a duration in nanoseconds to a timedelta object.

Arguments:

  • value - The duration in nanoseconds.

Returns:

  • timedelta - The corresponding timedelta object.

from_unix_nanoseconds

@staticmethod
def from_unix_nanoseconds(value: int) -> datetime

Converts a UNIX timestamp in nanoseconds to a datetime object.

Arguments:

  • value - The UNIX timestamp in nanoseconds.

Returns:

  • datetime - The corresponding datetime object.

from_string

@staticmethod
def from_string(value: str) -> int

Converts a string representation of a timestamp to a UNIX timestamp in nanoseconds.

Arguments:

  • value - The string representation of a timestamp.

Returns:

  • int - The corresponding UNIX timestamp in nanoseconds.

quixstreams.helpers.exceptionconverter

quixstreams.helpers.nativedecorator

quixstreams.helpers

quixstreams.helpers.enumconverter

quixstreams.raw.rawtopicproducer

RawTopicProducer Objects

@nativedecorator
class RawTopicProducer(object)

Class to produce raw messages into a Topic (capable of producing non-quixstreams messages)

__init__

def __init__(net_pointer: ctypes.c_void_p)

Initializes a new instance of the RawTopicProducer class.

Arguments:

  • net_pointer - Pointer to an instance of a .NET RawTopicProducer object.

publish

def publish(message: Union[RawMessage, bytes, bytearray])

Publishes the given message to the associated topic producer.

Arguments:

  • message - The message to be published, which can be either a RawMessage instance, bytes, or a bytearray.

quixstreams.raw.rawtopicconsumer

RawTopicConsumer Objects

@nativedecorator
class RawTopicConsumer(object)

Topic class to consume incoming raw messages (capable to consuming non-quixstreams messages).

__init__

def __init__(net_pointer: ctypes.c_void_p)

Initializes a new instance of RawTopicConsumer.

Notes:

Do not initialize this class manually, use KafkaStreamingClient.get_raw_topic_consumer.

Arguments:

  • net_pointer - Pointer to an instance of a .net RawTopicConsumer.

on_message_received

@property
def on_message_received() -> Callable[['RawTopicConsumer', RawMessage], None]

Gets the handler for when a topic receives a message.

Returns:

Callable[[RawTopicConsumer, RawMessage], None]: The event handler for when a topic receives a message. The first parameter is the RawTopicConsumer instance for which the message is received, and the second is the RawMessage.

on_message_received

@on_message_received.setter
def on_message_received(
        value: Callable[['RawTopicConsumer', RawMessage], None]) -> None

Sets the handler for when a topic receives a message.

Arguments:

  • value - The new event handler for when a topic receives a message. The first parameter is the RawTopicConsumer instance for which the message is received, and the second is the RawMessage.

on_error_occurred

@property
def on_error_occurred() -> Callable[['RawTopicConsumer', BaseException], None]

Gets the handler for when a stream experiences an exception during the asynchronous write process.

Returns:

Callable[[RawTopicConsumer, BaseException], None]: The event handler for when a stream experiences an exception during the asynchronous write process. The first parameter is the RawTopicConsumer instance for which the error is received, and the second is the exception.

on_error_occurred

@on_error_occurred.setter
def on_error_occurred(
        value: Callable[['RawTopicConsumer', BaseException], None]) -> None

Sets the handler for when a stream experiences an exception during the asynchronous write process.

Arguments:

  • value - The new handler for when a stream experiences an exception during the asynchronous write process. The first parameter is the RawTopicConsumer instance for which the error is received, and the second is the exception.

subscribe

def subscribe()

Starts subscribing to the streams.

quixstreams.raw.rawmessage

RawMessage Objects

@nativedecorator
class RawMessage(object)

The message consumed from topic without any transformation.

__init__

def __init__(data: Union[ctypes.c_void_p, bytes, bytearray])

Initializes a new instance of RawMessage.

Arguments:

  • data - The raw data to be stored in the message. Must be one of ctypes_c.void_p, bytes, or bytearray.

get_net_pointer

def get_net_pointer() -> ctypes.c_void_p

Gets the associated .net object pointer of the RawMessage instance.

Returns:

  • ctypes.c_void_p - The .net object pointer of the RawMessage instance.

key

@property
def key() -> bytes

Gets the optional key of the message. Depending on the broker and message, it is not guaranteed.

Returns:

  • bytes - The optional key of the message.

key

@key.setter
def key(value: Union[bytearray, bytes])

Sets the message key.

Arguments:

  • value - The key to set for the message.

value

@property
def value()

Gets the message value (bytes content of the message).

Returns:

Union[bytearray, bytes]: The message value (bytes content of the message).

value

@value.setter
def value(value: Union[bytearray, bytes])

Sets the message value (bytes content of the message).

Arguments:

  • value - The value to set for the message.

metadata

@property
def metadata() -> Dict[str, str]

Gets the wrapped message metadata.

Returns:

Dict[str, str]: The wrapped message metadata.

quixstreams.raw

quixstreams.configuration.securityoptions

SecurityOptions Objects

class SecurityOptions(object)

A class representing security options for configuring SSL encryption with SASL authentication in Kafka.

__init__

def __init__(ssl_certificates: str,
             username: str,
             password: str,
             sasl_mechanism: SaslMechanism = SaslMechanism.ScramSha256)

Initializes a new instance of SecurityOptions configured for SSL encryption with SASL authentication.

Arguments:

  • ssl_certificates - The path to the folder or file containing the certificate authority certificate(s) used to validate the SSL connection.
  • Example - "./certificates/ca.cert"
  • username - The username for SASL authentication.
  • password - The password for SASL authentication.
  • sasl_mechanism - The SASL mechanism to use. Defaults to ScramSha256.

get_net_pointer

def get_net_pointer()

Retrieves the .NET pointer for the current SecurityOptions instance.

Returns:

  • ctypes.c_void_p - The .NET pointer.

quixstreams.configuration.saslmechanism

quixstreams.configuration

quixstreams.quixstreamingclient

TokenValidationConfiguration Objects

@nativedecorator
class TokenValidationConfiguration(object)

__init__

def __init__(net_pointer: ctypes.c_void_p)

Initializes a new instance of TokenValidationConfiguration.

Arguments:

  • net_pointer - Pointer to an instance of a .NET TokenValidationConfiguration.

enabled

@property
def enabled() -> bool

Gets whether token validation and warnings are enabled. Defaults to true.

Returns:

  • bool - True if token validation and warnings are enabled, False otherwise.

enabled

@enabled.setter
def enabled(value: bool)

Sets whether token validation and warnings are enabled. Defaults to true.

Arguments:

  • value - True to enable token validation and warnings, False to disable.

warning_before_expiry

@property
def warning_before_expiry() -> Union[timedelta, None]

Gets the period within which, if the token expires, a warning will be displayed. Defaults to 2 days. Set to None to disable the check.

Returns:

Union[timedelta, None]: The period within which a warning will be displayed if the token expires or None if the check is disabled.

warning_before_expiry

@warning_before_expiry.setter
def warning_before_expiry(value: Union[timedelta, None])

Sets the period within which, if the token expires, a warning will be displayed. Defaults to 2 days. Set to None to disable the check.

Arguments:

  • value - The new period within which a warning will be displayed if the token expires or None to disable the check.

warn_about_pat_token

@property
def warn_about_pat_token() -> bool

Gets whether to warn if the provided token is not a PAT token. Defaults to true.

Returns:

  • bool - True if the warning is enabled, False otherwise.

warn_about_pat_token

@warn_about_pat_token.setter
def warn_about_pat_token(value: bool)

Sets whether to warn if the provided token is not a PAT token. Defaults to true.

Arguments:

  • value - True to enable the warning, False to disable.

get_net_pointer

def get_net_pointer() -> ctypes.c_void_p

Gets the associated .NET object pointer.

Returns:

  • ctypes.c_void_p - The .NET pointer

QuixStreamingClient Objects

class QuixStreamingClient(object)

Streaming client for Kafka configured automatically using Environment Variables and Quix platform endpoints. Use this Client when you use this library together with Quix platform.

__init__

def __init__(token: str = None,
             auto_create_topics: bool = True,
             properties: Dict[str, str] = None,
             debug: bool = False)

Initializes a new instance of the QuixStreamingClient capable of creating topic consumers and producers.

Arguments:

  • token - The token to use when talking to Quix. If not provided, the Quix__Sdk__Token environment variable will be used. Defaults to None.
  • auto_create_topics - Whether topics should be auto-created if they don't exist yet. Defaults to True.
  • properties - Additional broker properties. Defaults to None.
  • debug - Whether debugging should be enabled. Defaults to False.

get_topic_consumer

def get_topic_consumer(
    topic_id_or_name: str,
    consumer_group: str = None,
    commit_settings: Union[CommitOptions, CommitMode] = None,
    auto_offset_reset: AutoOffsetReset = AutoOffsetReset.Latest
) -> TopicConsumer

Opens a topic consumer capable of subscribing to receive incoming streams.

Arguments:

  • topic_id_or_name - ID or name of the topic. If name is provided, the workspace will be derived from the environment variable or token, in that order.
  • consumer_group - The consumer group ID to use for consuming messages. If None, the consumer group is not used, and only consuming new messages. Defaults to None.
  • commit_settings - The settings to use for committing. If not provided, defaults to committing every 5000 messages or 5 seconds, whichever is sooner.
  • auto_offset_reset - The offset to use when there is no saved offset for the consumer group. Defaults to AutoOffsetReset.Latest.

Returns:

  • TopicConsumer - An instance of TopicConsumer for the specified topic.

get_topic_producer

def get_topic_producer(topic_id_or_name: str) -> TopicProducer

Gets a topic producer capable of producing outgoing streams.

Arguments:

  • topic_id_or_name - ID or name of the topic. If name is provided, the workspace will be derived from the environment variable or token, in that order.

Returns:

  • TopicProducer - An instance of TopicProducer for the specified topic.

get_raw_topic_consumer

def get_raw_topic_consumer(
    topic_id_or_name: str,
    consumer_group: str = None,
    auto_offset_reset: Union[AutoOffsetReset,
                             None] = None) -> RawTopicConsumer

Gets a topic consumer for consuming raw data from the stream.

Arguments:

  • topic_id_or_name - ID or name of the topic. If name is provided, the workspace will be derived from the environment variable or token, in that order.
  • consumer_group - The consumer group ID to use for consuming messages. Defaults to None.
  • auto_offset_reset - The offset to use when there is no saved offset for the consumer group. Defaults to None.

Returns:

  • RawTopicConsumer - An instance of RawTopicConsumer for the specified topic.

get_raw_topic_producer

def get_raw_topic_producer(topic_id_or_name: str) -> RawTopicProducer

Gets a topic producer for producing raw data to the stream.

Arguments:

  • topic_id_or_name - ID or name of the topic. If name is provided, the workspace will be derived from the environment variable or token, in that order.

Returns:

  • RawTopicProducer - An instance of RawTopicProducer for the specified topic.

token_validation_config

@property
def token_validation_config() -> TokenValidationConfiguration

Gets the configuration for token validation.

Returns:

  • TokenValidationConfiguration - The current token validation configuration.

token_validation_config

@token_validation_config.setter
def token_validation_config(value: TokenValidationConfiguration)

Sets the configuration for token validation.

Arguments:

  • value - The new token validation configuration.

api_url

@property
def api_url() -> str

Gets the base API URI. Defaults to https://portal-api.platform.quix.ai, or environment variable Quix__Portal__Api if available.

Returns:

  • str - The current base API URI.

api_url

@api_url.setter
def api_url(value: str)

Sets the base API URI. Defaults to https://portal-api.platform.quix.ai, or environment variable Quix__Portal__Api if available.

Arguments:

  • value - The new base API URI.

cache_period

@property
def cache_period() -> timedelta

Gets the period for which some API responses will be cached to avoid an excessive amount of calls. Defaults to 1 minute.

Returns:

  • timedelta - The current cache period.

cache_period

@cache_period.setter
def cache_period(value: timedelta)

Sets the period for which some API responses will be cached to avoid an excessive amount of calls. Defaults to 1 minute.

Arguments:

  • value - The new cache period.

get_net_pointer

def get_net_pointer() -> ctypes.c_void_p

Gets the associated .NET object pointer.

Returns:

  • ctypes.c_void_p - The .NET pointer

quixstreams.topicproducer

TopicProducer Objects

@nativedecorator
class TopicProducer(object)

Interface to operate with the streaming platform for publishing messages

__init__

def __init__(net_pointer: ctypes.c_void_p)

Initializes a new instance of TopicProducer.

NOTE: Do not initialize this class manually, use KafkaStreamingClient.get_topic_producer to create it.

Arguments:

  • net_pointer - The .net object representing a StreamingClient.

on_disposed

@property
def on_disposed() -> Callable[['TopicProducer'], None]

Gets the handler for when the topic is disposed.

Returns:

Callable[[TopicProducer], None]: The event handler for topic disposal. The first parameter is the TopicProducer instance that got disposed.

on_disposed

@on_disposed.setter
def on_disposed(value: Callable[['TopicProducer'], None]) -> None

Sets the handler for when the topic is disposed.

Arguments:

  • value - The event handler for topic disposal. The first parameter is the TopicProducer instance that got disposed.

create_stream

def create_stream(stream_id: str = None) -> StreamProducer

Create a new stream and returns the related StreamProducer to operate it.

Arguments:

  • stream_id - Provide if you wish to overwrite the generated stream id. Useful if you wish to always stream a certain source into the same stream.

Returns:

  • StreamProducer - The created StreamProducer instance.

get_stream

def get_stream(stream_id: str) -> StreamProducer

Retrieves a stream that was previously created by this instance, if the stream is not closed.

Arguments:

  • stream_id - The id of the stream.

Returns:

  • StreamProducer - The retrieved StreamProducer instance or None if not found.

get_or_create_stream

def get_or_create_stream(
    stream_id: str,
    on_stream_created: Callable[[StreamProducer],
                                None] = None) -> StreamProducer

Retrieves a stream that was previously created by this instance if the stream is not closed, otherwise creates a new stream.

Arguments:

  • stream_id - The id of the stream you want to get or create.
  • on_stream_created - A callback function that takes a StreamProducer as a parameter.

Returns:

  • StreamProducer - The retrieved or created StreamProducer instance.

quixstreams.models.commitoptions

CommitOptions Objects

@nativedecorator
class CommitOptions(object)

__init__

def __init__(net_pointer: ctypes.c_void_p = None)

Initializes a new instance of CommitOptions

Arguments:

  • net_pointer: Pointer to an instance of a .net CommitOptions.

auto_commit_enabled

@property
def auto_commit_enabled() -> bool

Gets whether automatic committing is enabled. If automatic committing is not enabled, other values are ignored. Default is True.

auto_commit_enabled

@auto_commit_enabled.setter
def auto_commit_enabled(value: bool) -> None

Sets whether automatic committing is enabled. If automatic committing is not enabled, other values are ignored. Default is True.

commit_interval

@property
def commit_interval() -> Optional[int]

Gets the interval of automatic commit in ms. Default is 5000.

commit_interval

@commit_interval.setter
def commit_interval(value: Optional[int]) -> None

Sets the interval of automatic commit in ms. Default is 5000.

commit_every

@property
def commit_every() -> Optional[int]

Gets the number of messages to automatically commit at. Default is 5000.

commit_every

@commit_every.setter
def commit_every(value: Optional[int]) -> None

Sets the number of messages to automatically commit at. Default is 5000.

quixstreams.models.eventlevel

quixstreams.models.timeseriesdata

TimeseriesData Objects

@nativedecorator
class TimeseriesData(object)

Describes timeseries data for multiple timestamps.

__init__

def __init__(net_pointer: ctypes.c_void_p = None)

Initializes a new instance of TimeseriesData.

Arguments:

  • net_pointer - Pointer to an instance of a .net TimeseriesData.

clone

def clone(parameter_filter: Optional[List[str]] = None)

Initializes a new instance of timeseries data with parameters matching the filter if one is provided.

Arguments:

  • parameter_filter - The parameter filter. If one is provided, only parameters present in the list will be cloned.

Returns:

  • TimeseriesData - A new instance of TimeseriesData with filtered parameters.

add_timestamp

def add_timestamp(time: Union[datetime, timedelta]) -> TimeseriesDataTimestamp

Start adding a new set of parameters and their tags at the specified time.

Arguments:

  • time - The time to use for adding new event values. | datetime: The datetime to use for adding new event values. Epoch will never be added to this | timedelta: The time since the default epoch to add the event values at

Returns:

  • TimeseriesDataTimestamp - A new TimeseriesDataTimestamp instance.

add_timestamp_milliseconds

def add_timestamp_milliseconds(milliseconds: int) -> TimeseriesDataTimestamp

Start adding a new set of parameters and their tags at the specified time.

Arguments:

  • milliseconds - The time in milliseconds since the default epoch to add the event values at.

Returns:

  • TimeseriesDataTimestamp - A new TimeseriesDataTimestamp instance.

add_timestamp_nanoseconds

def add_timestamp_nanoseconds(nanoseconds: int) -> TimeseriesDataTimestamp

Start adding a new set of parameters and their tags at the specified time.

Arguments:

  • nanoseconds - The time in nanoseconds since the default epoch to add the event values at.

Returns:

  • TimeseriesDataTimestamp - A new TimeseriesDataTimestamp instance.

timestamps

@property
def timestamps() -> List[TimeseriesDataTimestamp]

Gets the data as rows of TimeseriesDataTimestamp.

Returns:

  • List[TimeseriesDataTimestamp] - A list of TimeseriesDataTimestamp instances.

timestamps

@timestamps.setter
def timestamps(timestamp_list: List[TimeseriesDataTimestamp]) -> None

Sets the data as rows of TimeseriesDataTimestamp.

Arguments:

  • timestamp_list - A list of TimeseriesDataTimestamp instances to set.

to_dataframe

def to_dataframe() -> pd.DataFrame

Converts TimeseriesData to pandas DataFrame.

Returns:

  • pd.DataFrame - Converted pandas DataFrame.

from_panda_dataframe

@staticmethod
def from_panda_dataframe(data_frame: pd.DataFrame,
                         epoch: int = 0) -> 'TimeseriesData'

Converts pandas DataFrame to TimeseriesData.

Arguments:

  • data_frame - The pandas DataFrame to convert to TimeseriesData.
  • epoch - The epoch to add to each time value when converting to TimeseriesData. Defaults to 0.

Returns:

  • TimeseriesData - Converted TimeseriesData instance.

get_net_pointer

def get_net_pointer() -> ctypes.c_void_p

Gets the .net pointer of the current instance.

Returns:

  • ctypes.c_void_p - The .net pointer of the current instance

quixstreams.models.eventdefinition

EventDefinition Objects

class EventDefinition(object)

Describes additional context for the event

__init__

def __init__(net_pointer: ctypes.c_void_p)

Initializes a new instance of EventDefinition

NOTE: Do not initialize this class manually. Instances of it are available on StreamEventsConsumer.definitions

Arguments:

  • net_pointer: Pointer to an instance of a .net EventDefinition

quixstreams.models.streamproducer.streameventsproducer

StreamEventsProducer Objects

@nativedecorator
class StreamEventsProducer(object)

Helper class for producing EventDefinitions and EventData.

__init__

def __init__(net_pointer: ctypes.c_void_p)

Initializes a new instance of StreamEventsProducer.

Arguments:

  • net_pointer - Pointer to an instance of a .NET StreamEventsProducer.

flush

def flush()

Immediately publishes the event definitions from the buffer without waiting for buffer condition to fulfill (200ms timeout). TODO: Verify 200ms timeout value.

default_tags

@property
def default_tags() -> Dict[str, str]

Gets default tags injected to all event values sent by the producer.

default_location

@property
def default_location() -> str

Gets the default Location of the events. Event definitions added with add_definition will be inserted at this location. See add_location for adding definitions at a different location without changing default. Example: "/Group1/SubGroup2"

default_location

@default_location.setter
def default_location(value: str)

Sets the default Location of the events. Event definitions added with add_definition will be inserted at this location. See add_location for adding definitions at a different location without changing default.

Arguments:

  • value - Location string, e.g., "/Group1/SubGroup2".

epoch

@property
def epoch() -> datetime

The unix epoch from, which all other timestamps in this model are measured from in nanoseconds.

epoch

@epoch.setter
def epoch(value: datetime)

Sets the default epoch used for event values.

publish

def publish(data: Union[EventData, pd.DataFrame], **columns) -> None

Publish an event into the stream.

Arguments:

  • data - EventData object or a pandas dataframe.
  • columns - Column names if the dataframe has different columns from 'id', 'timestamp', and 'value'. For instance, if 'id' is in the column 'event_id', id='event_id' must be passed as an argument.

Raises:

  • TypeError - If the data argument is neither an EventData nor pandas dataframe.

add_timestamp

def add_timestamp(time: Union[datetime, timedelta]) -> EventDataBuilder

Start adding a new set of event values at the given timestamp.

Arguments:

  • time - The time to use for adding new event values.
  • datetime: The datetime to use for adding new event values. NOTE, epoch is not used.
  • timedelta: The time since the default epoch to add the event values at

Returns:

  • EventDataBuilder - Event data builder to add event values at the provided time.

add_timestamp_milliseconds

def add_timestamp_milliseconds(milliseconds: int) -> EventDataBuilder

Start adding a new set of event values at the given timestamp.

Arguments:

  • milliseconds - The time in milliseconds since the default epoch to add the event values at.

Returns:

  • EventDataBuilder - Event data builder to add event values at the provided time.

add_timestamp_nanoseconds

def add_timestamp_nanoseconds(nanoseconds: int) -> EventDataBuilder

Start adding a new set of event values at the given timestamp.

Arguments:

  • nanoseconds - The time in nanoseconds since the default epoch to add the event values at.

Returns:

  • EventDataBuilder - Event data builder to add event values at the provided time.

add_definition

def add_definition(event_id: str,
                   name: str = None,
                   description: str = None) -> EventDefinitionBuilder

Add new Event definition to define properties like Name or Level, among others.

Arguments:

  • event_id - The id of the event. Must match the event id used to send data.
  • name - The human-friendly display name of the event.
  • description - The description of the event.

Returns:

  • EventDefinitionBuilder - EventDefinitionBuilder to define properties of the event or add additional events.

add_location

def add_location(location: str) -> EventDefinitionBuilder

Add a new location in the events groups hierarchy.

Arguments:

  • location - The group location.

Returns:

  • EventDefinitionBuilder - EventDefinitionBuilder to define the events under the specified location.

quixstreams.models.streamproducer.streamtimeseriesproducer

StreamTimeseriesProducer Objects

@nativedecorator
class StreamTimeseriesProducer(object)

Helper class for producing ParameterDefinition and TimeseriesData.

__init__

def __init__(stream_producer, net_pointer: ctypes.c_void_p)

Initializes a new instance of StreamTimeseriesProducer.

Arguments:

  • stream_producer - The Stream producer which owns this stream timeseries producer.
  • net_pointer - Pointer to an instance of a .net StreamTimeseriesProducer.

flush

def flush()

Immediately publish timeseries data and definitions from the buffer without waiting for buffer condition to fulfill for either.

add_definition

def add_definition(parameter_id: str,
                   name: str = None,
                   description: str = None) -> ParameterDefinitionBuilder

Add new parameter definition to the StreamTimeseriesProducer. Configure it with the builder methods.

Arguments:

  • parameter_id - The id of the parameter. Must match the parameter id used to send data.
  • name - The human friendly display name of the parameter.
  • description - The description of the parameter.

Returns:

  • ParameterDefinitionBuilder - Builder to define the parameter properties.

add_location

def add_location(location: str) -> ParameterDefinitionBuilder

Add a new location in the parameters groups hierarchy.

Arguments:

  • location - The group location.

Returns:

  • ParameterDefinitionBuilder - Builder to define the parameters under the specified location.

default_location

@property
def default_location() -> str

Gets the default location of the parameters. Parameter definitions added with add_definition will be inserted at this location. See add_location for adding definitions at a different location without changing default.

Returns:

  • str - The default location of the parameters, e.g., "/Group1/SubGroup2".

default_location

@default_location.setter
def default_location(value: str)

Sets the default location of the parameters. Parameter definitions added with add_definition will be inserted at this location. See add_location for adding definitions at a different location without changing default.

Arguments:

  • value - The new default location of the parameters, e.g., "/Group1/SubGroup2".

buffer

@property
def buffer() -> TimeseriesBufferProducer

Get the buffer for producing timeseries data.

Returns:

  • TimeseriesBufferProducer - The buffer for producing timeseries data.

publish

def publish(
    packet: Union[TimeseriesData, pd.DataFrame, TimeseriesDataRaw,
                  TimeseriesDataTimestamp]
) -> None

Publish the given packet to the stream without any buffering.

Arguments:

  • packet - The packet containing TimeseriesData, TimeseriesDataRaw, TimeseriesDataTimestamp, or pandas DataFrame.

Notes:

  • Pandas DataFrame should contain 'time' label, else the first integer label will be taken as time.
  • Tags should be prefixed by TAG__ or they will be treated as parameters.

Examples:

Send a pandas DataFrame: pdf = pandas.DataFrame({'time': [1, 5], - 'panda_param' - [123.2, 5]}) instance.publish(pdf)

Send a pandas DataFrame with multiple values: pdf = pandas.DataFrame({'time': [1, 5, 10], - 'panda_param' - [123.2, None, 12], - 'panda_param2' - ["val1", "val2", None]}) instance.publish(pdf)

Send a pandas DataFrame with tags: pdf = pandas.DataFrame({'time': [1, 5, 10], - 'panda_param' - [123.2, 5, 12], - 'TAG__Tag1' - ["v1", 2, None], - 'TAG__Tag2' - [1, None, 3]}) instance.publish(pdf)

Raises:

  • Exception - If the given type is not supported for publishing.

quixstreams.models.streamproducer.streampropertiesproducer

StreamPropertiesProducer Objects

@nativedecorator
class StreamPropertiesProducer(object)

Represents properties and metadata of the stream. All changes to these properties are automatically published to the underlying stream.

__init__

def __init__(net_pointer: ctypes.c_void_p)

Initializes a new instance of StreamPropertiesProducer.

Arguments:

  • net_pointer - Pointer to an instance of a .net StreamPropertiesProducer.

name

@property
def name() -> str

Gets the human friendly name of the stream.

Returns:

  • str - The human friendly name of the stream.

name

@name.setter
def name(value: str)

Sets the human friendly name of the stream.

Arguments:

  • value - The new human friendly name of the stream.

location

@property
def location() -> str

Gets the location of the stream in the data catalogue.

Returns:

  • str - The location of the stream in the data catalogue, e.g., "/cars/ai/carA/".

location

@location.setter
def location(value: str)

Sets the location of the stream in the data catalogue.

Arguments:

  • value - The new location of the stream in the data catalogue.

metadata

@property
def metadata() -> Dict[str, str]

" Gets the metadata of the stream.

Returns:

Dict[str, str]: The metadata of the stream.

parents

@property
def parents() -> List[str]

Gets the list of stream ids of the parent streams.

Returns:

  • List[str] - The list of stream ids of the parent streams.

time_of_recording

@property
def time_of_recording() -> datetime

Gets the datetime of the stream recording.

Returns:

  • datetime - The datetime of the stream recording.

time_of_recording

@time_of_recording.setter
def time_of_recording(value: datetime)

Sets the time of the stream recording.

Arguments:

  • value - The new time of the stream recording.

flush_interval

@property
def flush_interval() -> int

Gets the automatic flush interval of the properties metadata into the channel (in milliseconds).

Returns:

  • int - The automatic flush interval in milliseconds, default is 30000.

flush_interval

@flush_interval.setter
def flush_interval(value: int)

Sets the automatic flush interval of the properties metadata into the channel (in milliseconds).

Arguments:

  • value - The new flush interval in milliseconds.

flush

def flush()

Immediately publishes the properties yet to be sent instead of waiting for the flush timer (20ms).

quixstreams.models.streamproducer.timeseriesbufferproducer

TimeseriesBufferProducer Objects

@nativedecorator
class TimeseriesBufferProducer(TimeseriesBuffer)

A class for producing timeseries data to a StreamProducer in a buffered manner.

__init__

def __init__(stream_producer, net_pointer: ctypes.c_void_p)

Initializes a new instance of TimeseriesBufferProducer. NOTE: Do not initialize this class manually, use StreamTimeseriesProducer.buffer to access an instance of it

Arguments:

  • stream_producer - The Stream producer which owns this timeseries buffer producer
  • net_pointer - Pointer to an instance of a .net TimeseriesBufferProducer

Raises:

  • Exception - If TimeseriesBufferProducer is None

default_tags

@property
def default_tags() -> Dict[str, str]

Get default tags injected for all parameters values sent by this buffer.

Returns:

Dict[str, str]: A dictionary containing the default tags

epoch

@property
def epoch() -> datetime

Get the default epoch used for parameter values.

Returns:

  • datetime - The default epoch used for parameter values

epoch

@epoch.setter
def epoch(value: datetime)

Set the default epoch used for parameter values. Datetime added on top of all the Timestamps.

Arguments:

  • value - The default epoch to set for parameter values

add_timestamp

def add_timestamp(time: Union[datetime, timedelta]) -> TimeseriesDataBuilder

Start adding a new set of parameter values at the given timestamp.

Arguments:

  • time - The time to use for adding new parameter values.
  • datetime: The datetime to use for adding new parameter values. NOTE, epoch is not used
  • timedelta: The time since the default epoch to add the parameter values at

Returns:

  • TimeseriesDataBuilder - A TimeseriesDataBuilder instance for adding parameter values

Raises:

  • ValueError - If 'time' is None or not an instance of datetime or timedelta

add_timestamp_nanoseconds

def add_timestamp_nanoseconds(nanoseconds: int) -> TimeseriesDataBuilder

Start adding a new set of parameter values at the given timestamp.

Arguments:

  • nanoseconds - The time in nanoseconds since the default epoch to add the parameter values at

Returns:

  • TimeseriesDataBuilder - A TimeseriesDataBuilder instance for adding parameter values

flush

def flush()

Immediately publishes the data from the buffer without waiting for the buffer condition to be fulfilled.

publish

def publish(
    packet: Union[TimeseriesData, pd.DataFrame, TimeseriesDataRaw,
                  TimeseriesDataTimestamp]
) -> None

Publish the provided timeseries packet to the buffer.

Arguments:

  • packet - The packet containing TimeseriesData, TimeseriesDataRaw, TimeseriesDataTimestamp, or pandas DataFrame.
  • packet type panda.DataFrame:
  • Note 1: panda data frame should contain 'time' label, else the first integer label will be taken as time.
  • Note 2: Tags should be prefixed by TAG__ or they will be treated as timeseries parameters

Examples:

Send a panda data frame: pdf = panda.DataFrame({'time': [1, 5], - 'panda_param' - [123.2, 5]})

instance.publish(pdf)

Send a panda data frame with multiple values: pdf = panda.DataFrame({'time': [1, 5, 10], - 'panda_param' - [123.2, None, 12], - 'panda_param2' - ["val1", "val2", None]})

instance.publish(pdf)

Send a panda data frame with tags: pdf = panda.DataFrame({'time': [1, 5, 10], - 'panda_param' - [123.2, 5, 12],, - 'TAG__Tag1' - ["v1", 2, None], - 'TAG__Tag2' - [1, None, 3]})

instance.publish(pdf)

Raises:

  • Exception - If the packet type is not supported

quixstreams.models.streamproducer

quixstreams.models.parameterdefinition

ParameterDefinition Objects

class ParameterDefinition(object)

Describes additional context for the parameter

__init__

def __init__(net_pointer: ctypes.c_void_p)

Initializes a new instance of ParameterDefinition

NOTE: Do not initialize this class manually. Instances of it are available on StreamTimeseriesConsumer.definitions

Arguments:

  • net_pointer: Pointer to an instance of a .net ParameterDefinition.

quixstreams.models.parametervalue

ParameterValue Objects

@nativedecorator
class ParameterValue(object)

Represents a single parameter value of either numeric, string, or binary type.

__init__

def __init__(net_pointer: ctypes.c_void_p)

Initializes a new instance of ParameterValue.

Arguments:

  • net_pointer - The .net object pointer representing ParameterValue.

numeric_value

@property
def numeric_value() -> float

Gets the numeric value of the parameter if the underlying parameter is of numeric type.

numeric_value

@numeric_value.setter
def numeric_value(value: float)

Sets the numeric value of the parameter and updates the type to numeric.

Arguments:

  • value - The numeric value to set.

string_value

@property
def string_value() -> str

Gets the string value of the parameter if the underlying parameter is of string type.

string_value

@string_value.setter
def string_value(value: str)

Sets the string value of the parameter and updates the type to string.

Arguments:

  • value - The string value to set.

binary_value

@property
def binary_value() -> bytes

Gets the binary value of the parameter if the underlying parameter is of binary type.

binary_value

@binary_value.setter
def binary_value(value: Union[bytearray, bytes])

Sets the binary value of the parameter and updates the type to binary.

Arguments:

  • value - The binary value to set.

type

@property
def type() -> ParameterValueType

Gets the type of value, which is numeric, string, binary if set, otherwise empty

value

@property
def value()

Gets the underlying value.

get_net_pointer

def get_net_pointer() -> ctypes.c_void_p

Gets the associated .net object pointer.

quixstreams.models.timeseriesdataraw

TimeseriesDataRaw Objects

@nativedecorator
class TimeseriesDataRaw(object)

Describes timeseries data in a raw format for multiple timestamps. Class is intended for read only.

__init__

def __init__(net_pointer: ctypes.c_void_p = None)

Initializes a new instance of TimeseriesDataRaw.

Arguments:

  • net_pointer - Pointer to an instance of a .net TimeseriesDataRaw. Defaults to None.

to_dataframe

def to_dataframe() -> pd.DataFrame

Converts TimeseriesDataRaw to pandas DataFrame.

Returns:

  • pd.DataFrame - Converted pandas DataFrame.

from_dataframe

@staticmethod
def from_dataframe(data_frame: pd.DataFrame,
                   epoch: int = 0) -> 'TimeseriesDataRaw'

Converts from pandas DataFrame to TimeseriesDataRaw.

Arguments:

  • data_frame - The pandas DataFrame to convert to TimeseriesData.
  • epoch - The epoch to add to each time value when converting to TimeseriesData. Defaults to 0.

Returns:

  • TimeseriesDataRaw - Converted TimeseriesDataRaw.

set_values

def set_values(epoch: int, timestamps: List[int],
               numeric_values: Dict[str, List[float]],
               string_values: Dict[str, List[str]],
               binary_values: Dict[str,
                                   List[bytes]], tag_values: Dict[str,
                                                                  List[str]])

Sets the values of the timeseries data from the provided dictionaries. Dictionary values are matched by index to the provided timestamps.

Arguments:

  • epoch - The time from which all timestamps are measured from.
  • timestamps - The timestamps of values in nanoseconds since epoch as an array.
  • numeric_values - The numeric values where the dictionary key is the parameter name and the value is the array of values.
  • string_values - The string values where the dictionary key is the parameter name and the value is the array of values.
  • binary_values - The binary values where the dictionary key is the parameter name and the value is the array of values.
  • tag_values - The tag values where the dictionary key is the parameter name and the value is the array of values.

epoch

@property
def epoch() -> int

The Unix epoch from which all other timestamps in this model are measured, in nanoseconds.

Returns:

  • int - The Unix epoch (01/01/1970) in nanoseconds.

timestamps

@property
def timestamps() -> List[int]

The timestamps of values in nanoseconds since the epoch. Timestamps are matched by index to numeric_values, string_values, binary_values, and tag_values.

Returns:

  • List[int] - A list of timestamps in nanoseconds since the epoch.

numeric_values

@property
def numeric_values() -> Dict[str, List[Optional[float]]]

The numeric values for parameters. The key is the parameter ID the values belong to. The value is the numerical values of the parameter. Values are matched by index to timestamps.

Returns:

Dict[str, List[Optional[float]]]: A dictionary mapping parameter IDs to lists of numerical values.

string_values

@property
def string_values() -> Dict[str, List[str]]

The string values for parameters. The key is the parameter ID the values belong to. The value is the string values of the parameter. Values are matched by index to timestamps.

Returns:

Dict[str, List[str]]: A dictionary mapping parameter IDs to lists of string values

binary_values

@property
def binary_values() -> Dict[str, List[bytes]]

The binary values for parameters. The key is the parameter ID the values belong to. The value is the binary values of the parameter. Values are matched by index to timestamps.

Returns:

Dict[str, List[bytes]]: A dictionary mapping parameter IDs to lists of bytes values

tag_values

@property
def tag_values() -> Dict[str, List[str]]

The tag values for parameters. The key is the parameter ID the values belong to. The value is the tag values of the parameter. Values are matched by index to timestamps.

Returns:

Dict[str, List[str]]: A dictionary mapping parameter IDs to lists of string values

convert_to_timeseriesdata

def convert_to_timeseriesdata() -> TimeseriesData

Converts TimeseriesDataRaw to TimeseriesData

quixstreams.models.timeseriesbuffer

TimeseriesBuffer Objects

@nativedecorator
class TimeseriesBuffer(object)

Represents a class used to consume and produce stream messages in a buffered manner. When buffer conditions are not configured, it acts a pass-through, raising each message as arrives.

__init__

def __init__(stream, net_pointer: ctypes.c_void_p)

Initializes a new instance of TimeseriesBuffer.

NOTE: Do not initialize this class manually, use StreamProducer.timeseries.buffer to create it.

Arguments:

  • stream - The stream the buffer is created for.
  • net_pointer - Pointer to a .net TimeseriesBuffer object.

on_data_released

@property
def on_data_released() -> Callable[
    [Union['StreamConsumer', 'StreamProducer'], TimeseriesData], None]

Gets the handler for when the stream receives data.

Returns:

Callable[[Union['StreamConsumer', 'StreamProducer'], TimeseriesData], None]: The event handler. The first parameter is the stream the data is received for, second is the data in TimeseriesData format.

on_data_released

@on_data_released.setter
def on_data_released(
    value: Callable[
        [Union['StreamConsumer', 'StreamProducer'], TimeseriesData], None]
) -> None

Sets the handler for when the stream receives data.

Arguments:

  • value - The event handler. The first parameter is the stream the data is received for, second is the data in TimeseriesData format.

on_raw_released

@property
def on_raw_released() -> Callable[
    [Union['StreamConsumer', 'StreamProducer'], TimeseriesDataRaw], None]

Gets the handler for when the stream receives raw data.

Returns:

Callable[[Union['StreamConsumer', 'StreamProducer'], TimeseriesDataRaw], None]: The event handler. The first parameter is the stream the data is received for, second is the data in TimeseriesDataRaw format.

on_raw_released

@on_raw_released.setter
def on_raw_released(
    value: Callable[
        [Union['StreamConsumer', 'StreamProducer'], TimeseriesDataRaw], None]
) -> None

Sets the handler for when the stream receives raw data.

Arguments:

  • value - The event handler. The first parameter is the stream the data is received for, second is the data in TimeseriesDataRaw format.

on_dataframe_released

@property
def on_dataframe_released() -> Callable[
    [Union['StreamConsumer', 'StreamProducer'], pandas.DataFrame], None]

Gets the handler for when the stream receives data as a pandas DataFrame.

Returns:

Callable[[Union['StreamConsumer', 'StreamProducer'], pandas.DataFrame], None]: The event handler. The first parameter is the stream the data is received for, second is the data in pandas.DataFrame format.

on_dataframe_released

@on_dataframe_released.setter
def on_dataframe_released(
    value: Callable[
        [Union['StreamConsumer', 'StreamProducer'], pandas.DataFrame], None]
) -> None

Sets the handler for when the stream receives data as a pandas DataFrame.

Arguments:

  • value - The event handler. The first parameter is the stream the data is received for, second is the data in pandas.DataFrame format.

filter

@property
def filter() -> Callable[[TimeseriesDataTimestamp], bool]

Gets the custom function to filter the incoming data before adding it to the buffer. If returns true, data is added otherwise not. Defaults to none (disabled).

filter

@filter.setter
def filter(value: Callable[[TimeseriesDataTimestamp], bool])

Sets the custom function to filter incoming data before adding to the buffer.

The custom function takes a TimeseriesDataTimestamp object as input and returns a boolean value. If the function returns True, the data is added to the buffer, otherwise not. By default, this feature is disabled (None).

Arguments:

  • value - Custom filter function.

custom_trigger

@property
def custom_trigger() -> Callable[[TimeseriesData], bool]

Gets the custom trigger function, which is invoked after adding a new timestamp to the buffer.

If the custom trigger function returns True, the buffer releases content and triggers relevant callbacks. By default, this feature is disabled (None).

Returns:

Callable[[TimeseriesData], bool]: Custom trigger function.

custom_trigger

@custom_trigger.setter
def custom_trigger(value: Callable[[TimeseriesData], bool])

Sets the custom trigger function, which is invoked after adding a new timestamp to the buffer.

If the custom trigger function returns True, the buffer releases content and triggers relevant callbacks. By default, this feature is disabled (None).

Arguments:

  • value - Custom trigger function.

packet_size

@property
def packet_size() -> Optional[int]

Gets the maximum packet size in terms of values for the buffer.

Each time the buffer has this amount of data, a callback method, such as on_data_released, is invoked, and the data is cleared from the buffer. By default, this feature is disabled (None).

Returns:

  • Optional[int] - Maximum packet size for the buffer.

packet_size

@packet_size.setter
def packet_size(value: Optional[int])

Sets the maximum packet size in terms of values for the buffer.

Each time the buffer has this amount of data, a callback method, such as on_data_released, is invoked, and the data is cleared from the buffer. By default, this feature is disabled (None).

Arguments:

  • value - Maximum packet size for the buffer.

time_span_in_nanoseconds

@property
def time_span_in_nanoseconds() -> Optional[int]

Gets the maximum time between timestamps for the buffer in nanoseconds.

When the difference between the earliest and latest buffered timestamp surpasses this number, a callback method, such as on_data_released, is invoked, and the data is cleared from the buffer. By default, this feature is disabled (None).

Returns:

  • Optional[int] - Maximum time between timestamps in nanoseconds.

time_span_in_nanoseconds

@time_span_in_nanoseconds.setter
def time_span_in_nanoseconds(value: Optional[int])

Sets the maximum time between timestamps for the buffer in nanoseconds.

When the difference between the earliest and latest buffered timestamp surpasses this number, a callback method, such as on_data_released, is invoked, and the data is cleared from the buffer. By default, this feature is disabled (None).

Arguments:

  • value - Maximum time between timestamps in nanoseconds.

time_span_in_milliseconds

@property
def time_span_in_milliseconds() -> Optional[int]

Gets the maximum time between timestamps for the buffer in milliseconds.

This property retrieves the maximum time between the earliest and latest buffered timestamp in milliseconds. If the difference surpasses this number, a callback method, such as on_data_released, is invoked, and the data is cleared from the buffer. Note that this property is a millisecond converter on top of time_span_in_nanoseconds, and both work with the same underlying value. Defaults to None (disabled).

Returns:

  • Optional[int] - The maximum time difference between timestamps in milliseconds, or None if disabled.

time_span_in_milliseconds

@time_span_in_milliseconds.setter
def time_span_in_milliseconds(value: Optional[int])

Sets the maximum time between timestamps for the buffer in milliseconds.

This property sets the maximum time between the earliest and latest buffered timestamp in milliseconds. If the difference surpasses this number, a callback method, such as on_data_released, is invoked, and the data is cleared from the buffer. Note that this property is a millisecond converter on top of time_span_in_nanoseconds, and both work with the same underlying value. Defaults to None (disabled).

Arguments:

  • value - The maximum time difference between timestamps in milliseconds, or None to disable.

buffer_timeout

@property
def buffer_timeout() -> Optional[int]

Gets the maximum duration in milliseconds for which the buffer will be held. When the configured value has elapsed or other buffer conditions are met, a callback method, such as on_data_released, is invoked. Defaults to None (disabled).

Returns:

  • Optional[int] - The maximum duration in milliseconds before invoking a callback method, or None if disabled.

buffer_timeout

@buffer_timeout.setter
def buffer_timeout(value: Optional[int])

Sets the maximum duration in milliseconds for which the buffer will be held. When the configured value has elapsed or other buffer conditions are met, a callback method, such as on_data_released, is invoked. Defaults to None (disabled).

Arguments:

  • value - The maximum duration in milliseconds before invoking a callback method, or None to disable.

get_net_pointer

def get_net_pointer() -> ctypes.c_void_p

Gets the associated .net object pointer.

Returns:

  • ctypes.c_void_p - The .net object pointer.

quixstreams.models.streampackage

StreamPackage Objects

@nativedecorator
class StreamPackage(object)

Default model implementation for non-typed message packages of the Telemetry layer. It holds a value and its type.

__init__

def __init__(net_pointer: ctypes.c_void_p)

Initializes a new instance of StreamPackage.

Notes:

Do not initialize this class manually. Will be initialized by StreamConsumer.on_package_received.

Arguments:

  • net_pointer - Pointer to an instance of a .net StreamPackage.

transport_context

@property
def transport_context() -> Dict[str, str]

Context holder for package when transporting through the pipeline.

to_json

def to_json() -> str

Serialize the package into JSON.

Returns:

  • str - The serialized JSON string of the package.

get_net_pointer

def get_net_pointer() -> ctypes.c_void_p

Gets the associated .net object pointer.

Returns:

  • ctypes.c_void_p - The .net object pointer.

quixstreams.models.codecsettings

CodecSettings Objects

class CodecSettings(object)

Global Codec settings for streams.

set_global_codec_type

@staticmethod
def set_global_codec_type(codec_type: CodecType)

Sets the codec type to be used by producers and transfer package value serialization.

quixstreams.models.streamconsumer.streamtimeseriesconsumer

StreamTimeseriesConsumer Objects

@nativedecorator
class StreamTimeseriesConsumer(object)

Consumer for streams, which raises TimeseriesData and ParameterDefinitions related messages

__init__

def __init__(stream_consumer, net_pointer: ctypes.c_void_p)

Initializes a new instance of StreamTimeseriesConsumer. NOTE: Do not initialize this class manually. Use StreamConsumer.timeseries to access an instance of it.

Arguments:

  • stream_consumer - The Stream consumer which owns this stream event consumer.
  • net_pointer .net object - Pointer to an instance of a .net StreamTimeseriesConsumer.

on_data_received

@property
def on_data_received() -> Callable[['StreamConsumer', TimeseriesData], None]

Gets the handler for when data is received (without buffering).

Returns:

Callable[['StreamConsumer', TimeseriesData], None]: The function that handles the data received. The first parameter is the stream that receives the data, and the second is the data in TimeseriesData format.

on_data_received

@on_data_received.setter
def on_data_received(
        value: Callable[['StreamConsumer', TimeseriesData], None]) -> None

Sets the handler for when data is received (without buffering).

Arguments:

  • value - The function that handles the data received. The first parameter is the stream that receives the data, and the second is the data in TimeseriesData format.

on_raw_received

@property
def on_raw_received() -> Callable[['StreamConsumer', TimeseriesDataRaw], None]

Gets the handler for when data is received (without buffering) in raw transport format.

Returns:

Callable[['StreamConsumer', TimeseriesDataRaw], None]: The function that handles the data received. The first parameter is the stream that receives the data, and the second is the data in TimeseriesDataRaw format.

on_raw_received

@on_raw_received.setter
def on_raw_received(
        value: Callable[['StreamConsumer', TimeseriesDataRaw], None]) -> None

Sets the handler for when data is received (without buffering) in raw transport format.

Arguments:

  • value - The function that handles the data received. The first parameter is the stream that receives the data, and the second is the data in TimeseriesDataRaw format.

on_dataframe_received

@property
def on_dataframe_received(
) -> Callable[['StreamConsumer', pandas.DataFrame], None]

Gets the handler for when data is received (without buffering) in pandas DataFrame format.

Returns:

Callable[['StreamConsumer', pandas.DataFrame], None]: The function that handles the data received. The first parameter is the stream that receives the data, and the second is the data in pandas DataFrame format.

on_dataframe_received

@on_dataframe_received.setter
def on_dataframe_received(
        value: Callable[['StreamConsumer', pandas.DataFrame], None]) -> None

Sets the handler for when data is received (without buffering) in pandas DataFrame format.

Arguments:

  • value - The function that handles the data received. The first parameter is the stream that receives the data, and the second is the data in pandas DataFrame format.

on_definitions_changed

@property
def on_definitions_changed() -> Callable[['StreamConsumer'], None]

Gets the handler for when the parameter definitions have changed for the stream.

Returns:

Callable[['StreamConsumer'], None]: The function that handles the parameter definitions change. The first parameter is the stream for which the parameter definitions changed.

on_definitions_changed

@on_definitions_changed.setter
def on_definitions_changed(value: Callable[['StreamConsumer'], None]) -> None

Sets the handler for when the parameter definitions have changed for the stream.

Arguments:

  • value - The function that handles the parameter definitions change. The first parameter is the stream for which the parameter definitions changed.

definitions

@property
def definitions() -> List[ParameterDefinition]

Gets the latest set of parameter definitions.

create_buffer

def create_buffer(
    *parameter_filter: str,
    buffer_configuration: TimeseriesBufferConfiguration = None
) -> TimeseriesBufferConsumer

Creates a new buffer for consuming data according to the provided parameter_filter and buffer_configuration.

Arguments:

  • parameter_filter - Zero or more parameter identifiers to filter as a whitelist. If provided, only those parameters will be available through this buffer.
  • buffer_configuration - An optional TimeseriesBufferConfiguration.

Returns:

  • TimeseriesBufferConsumer - An consumer that will raise new data consumed via the on_data_released event.

get_net_pointer

def get_net_pointer() -> ctypes.c_void_p

Gets the .NET pointer for the StreamTimeseriesConsumer instance.

Returns:

  • ctypes.c_void_p - .NET pointer for the StreamTimeseriesConsumer instance.

quixstreams.models.streamconsumer.streampropertiesconsumer

StreamPropertiesConsumer Objects

@nativedecorator
class StreamPropertiesConsumer(object)

Represents properties and metadata of the stream. All changes to these properties are automatically populated to this class.

__init__

def __init__(stream_consumer: 'StreamConsumer', net_pointer: ctypes.c_void_p)

Initializes a new instance of StreamPropertiesConsumer. NOTE: Do not initialize this class manually, use StreamConsumer.properties to access an instance of it.

Arguments:

  • stream_consumer - The Stream consumer that owns this stream event consumer.
  • net_pointer - Pointer to an instance of a .NET StreamPropertiesConsumer.

on_changed

@property
def on_changed() -> Callable[['StreamConsumer'], None]

Gets the handler for when the stream properties change.

Returns:

Callable[[StreamConsumer], None]: The event handler for stream property changes. The first parameter is the StreamConsumer instance for which the change is invoked.

on_changed

@on_changed.setter
def on_changed(value: Callable[['StreamConsumer'], None]) -> None

Sets the handler for when the stream properties change.

Arguments:

  • value - The first parameter is the stream it is invoked for.

name

@property
def name() -> str

Gets the name of the stream.

location

@property
def location() -> str

Gets the location of the stream.

time_of_recording

@property
def time_of_recording() -> datetime

Gets the datetime of the recording.

metadata

@property
def metadata() -> Dict[str, str]

Gets the metadata of the stream.

parents

@property
def parents() -> List[str]

Gets the list of Stream IDs for the parent streams.

get_net_pointer

def get_net_pointer() -> ctypes.c_void_p

Gets the .NET pointer for the StreamPropertiesConsumer instance.

Returns:

  • ctypes.c_void_p - .NET pointer for the StreamPropertiesConsumer instance.

quixstreams.models.streamconsumer.streameventsconsumer

StreamEventsConsumer Objects

@nativedecorator
class StreamEventsConsumer(object)

Consumer for streams, which raises EventData and EventDefinitions related messages

__init__

def __init__(stream_consumer, net_pointer: ctypes.c_void_p)

Initializes a new instance of StreamEventsConsumer. NOTE: Do not initialize this class manually, use StreamConsumer.events to access an instance of it

Arguments:

  • stream_consumer - The Stream consumer which owns this stream event consumer
  • net_pointer - Pointer to an instance of a .net StreamEventsConsumer

on_data_received

@property
def on_data_received() -> Callable[['StreamConsumer', EventData], None]

Gets the handler for when an events data package is received for the stream.

Returns:

Callable[['StreamConsumer', EventData], None]: The first parameter is the stream the event is received for. The second is the event.

on_data_received

@on_data_received.setter
def on_data_received(
        value: Callable[['StreamConsumer', EventData], None]) -> None

Sets the handler for when an events data package is received for the stream.

Arguments:

  • value - The first parameter is the stream the event is received for. The second is the event.

on_definitions_changed

@property
def on_definitions_changed() -> Callable[['StreamConsumer'], None]

Gets the handler for event definitions have changed for the stream.

Returns:

Callable[['StreamConsumer'], None]: The first parameter is the stream the event definitions changed for.

on_definitions_changed

@on_definitions_changed.setter
def on_definitions_changed(value: Callable[['StreamConsumer'], None]) -> None

Sets the handler for event definitions have changed for the stream.

Arguments:

  • value - The first parameter is the stream the event definitions changed for.

definitions

@property
def definitions() -> List[EventDefinition]

Gets the latest set of event definitions.

quixstreams.models.streamconsumer.timeseriesbufferconsumer

TimeseriesBufferConsumer Objects

class TimeseriesBufferConsumer(TimeseriesBuffer)

Represents a class for consuming data from a stream in a buffered manner.

__init__

def __init__(stream_consumer, net_pointer: ctypes.c_void_p = None)

Initializes a new instance of TimeseriesBufferConsumer.

NOTE: Do not initialize this class manually, use StreamTimeseriesConsumer.create_buffer to create it.

Arguments:

  • stream_consumer - The Stream consumer which owns this timeseries buffer consumer.
  • net_pointer - Pointer to an instance of a .net TimeseriesBufferConsumer. Defaults to None.

Raises:

  • Exception - If net_pointer is None.

get_net_pointer

def get_net_pointer() -> ctypes.c_void_p

Retrieves the pointer to the .net TimeseriesBufferConsumer instance.

Returns:

  • ctypes.c_void_p - The pointer to the .net TimeseriesBufferConsumer instance.

quixstreams.models.streamconsumer

quixstreams.models.commitmode

quixstreams.models.codectype

CodecType Objects

class CodecType(Enum)

Codecs available for serialization and deserialization of streams.

quixstreams.models.netlist

NetReadOnlyList Objects

class NetReadOnlyList(object)

Experimental. Acts as a proxy between a .net collection and a python list. Useful if .net collection is observable and reacts to changes

NetList Objects

class NetList(NetReadOnlyList)

Experimental. Acts as a proxy between a .net collection and a python list. Useful if .net collection is observable and reacts to changes

constructor_for_string

@staticmethod
def constructor_for_string(net_pointer=None)

Creates an empty dotnet list for strings if no pointer provided, else wraps in NetDict with string converters

quixstreams.models.timeseriesbufferconfiguration

TimeseriesBufferConfiguration Objects

@nativedecorator
class TimeseriesBufferConfiguration(object)

Describes the configuration for timeseries buffers When buffer conditions are not configured, it acts a pass-through, raising each message as arrives.

__init__

def __init__(net_pointer: ctypes.c_void_p = None)

Initializes a new instance of TimeseriesBufferConfiguration.

Arguments:

  • net_pointer - Can be ignored, here for internal purposes .net object: The .net object representing a TimeseriesBufferConfiguration. Defaults to None.

packet_size

@property
def packet_size() -> Optional[int]

Gets the maximum packet size in terms of values for the buffer.

When the buffer reaches this number of values, a callback method, such as on_data_released, is invoked and the buffer is cleared. If not set, defaults to None (disabled).

Returns:

  • Optional[int] - The maximum packet size in values or None if disabled.

packet_size

@packet_size.setter
def packet_size(value: Optional[int])

Sets the maximum packet size in terms of values for the buffer.

When the buffer reaches this number of values, a callback method, such as on_data_released, is invoked and the buffer is cleared. If not set, defaults to None (disabled).

Arguments:

  • value - The maximum packet size in values or None to disable.

time_span_in_nanoseconds

@property
def time_span_in_nanoseconds() -> Optional[int]

Gets the maximum time difference between timestamps in the buffer, in nanoseconds.

When the difference between the earliest and latest buffered timestamp exceeds this value, a callback method, such as on_data_released, is invoked and the data is cleared from the buffer. If not set, defaults to None (disabled).

Returns:

  • Optional[int] - The maximum time span in nanoseconds or None if disabled.

time_span_in_nanoseconds

@time_span_in_nanoseconds.setter
def time_span_in_nanoseconds(value: Optional[int])

Sets the maximum time difference between timestamps in the buffer, in nanoseconds.

When the difference between the earliest and latest buffered timestamp exceeds this value, a callback method, such as on_data_released, is invoked and the data is cleared from the buffer. If not set, defaults to None (disabled).

Arguments:

  • value - The maximum time span in nanoseconds or None to disable.

time_span_in_milliseconds

@property
def time_span_in_milliseconds() -> Optional[int]

Gets the maximum time difference between timestamps in the buffer, in milliseconds.

When the difference between the earliest and latest buffered timestamp exceeds this value, a callback method, such as on_data_released, is invoked and the data is cleared from the buffer. If not set, defaults to None (disabled).

Note: This is a millisecond converter on top of time_span_in_nanoseconds. They both work with the same underlying value.

Returns:

  • Optional[int] - The maximum time span in milliseconds or None if disabled.

time_span_in_milliseconds

@time_span_in_milliseconds.setter
def time_span_in_milliseconds(value: Optional[int])

Sets the maximum time difference between timestamps in the buffer, in milliseconds.

When the difference between the earliest and latest buffered timestamp exceeds this value, a callback method, such as on_data_released, is invoked and the data is cleared from the buffer. If not set, defaults to None (disabled).

Arguments:

  • value - The maximum time span in nanoseconds or None to disable.

buffer_timeout

@property
def buffer_timeout() -> Optional[int]

Gets the maximum duration for which the buffer will be held before releasing the events through callbacks, such as on_data_released.

A callback will be invoked once the configured value has elapsed or other buffer conditions are met. If not set, defaults to None (disabled).

Returns:

  • Optional[int] - The maximum buffer timeout in milliseconds or None if disabled.

buffer_timeout

@buffer_timeout.setter
def buffer_timeout(value: Optional[int])

Sets the maximum duration for which the buffer will be held before releasing the events through callbacks, such as on_data_released.

A callback will be invoked once the configured value has elapsed or other buffer conditions are met. If not set, defaults to None (disabled).

Arguments:

  • value - The maximum buffer timeout in milliseconds or None to disable.

custom_trigger_before_enqueue

@property
def custom_trigger_before_enqueue(
) -> Callable[[TimeseriesDataTimestamp], bool]

Gets the custom function that is called before adding a timestamp to the buffer.

If the function returns True, the buffer releases content and triggers relevant callbacks before adding the timestamp to the buffer. If not set, defaults to None (disabled).

Returns:

Callable[[TimeseriesDataTimestamp], bool]: The custom function or None if disabled.

custom_trigger_before_enqueue

@custom_trigger_before_enqueue.setter
def custom_trigger_before_enqueue(value: Callable[[TimeseriesDataTimestamp],
                                                  bool])

Sets the custom function that is called before adding a timestamp to the buffer.

If the function returns True, the buffer releases content and triggers relevant callbacks before adding the timestamp to the buffer. If not set, defaults to None (disabled).

Arguments:

  • value - The custom function or None to disable.

filter

@property
def filter() -> Callable[[TimeseriesDataTimestamp], bool]

Gets the custom function used to filter incoming data before adding it to the buffer.

If the function returns True, the data is added to the buffer; otherwise, it is not. If not set, defaults to None (disabled).

Returns:

Callable[[TimeseriesDataTimestamp], bool]: The custom filter function or None if disabled.

filter

@filter.setter
def filter(value: Callable[[TimeseriesDataTimestamp], bool])

Sets the custom function used to filter incoming data before adding it to the buffer.

If the function returns True, the data is added to the buffer; otherwise, it is not. If not set, defaults to None (disabled).

Arguments:

  • value - The custom filter function or None to disable.

custom_trigger

@property
def custom_trigger() -> Callable[[TimeseriesData], bool]

Gets the custom function that is called after adding a new timestamp to the buffer.

If the function returns True, the buffer releases content and triggers relevant callbacks. If not set, defaults to None (disabled).

Returns:

Callable[[TimeseriesData], bool]: The custom trigger function or None if disabled.

custom_trigger

@custom_trigger.setter
def custom_trigger(value: Callable[[TimeseriesData], bool])

Sets the custom function that is called after adding a new timestamp to the buffer.

If the function returns True, the buffer releases content and triggers relevant callbacks. If not set, defaults to None (disabled).

Arguments:

  • value - The custom trigger function or None to disable.

get_net_pointer

def get_net_pointer() -> ctypes.c_void_p

Returns the .net pointer for the TimeseriesBufferConfiguration object.

Returns:

  • ctypes.c_void_p - The .net pointer for the TimeseriesBufferConfiguration object.

quixstreams.models.netdict

ReadOnlyNetDict Objects

class ReadOnlyNetDict(object)

Experimental. Acts as a proxy between a .net dictionary and a python dict. Useful if .net dictionary is observable and reacts to changes

NetDict Objects

class NetDict(ReadOnlyNetDict)

Experimental. Acts as a proxy between a .net dictionary and a python list.

constructor_for_string_string

@staticmethod
def constructor_for_string_string(net_pointer=None)

Creates an empty dotnet list for strings if no pointer provided, else wraps in NetDict with string converters

quixstreams.models.timeseriesdatatimestamp

TimeseriesDataTimestamp Objects

@nativedecorator
class TimeseriesDataTimestamp()

Represents a single point in time with parameter values and tags attached to that time.

__init__

def __init__(net_pointer: ctypes.c_void_p)

Initializes a new instance of TimeseriesDataTimestamp.

Arguments:

  • net_pointer - Pointer to an instance of a .net TimeseriesDataTimestamp.

parameters

@property
def parameters() -> Dict[str, ParameterValue]

Gets the parameter values for the timestamp as a dictionary. If a key is not found, returns an empty ParameterValue.

Returns:

Dict[str, ParameterValue]: A dictionary with parameter id as key and ParameterValue as value.

tags

@property
def tags() -> Dict[str, str]

Gets the tags for the timestamp as a dictionary.

Returns:

Dict[str, str]: A dictionary with tag id as key and tag value as value.

timestamp_nanoseconds

@property
def timestamp_nanoseconds() -> int

Gets the timestamp in nanoseconds.

Returns:

  • int - The timestamp in nanoseconds.

timestamp_milliseconds

@property
def timestamp_milliseconds() -> int

Gets the timestamp in milliseconds.

Returns:

  • int - The timestamp in milliseconds.

timestamp

@property
def timestamp() -> datetime

Gets the timestamp in datetime format.

Returns:

  • datetime - The timestamp in datetime format.

timestamp_as_time_span

@property
def timestamp_as_time_span() -> timedelta

Gets the timestamp in timespan format.

Returns:

  • timedelta - The timestamp in timespan format.

add_value

def add_value(
    parameter_id: str, value: Union[numbers.Number, str, bytearray, bytes]
) -> 'TimeseriesDataTimestamp'

Adds a new value for the specified parameter.

Arguments:

  • parameter_id - The parameter id to add the value for.
  • value - The value to add. Can be a number, string, bytearray, or bytes.

Returns:

  • TimeseriesDataTimestamp - The updated TimeseriesDataTimestamp instance.

remove_value

def remove_value(parameter_id: str) -> 'TimeseriesDataTimestamp'

Removes the value for the specified parameter.

Arguments:

  • parameter_id - The parameter id to remove the value for.

Returns:

  • TimeseriesDataTimestamp - The updated TimeseriesDataTimestamp instance.

add_tag

def add_tag(tag_id: str, tag_value: str) -> 'TimeseriesDataTimestamp'

Adds a tag to the timestamp.

Arguments:

  • tag_id - The id of the tag to add.
  • tag_value - The value of the tag to add.

Returns:

  • TimeseriesDataTimestamp - The updated TimeseriesDataTimestamp instance.

remove_tag

def remove_tag(tag_id: str) -> 'TimeseriesDataTimestamp'

Removes a tag from the timestamp.

Arguments:

  • tag_id - The id of the tag to remove.

Returns:

  • TimeseriesDataTimestamp - The updated TimeseriesDataTimestamp instance.

add_tags

def add_tags(tags: Dict[str, str]) -> 'TimeseriesDataTimestamp'

Copies the tags from the specified dictionary. Conflicting tags will be overwritten.

Arguments:

  • tags - The dictionary of tags to add, with tag id as key and tag value as value.

Returns:

  • TimeseriesDataTimestamp - The updated TimeseriesDataTimestamp instance.

get_net_pointer

def get_net_pointer() -> ctypes.c_void_p

Gets the .net pointer of the TimeseriesDataTimestamp instance.

Returns:

  • ctypes.c_void_p - The .net pointer of the TimeseriesDataTimestamp instance.

quixstreams.models.streamendtype

quixstreams.models.eventdata

EventData Objects

@nativedecorator
class EventData(object)

Represents a single point in time with event value and tags attached to it.

__init__

def __init__(event_id: str = None,
             time: Union[int, str, datetime, pd.Timestamp] = None,
             value: str = None,
             net_pointer: ctypes.c_void_p = None)

Initializes a new instance of EventData.

Arguments:

  • event_id - The unique id of the event the value belongs to.
  • time - The time at which the event has occurred in nanoseconds since epoch or as a datetime.
  • value - The value of the event.
  • net_pointer - Pointer to an instance of a .net EventData.

id

@property
def id() -> str

Gets the globally unique identifier of the event.

id

@id.setter
def id(value: str) -> None

Sets the globally unique identifier of the event.

value

@property
def value() -> str

Gets the value of the event.

value

@value.setter
def value(value: str) -> None

Sets the value of the event.

tags

@property
def tags() -> Dict[str, str]

Gets the tags for the timestamp.

If a key is not found, it returns None. The dictionary key is the tag id. The dictionary value is the tag value.

timestamp_nanoseconds

@property
def timestamp_nanoseconds() -> int

Gets timestamp in nanoseconds.

timestamp_milliseconds

@property
def timestamp_milliseconds() -> int

Gets timestamp in milliseconds.

timestamp

@property
def timestamp() -> datetime

Gets the timestamp in datetime format.

timestamp_as_time_span

@property
def timestamp_as_time_span() -> timedelta

Gets the timestamp in timespan format.

clone

def clone()

Clones the event data.

Returns:

  • EventData - Cloned EventData object.

add_tag

def add_tag(tag_id: str, tag_value: str) -> 'EventData'

Adds a tag to the event.

Arguments:

  • tag_id - The id of the tag.
  • tag_value - The value to set.

Returns:

  • EventData - The updated EventData object.

add_tags

def add_tags(tags: Dict[str, str]) -> 'EventData'

Adds tags from the specified dictionary. Conflicting tags will be overwritten.

Arguments:

  • tags - The tags to add.

Returns:

  • EventData - The updated EventData object.

remove_tag

def remove_tag(tag_id: str) -> 'EventData'

Removes a tag from the event.

Arguments:

  • tag_id - The id of the tag to remove.

Returns:

  • EventData - The updated EventData object.

get_net_pointer

def get_net_pointer()

Gets the associated .net object pointer.

Returns:

The .net object pointer.

quixstreams.models

quixstreams.models.autooffsetreset

AutoOffsetReset Objects

class AutoOffsetReset(Enum)

Enum representing the policy on how a consumer should behave when consuming from a topic partition when there is no initial offset.

Latest

Latest: Starts from the newest message if there is no stored offset.

Earliest

Earliest: Starts from the oldest message if there is no stored offset.

Error

Error: Throws an exception if there is no stored offset.

quixstreams.app

CancellationTokenSource Objects

class CancellationTokenSource()

Represents a token source that can signal a cancellation System.Threading.CancellationToken

__init__

def __init__()

Initializes a new instance of the CancellationTokenSource class.

is_cancellation_requested

def is_cancellation_requested()

Checks if a cancellation has been requested.

Returns:

  • bool - True if the cancellation has been requested, False otherwise.

cancel

def cancel() -> 'CancellationToken'

Signals a cancellation to the CancellationToken.

token

@property
def token() -> 'CancellationToken'

Gets the associated CancellationToken.

Returns:

  • CancellationToken - The CancellationToken associated with this CancellationTokenSource.

get_net_pointer

def get_net_pointer() -> ctypes.c_void_p

Gets the interop pointer of the CancellationTokenSource object.

Returns:

  • ctypes.c_void_p - The interop pointer of the CancellationTokenSource object.

App Objects

class App()

Provides utilities to handle default streaming behaviors and automatic resource cleanup on shutdown.

run

@staticmethod
def run(cancellation_token: CancellationToken = None,
        before_shutdown: Callable[[], None] = None)

Runs the application, managing streaming behaviors and automatic resource cleanup on shutdown.

Arguments:

  • cancellation_token - An optional CancellationToken to abort the application run with.
  • before_shutdown - An optional function to call before shutting down the application.

get_state_manager

@staticmethod
def get_state_manager() -> AppStateManager

Retrieves the state manager for the application

Returns:

  • AppStateManager - the app's state manager

set_state_storage

@staticmethod
def set_state_storage(storage: IStateStorage) -> None

Sets the state storage for the app

Arguments:

  • storage - The state storage to use for app's state manager

quixstreams.states.topicstatemanager

TopicStateManager Objects

@nativedecorator
class TopicStateManager(object)

Manages the states of a topic.

__init__

def __init__(net_pointer: ctypes.c_void_p)

Initializes a new instance of TopicStateManager.

NOTE: Do not initialize this class manually, use TopicConsumer.get_state_manager

Arguments:

  • net_pointer - The .net object representing a TopicStateManager.

get_stream_states

def get_stream_states() -> List[str]

Returns a collection of all available stream state ids for the current topic.

Returns:

  • List[str] - All available stream state ids for the current topic.

get_stream_state_manager

def get_stream_state_manager(stream_id: str) -> StreamStateManager

Gets an instance of the StreamStateManager for the specified stream_id.

Arguments:

  • stream_id - The ID of the stream

delete_stream_state

def delete_stream_state(stream_id: str) -> bool

Deletes the stream state for the specified stream

Arguments:

  • stream_id - The ID of the stream

Returns:

  • bool - Whether the stream state was deleted

delete_stream_states

def delete_stream_states() -> int

Deletes all stream states for the current topic.

Returns:

  • int - The number of stream states that were deleted

quixstreams.states.streamstatemanager

StreamStateManager Objects

@nativedecorator
class StreamStateManager(object)

Manages the states of a stream.

__init__

def __init__(net_pointer: ctypes.c_void_p)

Initializes a new instance of StreamStateManager.

NOTE: Do not initialize this class manually, use StreamConsumer.get_state_manager

Arguments:

  • net_pointer - The .net object representing a StreamStateManager.

get_dict_state

def get_dict_state(
        state_name: str,
        default_value_factory: Callable[[str], StreamStateType] = None,
        state_type: StreamStateType = None
) -> DictStreamState[StreamStateType]

Creates a new application state of dictionary type with automatically managed lifecycle for the stream

Arguments:

  • state_name - The name of the state
  • state_type - The type of the state
  • default_value_factory - The default value factory to create value when the key is not yet present in the state

Example:

state_manager.get_dict_state('some_state') This will return a state where type is 'Any'

state_manager.get_dict_state('some_state', lambda missing_key: return {}) this will return a state where type is a generic dictionary, with an empty dictionary as default value when key is not available. The lambda function will be invoked with 'get_state_type_check' key to determine type

state_manager.get_dict_state('some_state', lambda missing_key: return {}, Dict[str, float]) this will return a state where type is a specific dictionary type, with default value

state_manager.get_dict_state('some_state', state_type=float) this will return a state where type is a float without default value, resulting in KeyError when not found

get_scalar_state

def get_scalar_state(
        state_name: str,
        default_value_factory: Callable[[], StreamStateType] = None,
        state_type: StreamStateType = None
) -> ScalarStreamState[StreamStateType]

Creates a new application state of scalar type with automatically managed lifecycle for the stream

Arguments:

  • state_name - The name of the state
  • default_value_factory - The default value factory to create value when it has not been set yet
  • state_type - The type of the state

Example:

stream_consumer.get_scalar_state('some_state') This will return a state where type is 'Any'

stream_consumer.get_scalar_state('some_state', lambda missing_key: return 1) this will return a state where type is 'Any', with an integer 1 (zero) as default when value has not been set yet. The lambda function will be invoked with 'get_state_type_check' key to determine type

stream_consumer.get_scalar_state('some_state', lambda missing_key: return 0, float) this will return a state where type is a specific type, with default value

stream_consumer.get_scalar_state('some_state', state_type=float) this will return a state where type is a float with a default value of that type

get_states

def get_states() -> List[str]

Returns an enumerable collection of all available state names for the current stream.

Returns:

  • List[str] - All available stream state names for this state

delete_state

def delete_state(state_name: str) -> bool

Deletes the state with the specified name

Arguments:

  • state_name - The state to delete

Returns:

  • bool - Whether the state was deleted<

delete_states

def delete_states() -> int

Deletes all states for the current stream.

Returns:

  • int - The number of states that were deleted

quixstreams.states.scalarstreamstate

ScalarStreamState Objects

@nativedecorator
class ScalarStreamState(Generic[StreamStateType])

Represents a state container that stores a scalar value with the ability to flush changes to a specified storage.

__init__

def __init__(net_pointer: ctypes.c_void_p, state_type: StreamStateType,
             default_value_factory: Callable[[], StreamStateType])

Initializes a new instance of ScalarStreamState.

NOTE: Do not initialize this class manually, use StreamStateManager.get_scalar_state

Arguments:

  • net_pointer - The .net object representing a ScalarStreamState.
  • state_type - The type of the state
  • default_value_factory - A function that returns a default value of type T when the value has not been set yet

type

@property
def type() -> type

Gets the type of the ScalarStreamState

Returns:

  • StreamStateType - type of the state

on_flushed

@property
def on_flushed() -> Callable[[], None]

Gets the handler for when flush operation is completed.

Returns:

Callable[[], None]: The event handler for after flush.

on_flushed

@on_flushed.setter
def on_flushed(value: Callable[[], None]) -> None

Sets the handler for when flush operation is completed.

Arguments:

  • value - The parameterless callback to invoke

on_flushing

@property
def on_flushing() -> Callable[[], None]

Gets the handler for when flush operation begins.

Returns:

Callable[[], None]: The event handler for after flush.

on_flushing

@on_flushing.setter
def on_flushing(value: Callable[[], None]) -> None

Sets the handler for when flush operation begins.

Arguments:

  • value - The parameterless callback to invoke

flush

def flush()

Flushes the changes made to the in-memory state to the specified storage.

reset

def reset()

Reset the state to before in-memory modifications

value

@property
def value()

Gets the value of the state.

Returns:

  • StreamStateType - The value of the state.

value

@value.setter
def value(val: StreamStateType)

Sets the value of the state.

Arguments:

  • val - The value of the state.

quixstreams.states.dictstreamstate

DictStreamState Objects

@nativedecorator
class DictStreamState(Generic[StreamStateType])

Represents a state container that stores key-value pairs with the ability to flush changes to a specified storage.

__init__

def __init__(net_pointer: ctypes.c_void_p, state_type: StreamStateType,
             default_value_factory: Callable[[str], StreamStateType])

Initializes a new instance of DictStreamState.

NOTE: Do not initialize this class manually, use StreamStateManager.get_dict_state

Arguments:

  • net_pointer - The .net object representing a DictStreamState.
  • state_type - The type of the state
  • default_value_factory - The default value factory to create value when the key is not yet present in the state

type

@property
def type() -> type

Gets the type of the StreamState

Returns:

  • StreamStateType - type of the state

on_flushed

@property
def on_flushed() -> Callable[[], None]

Gets the handler for when flush operation is completed.

Returns:

Callable[[], None]: The event handler for after flush.

on_flushed

@on_flushed.setter
def on_flushed(value: Callable[[], None]) -> None

Sets the handler for when flush operation is completed.

Arguments:

  • value - The parameterless callback to invoke

on_flushing

@property
def on_flushing() -> Callable[[], None]

Gets the handler for when flush operation begins.

Returns:

Callable[[], None]: The event handler for after flush.

on_flushing

@on_flushing.setter
def on_flushing(value: Callable[[], None]) -> None

Sets the handler for when flush operation begins.

Arguments:

  • value - The parameterless callback to invoke

flush

def flush()

Flushes the changes made to the in-memory state to the specified storage.

reset

def reset()

Reset the state to before in-memory modifications

quixstreams.states

quixstreams.states.appstatemanager

AppStateManager Objects

@nativedecorator
class AppStateManager(object)

Manages the states of an app.

__init__

def __init__(net_pointer: ctypes.c_void_p)

Initializes a new instance of AppStateManager.

NOTE: Do not initialize this class manually, use App.get_state_manager

Arguments:

  • net_pointer - The .net object representing a AppStateManager.

get_topic_states

def get_topic_states() -> List[str]

Returns a collection of all available topic states for the app.

Returns:

  • List[str] - All available app topic states for the app.

get_topic_state_manager

def get_topic_state_manager(topic_name: str) -> TopicStateManager

Gets an instance of the TopicStateManager for the specified topic.

Arguments:

  • topic_name - The name of the topic

delete_topic_state

def delete_topic_state(topic_name: str) -> bool

Deletes the specified topic state

Arguments:

  • topic_name - The name of the topic

Returns:

  • bool - Whether the topic state was deleted

delete_topic_states

def delete_topic_states() -> int

Deletes all topic states for the app.

Returns:

  • int - The number of topic states that were deleted

quixstreams.topicconsumer

TopicConsumer Objects

@nativedecorator
class TopicConsumer(object)

Interface to operate with the streaming platform for consuming messages

__init__

def __init__(net_pointer: ctypes.c_void_p)

Initializes a new instance of TopicConsumer.

NOTE: Do not initialize this class manually, use KafkaStreamingClient.get_topic_consumer to create it.

Arguments:

  • net_pointer - The .net pointer to TopicConsumer instance.

on_stream_received

@property
def on_stream_received() -> Callable[['StreamConsumer'], None]

Gets the event handler for when a stream is received for the topic.

Returns:

Callable[[StreamConsumer], None]: The event handler for when a stream is received for the topic. The first parameter is the StreamConsumer instance that was received.

on_stream_received

@on_stream_received.setter
def on_stream_received(value: Callable[['StreamConsumer'], None]) -> None

Sets the event handler for when a stream is received for the topic.

Arguments:

  • value - The new event handler for when a stream is received for the topic. The first parameter is the StreamConsumer instance that was received.

on_streams_revoked

@property
def on_streams_revoked(
) -> Callable[['TopicConsumer', List['StreamConsumer']], None]

Gets the event handler for when streams are revoked for the topic.

Returns:

Callable[[TopicConsumer, List[StreamConsumer]], None]: The event handler for when streams are revoked for the topic. The first parameter is the TopicConsumer instance for which the streams were revoked, and the second parameter is a list of StreamConsumer instances that were revoked.

on_streams_revoked

@on_streams_revoked.setter
def on_streams_revoked(
        value: Callable[['TopicConsumer', List['StreamConsumer']],
                        None]) -> None

Sets the event handler for when streams are revoked for the topic.

Arguments:

  • value - The new event handler for when streams are revoked for the topic. The first parameter is the TopicConsumer instance for which the streams were revoked, and the second parameter is a list of StreamConsumer instances that were revoked.

on_revoking

@property
def on_revoking() -> Callable[['TopicConsumer'], None]

Gets the event handler for when the topic is being revoked.

Returns:

Callable[[TopicConsumer], None]: The event handler for when the topic is being revoked. The first parameter is the TopicConsumer instance for which the revocation is happening.

on_revoking

@on_revoking.setter
def on_revoking(value: Callable[['TopicConsumer'], None]) -> None

Sets the event handler for when the topic is being revoked.

Arguments:

  • value - The new event handler for when the topic is being revoked. The first parameter is the TopicConsumer instance for which the revocation is happening.

on_committed

@property
def on_committed() -> Callable[['TopicConsumer'], None]

Gets the event handler for when the topic finishes committing consumed data up to this point.

Returns:

Callable[[TopicConsumer], None]: The event handler for when the topic finishes committing consumed data up to this point. The first parameter is the TopicConsumer instance for which the commit happened.

on_committed

@on_committed.setter
def on_committed(value: Callable[['TopicConsumer'], None]) -> None

Sets the event handler for when the topic finishes committing consumed data up to this point.

Arguments:

  • value - The new event handler for when the topic finishes committing consumed data up to this point. The first parameter is the TopicConsumer instance for which the commit happened.

on_committing

@property
def on_committing() -> Callable[['TopicConsumer'], None]

Gets the event handler for when the topic begins committing consumed data up to this point.

Returns:

Callable[[TopicConsumer], None]: The event handler for when the topic begins committing consumed data up to this point. The first parameter is the TopicConsumer instance for which the commit is happening.

on_committing

@on_committing.setter
def on_committing(value: Callable[['TopicConsumer'], None]) -> None

Sets the event handler for when the topic begins committing consumed data up to this point.

Arguments:

  • value - The new event handler for when the topic begins committing consumed data up to this point. The first parameter is the TopicConsumer instance for which the commit is happening.

subscribe

def subscribe()

Subscribes to streams in the topic. Use 'on_stream_received' event to consume incoming streams.

commit

def commit()

Commit packages consumed up until now

get_state_manager

def get_state_manager() -> TopicStateManager

Gets the manager for the topic states.

Returns:

  • TopicStateManager - The topic state manager

get_net_pointer

def get_net_pointer() -> ctypes.c_void_p

Retrieves the .net pointer to TopicConsumer instance.

Returns:

  • ctypes.c_void_p - The .net pointer to TopicConsumer instance.

quixstreams.streamconsumer

StreamConsumer Objects

@nativedecorator
class StreamConsumer(object)

Handles consuming stream from a topic.

__init__

def __init__(net_pointer: ctypes.c_void_p, topic_consumer: 'TopicConsumer',
             on_close_cb_always: Callable[['StreamConsumer'], None])

Initializes a new instance of StreamConsumer.

NOTE: Do not initialize this class manually, TopicProducer automatically creates this when a new stream is received.

Arguments:

  • net_pointer - Pointer to an instance of a .NET StreamConsumer.
  • topic_consumer - The topic consumer which owns the stream consumer.
  • on_close_cb_always - The callback function to be executed when the stream is closed.

topic

@property
def topic() -> 'TopicConsumer'

Gets the topic the stream was raised for.

Returns:

  • TopicConsumer - The topic consumer instance associated with the stream.

on_stream_closed

@property
def on_stream_closed() -> Callable[['StreamConsumer', 'StreamEndType'], None]

Gets the handler for when the stream closes.

Returns:

Callable[['StreamConsumer', 'StreamEndType'], None]: The callback function to be executed when the stream closes. The first parameter is the stream that closes, and the second is the close type.

on_stream_closed

@on_stream_closed.setter
def on_stream_closed(
        value: Callable[['StreamConsumer', 'StreamEndType'], None]) -> None

Sets the handler for when the stream closes.

Arguments:

  • value - The new callback function to be executed when the stream closes. The first parameter is the stream that closes, and the second is the close type.

on_package_received

@property
def on_package_received() -> Callable[['StreamConsumer', Any], None]

Gets the handler for when the stream receives a package of any type.

Returns:

Callable[['StreamConsumer', Any], None]: The callback function to be executed when the stream receives a package. The first parameter is the stream that receives the package, and the second is the package itself.

on_package_received

@on_package_received.setter
def on_package_received(
        value: Callable[['StreamConsumer', Any], None]) -> None

Sets the handler for when the stream receives a package of any type.

Arguments:

  • value - The new callback function to be executed when the stream receives a package. The first parameter is the stream that receives the package, and the second is the package itself.

stream_id

@property
def stream_id() -> str

Get the ID of the stream being consumed.

Returns:

  • str - The ID of the stream being consumed.

properties

@property
def properties() -> StreamPropertiesConsumer

Gets the consumer for accessing the properties and metadata of the stream.

Returns:

  • StreamPropertiesConsumer - The stream properties consumer instance.

events

@property
def events() -> StreamEventsConsumer

Gets the consumer for accessing event related information of the stream such as event definitions and values.

Returns:

  • StreamEventsConsumer - The stream events consumer instance.

timeseries

@property
def timeseries() -> StreamTimeseriesConsumer

Gets the consumer for accessing timeseries related information of the stream such as parameter definitions and values.

Returns:

  • StreamTimeseriesConsumer - The stream timeseries consumer instance.

get_dict_state

def get_dict_state(
        state_name: str,
        default_value_factory: Callable[[str], StreamStateType] = None,
        state_type: StreamStateType = None
) -> DictStreamState[StreamStateType]

Creates a new application state of dictionary type with automatically managed lifecycle for the stream

Arguments:

  • state_name - The name of the state
  • default_value_factory - The default value factory to create value when the key is not yet present in the state
  • state_type - The type of the state

Returns:

  • DictStreamState - The stream state

Example:

stream_consumer.get_dict_state('some_state') This will return a state where type is 'Any'

stream_consumer.get_dict_state('some_state', lambda missing_key: return {}) this will return a state where type is a generic dictionary, with an empty dictionary as default value when key is not available. The lambda function will be invoked with 'get_state_type_check' key to determine type

stream_consumer.get_dict_state('some_state', lambda missing_key: return {}, Dict[str, float]) this will return a state where type is a specific dictionary type, with default value

stream_consumer.get_dict_state('some_state', state_type=float) this will return a state where type is a float without default value, resulting in KeyError when not found

get_scalar_state

def get_scalar_state(
        state_name: str,
        default_value_factory: Callable[[], StreamStateType] = None,
        state_type: StreamStateType = None
) -> ScalarStreamState[StreamStateType]

Creates a new application state of scalar type with automatically managed lifecycle for the stream

Arguments:

  • state_name - The name of the state
  • default_value_factory - The default value factory to create value when it has not been set yet
  • state_type - The type of the state

Returns:

  • ScalarStreamState - The stream state

Example:

stream_consumer.get_scalar_state('some_state') This will return a state where type is 'Any'

stream_consumer.get_scalar_state('some_state', lambda missing_key: return 1) this will return a state where type is 'Any', with an integer 1 (zero) as default when value has not been set yet. The lambda function will be invoked with 'get_state_type_check' key to determine type

stream_consumer.get_scalar_state('some_state', lambda missing_key: return 0, float) this will return a state where type is a specific type, with default value

stream_consumer.get_scalar_state('some_state', state_type=float) this will return a state where type is a float with a default value of that type

get_state_manager

def get_state_manager() -> StreamStateManager

Gets the manager for the stream states.

Returns:

  • StreamStateManager - The stream state manager

get_net_pointer

def get_net_pointer() -> ctypes.c_void_p

Gets the associated .NET object pointer.

Returns:

  • ctypes.c_void_p - The .NET pointer

quixstreams.state.localfilestorage

LocalFileStorage Objects

class LocalFileStorage(IStateStorage)

A directory storage containing the file storage for single process access purposes. Locking is implemented via in-memory mutex.

__init__

def __init__(storage_directory=None, auto_create_dir=True)

Initializes the LocalFileStorage instance.

Arguments:

  • storage_directory - The path to the storage directory.
  • auto_create_dir - If True, automatically creates the storage directory if it doesn't exist.

quixstreams.state.inmemorystorage

InMemoryStorage Objects

class InMemoryStorage(IStateStorage)

Basic non-thread safe in-memory storage implementing IStateStorage

__init__

def __init__()

Initializes the InMemoryStorage instance.

quixstreams.state.statetype

quixstreams.state.statevalue

StateValue Objects

class StateValue(object)

A wrapper class for values that can be stored inside the storage.

__init__

def __init__(value: Any)

Initializes the wrapped value inside the store.

Arguments:

  • value - The value to be wrapped, which can be one of the following types: StateValue, str, int, float, bool, bytes, bytearray, or object (via pickle).

type

@property
def type()

Gets the type of the wrapped value.

Returns:

  • StateType - The type of the wrapped value.

value

@property
def value()

Gets the wrapped value.

Returns:

The wrapped value.

get_net_pointer

def get_net_pointer() -> ctypes.c_void_p

Gets the .NET pointer of the wrapped value.

Returns:

  • ctypes.c_void_p - The .NET pointer of the wrapped value.

quixstreams.state.istatestorage

IStateStorage Objects

class IStateStorage(object)

The minimum definition for a state storage

__init__

def __init__(net_pointer=ctypes.c_void_p)

Initializes a new instance of IStateStorage.

NOTE: Do not initialize this class manually, it can be returned by classes for certain calls

Arguments:

  • net_pointer - The .net object representing a StreamStateManager.

is_case_sensitive

@property
def is_case_sensitive() -> bool

Returns whether the storage is case-sensitive

get_or_create_sub_storage

def get_or_create_sub_storage(sub_storage_name: str) -> 'IStateStorage'

Creates or retrieves the existing storage under this in hierarchy.

Arguments:

  • sub_storage_name - The name of the sub storage

Returns:

  • IStateStorage - The state storage for the given storage name

delete_sub_storages

def delete_sub_storages() -> int

Deletes the storages under this in hierarchy.

Returns:

  • int - The number of state storage deleted

delete_sub_storage

def delete_sub_storage(sub_storage_name: str) -> bool

Deletes a storage under this in hierarchy.

Arguments:

  • sub_storage_name - The name of the sub storage

Returns:

  • bool - Whether the state storage for the given storage name was deleted

get_sub_storages

def get_sub_storages() -> List[str]

Gets the storages under this in hierarchy.

Returns:

  • IStateStorage - The storage names this store contains

get

def get(key: str) -> Any

Gets the value at the specified key.

Arguments:

  • key - The key to retrieve the value for.

Returns:

  • Any - The value at the specified key, which can be one of the following types: str, int, float, bool, bytes, bytearray, or object (via pickle).

set

def set(key: str, value: Any)

Sets the value at the specified key.

Arguments:

  • key - The key to set the value for.
  • value - The value to be set, which can be one of the following types: StateValue, str, int, float, bool, bytes, bytearray, or object (via pickle).

contains_key

def contains_key(key: str) -> bool

Checks if the storage contains the specified key.

Arguments:

  • key - The key to check for.

Returns:

  • bool - True if the storage contains the key, False otherwise.

get_all_keys

def get_all_keys()

Retrieves a set containing all the keys in the storage.

Returns:

  • set[str] - A set of all keys in the storage.

remove

def remove(key) -> None

Removes the specified key from the storage.

Arguments:

  • key - The key to be removed.

clear

def clear()

Clears the storage by removing all keys and their associated values.

quixstreams.state

quixstreams.streamproducer

StreamProducer Objects

@nativedecorator
class StreamProducer(object)

Handles publishing stream to a topic

__init__

def __init__(topic_producer: 'TopicProducer', net_pointer: ctypes.c_void_p)

Initializes a new instance of StreamProducer.

NOTE: Do not initialize this class manually, use TopicProducer.get_or_create_stream or create_stream

Arguments:

  • topic_producer - The topic producer the stream producer publishes to.
  • net_pointer - The .net object representing a StreamProducer.

topic

@property
def topic() -> 'TopicProducer'

Gets the topic the stream is producing to.

Returns:

  • TopicProducer - The topic the stream is producing to.

on_write_exception

@property
def on_write_exception() -> Callable[['StreamProducer', BaseException], None]

Gets the handler for when a stream experiences exception during the asynchronous write process.

Returns:

Callable[['StreamProducer', BaseException], None]: The handler for exceptions during the asynchronous write process. The first parameter is the stream is received for, second is the exception.

on_write_exception

@on_write_exception.setter
def on_write_exception(
        value: Callable[['StreamProducer', BaseException], None]) -> None

Sets the handler for when a stream experiences exception during the asynchronous write process.

Arguments:

  • value - The handler for exceptions during the asynchronous write process. The first parameter is the stream is received for, second is the exception.

stream_id

@property
def stream_id() -> str

Gets the unique id of the stream being produced.

Returns:

  • str - The unique id of the stream being produced.

epoch

@property
def epoch() -> datetime

Gets the default Epoch used for Timeseries and Events.

Returns:

  • datetime - The default Epoch used for Timeseries and Events.

epoch

@epoch.setter
def epoch(value: datetime)

Set the default Epoch used for Timeseries and Events.

Arguments:

  • value - The default Epoch value to set.

properties

@property
def properties() -> StreamPropertiesProducer

Gets the properties of the stream. The changes will automatically be sent after a slight delay.

Returns:

  • StreamPropertiesProducer - The properties of the stream.

timeseries

@property
def timeseries() -> StreamTimeseriesProducer

Gets the producer for publishing timeseries related information of the stream such as parameter definitions and values.

Returns:

  • StreamTimeseriesProducer - The producer for publishing timeseries related information of the stream.

events

@property
def events() -> StreamEventsProducer

Gets the producer for publishing event related information of the stream such as event definitions and values.

Returns:

  • StreamEventsProducer - The producer for publishing event related information of the stream.

flush

def flush()

Flushes the pending data to stream.

close

def close(end_type: StreamEndType = StreamEndType.Closed)

Closes the stream and flushes the pending data to stream.

Arguments:

  • end_type - The type of stream end. Defaults to StreamEndType.Closed.

quixstreams.builders.parameterdefinitionbuilder

ParameterDefinitionBuilder Objects

@nativedecorator
class ParameterDefinitionBuilder(object)

Builder for creating ParameterDefinition for StreamTimeseriesProducer.

__init__

def __init__(net_pointer: ctypes.c_void_p)

Initializes a new instance of ParameterDefinitionBuilder.

Arguments:

  • net_pointer - Pointer to an instance of a .net ParameterDefinitionBuilder.

set_range

def set_range(minimum_value: float,
              maximum_value: float) -> 'ParameterDefinitionBuilder'

Set the minimum and maximum range of the parameter.

Arguments:

  • minimum_value - The minimum value.
  • maximum_value - The maximum value.

Returns:

The builder.

set_unit

def set_unit(unit: str) -> 'ParameterDefinitionBuilder'

Set the unit of the parameter.

Arguments:

  • unit - The unit of the parameter.

Returns:

The builder.

set_format

def set_format(format: str) -> 'ParameterDefinitionBuilder'

Set the format of the parameter.

Arguments:

  • format - The format of the parameter.

Returns:

The builder.

set_custom_properties

def set_custom_properties(
        custom_properties: str) -> 'ParameterDefinitionBuilder'

Set the custom properties of the parameter.

Arguments:

  • custom_properties - The custom properties of the parameter.

Returns:

The builder.

add_definition

def add_definition(parameter_id: str,
                   name: str = None,
                   description: str = None) -> 'ParameterDefinitionBuilder'

Add new parameter definition to the StreamTimeseriesProducer. Configure it with the builder methods.

Arguments:

  • parameter_id - The id of the parameter. Must match the parameter id used to send data.
  • name - The human friendly display name of the parameter.
  • description - The description of the parameter.

Returns:

Parameter definition builder to define the parameter properties

quixstreams.builders.eventdatabuilder

EventDataBuilder Objects

@nativedecorator
class EventDataBuilder(object)

Builder for creating event data packages for StreamEventsProducer.

__init__

def __init__(net_pointer: ctypes.c_void_p)

Initializes a new instance of EventDataBuilder.

Arguments:

  • net_pointer - Pointer to an instance of a .net EventDataBuilder.

add_value

def add_value(event_id: str, value: str) -> 'EventDataBuilder'

Adds new event at the time the builder is created for.

Arguments:

  • event_id - The id of the event to set the value for.
  • value - The string value.

Returns:

The builder.

add_tag

def add_tag(tag_id: str, value: str) -> 'EventDataBuilder'

Sets tag value for the values.

Arguments:

  • tag_id - The id of the tag.
  • value - The value of the tag.

Returns:

The builder.

add_tags

def add_tags(tags: Dict[str, str]) -> 'EventDataBuilder'

Copies the tags from the specified dictionary. Conflicting tags will be overwritten.

Arguments:

  • tags - The tags to add.

Returns:

The builder.

publish

def publish()

Publishes the values to the StreamEventsProducer buffer.

See StreamEventsProducer buffer settings for more information on when the values are sent to the broker.

quixstreams.builders.timeseriesdatabuilder

TimeseriesDataBuilder Objects

@nativedecorator
class TimeseriesDataBuilder(object)

Builder for managing TimeseriesDataTimestamp instances on TimeseriesBufferProducer.

__init__

def __init__(net_pointer: ctypes.c_void_p)

Initializes a new instance of TimeseriesDataBuilder.

Arguments:

  • net_pointer - Pointer to an instance of a .net TimeseriesDataBuilder.

add_value

def add_value(
    parameter_id: str, value: Union[numbers.Number, str, bytearray, bytes]
) -> 'TimeseriesDataBuilder'

Adds new parameter value at the time the builder is created for.

Arguments:

  • parameter_id - The id of the parameter to set the value for.
  • value - The value to add. Can be a number, string, bytearray, or bytes.

Returns:

The builder.

add_tag

def add_tag(tag_id: str, value: str) -> 'TimeseriesDataBuilder'

Adds a tag to the values.

Arguments:

  • tag_id - The id of the tag.
  • value - The value of the tag.

Returns:

The builder.

add_tags

def add_tags(tags: Dict[str, str]) -> 'TimeseriesDataBuilder'

Copies the tags from the specified dictionary. Conflicting tags will be overwritten.

Arguments:

  • tags - The tags to add.

Returns:

The builder.

publish

def publish()

Publish the values.

quixstreams.builders.eventdefinitionbuilder

EventDefinitionBuilder Objects

@nativedecorator
class EventDefinitionBuilder(object)

Builder for creating EventDefinitions within StreamPropertiesProducer.

__init__

def __init__(net_pointer: ctypes.c_void_p)

Initializes a new instance of EventDefinitionBuilder.

Arguments:

  • net_pointer - Pointer to an instance of a .net EventDefinitionBuilder.

set_level

def set_level(level: EventLevel) -> 'EventDefinitionBuilder'

Set severity level of the Event.

Arguments:

  • level - The severity level of the event.

Returns:

The builder.

set_custom_properties

def set_custom_properties(custom_properties: str) -> 'EventDefinitionBuilder'

Set custom properties of the Event.

Arguments:

  • custom_properties - The custom properties of the event.

Returns:

The builder.

add_definition

def add_definition(event_id: str,
                   name: str = None,
                   description: str = None) -> 'EventDefinitionBuilder'

Add new Event definition, to define properties like Name or Level, among others.

Arguments:

  • event_id - Event id. This must match the event id you use to publish event values.
  • name - Human friendly display name of the event.
  • description - Description of the event.

Returns:

Event definition builder to define the event properties.

quixstreams.builders

quixstreams.kafkastreamingclient

KafkaStreamingClient Objects

@nativedecorator
class KafkaStreamingClient(object)

A Kafka streaming client capable of creating topic consumer and producers.

__init__

def __init__(broker_address: str,
             security_options: SecurityOptions = None,
             properties: Dict[str, str] = None,
             debug: bool = False)

Initializes a new instance of the KafkaStreamingClient.

Arguments:

  • broker_address - The address of the Kafka cluster.
  • security_options - Optional security options for the Kafka client.
  • properties - Optional extra properties for broker configuration.
  • debug - Whether debugging should be enabled. Defaults to False.

get_topic_consumer

def get_topic_consumer(
    topic: str,
    consumer_group: str = None,
    commit_settings: Union[CommitOptions, CommitMode] = None,
    auto_offset_reset: AutoOffsetReset = AutoOffsetReset.Latest
) -> TopicConsumer

Gets a topic consumer capable of subscribing to receive incoming streams.

Arguments:

  • topic - The name of the topic.
  • consumer_group - The consumer group ID to use for consuming messages. Defaults to None.
  • commit_settings - The settings to use for committing. If not provided, defaults to committing every 5000 messages or 5 seconds, whichever is sooner.
  • auto_offset_reset - The offset to use when there is no saved offset for the consumer group. Defaults to AutoOffsetReset.Latest.

Returns:

  • TopicConsumer - An instance of TopicConsumer for the specified topic.

get_topic_producer

def get_topic_producer(topic: str) -> TopicProducer

Gets a topic producer capable of publishing stream messages.

Arguments:

  • topic - The name of the topic.

Returns:

  • TopicProducer - An instance of TopicProducer for the specified topic.

get_raw_topic_consumer

def get_raw_topic_consumer(
    topic: str,
    consumer_group: str = None,
    auto_offset_reset: Union[AutoOffsetReset,
                             None] = None) -> RawTopicConsumer

Gets a topic consumer capable of subscribing to receive non-quixstreams incoming messages.

Arguments:

  • topic - The name of the topic.
  • consumer_group - The consumer group ID to use for consuming messages. Defaults to None.
  • auto_offset_reset - The offset to use when there is no saved offset for the consumer group. Defaults to None.

Returns:

  • RawTopicConsumer - An instance of RawTopicConsumer for the specified topic.

get_raw_topic_producer

def get_raw_topic_producer(topic: str) -> RawTopicProducer

Gets a topic producer capable of publishing non-quixstreams messages.

Arguments:

  • topic - The name of the topic.

Returns:

  • RawTopicProducer - An instance of RawTopicProducer for the specified topic.

quixstreams.exceptions.quixapiexception

quixstreams.logging