Simple stream processing using Python and tumbling windows

Aggregating Real-time Sensor Data with Python and Redpanda - image  on https://aiquantumintelligence.com
Image by author

In this tutorial, I want to show you how to downsample a stream of sensor data using only Python (and Redpanda as a message broker). The goal is to show you how simple stream processing can be, and that you don’t need a heavy-duty stream processing framework to get started.

Until recently, stream processing was a complex task that usually required some Java expertise. But gradually, the Python stream processing ecosystem has matured and there are a few more options available to Python developers — such as Faust, Bytewax and Quix. Later, I’ll provide a bit more background on why these libraries have emerged to compete with the existing Java-centric options.

But first let’s get to the task at hand. We will use a Python libary called Quix Streams as our stream processor. Quix Streams is very similar to Faust, but it has been optimized to be more concise in its syntax and uses a Pandas like API called StreamingDataframes.

You can install the Quix Streams library with the following command:

pip install quixstreams

What you’ll build

You’ll build a simple application that will calculate the rolling aggregations of temperature readings coming from various sensors. The temperature readings will come in at a relatively high frequency and this application will aggregate the readings and output them at a lower time resolution (every 10 seconds). You can think of this as a form of compression since we don’t want to work on data at an unnecessarily high resolution.

You can access the complete code in this GitHub repository.

This application includes code that generates synthetic sensor data, but in a real-world scenario this data could come from many kinds of sensors, such as sensors installed in a fleet of vehicles or a warehouse full of machines.

Here’s an illustration of the basic architecture:

Aggregating Real-time Sensor Data with Python and Redpanda - image  on https://aiquantumintelligence.com
Diagram by author

The previous diagram reflects the main components of a stream processing pipeline: You have the sensors which are the data producers, Redpanda as the streaming data platform, and Quix as the stream processor.

Data producers

These are bits of code that are attached to systems that generate data such as firmware on ECUs (Engine Control Units), monitoring modules for cloud platforms, or web servers that log user activity. They take that raw data and send it to the streaming data platform in a format that that platform can understand.

Streaming data platform

This is where you put your streaming data. It plays more or less the same role as a database does for static data. But instead of tables, you use topics. Otherwise, it has similar features to a static database. You’ll want to manage who can consume and produce data, what schemas the data should adhere to. Unlike a database though, the data is constantly in flux, so it’s not designed to be queried. You’d usually use a stream processor to transform the data and put it somewhere else for data scientists to explore or sink the raw data into a queryable system optimized for streaming data such as RisingWave or Apache Pinot. However, for automated systems that are triggered by patterns in streaming data (such as recommendation engines), this isn’t an ideal solution. In this case, you definitely want to use a dedicated stream processor.

Stream processors

These are engines that perform continuous operations on the data as it arrives. They could be compared to just regular old microservices that process data in any application back end, but there’s one big difference. For microservices, data arrives in drips like droplets of rain, and each “drip” is processed discreetly. Even if it “rains” heavily, it’s not too hard for the service to keep up with the “drops” without overflowing (think of a filtration system that filters out impurities in the water).

For a stream processor, the data arrives as a continuous, wide gush of water. A filtration system would be quickly overwhelmed unless you change the design. I.e. break the stream up and route smaller streams to a battery of filtration systems. That’s kind of how stream processors work. They’re designed to be horizontally scaled and work in parallel as a battery. And they never stop, they process the data continuously, outputting the filtered data to the streaming data platform, which acts as a kind of reservoir for streaming data. To make things more complicated, stream processors often need to keep track of data that was received previously, such as in the windowing example you’ll try out here.

Note that there are also “data consumers” and “data sinks” — systems that consume the processed data (such as front end applications and mobile apps) or store it for offline analysis (data warehouses like Snowflake or AWS Redshift). Since we won’t be covering those in this tutorial, I’ll skip over them for now.

In this tutorial, I’ll show you how to use a local installation of Redpanda for managing your streaming data. I’ve chosen Redpanda because it’s very easy to run locally.

You’ll use Docker compose to quickly spin up a cluster, including the Redpanda console, so make sure you have Docker installed first.

First, you’ll create separate files to produce and process your streaming data. This makes it easier to manage the running processes independently. I.e. you can stop the producer without stopping the stream processor too. Here’s an overview of the two files that you’ll create:

  • The stream producer: sensor_stream_producer.py
    Generates synthetic temperature data and produces (i.e. writes) that data to a “raw data” source topic in Redpanda. Just like the Faust example, it produces the data at a resolution of approximately 20 readings every 5 seconds, or around 4 readings a second.
  • The stream processor: sensor_stream_processor.py
    Consumes (reads) the raw temperature data from the “source” topic, performs a tumbling window calculation to decrease the resolution of the data. It calculates the average of the data received in 10-second windows so you get a
    reading for every 10 seconds. It then produces these aggregated readings to the agg-temperatures topic in Redpanda.

