2. Event Detection
Our event detection pipeline is centered around this service, which runs an ML model to detect whether a vehicle has been involved in an accident.
In reality our ML model was trained to detect the difference between a phone being shaken versus just being used normally. You actually don’t have to use an ML model at all! There are various ways this service could have been written, for example, you could detect a change in the speed or use the speed and another parameter to determine if an event has occurred.
Service template
Follow these steps to start creating the crash detection service
:
-
Navigate to
Code Samples
and search forStarter transformation
. -
Ensure you have located the Python starter transformation and click
Preview code
. -
Click
Edit code
. -
Change the name to
Crash event detection
. -
Enter
phone-data
into the input field. -
Enter
phone-out
into the output field. -
Click
Save as project
.
You now have the basic template for the service saved to your workspace.
Test
At this stage you should test the code to make sure it passes some basic functional tests:
-
Ensure that the data source you deployed earlier is running.
-
Click the
Run
button in the top right of your browser. -
Explore the
Console
andMessages
tabs and verify that there is data arriving into thephone-data
topic. -
Stop the code from running once you are finished investigating the tabs.
Adding functionality
Next you will add code to detect the crash events, making use of some code snippets and Python libraries.
requirements.txt
Follow these steps:
-
Open the
requirements.txt
file and add the following lines to ensure all the packages are installed. -
Save the file.
main.py
-
Open
main.py
and add these lines to the existing imports.Next add these lines which will download the
.pkl
file (the pretrained model) from our storage account and load the model into memory. -
Next, to ensure that the latest messages are processed instead of any historical data, set the "ConsumerGroup" parameter to
None
when opening the input topic. Look for the line withopen_input_topic
.This line:
Should be changed to this: -
Locate the line that instantiates the
QuixFunction
and pass theloaded_model
as the last parameter. The QuixFunction class will use this to predict crash events.The resulting code should look like this:
The completed
main.py
should look like thisfrom quixstreaming import QuixStreamingClient, StreamEndType, StreamReader, AutoOffsetReset from quixstreaming.app import App from quix_function import QuixFunction import os import pickle from urllib import request # download the model f = request.urlopen("https://quixtutorials.blob.core.windows.net/tutorials/event-detection/XGB_model.pkl") with open("XGB_model.pkl", "wb") as model_file: model_file.write(f.read()) # load it with pickle loaded_model = pickle.load(open("XGB_model.pkl", 'rb')) # Quix injects credentials automatically to the client. Alternatively, you can always pass an SDK token manually as an argument. client = QuixStreamingClient() # Change consumer group to a different constant if you want to run model locally. print("Opening input and output topics") input_topic = client.open_input_topic(os.environ["input"], None, auto_offset_reset=AutoOffsetReset.Latest) output_topic = client.open_output_topic(os.environ["output"]) # Callback called for each incoming stream def read_stream(input_stream: StreamReader): # Create a new stream to output data output_stream = output_topic.create_stream(input_stream.stream_id) output_stream.properties.parents.append(input_stream.stream_id) # handle the data in a function to simplify the example quix_function = QuixFunction(input_stream, output_stream, loaded_model) # React to new data received from input topic. input_stream.events.on_read += quix_function.on_event_data_handler input_stream.parameters.on_read_pandas += quix_function.on_pandas_frame_handler # When input stream closes, we close output stream as well. def on_stream_close(endType: StreamEndType): output_stream.close() print("Stream closed:" + output_stream.stream_id) input_stream.on_stream_closed += on_stream_close # Hook up events before initiating read to avoid losing out on any data input_topic.on_stream_received += read_stream # Hook up to termination signal (for docker image) and CTRL-C print("Listening to streams. Press CTRL-C to exit.") # Handle graceful exit of the model. App.run()
-
Save
main.py
.
quix_function.py
-
Open
quix_function.py
This file contains handlers for tabular data and event data. You will add code to theon_pandas_frame_handler
function to handle the input streams tabular data. -
Start by locating the
__init__
function definition. -
Add
loaded_model
as the last parameter to the__init__
function, this will receive the loaded model frommain.py
. -
Store the
loaded_model
in a class property by adding the following line to the the__init__
function. -
Now replace the current
on_pandas_frame_handler
with this code:def on_pandas_frame_handler(self, df: pd.DataFrame): if "gForceX" in df: df["gForceTotal"] = df["gForceX"].abs() + df["gForceY"].abs() + df["gForceZ"].abs() df["shaking"] = self.loaded_model.predict(df[["gForceZ", "gForceY", "gForceX", "gForceTotal"]]) if df["shaking"].max() == 1: print("Crash detected.") self.output_stream.events.add_timestamp_nanoseconds(df.iloc[0]["time"]) \ .add_value("crash", "Crash detected.") \ .write()
The first thing this code does is check that the required data is in the DataFrame then uses the ML model with the g-force data to determine when shaking is occurring.
If shaking is occurring an event is generated and streamed to the output topic.
The completed
quix_function.py
should look like thisfrom quixstreaming import StreamReader, StreamWriter, EventData, ParameterData import pandas as pd class QuixFunction: def __init__(self, input_stream: StreamReader, output_stream: StreamWriter, loaded_model): self.input_stream = input_stream self.output_stream = output_stream self.loaded_model = loaded_model # Callback triggered for each new event. def on_event_data_handler(self, data: EventData): print(data.value) # Here transform your data. self.output_stream.events.write(data) # Callback triggered for each new parameter data. def on_pandas_frame_handler(self, df: pd.DataFrame): # if the expected column is in the dataframe if "gForceX" in df: # calc total g-force df["gForceTotal"] = df["gForceX"].abs() + df["gForceY"].abs() + df["gForceZ"].abs() # predict 'shaking' df["shaking"] = self.loaded_model.predict(df[["gForceZ", "gForceY", "gForceX", "gForceTotal"]]) # if 'shaking' if df["shaking"].max() == 1: print("Crash detected.") # write the event to the output stream self.output_stream.events.add_timestamp_nanoseconds(df.iloc[0]["time"]) \ .add_value("crash", "Crash detected.") \ .write()
-
Save
quix_function.py
.
dockerfile
-
Now update the
dockerfile
in thebuild
folder.Under the line with
COPY --from=git /project .
AddThis will install
libgomp1
which a requirement ofXGBoost
The completed
dockerfile
should look like thisFROM quixpythonbaseimage WORKDIR /app COPY --from=git /project . RUN apt-get install libgomp1 RUN find | grep requirements.txt | xargs -I '{}' python3 -m pip install -i http://pip-cache.pip-cache.svc.cluster.local/simple --trusted-host pip-cache.pip-cache.svc.cluster.local -r '{}' --extra-index-url https://pypi.org/simple --extra-index-url https://pkgs.dev.azure.com/quix-analytics/53f7fe95-59fe-4307-b479-2473b96de6d1/_packaging/public/pypi/simple/ ENTRYPOINT python3 main.py
-
Save
dockerfile
.
Test again
You can once again run the code in the development environment to test the functionality:
-
Ensure that the data source you deployed earlier is running.
-
Click
Run
in the top right of the browser to run the event detection code. -
If you chose to stream live data then gently shake your phone.
-
If you chose to use CSV data, wait for a crash event to be streamed from the data set, or stop and start the service in another browser tab.
-
Observe the
Console
tab. You should see a message saying "Crash detected". -
On the
Messages
tab selectoutput : phone-out
from the first drop-down. -
Gently shake your phone, or wait for another crash event from the CSV data, and observe that crash events are streamed to the output topic. You can click these rows to investigate the event data e.g.
-
Stop the code.
Success
The crash detection service is working as expected and can now be deployed
Deploy crash detection
Now that you have verified the service is working you can go ahead and deploy the service:
-
Tag the code by clicking the
add tag
icon at the top of the code panel. -
Enter a tag such as
v1
. -
Now click the
Deploy
button near the top right of the code panel. -
From the
Version tag
drop-down, select the tag you created. -
Click
Deploy
.
Success
You now have a data source and the crash detection service running in your workspace.
Next you’ll deploy a real-time UI to visualize the route being taken, the location of any crash events and also to see some of the sensor data.