Table of Contents
- quixstreams
- quixstreams.helpers.dotnet.datetimeconverter
- DateTimeConverter
- quixstreams.helpers.timeconverter
- TimeConverter
- quixstreams.helpers.exceptionconverter
- quixstreams.helpers.nativedecorator
- quixstreams.helpers
- quixstreams.helpers.enumconverter
- quixstreams.raw.rawtopicproducer
- RawTopicProducer
- quixstreams.raw.rawtopicconsumer
- RawTopicConsumer
- quixstreams.raw.rawmessage
- RawMessage
- quixstreams.raw
- quixstreams.configuration.securityoptions
- SecurityOptions
- quixstreams.configuration.saslmechanism
- quixstreams.configuration
- quixstreams.quixstreamingclient
- TokenValidationConfiguration
- QuixStreamingClient
- quixstreams.topicproducer
- TopicProducer
- quixstreams.models.commitoptions
- CommitOptions
- quixstreams.models.eventlevel
- quixstreams.models.timeseriesdata
- TimeseriesData
- quixstreams.models.eventdefinition
- EventDefinition
- quixstreams.models.streamproducer.streameventsproducer
- StreamEventsProducer
- quixstreams.models.streamproducer.streamtimeseriesproducer
- StreamTimeseriesProducer
- quixstreams.models.streamproducer.streampropertiesproducer
- StreamPropertiesProducer
- quixstreams.models.streamproducer.timeseriesbufferproducer
- TimeseriesBufferProducer
- quixstreams.models.streamproducer
- quixstreams.models.parameterdefinition
- ParameterDefinition
- quixstreams.models.parametervalue
- ParameterValue
- quixstreams.models.timeseriesdataraw
- TimeseriesDataRaw
- quixstreams.models.timeseriesbuffer
- TimeseriesBuffer
- __init__
- on_data_released
- on_data_released
- on_raw_released
- on_raw_released
- on_dataframe_released
- on_dataframe_released
- filter
- filter
- custom_trigger
- custom_trigger
- packet_size
- packet_size
- time_span_in_nanoseconds
- time_span_in_nanoseconds
- time_span_in_milliseconds
- time_span_in_milliseconds
- buffer_timeout
- buffer_timeout
- get_net_pointer
- quixstreams.models.streampackage
- StreamPackage
- quixstreams.models.codecsettings
- CodecSettings
- quixstreams.models.streamconsumer.streamtimeseriesconsumer
- StreamTimeseriesConsumer
- quixstreams.models.streamconsumer.streampropertiesconsumer
- StreamPropertiesConsumer
- quixstreams.models.streamconsumer.streameventsconsumer
- StreamEventsConsumer
- quixstreams.models.streamconsumer.timeseriesbufferconsumer
- TimeseriesBufferConsumer
- quixstreams.models.streamconsumer
- quixstreams.models.commitmode
- quixstreams.models.codectype
- CodecType
- quixstreams.models.netlist
- NetReadOnlyList
- NetList
- quixstreams.models.timeseriesbufferconfiguration
- TimeseriesBufferConfiguration
- quixstreams.models.netdict
- ReadOnlyNetDict
- NetDict
- quixstreams.models.timeseriesdatatimestamp
- TimeseriesDataTimestamp
- quixstreams.models.streamendtype
- quixstreams.models.eventdata
- EventData
- quixstreams.models
- quixstreams.models.autooffsetreset
- AutoOffsetReset
- quixstreams.app
- CancellationTokenSource
- App
- quixstreams.states.topicstatemanager
- TopicStateManager
- quixstreams.states.streamstatemanager
- StreamStateManager
- quixstreams.states.scalarstreamstate
- ScalarStreamState
- quixstreams.states.dictstreamstate
- DictStreamState
- quixstreams.states
- quixstreams.states.appstatemanager
- AppStateManager
- quixstreams.topicconsumer
- TopicConsumer
- quixstreams.streamconsumer
- StreamConsumer
- quixstreams.state.localfilestorage
- LocalFileStorage
- quixstreams.state.inmemorystorage
- InMemoryStorage
- quixstreams.state.statetype
- quixstreams.state.statevalue
- StateValue
- quixstreams.state.istatestorage
- IStateStorage
- quixstreams.state
- quixstreams.streamproducer
- StreamProducer
- quixstreams.builders.parameterdefinitionbuilder
- ParameterDefinitionBuilder
- quixstreams.builders.eventdatabuilder
- EventDataBuilder
- quixstreams.builders.timeseriesdatabuilder
- TimeseriesDataBuilder
- quixstreams.builders.eventdefinitionbuilder
- EventDefinitionBuilder
- quixstreams.builders
- quixstreams.kafkastreamingclient
- KafkaStreamingClient
- quixstreams.exceptions.quixapiexception
- quixstreams.logging
quixstreams
quixstreams.helpers.dotnet.datetimeconverter
DateTimeConverter Objects
datetime_to_python
Converts dotnet pointer to DateTime and frees the pointer.
Arguments:
hptr
- Handler Pointer to .Net type DateTime
Returns:
datetime.datetime: Python type datetime
datetime_to_dotnet
Arguments:
value
- Python type datetime
Returns:
ctypes.c_void_p: Handler Pointer to .Net type DateTime
timespan_to_python
Converts dotnet pointer to Timespan as binary and frees the pointer.
Arguments:
uptr
- Pointer to .Net type TimeSpan
Returns:
datetime.timedelta: Python type timedelta
timedelta_to_dotnet
Arguments:
value
- Python type timedelta
Returns:
ctypes.c_void_p: Pointer to unmanaged memory containing TimeSpan
quixstreams.helpers.timeconverter
TimeConverter Objects
A utility class for converting between different time representations.
offset_from_utc
The local time ahead of utc by this amount of nanoseconds
to_unix_nanoseconds
Converts a datetime object to UNIX timestamp in nanoseconds.
Arguments:
value
- The datetime object to be converted.
Returns:
int
- The UNIX timestamp in nanoseconds.
to_nanoseconds
Converts a timedelta object to nanoseconds.
Arguments:
value
- The timedelta object to be converted.
Returns:
int
- The duration in nanoseconds.
from_nanoseconds
Converts a duration in nanoseconds to a timedelta object.
Arguments:
value
- The duration in nanoseconds.
Returns:
timedelta
- The corresponding timedelta object.
from_unix_nanoseconds
Converts a UNIX timestamp in nanoseconds to a datetime object.
Arguments:
value
- The UNIX timestamp in nanoseconds.
Returns:
datetime
- The corresponding datetime object.
from_string
Converts a string representation of a timestamp to a UNIX timestamp in nanoseconds.
Arguments:
value
- The string representation of a timestamp.
Returns:
int
- The corresponding UNIX timestamp in nanoseconds.
quixstreams.helpers.exceptionconverter
quixstreams.helpers.nativedecorator
quixstreams.helpers
quixstreams.helpers.enumconverter
quixstreams.raw.rawtopicproducer
RawTopicProducer Objects
Class to produce raw messages into a Topic (capable of producing non-quixstreams messages)
__init__
Initializes a new instance of the RawTopicProducer class.
Arguments:
net_pointer
- Pointer to an instance of a .NET RawTopicProducer object.
publish
Publishes the given message to the associated topic producer.
Arguments:
message
- The message to be published, which can be either a RawMessage instance, bytes, or a bytearray.
quixstreams.raw.rawtopicconsumer
RawTopicConsumer Objects
Topic class to consume incoming raw messages (capable to consuming non-quixstreams messages).
__init__
Initializes a new instance of RawTopicConsumer.
Notes:
Do not initialize this class manually, use KafkaStreamingClient.get_raw_topic_consumer.
Arguments:
net_pointer
- Pointer to an instance of a .net RawTopicConsumer.
on_message_received
Gets the handler for when a topic receives a message.
Returns:
Callable[[RawTopicConsumer, RawMessage], None]: The event handler for when a topic receives a message. The first parameter is the RawTopicConsumer instance for which the message is received, and the second is the RawMessage.
on_message_received
@on_message_received.setter
def on_message_received(
value: Callable[['RawTopicConsumer', RawMessage], None]) -> None
Sets the handler for when a topic receives a message.
Arguments:
value
- The new event handler for when a topic receives a message. The first parameter is the RawTopicConsumer instance for which the message is received, and the second is the RawMessage.
on_error_occurred
Gets the handler for when a stream experiences an exception during the asynchronous write process.
Returns:
Callable[[RawTopicConsumer, BaseException], None]: The event handler for when a stream experiences an exception during the asynchronous write process. The first parameter is the RawTopicConsumer instance for which the error is received, and the second is the exception.
on_error_occurred
@on_error_occurred.setter
def on_error_occurred(
value: Callable[['RawTopicConsumer', BaseException], None]) -> None
Sets the handler for when a stream experiences an exception during the asynchronous write process.
Arguments:
value
- The new handler for when a stream experiences an exception during the asynchronous write process. The first parameter is the RawTopicConsumer instance for which the error is received, and the second is the exception.
subscribe
Starts subscribing to the streams.
quixstreams.raw.rawmessage
RawMessage Objects
The message consumed from topic without any transformation.
__init__
Initializes a new instance of RawMessage.
Arguments:
data
- The raw data to be stored in the message. Must be one of ctypes_c.void_p, bytes, or bytearray.
get_net_pointer
Gets the associated .net object pointer of the RawMessage instance.
Returns:
ctypes.c_void_p
- The .net object pointer of the RawMessage instance.
key
Gets the optional key of the message. Depending on the broker and message, it is not guaranteed.
Returns:
bytes
- The optional key of the message.
key
Sets the message key.
Arguments:
value
- The key to set for the message.
value
Gets the message value (bytes content of the message).
Returns:
Union[bytearray, bytes]: The message value (bytes content of the message).
value
Sets the message value (bytes content of the message).
Arguments:
value
- The value to set for the message.
metadata
Gets the wrapped message metadata.
Returns:
Dict[str, str]: The wrapped message metadata.
quixstreams.raw
quixstreams.configuration.securityoptions
SecurityOptions Objects
A class representing security options for configuring SSL encryption with SASL authentication in Kafka.
__init__
def __init__(ssl_certificates: str,
username: str,
password: str,
sasl_mechanism: SaslMechanism = SaslMechanism.ScramSha256)
Initializes a new instance of SecurityOptions configured for SSL encryption with SASL authentication.
Arguments:
ssl_certificates
- The path to the folder or file containing the certificate authority certificate(s) used to validate the SSL connection.Example
- "./certificates/ca.cert"username
- The username for SASL authentication.password
- The password for SASL authentication.sasl_mechanism
- The SASL mechanism to use. Defaults to ScramSha256.
get_net_pointer
Retrieves the .NET pointer for the current SecurityOptions instance.
Returns:
ctypes.c_void_p
- The .NET pointer.
quixstreams.configuration.saslmechanism
quixstreams.configuration
quixstreams.quixstreamingclient
TokenValidationConfiguration Objects
__init__
Initializes a new instance of TokenValidationConfiguration.
Arguments:
net_pointer
- Pointer to an instance of a .NET TokenValidationConfiguration.
enabled
Gets whether token validation and warnings are enabled. Defaults to true.
Returns:
bool
- True if token validation and warnings are enabled, False otherwise.
enabled
Sets whether token validation and warnings are enabled. Defaults to true.
Arguments:
value
- True to enable token validation and warnings, False to disable.
warning_before_expiry
Gets the period within which, if the token expires, a warning will be displayed. Defaults to 2 days. Set to None to disable the check.
Returns:
Union[timedelta, None]: The period within which a warning will be displayed if the token expires or None if the check is disabled.
warning_before_expiry
Sets the period within which, if the token expires, a warning will be displayed. Defaults to 2 days. Set to None to disable the check.
Arguments:
value
- The new period within which a warning will be displayed if the token expires or None to disable the check.
warn_about_pat_token
Gets whether to warn if the provided token is not a PAT token. Defaults to true.
Returns:
bool
- True if the warning is enabled, False otherwise.
warn_about_pat_token
Sets whether to warn if the provided token is not a PAT token. Defaults to true.
Arguments:
value
- True to enable the warning, False to disable.
get_net_pointer
Gets the associated .NET object pointer.
Returns:
ctypes.c_void_p
- The .NET pointer
QuixStreamingClient Objects
Streaming client for Kafka configured automatically using Environment Variables and Quix platform endpoints. Use this Client when you use this library together with Quix platform.
__init__
def __init__(token: str = None,
auto_create_topics: bool = True,
properties: Dict[str, str] = None,
debug: bool = False)
Initializes a new instance of the QuixStreamingClient capable of creating topic consumers and producers.
Arguments:
token
- The token to use when talking to Quix. If not provided, the Quix__Sdk__Token environment variable will be used. Defaults to None.auto_create_topics
- Whether topics should be auto-created if they don't exist yet. Defaults to True.properties
- Additional broker properties. Defaults to None.debug
- Whether debugging should be enabled. Defaults to False.
get_topic_consumer
def get_topic_consumer(
topic_id_or_name: str,
consumer_group: str = None,
commit_settings: Union[CommitOptions, CommitMode] = None,
auto_offset_reset: AutoOffsetReset = AutoOffsetReset.Latest
) -> TopicConsumer
Opens a topic consumer capable of subscribing to receive incoming streams.
Arguments:
topic_id_or_name
- ID or name of the topic. If name is provided, the workspace will be derived from the environment variable or token, in that order.consumer_group
- The consumer group ID to use for consuming messages. If None, the consumer group is not used, and only consuming new messages. Defaults to None.commit_settings
- The settings to use for committing. If not provided, defaults to committing every 5000 messages or 5 seconds, whichever is sooner.auto_offset_reset
- The offset to use when there is no saved offset for the consumer group. Defaults to AutoOffsetReset.Latest.
Returns:
TopicConsumer
- An instance of TopicConsumer for the specified topic.
get_topic_producer
Gets a topic producer capable of producing outgoing streams.
Arguments:
topic_id_or_name
- ID or name of the topic. If name is provided, the workspace will be derived from the environment variable or token, in that order.
Returns:
TopicProducer
- An instance of TopicProducer for the specified topic.
get_raw_topic_consumer
def get_raw_topic_consumer(
topic_id_or_name: str,
consumer_group: str = None,
auto_offset_reset: Union[AutoOffsetReset,
None] = None) -> RawTopicConsumer
Gets a topic consumer for consuming raw data from the stream.
Arguments:
topic_id_or_name
- ID or name of the topic. If name is provided, the workspace will be derived from the environment variable or token, in that order.consumer_group
- The consumer group ID to use for consuming messages. Defaults to None.auto_offset_reset
- The offset to use when there is no saved offset for the consumer group. Defaults to None.
Returns:
RawTopicConsumer
- An instance of RawTopicConsumer for the specified topic.
get_raw_topic_producer
Gets a topic producer for producing raw data to the stream.
Arguments:
topic_id_or_name
- ID or name of the topic. If name is provided, the workspace will be derived from the environment variable or token, in that order.
Returns:
RawTopicProducer
- An instance of RawTopicProducer for the specified topic.
token_validation_config
Gets the configuration for token validation.
Returns:
TokenValidationConfiguration
- The current token validation configuration.
token_validation_config
Sets the configuration for token validation.
Arguments:
value
- The new token validation configuration.
api_url
Gets the base API URI. Defaults to https://portal-api.platform.quix.ai, or environment variable Quix__Portal__Api if available.
Returns:
str
- The current base API URI.
api_url
Sets the base API URI. Defaults to https://portal-api.platform.quix.ai, or environment variable Quix__Portal__Api if available.
Arguments:
value
- The new base API URI.
cache_period
Gets the period for which some API responses will be cached to avoid an excessive amount of calls. Defaults to 1 minute.
Returns:
timedelta
- The current cache period.
cache_period
Sets the period for which some API responses will be cached to avoid an excessive amount of calls. Defaults to 1 minute.
Arguments:
value
- The new cache period.
get_net_pointer
Gets the associated .NET object pointer.
Returns:
ctypes.c_void_p
- The .NET pointer
quixstreams.topicproducer
TopicProducer Objects
Interface to operate with the streaming platform for publishing messages
__init__
Initializes a new instance of TopicProducer.
NOTE: Do not initialize this class manually, use KafkaStreamingClient.get_topic_producer to create it.
Arguments:
net_pointer
- The .net object representing a StreamingClient.
on_disposed
Gets the handler for when the topic is disposed.
Returns:
Callable[[TopicProducer], None]: The event handler for topic disposal. The first parameter is the TopicProducer instance that got disposed.
on_disposed
Sets the handler for when the topic is disposed.
Arguments:
value
- The event handler for topic disposal. The first parameter is the TopicProducer instance that got disposed.
create_stream
Create a new stream and returns the related StreamProducer to operate it.
Arguments:
stream_id
- Provide if you wish to overwrite the generated stream id. Useful if you wish to always stream a certain source into the same stream.
Returns:
StreamProducer
- The created StreamProducer instance.
get_stream
Retrieves a stream that was previously created by this instance, if the stream is not closed.
Arguments:
stream_id
- The id of the stream.
Returns:
StreamProducer
- The retrieved StreamProducer instance or None if not found.
get_or_create_stream
def get_or_create_stream(
stream_id: str,
on_stream_created: Callable[[StreamProducer],
None] = None) -> StreamProducer
Retrieves a stream that was previously created by this instance if the stream is not closed, otherwise creates a new stream.
Arguments:
stream_id
- The id of the stream you want to get or create.on_stream_created
- A callback function that takes a StreamProducer as a parameter.
Returns:
StreamProducer
- The retrieved or created StreamProducer instance.
quixstreams.models.commitoptions
CommitOptions Objects
__init__
Initializes a new instance of CommitOptions
Arguments:
net_pointer
: Pointer to an instance of a .net CommitOptions.
auto_commit_enabled
Gets whether automatic committing is enabled. If automatic committing is not enabled, other values are ignored. Default is True.
auto_commit_enabled
Sets whether automatic committing is enabled. If automatic committing is not enabled, other values are ignored. Default is True.
commit_interval
Gets the interval of automatic commit in ms. Default is 5000.
commit_interval
Sets the interval of automatic commit in ms. Default is 5000.
commit_every
Gets the number of messages to automatically commit at. Default is 5000.
commit_every
Sets the number of messages to automatically commit at. Default is 5000.
quixstreams.models.eventlevel
quixstreams.models.timeseriesdata
TimeseriesData Objects
Describes timeseries data for multiple timestamps.
__init__
Initializes a new instance of TimeseriesData.
Arguments:
net_pointer
- Pointer to an instance of a .net TimeseriesData.
clone
Initializes a new instance of timeseries data with parameters matching the filter if one is provided.
Arguments:
parameter_filter
- The parameter filter. If one is provided, only parameters present in the list will be cloned.
Returns:
TimeseriesData
- A new instance of TimeseriesData with filtered parameters.
add_timestamp
Start adding a new set of parameters and their tags at the specified time.
Arguments:
time
- The time to use for adding new event values. | datetime: The datetime to use for adding new event values. Epoch will never be added to this | timedelta: The time since the default epoch to add the event values at
Returns:
TimeseriesDataTimestamp
- A new TimeseriesDataTimestamp instance.
add_timestamp_milliseconds
Start adding a new set of parameters and their tags at the specified time.
Arguments:
milliseconds
- The time in milliseconds since the default epoch to add the event values at.
Returns:
TimeseriesDataTimestamp
- A new TimeseriesDataTimestamp instance.
add_timestamp_nanoseconds
Start adding a new set of parameters and their tags at the specified time.
Arguments:
nanoseconds
- The time in nanoseconds since the default epoch to add the event values at.
Returns:
TimeseriesDataTimestamp
- A new TimeseriesDataTimestamp instance.
timestamps
Gets the data as rows of TimeseriesDataTimestamp.
Returns:
List[TimeseriesDataTimestamp]
- A list of TimeseriesDataTimestamp instances.
timestamps
Sets the data as rows of TimeseriesDataTimestamp.
Arguments:
timestamp_list
- A list of TimeseriesDataTimestamp instances to set.
to_dataframe
Converts TimeseriesData to pandas DataFrame.
Returns:
pd.DataFrame
- Converted pandas DataFrame.
from_panda_dataframe
@staticmethod
def from_panda_dataframe(data_frame: pd.DataFrame,
epoch: int = 0) -> 'TimeseriesData'
Converts pandas DataFrame to TimeseriesData.
Arguments:
data_frame
- The pandas DataFrame to convert to TimeseriesData.epoch
- The epoch to add to each time value when converting to TimeseriesData. Defaults to 0.
Returns:
TimeseriesData
- Converted TimeseriesData instance.
get_net_pointer
Gets the .net pointer of the current instance.
Returns:
ctypes.c_void_p
- The .net pointer of the current instance
quixstreams.models.eventdefinition
EventDefinition Objects
Describes additional context for the event
__init__
Initializes a new instance of EventDefinition
NOTE: Do not initialize this class manually. Instances of it are available on StreamEventsConsumer.definitions
Arguments:
net_pointer
: Pointer to an instance of a .net EventDefinition
quixstreams.models.streamproducer.streameventsproducer
StreamEventsProducer Objects
Helper class for producing EventDefinitions and EventData.
__init__
Initializes a new instance of StreamEventsProducer.
Arguments:
net_pointer
- Pointer to an instance of a .NET StreamEventsProducer.
flush
Immediately publishes the event definitions from the buffer without waiting for buffer condition to fulfill (200ms timeout). TODO: Verify 200ms timeout value.
default_tags
Gets default tags injected to all event values sent by the producer.
default_location
Gets the default Location of the events. Event definitions added with add_definition will be inserted at this location. See add_location for adding definitions at a different location without changing default. Example: "/Group1/SubGroup2"
default_location
Sets the default Location of the events. Event definitions added with add_definition will be inserted at this location. See add_location for adding definitions at a different location without changing default.
Arguments:
value
- Location string, e.g., "/Group1/SubGroup2".
epoch
The unix epoch from, which all other timestamps in this model are measured from in nanoseconds.
epoch
Sets the default epoch used for event values.
publish
Publish an event into the stream.
Arguments:
data
- EventData object or a pandas dataframe.columns
- Column names if the dataframe has different columns from 'id', 'timestamp', and 'value'. For instance, if 'id' is in the column 'event_id', id='event_id' must be passed as an argument.
Raises:
TypeError
- If the data argument is neither an EventData nor pandas dataframe.
add_timestamp
Start adding a new set of event values at the given timestamp.
Arguments:
time
- The time to use for adding new event values.- datetime: The datetime to use for adding new event values. NOTE, epoch is not used.
- timedelta: The time since the default epoch to add the event values at
Returns:
EventDataBuilder
- Event data builder to add event values at the provided time.
add_timestamp_milliseconds
Start adding a new set of event values at the given timestamp.
Arguments:
milliseconds
- The time in milliseconds since the default epoch to add the event values at.
Returns:
EventDataBuilder
- Event data builder to add event values at the provided time.
add_timestamp_nanoseconds
Start adding a new set of event values at the given timestamp.
Arguments:
nanoseconds
- The time in nanoseconds since the default epoch to add the event values at.
Returns:
EventDataBuilder
- Event data builder to add event values at the provided time.
add_definition
def add_definition(event_id: str,
name: str = None,
description: str = None) -> EventDefinitionBuilder
Add new Event definition to define properties like Name or Level, among others.
Arguments:
event_id
- The id of the event. Must match the event id used to send data.name
- The human-friendly display name of the event.description
- The description of the event.
Returns:
EventDefinitionBuilder
- EventDefinitionBuilder to define properties of the event or add additional events.
add_location
Add a new location in the events groups hierarchy.
Arguments:
location
- The group location.
Returns:
EventDefinitionBuilder
- EventDefinitionBuilder to define the events under the specified location.
quixstreams.models.streamproducer.streamtimeseriesproducer
StreamTimeseriesProducer Objects
Helper class for producing ParameterDefinition and TimeseriesData.
__init__
Initializes a new instance of StreamTimeseriesProducer.
Arguments:
stream_producer
- The Stream producer which owns this stream timeseries producer.net_pointer
- Pointer to an instance of a .net StreamTimeseriesProducer.
flush
Immediately publish timeseries data and definitions from the buffer without waiting for buffer condition to fulfill for either.
add_definition
def add_definition(parameter_id: str,
name: str = None,
description: str = None) -> ParameterDefinitionBuilder
Add new parameter definition to the StreamTimeseriesProducer. Configure it with the builder methods.
Arguments:
parameter_id
- The id of the parameter. Must match the parameter id used to send data.name
- The human friendly display name of the parameter.description
- The description of the parameter.
Returns:
ParameterDefinitionBuilder
- Builder to define the parameter properties.
add_location
Add a new location in the parameters groups hierarchy.
Arguments:
location
- The group location.
Returns:
ParameterDefinitionBuilder
- Builder to define the parameters under the specified location.
default_location
Gets the default location of the parameters. Parameter definitions added with add_definition will be inserted at this location. See add_location for adding definitions at a different location without changing default.
Returns:
str
- The default location of the parameters, e.g., "/Group1/SubGroup2".
default_location
Sets the default location of the parameters. Parameter definitions added with add_definition will be inserted at this location. See add_location for adding definitions at a different location without changing default.
Arguments:
value
- The new default location of the parameters, e.g., "/Group1/SubGroup2".
buffer
Get the buffer for producing timeseries data.
Returns:
TimeseriesBufferProducer
- The buffer for producing timeseries data.
publish
def publish(
packet: Union[TimeseriesData, pd.DataFrame, TimeseriesDataRaw,
TimeseriesDataTimestamp]
) -> None
Publish the given packet to the stream without any buffering.
Arguments:
packet
- The packet containing TimeseriesData, TimeseriesDataRaw, TimeseriesDataTimestamp, or pandas DataFrame.
Notes:
- Pandas DataFrame should contain 'time' label, else the first integer label will be taken as time.
- Tags should be prefixed by TAG__ or they will be treated as parameters.
Examples:
Send a pandas DataFrame:
pdf = pandas.DataFrame({'time': [1, 5],
- 'panda_param'
- [123.2, 5]})
instance.publish(pdf)
Send a pandas DataFrame with multiple values:
pdf = pandas.DataFrame({'time': [1, 5, 10],
- 'panda_param'
- [123.2, None, 12],
- 'panda_param2'
- ["val1", "val2", None]})
instance.publish(pdf)
Send a pandas DataFrame with tags:
pdf = pandas.DataFrame({'time': [1, 5, 10],
- 'panda_param'
- [123.2, 5, 12],
- 'TAG__Tag1'
- ["v1", 2, None],
- 'TAG__Tag2'
- [1, None, 3]})
instance.publish(pdf)
Raises:
Exception
- If the given type is not supported for publishing.
quixstreams.models.streamproducer.streampropertiesproducer
StreamPropertiesProducer Objects
Represents properties and metadata of the stream. All changes to these properties are automatically published to the underlying stream.
__init__
Initializes a new instance of StreamPropertiesProducer.
Arguments:
net_pointer
- Pointer to an instance of a .net StreamPropertiesProducer.
name
Gets the human friendly name of the stream.
Returns:
str
- The human friendly name of the stream.
name
Sets the human friendly name of the stream.
Arguments:
value
- The new human friendly name of the stream.
location
Gets the location of the stream in the data catalogue.
Returns:
str
- The location of the stream in the data catalogue, e.g., "/cars/ai/carA/".
location
Sets the location of the stream in the data catalogue.
Arguments:
value
- The new location of the stream in the data catalogue.
metadata
" Gets the metadata of the stream.
Returns:
Dict[str, str]: The metadata of the stream.
parents
Gets the list of stream ids of the parent streams.
Returns:
List[str]
- The list of stream ids of the parent streams.
time_of_recording
Gets the datetime of the stream recording.
Returns:
datetime
- The datetime of the stream recording.
time_of_recording
Sets the time of the stream recording.
Arguments:
value
- The new time of the stream recording.
flush_interval
Gets the automatic flush interval of the properties metadata into the channel (in milliseconds).
Returns:
int
- The automatic flush interval in milliseconds, default is 30000.
flush_interval
Sets the automatic flush interval of the properties metadata into the channel (in milliseconds).
Arguments:
value
- The new flush interval in milliseconds.
flush
Immediately publishes the properties yet to be sent instead of waiting for the flush timer (20ms).
quixstreams.models.streamproducer.timeseriesbufferproducer
TimeseriesBufferProducer Objects
A class for producing timeseries data to a StreamProducer in a buffered manner.
__init__
Initializes a new instance of TimeseriesBufferProducer. NOTE: Do not initialize this class manually, use StreamTimeseriesProducer.buffer to access an instance of it
Arguments:
stream_producer
- The Stream producer which owns this timeseries buffer producernet_pointer
- Pointer to an instance of a .net TimeseriesBufferProducer
Raises:
Exception
- If TimeseriesBufferProducer is None
default_tags
Get default tags injected for all parameters values sent by this buffer.
Returns:
Dict[str, str]: A dictionary containing the default tags
epoch
Get the default epoch used for parameter values.
Returns:
datetime
- The default epoch used for parameter values
epoch
Set the default epoch used for parameter values. Datetime added on top of all the Timestamps.
Arguments:
value
- The default epoch to set for parameter values
add_timestamp
Start adding a new set of parameter values at the given timestamp.
Arguments:
time
- The time to use for adding new parameter values.- datetime: The datetime to use for adding new parameter values. NOTE, epoch is not used
- timedelta: The time since the default epoch to add the parameter values at
Returns:
TimeseriesDataBuilder
- A TimeseriesDataBuilder instance for adding parameter values
Raises:
ValueError
- If 'time' is None or not an instance of datetime or timedelta
add_timestamp_nanoseconds
Start adding a new set of parameter values at the given timestamp.
Arguments:
nanoseconds
- The time in nanoseconds since the default epoch to add the parameter values at
Returns:
TimeseriesDataBuilder
- A TimeseriesDataBuilder instance for adding parameter values
flush
Immediately publishes the data from the buffer without waiting for the buffer condition to be fulfilled.
publish
def publish(
packet: Union[TimeseriesData, pd.DataFrame, TimeseriesDataRaw,
TimeseriesDataTimestamp]
) -> None
Publish the provided timeseries packet to the buffer.
Arguments:
packet
- The packet containing TimeseriesData, TimeseriesDataRaw, TimeseriesDataTimestamp, or pandas DataFrame.- packet type panda.DataFrame:
- Note 1: panda data frame should contain 'time' label, else the first integer label will be taken as time.
- Note 2: Tags should be prefixed by TAG__ or they will be treated as timeseries parameters
Examples:
Send a panda data frame:
pdf = panda.DataFrame({'time': [1, 5],
- 'panda_param'
- [123.2, 5]})
instance.publish(pdf)
Send a panda data frame with multiple values:
pdf = panda.DataFrame({'time': [1, 5, 10],
- 'panda_param'
- [123.2, None, 12],
- 'panda_param2'
- ["val1", "val2", None]})
instance.publish(pdf)
Send a panda data frame with tags:
pdf = panda.DataFrame({'time': [1, 5, 10],
- 'panda_param'
- [123.2, 5, 12],,
- 'TAG__Tag1'
- ["v1", 2, None],
- 'TAG__Tag2'
- [1, None, 3]})
instance.publish(pdf)
Raises:
Exception
- If the packet type is not supported
quixstreams.models.streamproducer
quixstreams.models.parameterdefinition
ParameterDefinition Objects
Describes additional context for the parameter
__init__
Initializes a new instance of ParameterDefinition
NOTE: Do not initialize this class manually. Instances of it are available on StreamTimeseriesConsumer.definitions
Arguments:
net_pointer
: Pointer to an instance of a .net ParameterDefinition.
quixstreams.models.parametervalue
ParameterValue Objects
Represents a single parameter value of either numeric, string, or binary type.
__init__
Initializes a new instance of ParameterValue.
Arguments:
net_pointer
- The .net object pointer representing ParameterValue.
numeric_value
Gets the numeric value of the parameter if the underlying parameter is of numeric type.
numeric_value
Sets the numeric value of the parameter and updates the type to numeric.
Arguments:
value
- The numeric value to set.
string_value
Gets the string value of the parameter if the underlying parameter is of string type.
string_value
Sets the string value of the parameter and updates the type to string.
Arguments:
value
- The string value to set.
binary_value
Gets the binary value of the parameter if the underlying parameter is of binary type.
binary_value
Sets the binary value of the parameter and updates the type to binary.
Arguments:
value
- The binary value to set.
type
Gets the type of value, which is numeric, string, binary if set, otherwise empty
value
Gets the underlying value.
get_net_pointer
Gets the associated .net object pointer.
quixstreams.models.timeseriesdataraw
TimeseriesDataRaw Objects
Describes timeseries data in a raw format for multiple timestamps. Class is intended for read only.
__init__
Initializes a new instance of TimeseriesDataRaw.
Arguments:
net_pointer
- Pointer to an instance of a .net TimeseriesDataRaw. Defaults to None.
to_dataframe
Converts TimeseriesDataRaw to pandas DataFrame.
Returns:
pd.DataFrame
- Converted pandas DataFrame.
from_dataframe
Converts from pandas DataFrame to TimeseriesDataRaw.
Arguments:
data_frame
- The pandas DataFrame to convert to TimeseriesData.epoch
- The epoch to add to each time value when converting to TimeseriesData. Defaults to 0.
Returns:
TimeseriesDataRaw
- Converted TimeseriesDataRaw.
set_values
def set_values(epoch: int, timestamps: List[int],
numeric_values: Dict[str, List[float]],
string_values: Dict[str, List[str]],
binary_values: Dict[str,
List[bytes]], tag_values: Dict[str,
List[str]])
Sets the values of the timeseries data from the provided dictionaries. Dictionary values are matched by index to the provided timestamps.
Arguments:
epoch
- The time from which all timestamps are measured from.timestamps
- The timestamps of values in nanoseconds since epoch as an array.numeric_values
- The numeric values where the dictionary key is the parameter name and the value is the array of values.string_values
- The string values where the dictionary key is the parameter name and the value is the array of values.binary_values
- The binary values where the dictionary key is the parameter name and the value is the array of values.tag_values
- The tag values where the dictionary key is the parameter name and the value is the array of values.
epoch
The Unix epoch from which all other timestamps in this model are measured, in nanoseconds.
Returns:
int
- The Unix epoch (01/01/1970) in nanoseconds.
timestamps
The timestamps of values in nanoseconds since the epoch. Timestamps are matched by index to numeric_values, string_values, binary_values, and tag_values.
Returns:
List[int]
- A list of timestamps in nanoseconds since the epoch.
numeric_values
The numeric values for parameters. The key is the parameter ID the values belong to. The value is the numerical values of the parameter. Values are matched by index to timestamps.
Returns:
Dict[str, List[Optional[float]]]: A dictionary mapping parameter IDs to lists of numerical values.
string_values
The string values for parameters. The key is the parameter ID the values belong to. The value is the string values of the parameter. Values are matched by index to timestamps.
Returns:
Dict[str, List[str]]: A dictionary mapping parameter IDs to lists of string values
binary_values
The binary values for parameters. The key is the parameter ID the values belong to. The value is the binary values of the parameter. Values are matched by index to timestamps.
Returns:
Dict[str, List[bytes]]: A dictionary mapping parameter IDs to lists of bytes values
tag_values
The tag values for parameters. The key is the parameter ID the values belong to. The value is the tag values of the parameter. Values are matched by index to timestamps.
Returns:
Dict[str, List[str]]: A dictionary mapping parameter IDs to lists of string values
convert_to_timeseriesdata
Converts TimeseriesDataRaw to TimeseriesData
quixstreams.models.timeseriesbuffer
TimeseriesBuffer Objects
Represents a class used to consume and produce stream messages in a buffered manner. When buffer conditions are not configured, it acts a pass-through, raising each message as arrives.
__init__
Initializes a new instance of TimeseriesBuffer.
NOTE: Do not initialize this class manually, use StreamProducer.timeseries.buffer to create it.
Arguments:
stream
- The stream the buffer is created for.net_pointer
- Pointer to a .net TimeseriesBuffer object.
on_data_released
@property
def on_data_released() -> Callable[
[Union['StreamConsumer', 'StreamProducer'], TimeseriesData], None]
Gets the handler for when the stream receives data.
Returns:
Callable[[Union['StreamConsumer', 'StreamProducer'], TimeseriesData], None]: The event handler. The first parameter is the stream the data is received for, second is the data in TimeseriesData format.
on_data_released
@on_data_released.setter
def on_data_released(
value: Callable[
[Union['StreamConsumer', 'StreamProducer'], TimeseriesData], None]
) -> None
Sets the handler for when the stream receives data.
Arguments:
value
- The event handler. The first parameter is the stream the data is received for, second is the data in TimeseriesData format.
on_raw_released
@property
def on_raw_released() -> Callable[
[Union['StreamConsumer', 'StreamProducer'], TimeseriesDataRaw], None]
Gets the handler for when the stream receives raw data.
Returns:
Callable[[Union['StreamConsumer', 'StreamProducer'], TimeseriesDataRaw], None]: The event handler. The first parameter is the stream the data is received for, second is the data in TimeseriesDataRaw format.
on_raw_released
@on_raw_released.setter
def on_raw_released(
value: Callable[
[Union['StreamConsumer', 'StreamProducer'], TimeseriesDataRaw], None]
) -> None
Sets the handler for when the stream receives raw data.
Arguments:
value
- The event handler. The first parameter is the stream the data is received for, second is the data in TimeseriesDataRaw format.
on_dataframe_released
@property
def on_dataframe_released() -> Callable[
[Union['StreamConsumer', 'StreamProducer'], pandas.DataFrame], None]
Gets the handler for when the stream receives data as a pandas DataFrame.
Returns:
Callable[[Union['StreamConsumer', 'StreamProducer'], pandas.DataFrame], None]: The event handler. The first parameter is the stream the data is received for, second is the data in pandas.DataFrame format.
on_dataframe_released
@on_dataframe_released.setter
def on_dataframe_released(
value: Callable[
[Union['StreamConsumer', 'StreamProducer'], pandas.DataFrame], None]
) -> None
Sets the handler for when the stream receives data as a pandas DataFrame.
Arguments:
value
- The event handler. The first parameter is the stream the data is received for, second is the data in pandas.DataFrame format.
filter
Gets the custom function to filter the incoming data before adding it to the buffer. If returns true, data is added otherwise not. Defaults to none (disabled).
filter
Sets the custom function to filter incoming data before adding to the buffer.
The custom function takes a TimeseriesDataTimestamp object as input and returns a boolean value. If the function returns True, the data is added to the buffer, otherwise not. By default, this feature is disabled (None).
Arguments:
value
- Custom filter function.
custom_trigger
Gets the custom trigger function, which is invoked after adding a new timestamp to the buffer.
If the custom trigger function returns True, the buffer releases content and triggers relevant callbacks. By default, this feature is disabled (None).
Returns:
Callable[[TimeseriesData], bool]: Custom trigger function.
custom_trigger
Sets the custom trigger function, which is invoked after adding a new timestamp to the buffer.
If the custom trigger function returns True, the buffer releases content and triggers relevant callbacks. By default, this feature is disabled (None).
Arguments:
value
- Custom trigger function.
packet_size
Gets the maximum packet size in terms of values for the buffer.
Each time the buffer has this amount of data, a callback method, such as on_data_released
,
is invoked, and the data is cleared from the buffer. By default, this feature
is disabled (None).
Returns:
Optional[int]
- Maximum packet size for the buffer.
packet_size
Sets the maximum packet size in terms of values for the buffer.
Each time the buffer has this amount of data, a callback method, such as on_data_released
,
is invoked, and the data is cleared from the buffer. By default, this feature
is disabled (None).
Arguments:
value
- Maximum packet size for the buffer.
time_span_in_nanoseconds
Gets the maximum time between timestamps for the buffer in nanoseconds.
When the difference between the earliest and latest buffered timestamp surpasses
this number, a callback method, such as on_data_released
, is invoked, and the data is cleared
from the buffer. By default, this feature is disabled (None).
Returns:
Optional[int]
- Maximum time between timestamps in nanoseconds.
time_span_in_nanoseconds
Sets the maximum time between timestamps for the buffer in nanoseconds.
When the difference between the earliest and latest buffered timestamp surpasses
this number, a callback method, such as on_data_released
, is invoked, and the data is cleared
from the buffer. By default, this feature is disabled (None).
Arguments:
value
- Maximum time between timestamps in nanoseconds.
time_span_in_milliseconds
Gets the maximum time between timestamps for the buffer in milliseconds.
This property retrieves the maximum time between the earliest and latest buffered timestamp in milliseconds. If the difference surpasses this number, a callback method, such as on_data_released, is invoked, and the data is cleared from the buffer. Note that this property is a millisecond converter on top of time_span_in_nanoseconds, and both work with the same underlying value. Defaults to None (disabled).
Returns:
Optional[int]
- The maximum time difference between timestamps in milliseconds, or None if disabled.
time_span_in_milliseconds
Sets the maximum time between timestamps for the buffer in milliseconds.
This property sets the maximum time between the earliest and latest buffered timestamp in milliseconds. If the difference surpasses this number, a callback method, such as on_data_released, is invoked, and the data is cleared from the buffer. Note that this property is a millisecond converter on top of time_span_in_nanoseconds, and both work with the same underlying value. Defaults to None (disabled).
Arguments:
value
- The maximum time difference between timestamps in milliseconds, or None to disable.
buffer_timeout
Gets the maximum duration in milliseconds for which the buffer will be held. When the configured value has elapsed or other buffer conditions are met, a callback method, such as on_data_released, is invoked. Defaults to None (disabled).
Returns:
Optional[int]
- The maximum duration in milliseconds before invoking a callback method, or None if disabled.
buffer_timeout
Sets the maximum duration in milliseconds for which the buffer will be held. When the configured value has elapsed or other buffer conditions are met, a callback method, such as on_data_released, is invoked. Defaults to None (disabled).
Arguments:
value
- The maximum duration in milliseconds before invoking a callback method, or None to disable.
get_net_pointer
Gets the associated .net object pointer.
Returns:
ctypes.c_void_p
- The .net object pointer.
quixstreams.models.streampackage
StreamPackage Objects
Default model implementation for non-typed message packages of the Telemetry layer. It holds a value and its type.
__init__
Initializes a new instance of StreamPackage.
Notes:
Do not initialize this class manually. Will be initialized by StreamConsumer.on_package_received.
Arguments:
net_pointer
- Pointer to an instance of a .net StreamPackage.
transport_context
Context holder for package when transporting through the pipeline.
to_json
Serialize the package into JSON.
Returns:
str
- The serialized JSON string of the package.
get_net_pointer
Gets the associated .net object pointer.
Returns:
ctypes.c_void_p
- The .net object pointer.
quixstreams.models.codecsettings
CodecSettings Objects
Global Codec settings for streams.
set_global_codec_type
Sets the codec type to be used by producers and transfer package value serialization.
quixstreams.models.streamconsumer.streamtimeseriesconsumer
StreamTimeseriesConsumer Objects
Consumer for streams, which raises TimeseriesData and ParameterDefinitions related messages
__init__
Initializes a new instance of StreamTimeseriesConsumer. NOTE: Do not initialize this class manually. Use StreamConsumer.timeseries to access an instance of it.
Arguments:
stream_consumer
- The Stream consumer which owns this stream event consumer.net_pointer
.net object - Pointer to an instance of a .net StreamTimeseriesConsumer.
on_data_received
Gets the handler for when data is received (without buffering).
Returns:
Callable[['StreamConsumer', TimeseriesData], None]: The function that handles the data received. The first parameter is the stream that receives the data, and the second is the data in TimeseriesData format.
on_data_received
@on_data_received.setter
def on_data_received(
value: Callable[['StreamConsumer', TimeseriesData], None]) -> None
Sets the handler for when data is received (without buffering).
Arguments:
value
- The function that handles the data received. The first parameter is the stream that receives the data, and the second is the data in TimeseriesData format.
on_raw_received
Gets the handler for when data is received (without buffering) in raw transport format.
Returns:
Callable[['StreamConsumer', TimeseriesDataRaw], None]: The function that handles the data received. The first parameter is the stream that receives the data, and the second is the data in TimeseriesDataRaw format.
on_raw_received
@on_raw_received.setter
def on_raw_received(
value: Callable[['StreamConsumer', TimeseriesDataRaw], None]) -> None
Sets the handler for when data is received (without buffering) in raw transport format.
Arguments:
value
- The function that handles the data received. The first parameter is the stream that receives the data, and the second is the data in TimeseriesDataRaw format.
on_dataframe_received
Gets the handler for when data is received (without buffering) in pandas DataFrame format.
Returns:
Callable[['StreamConsumer', pandas.DataFrame], None]: The function that handles the data received. The first parameter is the stream that receives the data, and the second is the data in pandas DataFrame format.
on_dataframe_received
@on_dataframe_received.setter
def on_dataframe_received(
value: Callable[['StreamConsumer', pandas.DataFrame], None]) -> None
Sets the handler for when data is received (without buffering) in pandas DataFrame format.
Arguments:
value
- The function that handles the data received. The first parameter is the stream that receives the data, and the second is the data in pandas DataFrame format.
on_definitions_changed
Gets the handler for when the parameter definitions have changed for the stream.
Returns:
Callable[['StreamConsumer'], None]: The function that handles the parameter definitions change. The first parameter is the stream for which the parameter definitions changed.
on_definitions_changed
@on_definitions_changed.setter
def on_definitions_changed(value: Callable[['StreamConsumer'], None]) -> None
Sets the handler for when the parameter definitions have changed for the stream.
Arguments:
value
- The function that handles the parameter definitions change. The first parameter is the stream for which the parameter definitions changed.
definitions
Gets the latest set of parameter definitions.
create_buffer
def create_buffer(
*parameter_filter: str,
buffer_configuration: TimeseriesBufferConfiguration = None
) -> TimeseriesBufferConsumer
Creates a new buffer for consuming data according to the provided parameter_filter and buffer_configuration.
Arguments:
parameter_filter
- Zero or more parameter identifiers to filter as a whitelist. If provided, only those parameters will be available through this buffer.buffer_configuration
- An optional TimeseriesBufferConfiguration.
Returns:
TimeseriesBufferConsumer
- An consumer that will raise new data consumed via the on_data_released event.
get_net_pointer
Gets the .NET pointer for the StreamTimeseriesConsumer instance.
Returns:
ctypes.c_void_p
- .NET pointer for the StreamTimeseriesConsumer instance.
quixstreams.models.streamconsumer.streampropertiesconsumer
StreamPropertiesConsumer Objects
Represents properties and metadata of the stream. All changes to these properties are automatically populated to this class.
__init__
Initializes a new instance of StreamPropertiesConsumer. NOTE: Do not initialize this class manually, use StreamConsumer.properties to access an instance of it.
Arguments:
stream_consumer
- The Stream consumer that owns this stream event consumer.net_pointer
- Pointer to an instance of a .NET StreamPropertiesConsumer.
on_changed
Gets the handler for when the stream properties change.
Returns:
Callable[[StreamConsumer], None]: The event handler for stream property changes. The first parameter is the StreamConsumer instance for which the change is invoked.
on_changed
Sets the handler for when the stream properties change.
Arguments:
value
- The first parameter is the stream it is invoked for.
name
Gets the name of the stream.
location
Gets the location of the stream.
time_of_recording
Gets the datetime of the recording.
metadata
Gets the metadata of the stream.
parents
Gets the list of Stream IDs for the parent streams.
get_net_pointer
Gets the .NET pointer for the StreamPropertiesConsumer instance.
Returns:
ctypes.c_void_p
- .NET pointer for the StreamPropertiesConsumer instance.
quixstreams.models.streamconsumer.streameventsconsumer
StreamEventsConsumer Objects
Consumer for streams, which raises EventData and EventDefinitions related messages
__init__
Initializes a new instance of StreamEventsConsumer. NOTE: Do not initialize this class manually, use StreamConsumer.events to access an instance of it
Arguments:
stream_consumer
- The Stream consumer which owns this stream event consumernet_pointer
- Pointer to an instance of a .net StreamEventsConsumer
on_data_received
Gets the handler for when an events data package is received for the stream.
Returns:
Callable[['StreamConsumer', EventData], None]: The first parameter is the stream the event is received for. The second is the event.
on_data_received
@on_data_received.setter
def on_data_received(
value: Callable[['StreamConsumer', EventData], None]) -> None
Sets the handler for when an events data package is received for the stream.
Arguments:
value
- The first parameter is the stream the event is received for. The second is the event.
on_definitions_changed
Gets the handler for event definitions have changed for the stream.
Returns:
Callable[['StreamConsumer'], None]: The first parameter is the stream the event definitions changed for.
on_definitions_changed
@on_definitions_changed.setter
def on_definitions_changed(value: Callable[['StreamConsumer'], None]) -> None
Sets the handler for event definitions have changed for the stream.
Arguments:
value
- The first parameter is the stream the event definitions changed for.
definitions
Gets the latest set of event definitions.
quixstreams.models.streamconsumer.timeseriesbufferconsumer
TimeseriesBufferConsumer Objects
Represents a class for consuming data from a stream in a buffered manner.
__init__
Initializes a new instance of TimeseriesBufferConsumer.
NOTE: Do not initialize this class manually, use StreamTimeseriesConsumer.create_buffer to create it.
Arguments:
stream_consumer
- The Stream consumer which owns this timeseries buffer consumer.net_pointer
- Pointer to an instance of a .net TimeseriesBufferConsumer. Defaults to None.
Raises:
Exception
- If net_pointer is None.
get_net_pointer
Retrieves the pointer to the .net TimeseriesBufferConsumer instance.
Returns:
ctypes.c_void_p
- The pointer to the .net TimeseriesBufferConsumer instance.
quixstreams.models.streamconsumer
quixstreams.models.commitmode
quixstreams.models.codectype
CodecType Objects
Codecs available for serialization and deserialization of streams.
quixstreams.models.netlist
NetReadOnlyList Objects
Experimental. Acts as a proxy between a .net collection and a python list. Useful if .net collection is observable and reacts to changes
NetList Objects
Experimental. Acts as a proxy between a .net collection and a python list. Useful if .net collection is observable and reacts to changes
constructor_for_string
Creates an empty dotnet list for strings if no pointer provided, else wraps in NetDict with string converters
quixstreams.models.timeseriesbufferconfiguration
TimeseriesBufferConfiguration Objects
Describes the configuration for timeseries buffers When buffer conditions are not configured, it acts a pass-through, raising each message as arrives.
__init__
Initializes a new instance of TimeseriesBufferConfiguration.
Arguments:
net_pointer
- Can be ignored, here for internal purposes .net object: The .net object representing a TimeseriesBufferConfiguration. Defaults to None.
packet_size
Gets the maximum packet size in terms of values for the buffer.
When the buffer reaches this number of values, a callback method, such as on_data_released, is invoked and the buffer is cleared. If not set, defaults to None (disabled).
Returns:
Optional[int]
- The maximum packet size in values or None if disabled.
packet_size
Sets the maximum packet size in terms of values for the buffer.
When the buffer reaches this number of values, a callback method, such as on_data_released, is invoked and the buffer is cleared. If not set, defaults to None (disabled).
Arguments:
value
- The maximum packet size in values or None to disable.
time_span_in_nanoseconds
Gets the maximum time difference between timestamps in the buffer, in nanoseconds.
When the difference between the earliest and latest buffered timestamp exceeds this value, a callback method, such as on_data_released, is invoked and the data is cleared from the buffer. If not set, defaults to None (disabled).
Returns:
Optional[int]
- The maximum time span in nanoseconds or None if disabled.
time_span_in_nanoseconds
Sets the maximum time difference between timestamps in the buffer, in nanoseconds.
When the difference between the earliest and latest buffered timestamp exceeds this value, a callback method, such as on_data_released, is invoked and the data is cleared from the buffer. If not set, defaults to None (disabled).
Arguments:
value
- The maximum time span in nanoseconds or None to disable.
time_span_in_milliseconds
Gets the maximum time difference between timestamps in the buffer, in milliseconds.
When the difference between the earliest and latest buffered timestamp exceeds this value, a callback method, such as on_data_released, is invoked and the data is cleared from the buffer. If not set, defaults to None (disabled).
Note: This is a millisecond converter on top of time_span_in_nanoseconds. They both work with the same underlying value.
Returns:
Optional[int]
- The maximum time span in milliseconds or None if disabled.
time_span_in_milliseconds
Sets the maximum time difference between timestamps in the buffer, in milliseconds.
When the difference between the earliest and latest buffered timestamp exceeds this value, a callback method, such as on_data_released, is invoked and the data is cleared from the buffer. If not set, defaults to None (disabled).
Arguments:
value
- The maximum time span in nanoseconds or None to disable.
buffer_timeout
Gets the maximum duration for which the buffer will be held before releasing the events through callbacks, such as on_data_released.
A callback will be invoked once the configured value has elapsed or other buffer conditions are met. If not set, defaults to None (disabled).
Returns:
Optional[int]
- The maximum buffer timeout in milliseconds or None if disabled.
buffer_timeout
Sets the maximum duration for which the buffer will be held before releasing the events through callbacks, such as on_data_released.
A callback will be invoked once the configured value has elapsed or other buffer conditions are met. If not set, defaults to None (disabled).
Arguments:
value
- The maximum buffer timeout in milliseconds or None to disable.
custom_trigger_before_enqueue
Gets the custom function that is called before adding a timestamp to the buffer.
If the function returns True, the buffer releases content and triggers relevant callbacks before adding the timestamp to the buffer. If not set, defaults to None (disabled).
Returns:
Callable[[TimeseriesDataTimestamp], bool]: The custom function or None if disabled.
custom_trigger_before_enqueue
@custom_trigger_before_enqueue.setter
def custom_trigger_before_enqueue(value: Callable[[TimeseriesDataTimestamp],
bool])
Sets the custom function that is called before adding a timestamp to the buffer.
If the function returns True, the buffer releases content and triggers relevant callbacks before adding the timestamp to the buffer. If not set, defaults to None (disabled).
Arguments:
value
- The custom function or None to disable.
filter
Gets the custom function used to filter incoming data before adding it to the buffer.
If the function returns True, the data is added to the buffer; otherwise, it is not. If not set, defaults to None (disabled).
Returns:
Callable[[TimeseriesDataTimestamp], bool]: The custom filter function or None if disabled.
filter
Sets the custom function used to filter incoming data before adding it to the buffer.
If the function returns True, the data is added to the buffer; otherwise, it is not. If not set, defaults to None (disabled).
Arguments:
value
- The custom filter function or None to disable.
custom_trigger
Gets the custom function that is called after adding a new timestamp to the buffer.
If the function returns True, the buffer releases content and triggers relevant callbacks. If not set, defaults to None (disabled).
Returns:
Callable[[TimeseriesData], bool]: The custom trigger function or None if disabled.
custom_trigger
Sets the custom function that is called after adding a new timestamp to the buffer.
If the function returns True, the buffer releases content and triggers relevant callbacks. If not set, defaults to None (disabled).
Arguments:
value
- The custom trigger function or None to disable.
get_net_pointer
Returns the .net pointer for the TimeseriesBufferConfiguration object.
Returns:
ctypes.c_void_p
- The .net pointer for the TimeseriesBufferConfiguration object.
quixstreams.models.netdict
ReadOnlyNetDict Objects
Experimental. Acts as a proxy between a .net dictionary and a python dict. Useful if .net dictionary is observable and reacts to changes
NetDict Objects
Experimental. Acts as a proxy between a .net dictionary and a python list.
constructor_for_string_string
Creates an empty dotnet list for strings if no pointer provided, else wraps in NetDict with string converters
quixstreams.models.timeseriesdatatimestamp
TimeseriesDataTimestamp Objects
Represents a single point in time with parameter values and tags attached to that time.
__init__
Initializes a new instance of TimeseriesDataTimestamp.
Arguments:
net_pointer
- Pointer to an instance of a .net TimeseriesDataTimestamp.
parameters
Gets the parameter values for the timestamp as a dictionary. If a key is not found, returns an empty ParameterValue.
Returns:
Dict[str, ParameterValue]: A dictionary with parameter id as key and ParameterValue as value.
tags
Gets the tags for the timestamp as a dictionary.
Returns:
Dict[str, str]: A dictionary with tag id as key and tag value as value.
timestamp_nanoseconds
Gets the timestamp in nanoseconds.
Returns:
int
- The timestamp in nanoseconds.
timestamp_milliseconds
Gets the timestamp in milliseconds.
Returns:
int
- The timestamp in milliseconds.
timestamp
Gets the timestamp in datetime format.
Returns:
datetime
- The timestamp in datetime format.
timestamp_as_time_span
Gets the timestamp in timespan format.
Returns:
timedelta
- The timestamp in timespan format.
add_value
def add_value(
parameter_id: str, value: Union[numbers.Number, str, bytearray, bytes]
) -> 'TimeseriesDataTimestamp'
Adds a new value for the specified parameter.
Arguments:
parameter_id
- The parameter id to add the value for.value
- The value to add. Can be a number, string, bytearray, or bytes.
Returns:
TimeseriesDataTimestamp
- The updated TimeseriesDataTimestamp instance.
remove_value
Removes the value for the specified parameter.
Arguments:
parameter_id
- The parameter id to remove the value for.
Returns:
TimeseriesDataTimestamp
- The updated TimeseriesDataTimestamp instance.
add_tag
Adds a tag to the timestamp.
Arguments:
tag_id
- The id of the tag to add.tag_value
- The value of the tag to add.
Returns:
TimeseriesDataTimestamp
- The updated TimeseriesDataTimestamp instance.
remove_tag
Removes a tag from the timestamp.
Arguments:
tag_id
- The id of the tag to remove.
Returns:
TimeseriesDataTimestamp
- The updated TimeseriesDataTimestamp instance.
add_tags
Copies the tags from the specified dictionary. Conflicting tags will be overwritten.
Arguments:
tags
- The dictionary of tags to add, with tag id as key and tag value as value.
Returns:
TimeseriesDataTimestamp
- The updated TimeseriesDataTimestamp instance.
get_net_pointer
Gets the .net pointer of the TimeseriesDataTimestamp instance.
Returns:
ctypes.c_void_p
- The .net pointer of the TimeseriesDataTimestamp instance.
quixstreams.models.streamendtype
quixstreams.models.eventdata
EventData Objects
Represents a single point in time with event value and tags attached to it.
__init__
def __init__(event_id: str = None,
time: Union[int, str, datetime, pd.Timestamp] = None,
value: str = None,
net_pointer: ctypes.c_void_p = None)
Initializes a new instance of EventData.
Arguments:
event_id
- The unique id of the event the value belongs to.time
- The time at which the event has occurred in nanoseconds since epoch or as a datetime.value
- The value of the event.net_pointer
- Pointer to an instance of a .net EventData.
id
Gets the globally unique identifier of the event.
id
Sets the globally unique identifier of the event.
value
Gets the value of the event.
value
Sets the value of the event.
tags
Gets the tags for the timestamp.
If a key is not found, it returns None. The dictionary key is the tag id. The dictionary value is the tag value.
timestamp_nanoseconds
Gets timestamp in nanoseconds.
timestamp_milliseconds
Gets timestamp in milliseconds.
timestamp
Gets the timestamp in datetime format.
timestamp_as_time_span
Gets the timestamp in timespan format.
clone
Clones the event data.
Returns:
EventData
- Cloned EventData object.
add_tag
Adds a tag to the event.
Arguments:
tag_id
- The id of the tag.tag_value
- The value to set.
Returns:
EventData
- The updated EventData object.
add_tags
Adds tags from the specified dictionary. Conflicting tags will be overwritten.
Arguments:
tags
- The tags to add.
Returns:
EventData
- The updated EventData object.
remove_tag
Removes a tag from the event.
Arguments:
tag_id
- The id of the tag to remove.
Returns:
EventData
- The updated EventData object.
get_net_pointer
Gets the associated .net object pointer.
Returns:
The .net object pointer.
quixstreams.models
quixstreams.models.autooffsetreset
AutoOffsetReset Objects
Enum representing the policy on how a consumer should behave when consuming from a topic partition when there is no initial offset.
Latest
Latest: Starts from the newest message if there is no stored offset.
Earliest
Earliest: Starts from the oldest message if there is no stored offset.
Error
Error: Throws an exception if there is no stored offset.
quixstreams.app
CancellationTokenSource Objects
Represents a token source that can signal a cancellation System.Threading.CancellationToken
__init__
Initializes a new instance of the CancellationTokenSource class.
is_cancellation_requested
Checks if a cancellation has been requested.
Returns:
bool
- True if the cancellation has been requested, False otherwise.
cancel
Signals a cancellation to the CancellationToken.
token
Gets the associated CancellationToken.
Returns:
CancellationToken
- The CancellationToken associated with this CancellationTokenSource.
get_net_pointer
Gets the interop pointer of the CancellationTokenSource object.
Returns:
ctypes.c_void_p
- The interop pointer of the CancellationTokenSource object.
App Objects
Provides utilities to handle default streaming behaviors and automatic resource cleanup on shutdown.
run
@staticmethod
def run(cancellation_token: CancellationToken = None,
before_shutdown: Callable[[], None] = None)
Runs the application, managing streaming behaviors and automatic resource cleanup on shutdown.
Arguments:
cancellation_token
- An optional CancellationToken to abort the application run with.before_shutdown
- An optional function to call before shutting down the application.
get_state_manager
Retrieves the state manager for the application
Returns:
AppStateManager
- the app's state manager
set_state_storage
Sets the state storage for the app
Arguments:
storage
- The state storage to use for app's state manager
quixstreams.states.topicstatemanager
TopicStateManager Objects
Manages the states of a topic.
__init__
Initializes a new instance of TopicStateManager.
NOTE: Do not initialize this class manually, use TopicConsumer.get_state_manager
Arguments:
net_pointer
- The .net object representing a TopicStateManager.
get_stream_states
Returns a collection of all available stream state ids for the current topic.
Returns:
List[str]
- All available stream state ids for the current topic.
get_stream_state_manager
Gets an instance of the StreamStateManager for the specified stream_id.
Arguments:
stream_id
- The ID of the stream
delete_stream_state
Deletes the stream state for the specified stream
Arguments:
stream_id
- The ID of the stream
Returns:
bool
- Whether the stream state was deleted
delete_stream_states
Deletes all stream states for the current topic.
Returns:
int
- The number of stream states that were deleted
quixstreams.states.streamstatemanager
StreamStateManager Objects
Manages the states of a stream.
__init__
Initializes a new instance of StreamStateManager.
NOTE: Do not initialize this class manually, use StreamConsumer.get_state_manager
Arguments:
net_pointer
- The .net object representing a StreamStateManager.
get_dict_state
def get_dict_state(
state_name: str,
default_value_factory: Callable[[str], StreamStateType] = None,
state_type: StreamStateType = None
) -> DictStreamState[StreamStateType]
Creates a new application state of dictionary type with automatically managed lifecycle for the stream
Arguments:
state_name
- The name of the statestate_type
- The type of the statedefault_value_factory
- The default value factory to create value when the key is not yet present in the state
Example:
state_manager.get_dict_state('some_state') This will return a state where type is 'Any'
state_manager.get_dict_state('some_state', lambda missing_key: return {}) this will return a state where type is a generic dictionary, with an empty dictionary as default value when key is not available. The lambda function will be invoked with 'get_state_type_check' key to determine type
state_manager.get_dict_state('some_state', lambda missing_key: return {}, Dict[str, float]) this will return a state where type is a specific dictionary type, with default value
state_manager.get_dict_state('some_state', state_type=float) this will return a state where type is a float without default value, resulting in KeyError when not found
get_scalar_state
def get_scalar_state(
state_name: str,
default_value_factory: Callable[[], StreamStateType] = None,
state_type: StreamStateType = None
) -> ScalarStreamState[StreamStateType]
Creates a new application state of scalar type with automatically managed lifecycle for the stream
Arguments:
state_name
- The name of the statedefault_value_factory
- The default value factory to create value when it has not been set yetstate_type
- The type of the state
Example:
stream_consumer.get_scalar_state('some_state') This will return a state where type is 'Any'
stream_consumer.get_scalar_state('some_state', lambda missing_key: return 1) this will return a state where type is 'Any', with an integer 1 (zero) as default when value has not been set yet. The lambda function will be invoked with 'get_state_type_check' key to determine type
stream_consumer.get_scalar_state('some_state', lambda missing_key: return 0, float) this will return a state where type is a specific type, with default value
stream_consumer.get_scalar_state('some_state', state_type=float) this will return a state where type is a float with a default value of that type
get_states
Returns an enumerable collection of all available state names for the current stream.
Returns:
List[str]
- All available stream state names for this state
delete_state
Deletes the state with the specified name
Arguments:
state_name
- The state to delete
Returns:
bool
- Whether the state was deleted<
delete_states
Deletes all states for the current stream.
Returns:
int
- The number of states that were deleted
quixstreams.states.scalarstreamstate
ScalarStreamState Objects
Represents a state container that stores a scalar value with the ability to flush changes to a specified storage.
__init__
def __init__(net_pointer: ctypes.c_void_p, state_type: StreamStateType,
default_value_factory: Callable[[], StreamStateType])
Initializes a new instance of ScalarStreamState.
NOTE: Do not initialize this class manually, use StreamStateManager.get_scalar_state
Arguments:
net_pointer
- The .net object representing a ScalarStreamState.state_type
- The type of the statedefault_value_factory
- A function that returns a default value of type T when the value has not been set yet
type
Gets the type of the ScalarStreamState
Returns:
StreamStateType
- type of the state
on_flushed
Gets the handler for when flush operation is completed.
Returns:
Callable[[], None]: The event handler for after flush.
on_flushed
Sets the handler for when flush operation is completed.
Arguments:
value
- The parameterless callback to invoke
on_flushing
Gets the handler for when flush operation begins.
Returns:
Callable[[], None]: The event handler for after flush.
on_flushing
Sets the handler for when flush operation begins.
Arguments:
value
- The parameterless callback to invoke
flush
Flushes the changes made to the in-memory state to the specified storage.
reset
Reset the state to before in-memory modifications
value
Gets the value of the state.
Returns:
StreamStateType
- The value of the state.
value
Sets the value of the state.
Arguments:
val
- The value of the state.
quixstreams.states.dictstreamstate
DictStreamState Objects
Represents a state container that stores key-value pairs with the ability to flush changes to a specified storage.
__init__
def __init__(net_pointer: ctypes.c_void_p, state_type: StreamStateType,
default_value_factory: Callable[[str], StreamStateType])
Initializes a new instance of DictStreamState.
NOTE: Do not initialize this class manually, use StreamStateManager.get_dict_state
Arguments:
net_pointer
- The .net object representing a DictStreamState.state_type
- The type of the statedefault_value_factory
- The default value factory to create value when the key is not yet present in the state
type
Gets the type of the StreamState
Returns:
StreamStateType
- type of the state
on_flushed
Gets the handler for when flush operation is completed.
Returns:
Callable[[], None]: The event handler for after flush.
on_flushed
Sets the handler for when flush operation is completed.
Arguments:
value
- The parameterless callback to invoke
on_flushing
Gets the handler for when flush operation begins.
Returns:
Callable[[], None]: The event handler for after flush.
on_flushing
Sets the handler for when flush operation begins.
Arguments:
value
- The parameterless callback to invoke
flush
Flushes the changes made to the in-memory state to the specified storage.
reset
Reset the state to before in-memory modifications
quixstreams.states
quixstreams.states.appstatemanager
AppStateManager Objects
Manages the states of an app.
__init__
Initializes a new instance of AppStateManager.
NOTE: Do not initialize this class manually, use App.get_state_manager
Arguments:
net_pointer
- The .net object representing a AppStateManager.
get_topic_states
Returns a collection of all available topic states for the app.
Returns:
List[str]
- All available app topic states for the app.
get_topic_state_manager
Gets an instance of the TopicStateManager for the specified topic.
Arguments:
topic_name
- The name of the topic
delete_topic_state
Deletes the specified topic state
Arguments:
topic_name
- The name of the topic
Returns:
bool
- Whether the topic state was deleted
delete_topic_states
Deletes all topic states for the app.
Returns:
int
- The number of topic states that were deleted
quixstreams.topicconsumer
TopicConsumer Objects
Interface to operate with the streaming platform for consuming messages
__init__
Initializes a new instance of TopicConsumer.
NOTE: Do not initialize this class manually, use KafkaStreamingClient.get_topic_consumer to create it.
Arguments:
net_pointer
- The .net pointer to TopicConsumer instance.
on_stream_received
Gets the event handler for when a stream is received for the topic.
Returns:
Callable[[StreamConsumer], None]: The event handler for when a stream is received for the topic. The first parameter is the StreamConsumer instance that was received.
on_stream_received
@on_stream_received.setter
def on_stream_received(value: Callable[['StreamConsumer'], None]) -> None
Sets the event handler for when a stream is received for the topic.
Arguments:
value
- The new event handler for when a stream is received for the topic. The first parameter is the StreamConsumer instance that was received.
on_streams_revoked
Gets the event handler for when streams are revoked for the topic.
Returns:
Callable[[TopicConsumer, List[StreamConsumer]], None]: The event handler for when streams are revoked for the topic. The first parameter is the TopicConsumer instance for which the streams were revoked, and the second parameter is a list of StreamConsumer instances that were revoked.
on_streams_revoked
@on_streams_revoked.setter
def on_streams_revoked(
value: Callable[['TopicConsumer', List['StreamConsumer']],
None]) -> None
Sets the event handler for when streams are revoked for the topic.
Arguments:
value
- The new event handler for when streams are revoked for the topic. The first parameter is the TopicConsumer instance for which the streams were revoked, and the second parameter is a list of StreamConsumer instances that were revoked.
on_revoking
Gets the event handler for when the topic is being revoked.
Returns:
Callable[[TopicConsumer], None]: The event handler for when the topic is being revoked. The first parameter is the TopicConsumer instance for which the revocation is happening.
on_revoking
Sets the event handler for when the topic is being revoked.
Arguments:
value
- The new event handler for when the topic is being revoked. The first parameter is the TopicConsumer instance for which the revocation is happening.
on_committed
Gets the event handler for when the topic finishes committing consumed data up to this point.
Returns:
Callable[[TopicConsumer], None]: The event handler for when the topic finishes committing consumed data up to this point. The first parameter is the TopicConsumer instance for which the commit happened.
on_committed
Sets the event handler for when the topic finishes committing consumed data up to this point.
Arguments:
value
- The new event handler for when the topic finishes committing consumed data up to this point. The first parameter is the TopicConsumer instance for which the commit happened.
on_committing
Gets the event handler for when the topic begins committing consumed data up to this point.
Returns:
Callable[[TopicConsumer], None]: The event handler for when the topic begins committing consumed data up to this point. The first parameter is the TopicConsumer instance for which the commit is happening.
on_committing
Sets the event handler for when the topic begins committing consumed data up to this point.
Arguments:
value
- The new event handler for when the topic begins committing consumed data up to this point. The first parameter is the TopicConsumer instance for which the commit is happening.
subscribe
Subscribes to streams in the topic. Use 'on_stream_received' event to consume incoming streams.
commit
Commit packages consumed up until now
get_state_manager
Gets the manager for the topic states.
Returns:
TopicStateManager
- The topic state manager
get_net_pointer
Retrieves the .net pointer to TopicConsumer instance.
Returns:
ctypes.c_void_p
- The .net pointer to TopicConsumer instance.
quixstreams.streamconsumer
StreamConsumer Objects
Handles consuming stream from a topic.
__init__
def __init__(net_pointer: ctypes.c_void_p, topic_consumer: 'TopicConsumer',
on_close_cb_always: Callable[['StreamConsumer'], None])
Initializes a new instance of StreamConsumer.
NOTE: Do not initialize this class manually, TopicProducer automatically creates this when a new stream is received.
Arguments:
net_pointer
- Pointer to an instance of a .NET StreamConsumer.topic_consumer
- The topic consumer which owns the stream consumer.on_close_cb_always
- The callback function to be executed when the stream is closed.
topic
Gets the topic the stream was raised for.
Returns:
TopicConsumer
- The topic consumer instance associated with the stream.
on_stream_closed
Gets the handler for when the stream closes.
Returns:
Callable[['StreamConsumer', 'StreamEndType'], None]: The callback function to be executed when the stream closes. The first parameter is the stream that closes, and the second is the close type.
on_stream_closed
@on_stream_closed.setter
def on_stream_closed(
value: Callable[['StreamConsumer', 'StreamEndType'], None]) -> None
Sets the handler for when the stream closes.
Arguments:
value
- The new callback function to be executed when the stream closes. The first parameter is the stream that closes, and the second is the close type.
on_package_received
Gets the handler for when the stream receives a package of any type.
Returns:
Callable[['StreamConsumer', Any], None]: The callback function to be executed when the stream receives a package. The first parameter is the stream that receives the package, and the second is the package itself.
on_package_received
@on_package_received.setter
def on_package_received(
value: Callable[['StreamConsumer', Any], None]) -> None
Sets the handler for when the stream receives a package of any type.
Arguments:
value
- The new callback function to be executed when the stream receives a package. The first parameter is the stream that receives the package, and the second is the package itself.
stream_id
Get the ID of the stream being consumed.
Returns:
str
- The ID of the stream being consumed.
properties
Gets the consumer for accessing the properties and metadata of the stream.
Returns:
StreamPropertiesConsumer
- The stream properties consumer instance.
events
Gets the consumer for accessing event related information of the stream such as event definitions and values.
Returns:
StreamEventsConsumer
- The stream events consumer instance.
timeseries
Gets the consumer for accessing timeseries related information of the stream such as parameter definitions and values.
Returns:
StreamTimeseriesConsumer
- The stream timeseries consumer instance.
get_dict_state
def get_dict_state(
state_name: str,
default_value_factory: Callable[[str], StreamStateType] = None,
state_type: StreamStateType = None
) -> DictStreamState[StreamStateType]
Creates a new application state of dictionary type with automatically managed lifecycle for the stream
Arguments:
state_name
- The name of the statedefault_value_factory
- The default value factory to create value when the key is not yet present in the statestate_type
- The type of the state
Returns:
DictStreamState
- The stream state
Example:
stream_consumer.get_dict_state('some_state') This will return a state where type is 'Any'
stream_consumer.get_dict_state('some_state', lambda missing_key: return {}) this will return a state where type is a generic dictionary, with an empty dictionary as default value when key is not available. The lambda function will be invoked with 'get_state_type_check' key to determine type
stream_consumer.get_dict_state('some_state', lambda missing_key: return {}, Dict[str, float]) this will return a state where type is a specific dictionary type, with default value
stream_consumer.get_dict_state('some_state', state_type=float) this will return a state where type is a float without default value, resulting in KeyError when not found
get_scalar_state
def get_scalar_state(
state_name: str,
default_value_factory: Callable[[], StreamStateType] = None,
state_type: StreamStateType = None
) -> ScalarStreamState[StreamStateType]
Creates a new application state of scalar type with automatically managed lifecycle for the stream
Arguments:
state_name
- The name of the statedefault_value_factory
- The default value factory to create value when it has not been set yetstate_type
- The type of the state
Returns:
ScalarStreamState
- The stream state
Example:
stream_consumer.get_scalar_state('some_state') This will return a state where type is 'Any'
stream_consumer.get_scalar_state('some_state', lambda missing_key: return 1) this will return a state where type is 'Any', with an integer 1 (zero) as default when value has not been set yet. The lambda function will be invoked with 'get_state_type_check' key to determine type
stream_consumer.get_scalar_state('some_state', lambda missing_key: return 0, float) this will return a state where type is a specific type, with default value
stream_consumer.get_scalar_state('some_state', state_type=float) this will return a state where type is a float with a default value of that type
get_state_manager
Gets the manager for the stream states.
Returns:
StreamStateManager
- The stream state manager
get_net_pointer
Gets the associated .NET object pointer.
Returns:
ctypes.c_void_p
- The .NET pointer
quixstreams.state.localfilestorage
LocalFileStorage Objects
A directory storage containing the file storage for single process access purposes. Locking is implemented via in-memory mutex.
__init__
Initializes the LocalFileStorage instance.
Arguments:
storage_directory
- The path to the storage directory.auto_create_dir
- If True, automatically creates the storage directory if it doesn't exist.
quixstreams.state.inmemorystorage
InMemoryStorage Objects
Basic non-thread safe in-memory storage implementing IStateStorage
__init__
Initializes the InMemoryStorage instance.
quixstreams.state.statetype
quixstreams.state.statevalue
StateValue Objects
A wrapper class for values that can be stored inside the storage.
__init__
Initializes the wrapped value inside the store.
Arguments:
value
- The value to be wrapped, which can be one of the following types: StateValue, str, int, float, bool, bytes, bytearray, or object (via pickle).
type
Gets the type of the wrapped value.
Returns:
StateType
- The type of the wrapped value.
value
Gets the wrapped value.
Returns:
The wrapped value.
get_net_pointer
Gets the .NET pointer of the wrapped value.
Returns:
ctypes.c_void_p
- The .NET pointer of the wrapped value.
quixstreams.state.istatestorage
IStateStorage Objects
The minimum definition for a state storage
__init__
Initializes a new instance of IStateStorage.
NOTE: Do not initialize this class manually, it can be returned by classes for certain calls
Arguments:
net_pointer
- The .net object representing a StreamStateManager.
is_case_sensitive
Returns whether the storage is case-sensitive
get_or_create_sub_storage
Creates or retrieves the existing storage under this in hierarchy.
Arguments:
sub_storage_name
- The name of the sub storage
Returns:
IStateStorage
- The state storage for the given storage name
delete_sub_storages
Deletes the storages under this in hierarchy.
Returns:
int
- The number of state storage deleted
delete_sub_storage
Deletes a storage under this in hierarchy.
Arguments:
sub_storage_name
- The name of the sub storage
Returns:
bool
- Whether the state storage for the given storage name was deleted
get_sub_storages
Gets the storages under this in hierarchy.
Returns:
IStateStorage
- The storage names this store contains
get
Gets the value at the specified key.
Arguments:
key
- The key to retrieve the value for.
Returns:
Any
- The value at the specified key, which can be one of the following types: str, int, float, bool, bytes, bytearray, or object (via pickle).
set
Sets the value at the specified key.
Arguments:
key
- The key to set the value for.value
- The value to be set, which can be one of the following types: StateValue, str, int, float, bool, bytes, bytearray, or object (via pickle).
contains_key
Checks if the storage contains the specified key.
Arguments:
key
- The key to check for.
Returns:
bool
- True if the storage contains the key, False otherwise.
get_all_keys
Retrieves a set containing all the keys in the storage.
Returns:
set[str]
- A set of all keys in the storage.
remove
Removes the specified key from the storage.
Arguments:
key
- The key to be removed.
clear
Clears the storage by removing all keys and their associated values.
quixstreams.state
quixstreams.streamproducer
StreamProducer Objects
Handles publishing stream to a topic
__init__
Initializes a new instance of StreamProducer.
NOTE: Do not initialize this class manually, use TopicProducer.get_or_create_stream or create_stream
Arguments:
topic_producer
- The topic producer the stream producer publishes to.net_pointer
- The .net object representing a StreamProducer.
topic
Gets the topic the stream is producing to.
Returns:
TopicProducer
- The topic the stream is producing to.
on_write_exception
Gets the handler for when a stream experiences exception during the asynchronous write process.
Returns:
Callable[['StreamProducer', BaseException], None]: The handler for exceptions during the asynchronous write process. The first parameter is the stream is received for, second is the exception.
on_write_exception
@on_write_exception.setter
def on_write_exception(
value: Callable[['StreamProducer', BaseException], None]) -> None
Sets the handler for when a stream experiences exception during the asynchronous write process.
Arguments:
value
- The handler for exceptions during the asynchronous write process. The first parameter is the stream is received for, second is the exception.
stream_id
Gets the unique id of the stream being produced.
Returns:
str
- The unique id of the stream being produced.
epoch
Gets the default Epoch used for Timeseries and Events.
Returns:
datetime
- The default Epoch used for Timeseries and Events.
epoch
Set the default Epoch used for Timeseries and Events.
Arguments:
value
- The default Epoch value to set.
properties
Gets the properties of the stream. The changes will automatically be sent after a slight delay.
Returns:
StreamPropertiesProducer
- The properties of the stream.
timeseries
Gets the producer for publishing timeseries related information of the stream such as parameter definitions and values.
Returns:
StreamTimeseriesProducer
- The producer for publishing timeseries related information of the stream.
events
Gets the producer for publishing event related information of the stream such as event definitions and values.
Returns:
StreamEventsProducer
- The producer for publishing event related information of the stream.
flush
Flushes the pending data to stream.
close
Closes the stream and flushes the pending data to stream.
Arguments:
end_type
- The type of stream end. Defaults to StreamEndType.Closed.
quixstreams.builders.parameterdefinitionbuilder
ParameterDefinitionBuilder Objects
Builder for creating ParameterDefinition for StreamTimeseriesProducer.
__init__
Initializes a new instance of ParameterDefinitionBuilder.
Arguments:
net_pointer
- Pointer to an instance of a .net ParameterDefinitionBuilder.
set_range
Set the minimum and maximum range of the parameter.
Arguments:
minimum_value
- The minimum value.maximum_value
- The maximum value.
Returns:
The builder.
set_unit
Set the unit of the parameter.
Arguments:
unit
- The unit of the parameter.
Returns:
The builder.
set_format
Set the format of the parameter.
Arguments:
format
- The format of the parameter.
Returns:
The builder.
set_custom_properties
Set the custom properties of the parameter.
Arguments:
custom_properties
- The custom properties of the parameter.
Returns:
The builder.
add_definition
def add_definition(parameter_id: str,
name: str = None,
description: str = None) -> 'ParameterDefinitionBuilder'
Add new parameter definition to the StreamTimeseriesProducer. Configure it with the builder methods.
Arguments:
parameter_id
- The id of the parameter. Must match the parameter id used to send data.name
- The human friendly display name of the parameter.description
- The description of the parameter.
Returns:
Parameter definition builder to define the parameter properties
quixstreams.builders.eventdatabuilder
EventDataBuilder Objects
Builder for creating event data packages for StreamEventsProducer.
__init__
Initializes a new instance of EventDataBuilder.
Arguments:
net_pointer
- Pointer to an instance of a .net EventDataBuilder.
add_value
Adds new event at the time the builder is created for.
Arguments:
event_id
- The id of the event to set the value for.value
- The string value.
Returns:
The builder.
add_tag
Sets tag value for the values.
Arguments:
tag_id
- The id of the tag.value
- The value of the tag.
Returns:
The builder.
add_tags
Copies the tags from the specified dictionary. Conflicting tags will be overwritten.
Arguments:
tags
- The tags to add.
Returns:
The builder.
publish
Publishes the values to the StreamEventsProducer buffer.
See StreamEventsProducer buffer settings for more information on when the values are sent to the broker.
quixstreams.builders.timeseriesdatabuilder
TimeseriesDataBuilder Objects
Builder for managing TimeseriesDataTimestamp instances on TimeseriesBufferProducer.
__init__
Initializes a new instance of TimeseriesDataBuilder.
Arguments:
net_pointer
- Pointer to an instance of a .net TimeseriesDataBuilder.
add_value
def add_value(
parameter_id: str, value: Union[numbers.Number, str, bytearray, bytes]
) -> 'TimeseriesDataBuilder'
Adds new parameter value at the time the builder is created for.
Arguments:
parameter_id
- The id of the parameter to set the value for.value
- The value to add. Can be a number, string, bytearray, or bytes.
Returns:
The builder.
add_tag
Adds a tag to the values.
Arguments:
tag_id
- The id of the tag.value
- The value of the tag.
Returns:
The builder.
add_tags
Copies the tags from the specified dictionary. Conflicting tags will be overwritten.
Arguments:
tags
- The tags to add.
Returns:
The builder.
publish
Publish the values.
quixstreams.builders.eventdefinitionbuilder
EventDefinitionBuilder Objects
Builder for creating EventDefinitions within StreamPropertiesProducer.
__init__
Initializes a new instance of EventDefinitionBuilder.
Arguments:
net_pointer
- Pointer to an instance of a .net EventDefinitionBuilder.
set_level
Set severity level of the Event.
Arguments:
level
- The severity level of the event.
Returns:
The builder.
set_custom_properties
Set custom properties of the Event.
Arguments:
custom_properties
- The custom properties of the event.
Returns:
The builder.
add_definition
def add_definition(event_id: str,
name: str = None,
description: str = None) -> 'EventDefinitionBuilder'
Add new Event definition, to define properties like Name or Level, among others.
Arguments:
event_id
- Event id. This must match the event id you use to publish event values.name
- Human friendly display name of the event.description
- Description of the event.
Returns:
Event definition builder to define the event properties.
quixstreams.builders
quixstreams.kafkastreamingclient
KafkaStreamingClient Objects
A Kafka streaming client capable of creating topic consumer and producers.
__init__
def __init__(broker_address: str,
security_options: SecurityOptions = None,
properties: Dict[str, str] = None,
debug: bool = False)
Initializes a new instance of the KafkaStreamingClient.
Arguments:
broker_address
- The address of the Kafka cluster.security_options
- Optional security options for the Kafka client.properties
- Optional extra properties for broker configuration.debug
- Whether debugging should be enabled. Defaults to False.
get_topic_consumer
def get_topic_consumer(
topic: str,
consumer_group: str = None,
commit_settings: Union[CommitOptions, CommitMode] = None,
auto_offset_reset: AutoOffsetReset = AutoOffsetReset.Latest
) -> TopicConsumer
Gets a topic consumer capable of subscribing to receive incoming streams.
Arguments:
topic
- The name of the topic.consumer_group
- The consumer group ID to use for consuming messages. Defaults to None.commit_settings
- The settings to use for committing. If not provided, defaults to committing every 5000 messages or 5 seconds, whichever is sooner.auto_offset_reset
- The offset to use when there is no saved offset for the consumer group. Defaults to AutoOffsetReset.Latest.
Returns:
TopicConsumer
- An instance of TopicConsumer for the specified topic.
get_topic_producer
Gets a topic producer capable of publishing stream messages.
Arguments:
topic
- The name of the topic.
Returns:
TopicProducer
- An instance of TopicProducer for the specified topic.
get_raw_topic_consumer
def get_raw_topic_consumer(
topic: str,
consumer_group: str = None,
auto_offset_reset: Union[AutoOffsetReset,
None] = None) -> RawTopicConsumer
Gets a topic consumer capable of subscribing to receive non-quixstreams incoming messages.
Arguments:
topic
- The name of the topic.consumer_group
- The consumer group ID to use for consuming messages. Defaults to None.auto_offset_reset
- The offset to use when there is no saved offset for the consumer group. Defaults to None.
Returns:
RawTopicConsumer
- An instance of RawTopicConsumer for the specified topic.
get_raw_topic_producer
Gets a topic producer capable of publishing non-quixstreams messages.
Arguments:
topic
- The name of the topic.
Returns:
RawTopicProducer
- An instance of RawTopicProducer for the specified topic.