As you can see the stream processor does most of the heavy lifting and is the core of this tutorial. The stream producer is a stand-in for a proper data ingestion process. For example, in a production scenario, you might use something like this MQTT connector to get data from your sensors and produce it to a topic.

  • For a tutorial, it’s simpler to simulate the data, so let’s get that set up first.

You’ll start by creating a new file called sensor_stream_producer.py and define the main Quix application. (This example has been developed on Python 3.10, but different versions of Python 3 should work as well, as long as you are able to run pip install quixstreams.)

Create the file sensor_stream_producer.py and add all the required dependencies (including Quix Streams)

from dataclasses import dataclass, asdict # used to define the data schema
from datetime import datetime # used to manage timestamps
from time import sleep # used to slow down the data generator
import uuid # used for message id creation
import json # used for serializing data

from quixstreams import Application

Then, define a Quix application and destination topic to send the data.


app = Application(broker_address='localhost:19092')

destination_topic = app.topic(name='raw-temp-data', value_serializer="json")

The value_serializer parameter defines the format of the expected source data (to be serialized into bytes). In this case, you’ll be sending JSON.

Let’s use the dataclass module to define a very basic schema for the temperature data and add a function to serialize it to JSON.

@dataclass
class Temperature:
ts: datetime
value: int

def to_json(self):
# Convert the dataclass to a dictionary
data = asdict(self)
# Format the datetime object as a string
data['ts'] = self.ts.isoformat()
# Serialize the dictionary to a JSON string
return json.dumps(data)

Next, add the code that will be responsible for sending the mock temperature sensor data into our Redpanda source topic.

i = 0
with app.get_producer() as producer:
while i < 10000:
sensor_id = random.choice(["Sensor1", "Sensor2", "Sensor3", "Sensor4", "Sensor5"])
temperature = Temperature(datetime.now(), random.randint(0, 100))
value = temperature.to_json()

print(f"Producing value {value}")
serialized = destination_topic.serialize(
key=sensor_id, value=value, headers={"uuid": str(uuid.uuid4())}
)
producer.produce(
topic=destination_topic.name,
headers=serialized.headers,
key=serialized.key,
value=serialized.value,
)
i += 1
sleep(random.randint(0, 1000) / 1000)

This generates 1000 records separated by random time intervals between 0 and 1 second. It also randomly selects a sensor name from a list of 5 options.

Now, try out the producer by running the following in the command line

python sensor_stream_producer.py

You should see data being logged to the console like this:

[data produced]

Once you’ve confirmed that it works, stop the process for now (you’ll run it alongside the stream processing process later).

The stream processor performs three main tasks: 1) consume the raw temperature readings from the source topic, 2) continuously aggregate the data, and 3) produce the aggregated results to a sink topic.

Let’s add the code for each of these tasks. In your IDE, create a new file called sensor_stream_processor.py.

First, add the dependencies as before:

import os
import random
import json
from datetime import datetime, timedelta
from dataclasses import dataclass
import logging
from quixstreams import Application

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

Let’s also set some variables that our stream processing application needs:

TOPIC = "raw-temperature" # defines the input topic
SINK = "agg-temperature" # defines the output topic
WINDOW = 10 # defines the length of the time window in seconds
WINDOW_EXPIRES = 1 # defines, in seconds, how late data can arrive before it is excluded from the window

We’ll go into more detail on what the window variables mean a bit later, but for now, let’s crack on with defining the main Quix application.

app = Application(
broker_address='localhost:19092',
consumer_group="quix-stream-processor",
auto_offset_reset="earliest",
)

Note that there are a few more application variables this time around, namely consumer_group and auto_offset_reset. To learn more about the interplay between these settings, check out the article “Understanding Kafka’s auto offset reset configuration: Use cases and pitfalls

Next, define the input and output topics on either side of the core stream processing function and add a function to put the incoming data into a DataFrame.

input_topic = app.topic(TOPIC, value_deserializer="json")
output_topic = app.topic(SINK, value_serializer="json")

sdf = app.dataframe(input_topic)
sdf = sdf.update(lambda value: logger.info(f"Input value received: {value}"))

We’ve also added a logging line to make sure the incoming data is intact.

Next, let’s add a custom timestamp extractor to use the timestamp from the message payload instead of Kafka timestamp. For your aggregations, this basically means that you want to use the time that the reading was generated rather than the time that it was received by Redpanda. Or in even simpler terms “Use the sensor’s definition of time rather than Redpanda’s”.

def custom_ts_extractor(value):

# Extract the sensor's timestamp and convert to a datetime object
dt_obj = datetime.strptime(value["ts"], "%Y-%m-%dT%H:%M:%S.%f") #

# Convert to milliseconds since the Unix epoch for efficent procesing with Quix
milliseconds = int(dt_obj.timestamp() * 1000)
value["timestamp"] = milliseconds
logger.info(f"Value of new timestamp is: {value['timestamp']}")

return value["timestamp"]

# Override the previously defined input_topic variable so that it uses the custom timestamp extractor
input_topic = app.topic(TOPIC, timestamp_extractor=custom_ts_extractor, value_deserializer="json")

Why are we doing this? Well, we could get into a philosophical rabbit hole about which kind of time to use for processing, but that’s a subject for another article. With the custom timestamp, I just wanted to illustrate that there are many ways to interpret time in stream processing, and you don’t necessarily have to use the time of data arrival.

Next, initialize the state for the aggregation when a new window starts. It will prime the aggregation when the first record arrives in the window.

def initializer(value: dict) -> dict:

value_dict = json.loads(value)
return {
'count': 1,
'min': value_dict['value'],
'max': value_dict['value'],
'mean': value_dict['value'],
}

This sets the initial values for the window. In the case of min, max, and mean, they are all identical because you’re just taking the first sensor reading as the starting point.

Now, let’s add the aggregation logic in the form of a “reducer” function.

def reducer(aggregated: dict, value: dict) -> dict:
aggcount = aggregated['count'] + 1
value_dict = json.loads(value)
return {
'count': aggcount,
'min': min(aggregated['min'], value_dict['value']),
'max': max(aggregated['max'], value_dict['value']),
'mean': (aggregated['mean'] * aggregated['count'] + value_dict['value']) / (aggregated['count'] + 1)
}

This function is only necessary when you’re performing multiple aggregations on a window. In our case, we’re creating count, min, max, and mean values for each window, so we need to define these in advance.

Next up, the juicy part — adding the tumbling window functionality:

### Define the window parameters such as type and length
sdf = (
# Define a tumbling window of 10 seconds
sdf.tumbling_window(timedelta(seconds=WINDOW), grace_ms=timedelta(seconds=WINDOW_EXPIRES))

# Create a "reduce" aggregation with "reducer" and "initializer" functions
.reduce(reducer=reducer, initializer=initializer)

# Emit results only for closed 10 second windows
.final()
)

### Apply the window to the Streaming DataFrame and define the data points to include in the output
sdf = sdf.apply(
lambda value: {
"time": value["end"], # Use the window end time as the timestamp for message sent to the 'agg-temperature' topic
"temperature": value["value"], # Send a dictionary of {count, min, max, mean} values for the temperature parameter
}
)

This defines the Streaming DataFrame as a set of aggregations based on a tumbling window — a set of aggregations performed on 10-second non-overlapping segments of time.

Tip: If you need a refresher on the different types of windowed calculations, check out this article: “A guide to windowing in stream processing”.

Finally, produce the results to the downstream output topic:

sdf = sdf.to_topic(output_topic)
sdf = sdf.update(lambda value: logger.info(f"Produced value: {value}"))

if __name__ == "__main__":
logger.info("Starting application")
app.run(sdf)

Note: You might wonder why the producer code looks very different to the producer code used to send the synthetic temperature data (the part that uses with app.get_producer() as producer()). This is because Quix uses a different producer function for transformation tasks (i.e. a task that sits between input and output topics).

As you might notice when following alo
ng, we iteratively change the Streaming DataFrame (the sdf variable) until it is the final form that we want to send downstream. Thus, the sdf.to_topic function simply streams the final state of the Streaming DataFrame back to the output topic, row-by-row.

The producer function on the other hand, is used to ingest data from an external source such as a CSV file, an MQTT broker, or in our case, a generator function.

Finally, you get to run our streaming applications and see if all the moving parts work in harmony.

First, in a terminal window, start the producer again:

python sensor_stream_producer.py

Then, in a second terminal window, start the stream processor:

python sensor_stream_processor.py

Pay attention to the log output in each window, to make sure everything is running smoothly.

You can also check the Redpanda console to make sure that the aggregated data is being streamed to the sink topic correctly (you’ll fine the topic browser at: http://localhost:8080/topics).

Aggregating Real-time Sensor Data with Python and Redpanda - image  on https://aiquantumintelligence.com
Screenshot by author

What you’ve tried out here is just one way to do stream processing. Naturally, there are heavy duty tools such Apache Flink and Apache Spark Streaming which are have also been covered extensively online. But — those are predominantly Java-based tools. Sure, you can use their Python wrappers, but when things go wrong, you’ll still be debugging Java errors rather than Python errors. And Java skills aren’t exactly ubiquitous among data folks who are increasingly working alongside software engineers to tune stream processing algorithms.

In this tutorial, we ran a simple aggregation as our stream processing algorithm, but in reality, these algorithms often employ machine learning models to transform that data — and the software ecosystem for machine learning is heavily dominated by Python.

An oft overlooked fact is that Python is the lingua franca for data specialists, ML engineers, and software engineers to work together. It’s even better than SQL because you can use it to do non-data-related things like make API calls and trigger webhooks. That’s one of the reasons why libraries like Faust, Bytewax and Quix evolved — to bridge the so-called impedance gap between these different disciplines.

Hopefully, I’ve managed to show you that Python is a viable language for stream processing, and that the Python ecosystem for stream processing is maturing at a steady rate and can hold its own against the older Java-based ecosystem.



Source link