Quick start
Complete quick start example demonstrating integration of Fluxtion into an application
Introduction
5 Minute tutorial to demonstrate stream data processing using Fluxtion. The goal is to read a sensor data stream for a set of rooms, calculate aggregate values per room and notify a user class when a room breaches set temperature criteria. The user class will send an SMS alert when notified, if a number has been registered.
To get benefit out of this tutorial you should have:
A passing understanding of stream processing
Intermediate Java coding skills combined with basic knowledge of git and maven
Requirements
Read room sensor temperature as a stream of csv records or as instances of SensorReading events.
Merge csv records and SensorReading instances into a single event stream for processing
The event stream can be infinite
For each room calculate the max and average temperature
Run a tumbling window, zeroing all room values every 3 readings
Register a user class with the stream processor to act as a controller of an external system
If a room has an average of > 60 and max of > 90 then
Log a warning
A user class(TempertureController) will attempt to send an SMS listing rooms to investigate
Register an SMS endpoint with the controller by sending a String as an event into the processor
The SMS number can be updated in realtime
Running the application
Clone the quickstart project and execute the sensorquickstart.jar in the dist directory, as shown below:
The application processes the file temperatureData.csv as an input in place of real sensor source. The last three records trigger an alert condition. No SMS endoint is registered so the controller is unable to send a message.
After reading the csv file SensorReading events are programatically sent to the processor, to register an SMS number and create an alert condition. In this case the controller can now send an SMS message.
The environmental variable fluxtion.cacheDIrectory sets the cache directory for outputs of the fluxtion compiler. Executing a second time sees a significant reduction in processing time as the classes in the cache directory are used and no compilation of the stream processor is required. Deleting the cache directory will force the application to recompile at the next start.
Solution description
Dependencies
Build a processor and stream events
The code below is from SensorMonitor builds a streaming processing engine and sends events to it.
A method reference is passed to the reuseOrBuild function on line 2 to build the graph. The two string parameters are used as the fully qualified name of the generated stream processing class. The call to reuseOrBuild checks the classpath for a class that matches the fully qualified name. If no class can be loaded for that fqn, then a new stream processor is generated.
Processing events
Once built the application can send CharEvent's from a file to the generated StaticEventProcessor using the utility method CharStreamer,stream() on line 5 or programmatically via the onEvent method on line 7. The processor will dispatch events within the execution graph to the correct instance.
Defining the calculation
The builder method constructs the processor on lines 14-31. Method references are used throughout to increase type safety, make refactoring easier and make ide's more productive in building event processors.
Integrated user classes
The builder refers to two helper instances that define the input and output datatypes. Lombok is used to reduce boilerplate code for getter/setters.
A user supplied mapping function converts the aggregated sensor data for all rooms and creates a collection of room names that require investigation:
A user supplied controller instance is registered with the stream processor. When the list of rooms to investigate is > 0, the list is pushed to the user controller class.
To register an SMS endpoint the controller class annotates a method to receive a String as an event. The processor will route any String to the controller class method.
Cached compilation
The application generates a solution in a cache directory, set with system property:
-Dfluxtion.cacheDirectory=fluxtion
The fluxtion cache directory contains three sub-directories:
classes - compiled classes generated by the Fluxtion compiler
resources - meta-data describing the processor
sources - The java source files used to generate the classes
The classes directory is added to the classpath by the reuseOrBuild method. If an event processor cannot be loaded with the fqn supplied then the classes generated by the ahead of time compiler are written to the classes directory.
Executing the jar a second time sees a significant reduction in execution time as the application loads the compiled processor from the first run. Using the cached compiled classes gives an almost instant response to the input event stream.
Deleting the cache directory will cause the regeneration and compilation of the solution.
Generated files
source - A CSV marshaller and stream processing solution are generated here.
The main entry point to the stream processor is RoomSensorSEP.java
The CVS marshaller is SensorReadingCsvDecoder0.java
classes - The sources are compiled in this directory, ready for subsequent executions
resources - Description of the generated process graph for rendering and non class resources required at runtime
An image describing the processing graph
A graphml that can be interactively explored with a netbeans plugin
Any user lambdas used in the processor are serialised for loading in the generated processor
Last updated