Publishing data
Quix Streams enables you use stream context to publish data to your topic. You can create new streams, append data to existing streams, organize streams in folders, and add context to the streams.
Connect to Quix
In order to start publishing data to Quix you first need an instance of KafkaStreamingClient
(or QuixStreamingClient
if using the Quix Platform). To create an instance, use the following code:
You can read about other ways to connect to your message broker in the Connecting to a broker section of this documentation.
Create a topic producer
To publish data to a topic you need an instance of TopicProducer
. This instance enables you to publish data and additional context for streams using the provided topic. You can create an instance using the client’s get_topic_producer
method, passing the TOPIC
as the parameter.
Create / reopen a stream
Streams are the central context of data in Quix Streams. Streams make it easy to manage, discover, and work with your data. You can create as many streams as you want using the create_stream
method of your TopicProducer
instance:
A stream ID is auto-generated, but you can also pass a StreamId
to the method to append data to an existing stream. This is also useful when you want to have a consistent StreamId
so you can continue using the same stream in the future.
Stream properties
You can add optional context to your streams by adding a name, some metadata, or a default location.
You can add these using the Properties
options of the generated stream
instance:
Stream name
When using Quix Platform, the stream name is the display name of your stream in the platform. If you specify one, Quix Platform uses it instead of the Stream ID to represent your stream inside the platform. For example, the following name:
Would result in this visualization in the list of streams of your workspace:
Stream location
The stream location property defines a default folder for the stream in the folder structure of your persisted streams.
For example, the following location:
Would result in this hierarchy:
Any streams sent without a location property will be located under the "Root" level by default.
Stream exception handler
An exception can occur on writing to a stream. This is handled for you by the default exception handler.
You can create a custom exception handler should you require functionality different to that provided by the default exception handler. The following code demonstrates how to create and register a custom exception handler:
topic_producer = client.get_topic_producer(topic_id_or_name = os.environ["output"])
stream = topic_producer.create_stream()
# custom exception handler
def my_on_write_exception_handler(stream: qx.StreamProducer, ex: BaseException):
# your code here
print('Custom exception handler')
print('Stream: ', stream.stream_id)
print('Exception: ', ex.args[0])
return
# register the exception handler
stream.on_write_exception = my_on_write_exception_handler
Close a stream
Streams can be left open 24/7 if you aren’t sure when the next data will arrive, but they can and should be closed when you know that you won't be publishing any more data to signal to consumers that the stream is over.
However, sometimes a stream can be closed for other reasons, such as if an error occurs in the publisher code, or something unexpected happens.
These snippets show you how to close a stream and how to specify the StreamEndType
:
The StreamEndType
can be one of the following possible end types:
StreamEndType | Description |
---|---|
Closed | The stream was closed normally |
Aborted | The stream was aborted by your code for your own reasons |
Terminated | The stream was terminated unexpectedly while data was being written |
Note
The Terminated
state is also used when subscribing to a stream but the StreamConsumer
is closed before the stream concluded for other reasons. This can happen when closing the TopicConsumer
for example.
Publishing time-series data
You can now start writing data to your stream. TimeseriesData is the formal class in Quix Streams which represents a time-series data packet in memory.
TimeseriesData is for time-series data coming from sources that generate data at regular time intervals, and with a fixed number of Parameters.
Tip
If your data source generates data at irregular time intervals, and you don’t have a defined list of regular Parameters, the EventData format is a better fit for your data.
TimeseriesData format
TimeseriesData
is the formal class in Quix Streams that represents a time-series data packet in memory. The format consists of a list of timestamps, with their corresponding parameter names and values for each timestamp.
You can imagine a TimeseriesData
as a table where the Timestamp
is the first column of that table, and where the parameters are the columns for the values of that table.
The following table shows an example:
Timestamp | Speed | Gear |
---|---|---|
1 | 120 | 3 |
2 | 123 | 3 |
3 | 125 | 3 |
6 | 110 | 2 |
Tip
The Timestamp column plus the Tags assigned to it function as the index of the table. If you add values for the same Timestamp and Tags combination, only the last Values will be sent to the stream.
The following code would generate the previous TimeseriesData
and publish it to the stream:
from quixstreams import TimeseriesData
data = TimeseriesData()
data.add_timestamp_nanoseconds(1) \
.add_value("Speed", 120) \
.add_value("Gear", 3)
data.add_timestamp_nanoseconds(2) \
.add_value("Speed", 123) \
.add_value("Gear", 3)
data.add_timestamp_nanoseconds(3) \
.add_value("Speed", 125) \
.add_value("Gear", 3)
data.add_timestamp_nanoseconds(6) \
.add_value("Speed", 110) \
.add_value("Gear", 2)
stream.timeseries.publish(data)
var data = new TimeseriesData();
data.AddTimestampNanoseconds(1)
.AddValue("Speed", 120)
.AddValue("Gear", 3);
data.AddTimestampNanoseconds(2)
.AddValue("Speed", 123)
.AddValue("Gear", 3);
data.AddTimestampNanoseconds(3)
.AddValue("Speed", 125)
.AddValue("Gear", 3);
data.AddTimestampNanoseconds(6)
.AddValue("Speed", 110)
.AddValue("Gear", 2);
stream.Timeseries.Publish(data);
Although Quix Streams enables you to publish TimeseriesData
to a stream directly, without any buffering, Quix recommends you use the built-in Buffer feature to achieve high throughput. The following code would publish the same TimeseriesData
through a buffer:
Visit the Buffer section of this documentation to find out more about the built-in buffer feature.
Quix Streams enables you to attach numbers, strings, or binary data to your timestamps. The following code attaches one of each to the same timestamp:
from quixstreams import TimeseriesData
from datetime import datetime
data = TimeseriesData()
data.add_timestamp(datetime.utcnow()) \
.add_value("ParameterA", 10) \
.add_value("ParameterB", "hello") \
.add_value("ParameterC", bytearray("hello, Quix!", 'utf-8')) # use bytearray to publish binary data to a stream.
pandas DataFrame format
If you use the Python version of Quix Streams you can use pandas DataFrame for writing time-series data. You can use the publish
methods of the stream.timeseries
or stream.timeseries.buffer
, passing the Data Frame instead of a TimeseriesData:
Alternatively, you can convert a pandas DataFrame
to a TimeseriesData using the method from_dataframe
:
Tip
The conversions from pandas DataFrame
to TimeseriesData have an intrinsic cost overhead. For high-performance models using pandas DataFrame
, you should use pandas DataFrame
methods provided by Quix Streams that are optimized for doing as few conversions as possible.
Timestamps
Quix Streams supports common date and time formats for timestamps when adding data to a stream.
There are several helper functions to add new timestamps to Buffer
, TimeseriesData
, and EventData
instances with several types of date/time formats.
These are the helper functions:
add_timestamp(datetime: datetime)
: Add a new timestamp indatetime
format. Defaultepoch
will never be added to this.add_timestamp(time: timedelta)
: Add a new timestamp intimedelta
format since the defaultepoch
determined in the stream.add_timestamp_milliseconds(milliseconds: int)
: Add a new timestamp in milliseconds since the defaultepoch
determined in the stream.add_timestamp_nanoseconds(nanoseconds: int)
: Add a new timestamp in nanoseconds since the defaultepoch
determined in the stream.
AddTimestamp(DateTime dateTime)
: Add a new timestamp inDateTime
format. DefaultEpoch
will never be added to this.AddTimestamp(TimeSpan timeSpan)
: Add a new timestamp inTimeSpan
format since the defaultEpoch
determined in the stream.AddTimestampMilliseconds(long timeMilliseconds)
: Add a new timestamp in milliseconds since the defaultEpoch
determined in the stream.AddTimestampNanoseconds(long timeNanoseconds)
: Add a new timestamp in nanoseconds since the defaultEpoch
determined in the stream.
Epoch
There is a stream property called epoch
(set to 0 by default, meaning 00:00:00 on 1 January 1970). This property provides a base value that is added to every timestamp (except for datetime formats) when it’s added to the stream. You can use any value you like to act as a base, from which point timestamps will be relative to the base.
The following code indicates to Quix Streams to set the current date as epoch
and add it to each timestamp added to the stream:
Adding data without using the epoch
property:
Or you can add a timestamp 1000ms from the epoch Today
:
Using a Buffer
Quix Streams provides you with an optional programmable buffer which you can configure to your needs. Using buffers to publish data enables you to achieve better compression and higher throughput.
The following code configures the buffer to publish a packet when the size of the buffer reaches 100 timestamps:
Once created, you can then write data to that buffer:
Writing a TimeseriesData to that buffer is as simple as using the publish
method of that built-in buffer
:
Writing a TimeseriesData to that buffer is as simple as using the Publish
method of that built-in Buffer
:
Quix Streams also enables you to publish data to the buffer without creating a TimeseriesData
instance explicitly. To do so, you can use the same helper methods that are supported by the TimeseriesData
class like add_timestamp
, add_value
or add_tag
. Then use the publish
method to publish that timestamp to the buffer.
You can configure multiple conditions to determine when the buffer has to release data. If any of these conditions become true, the buffer releases a new packet of data and that data is cleared from the buffer:
buffer.buffer_timeout
: The maximum duration in milliseconds for which the buffer will be held before releasing the data. A packet of data is released when the configured timeout value has elapsed from the last data received in the buffer.buffer.packet_size
: The maximum packet size in terms of number of timestamps. Each time the buffer has this number of timestamps, the packet of data is released.buffer.time_span_in_nanoseconds
: The maximum time between timestamps in nanoseconds. When the difference between the earliest and latest buffered timestamp surpasses this number, the packet of data is released.buffer.time_span_in_milliseconds
: The maximum time between timestamps in milliseconds. When the difference between the earliest and latest buffered timestamp surpasses this number the packet of data is released. Note: This is a millisecond converter on top oftime_span_in_nanoseconds
. They both work with the same underlying value.buffer.custom_trigger_before_enqueue
: A custom function which is invoked before adding a new timestamp to the buffer. If it returns true, the packet of data is released before adding the timestamp to it.buffer.custom_trigger
: A custom function which is invoked after adding a new timestamp to the buffer. If it returns true, the packet of data is released with the entire buffer content.buffer.filter
: A custom function to filter the incoming data before adding it to the buffer. If it returns true, data is added, otherwise it isn’t.
Buffer.BufferTimeout
: The maximum duration in milliseconds for which the buffer will be held before releasing the data. A packet of data is released when the configured timeout value has elapsed from the last data received in the buffer.Buffer.PacketSize
: The maximum packet size in terms of number of timestamps. Each time the buffer has this number of timestamps, the packet of data is released.Buffer.TimeSpanInNanoseconds
: The maximum time between timestamps in nanoseconds. When the difference between the earliest and latest buffered timestamp surpasses this number, the packet of data is released.Buffer.TimeSpanInMilliseconds
: The maximum time between timestamps in milliseconds. When the difference between the earliest and latest buffered timestamp surpasses this number, the packet of data is released. Note: This is a millisecond converter on top ofTimeSpanInNanoseconds
. They both work with the same underlying value.Buffer.CustomTriggerBeforeEnqueue
: A custom function which is invoked before adding a new timestamp to the buffer. If it returns true, the packet of data is released before adding the timestamp to it.Buffer.CustomTrigger
: A custom function which is invoked after adding a new timestamp to the buffer. If it returns true, the packet of data is released with the entire buffer content.Buffer.Filter
: A custom function to filter the incoming data before adding it to the buffer. If it returns true, data is added, otherwise it isn’t.
Examples
The following buffer configuration will publish data every 100ms or, if no data is buffered in the 1 second timeout period, it will flush and empty the buffer anyway:
The following buffer configuration will publish data every 100ms window, or if critical data is added to it:
Parameter definitions
Quix Streams enables you to define metadata for parameters and events to describe them. You can define things like human readable names, descriptions, acceptable ranges of values, and so on. Quix Platform uses some of this configuration when visualizing data on the platform, but you can also use them in your own models, bridges, or visualization implementations.
We call this parameter metadata ParameterDefinitions
, and all you need to do is to use the add_definition
helper function of the stream.timeseries
property:
Once you have added a new definition, you can attach some additional properties to it. This is the list of visualization and metadata options you can attach to a ParameterDefinition
:
set_range(minimum_value: float, maximum_value: float)
: Set the minimum and maximum range of the parameter.set_unit(unit: str)
: Set the unit of the parameter.set_format(format: str)
: Set the format of the parameter.set_custom_properties(custom_properties: str)
: Set the custom properties of the parameter for your own needs.
Example:
SetRange(double minimumValue, double maximumValue)
: Set the minimum and maximum range of the parameter.SetUnit(string unit)
: Set the unit of the parameter.SetFormat(string format)
: Set the format of the parameter.SetCustomProperties(string customProperties)
: Set the custom properties of the parameter for your own needs
Example:
The Min and Max range definition sets the Y axis range in the waveform visualisation view in Quix Platform. This definition:
Will set up this view in Data explorer:
Adding Definitions
for each parameter enables you to see data with different ranges on the same waveform view:
You can also define a Location
before adding parameter and event definitions. Locations are used to organize the parameters and events in hierarchy groups in the data catalogue. To add a Location you should use the add_location
method before adding the definitions you want to include in that group.
For example, setting this parameter location:
Will result in this parameter hierarchy in the parameter selection dialogs.
Publishing events
EventData
is the formal class in Quix Streams which represents an Event data packet in memory. EventData
is meant to be used when the data is intended to be consumed only as single unit, such as a JSON payload where properties can't be converted to individual parameters. EventData
can also be better for non-standard changes, such as when a machine shutting down publishes an event named ShutDown
.
Tip
If your data source generates data at regular time intervals, or the information can be organized in a fixed list of Parameters, the TimeseriesData format is a better fit for your time-series data.
EventData format
EventData
consists of a record with a Timestamp
, an EventId
and an EventValue
.
You can imagine a list of EventData
instances as a table of three columns where the Timestamp
is the first column of that table and the EventId
and EventValue
are the second and third columns, as shown in the following table:
Timestamp | EventId | EventValue |
---|---|---|
1 | failure23 | Gearbox has a failure |
2 | box-event2 | Car has entered to the box |
3 | motor-off | Motor has stopped |
6 | race-event3 | Race has finished |
The following code would generate the list of EventData
shown in the previous example and publish it to the stream:
from quixstreams import EventData
stream.events.publish(EventData("failure23", 1, "Gearbox has a failure"))
stream.events.publish(EventData("box-event2", 2, "Car has entered to the box"))
stream.events.publish(EventData("motor-off", 3, "Motor has stopped"))
stream.events.publish(EventData("race-event3", 6, "Race has finished"))
stream.Events.Publish(new EventData("failure23", 1, "Gearbox has a failure"));
stream.Events.Publish(new EventData("box-event2", 2, "Car has entered to the box"));
stream.Events.Publish(new EventData("motor-off", 3, "Motor has stopped"));
stream.Events.Publish(new EventData("race-event3", 6, "Race has finished"));
Quix Streams enables you publish events without creating EventData
instances explicitly. To do so, you can use similar helpers to those present in TimeseriesData format such as add_timestamp
, add_value
or add_tag
. Then use the publish
method to publish that timestamp to the stream.
stream.events \
.add_timestamp(1) \
.add_value("failure23", "Gearbox has a failure") \
.publish()
stream.events \
.add_timestamp(2) \
.add_value("box-event2", "Car has entered to the box") \
.publish()
stream.events \
.add_timestamp(3) \
.add_value("motor-off", "Motor has stopped") \
.publish()
stream.events \
.add_timestamp(6) \
.add_value("race-event3", "Race has finished") \
.publish()
stream.Events
.AddTimestamp(1)
.AddValue("failure23", "Gearbox has a failure")
.Publish();
stream.Events
.AddTimestamp(2)
.AddValue("box-event2", "Car has entered to the box")
.Publish();
stream.Events
.AddTimestamp(3)
.AddValue("motor-off", "Motor has stopped")
.Publish();
stream.Events
.AddTimestamp(6)
.AddValue("race-event3", "Race has finished")
.Publish();
Event definitions
As with parameters, you can attach Definitions
to each event.
This is the list of visualization and metadata options you can attach to a EventDefinition
:
set_level(level: EventLevel)
: Set severity level of the event.set_custom_properties(custom_properties: str)
: Set the custom properties of the event for your own needs.
For example, the following code defines a human readable name and a Severity level for the EventA
:
Tags
The library enables you to tag data for TimeseriesData
and EventData
packets. Using tags alongside parameters and events helps when indexing persisted data in the database. Tags enable you to filter and group data with fast queries.
Tags work as a part of the primary key inside TimeseriesData
and EventData
, in combination with the default Timestamp key. If you add data values with the same Timestamps, but a different combination of Tags, the timestamp will be treated as a separate row.
For example, the following code:
from quixstreams import TimeseriesData
data = TimeseriesData()
data.add_timestamp_nanoseconds(1) \
.add_tag("CarId", "car1") \
.add_value("Speed", 120) \
.add_value("Gear", 3)
data.add_timestamp_nanoseconds(2) \
.add_tag("CarId", "car1") \
.add_value("Speed", 123) \
.add_value("Gear", 3)
data.add_timestamp_nanoseconds(3) \
.add_tag("CarId", "car1") \
.add_value("Speed", 125) \
.add_value("Gear", 3)
data.add_timestamp_nanoseconds(1) \
.add_tag("CarId", "car2") \
.add_value("Speed", 95) \
.add_value("Gear", 2)
data.add_timestamp_nanoseconds(2) \
.add_tag("CarId", "car2") \
.add_value("Speed", 98) \
.add_value("Gear", 2)
data.add_timestamp_nanoseconds(3) \
.add_tag("CarId", "car2") \
.add_value("Speed", 105) \
.add_value("Gear", 2)
var data = new TimeseriesData();
data.AddTimestampNanoseconds(1)
.AddTag("CarId", "car1")
.AddValue("Speed", 120)
.AddValue("Gear", 3);
data.AddTimestampNanoseconds(2)
.AddTag("CarId", "car1")
.AddValue("Speed", 123)
.AddValue("Gear", 3);
data.AddTimestampNanoseconds(3)
.AddTag("CarId", "car1")
.AddValue("Speed", 125)
.AddValue("Gear", 3);
data.AddTimestampNanoseconds(1)
.AddTag("CarId", "car2")
.AddValue("Speed", 95)
.AddValue("Gear", 2);
data.AddTimestampNanoseconds(2)
.AddTag("CarId", "car2")
.AddValue("Speed", 98)
.AddValue("Gear", 2);
data.AddTimestampNanoseconds(3)
.AddTag("CarId", "car2")
.AddValue("Speed", 105)
.AddValue("Gear", 2);
Will generate the following TimeseriesData
packet with tagged data:
Timestamp | CarId | Speed | Gear |
---|---|---|---|
1 | car1 | 120 | 3 |
1 | car2 | 95 | 2 |
2 | car1 | 123 | 3 |
2 | car2 | 98 | 2 |
3 | car1 | 125 | 3 |
3 | car2 | 105 | 2 |
Warning
Tags have to be chosen carefully as excessive cardinality leads to performance degradation in the database. You should use tags only for identifiers and not cardinal values.
The following example of good tagging practice enables you to query the maximum speed for driver identifier "Peter":
The following example of bad tagging practice will lead to excessive cardinality as there will be a large number of different values for the specified tag, Speed:
Minimal example
This is a minimal code example you can use to publish data to a topic using Quix Streams:
import time
import datetime
from quixstreams import *
client = KafkaStreamingClient('127.0.0.1:9092')
with (topic_producer := client.get_topic_producer(TOPIC_ID)):
stream = topic_producer.create_stream()
stream.properties.name = "Hello World python stream"
for index in range(0, 3000):
stream.timeseries \
.buffer \
.add_timestamp(datetime.datetime.utcnow()) \
.add_value("ParameterA", index) \
.publish()
time.sleep(0.01)
print("Closing stream")
stream.close()
using System;
using System.Threading;
namespace WriteHelloWorld
{
class Program
{
/// <summary>
/// Main will be invoked when you run the application
/// </summary>
static void Main()
{
// Create a client which holds generic details for creating input and output topics
var client = new QuixStreams.Streaming.QuixStreamingClient();
using var topicProducer = client.GetTopicProducer(TOPIC_ID);
var stream = topicProducer.CreateStream();
stream.Properties.Name = "Hello World stream";
Console.WriteLine("Publishing values for 30 seconds");
for (var index = 0; index < 3000; index++)
{
stream.Timeseries.Buffer
.AddTimestamp(DateTime.UtcNow)
.AddValue("ParameterA", index)
.Publish();
Thread.Sleep(10);
}
Console.WriteLine("Closing stream");
stream.Close();
Console.WriteLine("Done!");
}
}
}
Publish raw Kafka messages
Quix Streams uses an internal protocol which is both data and speed optimized, but this requires you to use Quix Streams on both the producer and consumer sides. Custom formats need to be handled manually.
For this, Quix Streams provides a way to publish and subscribe to the raw, unformatted messages, and work with them as bytes. This gives you the ability to implement the protocol as needed and convert between formats.
You can publish messages with or without a key. The following example demonstrates how to publish two messages to Kafka, one message with a key, and one without: